主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create
CREATE 語句用于注冊表、視圖或函數。注冊后的表、視圖和函數可以在 SQL 查詢中使用。目前 Flink SQL 支持下列 CREATE 語句:
- CREATE TABLE
- CREATE DATABASE
- CREATE VIEW
- CREATE FUNCTION
執(zhí)行 CREATE
可以使用 TableEnvironment 的 executeSql() 方法執(zhí)行 CREATE 語句。若 CREATE 操作執(zhí)行成功,executeSql() 方法返回OK,否則會拋出異常。
EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 對已注冊的表進行 SQL 查詢
// 注冊名為 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 SQL 查詢,并把得到的結果作為一個新的表
Table result = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// 對已注冊的表進行 INSERT 操作
// 注冊 TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 INSERT 語句并向 TableSink 發(fā)出結果
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
SQL CLI
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...
CREATE TABLE
以下語法概述了 CREATE TABLE 語法:
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
Columns
Physical / Regular Columns
Pyhsical Columns 數據庫中已知的常規(guī)字段,定義了字段的名稱、類型和順序。Connectors 和 Formats 使用這些列(按定義的順序)來進行配置。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
Metadata Columns
Metadata Column 是 SQL 標準的擴展(可選項),允許訪問連接器、格式化表中每一行數據,由 METADATA 關鍵字表示。
例如,可以使用 Metadata column 從 Kafka 記錄中讀取和寫入時間戳,以進行基于時間的操作。
根據 Connector 和 Format 選擇可使用的 Metadata column 。
下面定義了一個表,聲明字段 record_time 使用 Metadata column timestamp:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH (
'connector' = 'kafka'
...
);
record_time 成為表結構的一部分,可以像常規(guī)列一樣進行轉換和存儲:
INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;
如果列名使用 Metadata column 的標識鍵(kafka 中的 timesamp),則可以簡略寫法:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA -- use column name as metadata key
) WITH (
'connector' = 'kafka'
...
);
如果列的數據類型與 Metadata column 的數據類型不同,則運行時將執(zhí)行顯式強制轉換(這要求這兩種數據類型是兼容的)。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` BIGINT METADATA -- cast the timestamp as BIGINT
) WITH (
'connector' = 'kafka'
...
);
默認情況下,Planner 假定 Metadata column 可以用于讀寫。但是,在許多情況下,外部系統提供的只讀多于可寫。因此,可以使用 VIRTUAL 關鍵字從持久化中排除元數據列(下面例子中的 offset)。
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA, -- part of the query-to-sink schema
`offset` BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
Computed Columns
計算列是一個使用 column_name AS computed_column_expression 語法生成的虛擬列。由使用同一表中其他列的表達式生成,并且不會在表中進行物理存儲。這個表達式可以包含物理列、常量、函數或變量的任意組合,但這個表達式不能存在任何子查詢。
下面的例子是使用 cost AS price * quantity 定義的一個計算列
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity, -- evaluate expression and supply the result to queries
) WITH (
'connector' = 'kafka'
...
);
定義在 source table 上的計算列會在從數據源讀取數據后被計算,可以在 SELECT 查詢語句中使用。與使用 VIRTUAL 的 Metadata column 類似,計算列不會持久化。因此計算列不能作為 INSERT INTO 語句的目標(在 INSERT 語句中,SELECT 語句的 schema 需要與目標表不帶有計算列的 schema 一致)。
計算列可用于為 CREATE TABLE 語句定義時間屬性。Processing time 可以簡單地通過使用了系統函數 PROCTIME() 的 proc AS PROCTIME() 語句進行定義。 Event time 可能需要從現有的字段中獲得(例如,原始字段的類型不是 TIMESTAMP 或嵌套在 JSON 字符串中)。
WATERMARK
WATERMARK 定義了表的事件時間屬性,其形式為 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
rowtime_column_name 把一個現有的列定義為事件時間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列(top-level column),也可以是一個計算列。
watermark_strategy_expression 定義了 watermark 的生成策略。允許使用包括計算列在內的任意非子查詢表達式來計算 watermark;表達式的返回類型必須是 TIMESTAMP(3)。僅當返回的 watermark 不為空且大于之前發(fā)出的 watermark 時才會被發(fā)出(以保證 watermark 遞增)。
Flink 為每條記錄的計算 watermark,定期(pipeline.auto-watermark-interval)發(fā)出所生成的最大的 watermark(如果 watermark 為空或不大于之前的 watermark 不發(fā)出)。若 watermark 的間隔(pipeline.auto-watermark-interval)是 0ms,那么每條記錄都會產生一個 watermark(根據前述的規(guī)則發(fā)出)。
使用事件時間(Event time)語義時,表必須包含事件時間屬性和 watermark 策略。
Flink 提供了幾種常用的 watermark 策略:
嚴格遞增時間戳(Strictly ascending timestamps):
WATERMARK FOR rowtime_column AS rowtime_column- 發(fā)出到目前為止已觀察到的最大時間戳的 watermark,時間戳大于最大時間戳的 Row 被認為沒有遲到
遞增時間戳(Ascending timestamps):
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND- 發(fā)出到目前為止已觀察到的最大時間戳減 1 的 watermark,時間戳大于或等于最大時間戳的 Row 被認為沒有遲到
有界亂序時間戳(Bounded out of orderness timestamps):
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit- 發(fā)出到目前為止已觀察到的最大時間戳減去指定延遲的 watermark,例如,
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND是一個 5 秒延遲的 watermark 策略
- 發(fā)出到目前為止已觀察到的最大時間戳減去指定延遲的 watermark,例如,
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
PRIMARY KEY
主鍵約束是 Flink 優(yōu)化的一種提示信息,表明一張表或視圖的某個(些)列是唯一的并且不包含 Null 值。主鍵聲明的列都是非空的,可以被用作表中每行的唯一標識。
主鍵可以和列的定義一起聲明,也可以獨立聲明為表的限制屬性,不管是哪種方式,主鍵都不可以重復定義,否則 Flink 會報錯。
有效性檢查
SQL 標準主鍵限制可以有兩種模式:ENFORCED 或者 NOT ENFORCED。 申明了是否輸入/輸出數據做檢查(是否唯一)。Flink 只支持 NOT ENFORCED 模式,<u>用戶需要自己保證唯一性</u>。
Flink 假定聲明了主鍵的列都是不包含 Null 值的,Connector 在處理數據時需要自己保證語義正確。
在 CREATE TABLE 語句中,創(chuàng)建主鍵會修改列的 nullable 屬性,主鍵聲明的列默認都是非 Nullable 的。
PARTITIONED BY
根據指定的列對已經創(chuàng)建的表進行分區(qū)。若表使用 filesystem sink ,則將會為每個分區(qū)創(chuàng)建一個目錄。
WITH Options
Table properties 用于創(chuàng)建 table source/sink,一般用于尋找和創(chuàng)建底層的連接器(Connector)。
表達式 key1=val1 的鍵和值必須為字符串字面量。不同 Connector 有各自的 properties。
Note. 表名可以為以下三種格式 1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name。
使用 catalog_name.db_name.table_name 的表將會與名為 catalog_name 的 catalog 和名為 db_name 的數據庫一起注冊到 metastore 中;使用 db_name.table_name 的表將會被注冊到當前執(zhí)行的 table environment 中的 catalog 且數據庫會被命名為 db_name;對于 table_name,數據表將會被注冊到當前正在運行的 catalog 和數據庫中。
Note. 使用 CREATE TABLE 語句注冊的表均可用作 table source 和 table sink。 在被 DML 語句引用前,無法決定其實際用于 source 或是 sink。
LIKE
LIKE 子句可以基于現有表的定義去創(chuàng)建新表,并且可以擴展或排除原始表中的某些部分。LIKE 子句必須在 CREATE 語句中定義,并且是基于 CREATE 語句的更上層定義。LIKE 子句可以用于定義表的多個部分,而不僅僅是 schema 部分(可以重用或改寫指定的連接器配置屬性或者添加 watermark 定義)。
示例如下:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- 添加 watermark 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 改寫 startup-mode 屬性
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
Orders_with_watermark 表等效于使用以下語句創(chuàng)建的表:
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
Merge Table
表屬性的合并邏輯可以用 like options 來控制??梢钥刂坪喜⒌谋韺傩匀缦拢?/p>
- CONSTRAINTS - 主鍵和唯一鍵約束
- GENERATED - 計算列
- OPTIONS - 連接器信息、格式化方式等配置項
- PARTITIONS - 表分區(qū)信息
- WATERMARKS - watermark 定義
并且有三種不同的表屬性合并策略:
- INCLUDING - 新表包含源表(source table)所有的表屬性,如果和源表的表屬性重復則會直接失敗,如新表和源表存在相同 key 的屬性。
- EXCLUDING - 新表不包含源表指定的任何表屬性。
- OVERWRITING - 新表包含源表的表屬性,但如果出現重復項,則會用新表的表屬性覆蓋源表中的重復表屬性,如新表和源表存在相同 key 的屬性,則會使用當前語句中定義的 key 的屬性值。
可以使用 INCLUDING/EXCLUDING ALL 這種聲明方式來指定使用怎樣的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,代表只有源表的 WATERMARKS 屬性才會被包含進新表。示例如下:
-- 存儲在文件系統的源表
CREATE TABLE Orders_in_file (
`user` BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
)
PARTITIONED BY (`user`)
WITH (
'connector' = 'filesystem',
'path' = '...'
);
-- 對應存儲在 kafka 的源表
CREATE TABLE Orders_in_kafka (
-- 添加 watermark 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
)
LIKE Orders_in_file (
-- 排除需要生成 watermark 的計算列之外的所有內容。
-- 去除不適用于 kafka 的所有分區(qū)和文件系統的相關屬性。
EXCLUDING ALL
INCLUDING GENERATED
);
默認將使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。
無法選擇 physical columns 的合并策略,會按照 INCLUDING 策略合并。
CREATE CATALOG
CREATE CATALOG catalog_name
WITH (key1=val1, key2=val2, ...)
Catalog 提供了元數據信息,例如數據庫、表、分區(qū)、視圖以及數據庫或其他外部系統中存儲的函數和信息。
更多參考官方文檔
CREATE DATABASE
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
根據給定的屬性創(chuàng)建數據庫。若數據庫中已存在同名表會拋出異常。
IF NOT EXISTS
若數據庫已經存在,則不會進行任何操作。
WITH OPTIONS
數據庫屬性一般用于存儲關于這個數據庫額外的信息。 表達式 key1=val1 中的鍵和值都需要是字符串字面量。
CREATE VIEW
CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[( columnName [, columnName ]* )] [COMMENT view_comment]
AS query_expression
根據給定的 query 語句創(chuàng)建一個視圖。若數據庫中已經存在同名視圖會拋出異常。
TEMPORARY
創(chuàng)建一個有 catalog 和數據庫命名空間的臨時視圖,并覆蓋原有的視圖。
IF NOT EXISTS
若該視圖已經存在,則不會進行任何操作。
CREATE FUNCTION
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
創(chuàng)建 function,可以指定 catalog 和 database,若 catalog 中,已經有同名的函數注冊了,則無法注冊。
LANGUAGE JAVA|SCALA|PYTHON 用于指定 Flink runtime 如何執(zhí)行這個函數。目前,只支持 JAVA, SCALA 和 PYTHON,且函數的默認語言為 JAVA。
如果是 JAVA 或者 SCALA,則 identifier 是 UDF 實現類的全限定名(更多參考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/)。
如果是 PYTHON,則 identifier 是 UDF 對象的全限定名(更多參考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/)。 如果是 PYTHON,而當前程序是 Java/Scala 程序或者 SQL 程序,則需要配置 Python 相關的依賴(更多參考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-dependency-in-javascala-program)。
TEMPORARY
創(chuàng)建一個臨時 catalog function,有 catalog 和 database,并覆蓋原有的 function 。
TEMPORARY SYSTEM
創(chuàng)建一個臨時 system function,有 catalog,沒有 database,并覆蓋系統內置的 function。
IF NOT EXISTS
若該函數已經存在,則不會進行任何操作。