Flink SQL Create 語法

主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create

CREATE 語句用于注冊表、視圖或函數。注冊后的表、視圖和函數可以在 SQL 查詢中使用。目前 Flink SQL 支持下列 CREATE 語句:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

執(zhí)行 CREATE

可以使用 TableEnvironment 的 executeSql() 方法執(zhí)行 CREATE 語句。若 CREATE 操作執(zhí)行成功,executeSql() 方法返回OK,否則會拋出異常。

EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 對已注冊的表進行 SQL 查詢
// 注冊名為 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 SQL 查詢,并把得到的結果作為一個新的表
Table result = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// 對已注冊的表進行 INSERT 操作
// 注冊 TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上執(zhí)行 INSERT 語句并向 TableSink 發(fā)出結果
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

SQL CLI

Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...

CREATE TABLE

以下語法概述了 CREATE TABLE 語法:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]
   
<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

Columns

Physical / Regular Columns

Pyhsical Columns 數據庫中已知的常規(guī)字段,定義了字段的名稱、類型和順序。Connectors 和 Formats 使用這些列(按定義的順序)來進行配置。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING
) WITH (
  ...
);

Metadata Columns

Metadata Column 是 SQL 標準的擴展(可選項),允許訪問連接器、格式化表中每一行數據,由 METADATA 關鍵字表示。

例如,可以使用 Metadata column 從 Kafka 記錄中讀取和寫入時間戳,以進行基于時間的操作。

根據 Connector 和 Format 選擇可使用的 Metadata column 。

下面定義了一個表,聲明字段 record_time 使用 Metadata column timestamp

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp
) WITH (
  'connector' = 'kafka'
  ...
);

record_time 成為表結構的一部分,可以像常規(guī)列一樣進行轉換和存儲:

INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;

如果列名使用 Metadata column 的標識鍵(kafka 中的 timesamp),則可以簡略寫法:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key
) WITH (
  'connector' = 'kafka'
  ...
);

如果列的數據類型與 Metadata column 的數據類型不同,則運行時將執(zhí)行顯式強制轉換(這要求這兩種數據類型是兼容的)。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` BIGINT METADATA    -- cast the timestamp as BIGINT
) WITH (
  'connector' = 'kafka'
  ...
);

默認情況下,Planner 假定 Metadata column 可以用于讀寫。但是,在許多情況下,外部系統提供的只讀多于可寫。因此,可以使用 VIRTUAL 關鍵字從持久化中排除元數據列(下面例子中的 offset)。

CREATE TABLE MyTable (
  `timestamp` BIGINT METADATA,       -- part of the query-to-sink schema
  `offset` BIGINT METADATA VIRTUAL,  -- not part of the query-to-sink schema
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

Computed Columns

計算列是一個使用 column_name AS computed_column_expression 語法生成的虛擬列。由使用同一表中其他列的表達式生成,并且不會在表中進行物理存儲。這個表達式可以包含物理列、常量、函數或變量的任意組合,但這個表達式不能存在任何子查詢。

下面的例子是使用 cost AS price * quantity 定義的一個計算列

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries
) WITH (
  'connector' = 'kafka'
  ...
);

定義在 source table 上的計算列會在從數據源讀取數據后被計算,可以在 SELECT 查詢語句中使用。與使用 VIRTUAL 的 Metadata column 類似,計算列不會持久化。因此計算列不能作為 INSERT INTO 語句的目標(在 INSERT 語句中,SELECT 語句的 schema 需要與目標表不帶有計算列的 schema 一致)。

計算列可用于為 CREATE TABLE 語句定義時間屬性。Processing time 可以簡單地通過使用了系統函數 PROCTIME()proc AS PROCTIME() 語句進行定義。 Event time 可能需要從現有的字段中獲得(例如,原始字段的類型不是 TIMESTAMP 或嵌套在 JSON 字符串中)。

WATERMARK

WATERMARK 定義了表的事件時間屬性,其形式為 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。

rowtime_column_name 把一個現有的列定義為事件時間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列(top-level column),也可以是一個計算列。

watermark_strategy_expression 定義了 watermark 的生成策略。允許使用包括計算列在內的任意非子查詢表達式來計算 watermark;表達式的返回類型必須是 TIMESTAMP(3)。僅當返回的 watermark 不為空且大于之前發(fā)出的 watermark 時才會被發(fā)出(以保證 watermark 遞增)。

Flink 為每條記錄的計算 watermark,定期(pipeline.auto-watermark-interval)發(fā)出所生成的最大的 watermark(如果 watermark 為空或不大于之前的 watermark 不發(fā)出)。若 watermark 的間隔(pipeline.auto-watermark-interval)是 0ms,那么每條記錄都會產生一個 watermark(根據前述的規(guī)則發(fā)出)。

使用事件時間(Event time)語義時,表必須包含事件時間屬性和 watermark 策略。

Flink 提供了幾種常用的 watermark 策略:

  • 嚴格遞增時間戳(Strictly ascending timestamps):WATERMARK FOR rowtime_column AS rowtime_column

    • 發(fā)出到目前為止已觀察到的最大時間戳的 watermark,時間戳大于最大時間戳的 Row 被認為沒有遲到
  • 遞增時間戳(Ascending timestamps): WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    • 發(fā)出到目前為止已觀察到的最大時間戳減 1 的 watermark,時間戳大于或等于最大時間戳的 Row 被認為沒有遲到
  • 有界亂序時間戳(Bounded out of orderness timestamps):WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

    • 發(fā)出到目前為止已觀察到的最大時間戳減去指定延遲的 watermark,例如,WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一個 5 秒延遲的 watermark 策略
CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

PRIMARY KEY

主鍵約束是 Flink 優(yōu)化的一種提示信息,表明一張表或視圖的某個(些)列是唯一的并且不包含 Null 值。主鍵聲明的列都是非空的,可以被用作表中每行的唯一標識。

主鍵可以和列的定義一起聲明,也可以獨立聲明為表的限制屬性,不管是哪種方式,主鍵都不可以重復定義,否則 Flink 會報錯。

有效性檢查

SQL 標準主鍵限制可以有兩種模式:ENFORCED 或者 NOT ENFORCED。 申明了是否輸入/輸出數據做檢查(是否唯一)。Flink 只支持 NOT ENFORCED 模式,<u>用戶需要自己保證唯一性</u>。

Flink 假定聲明了主鍵的列都是不包含 Null 值的,Connector 在處理數據時需要自己保證語義正確。

在 CREATE TABLE 語句中,創(chuàng)建主鍵會修改列的 nullable 屬性,主鍵聲明的列默認都是非 Nullable 的。

PARTITIONED BY

根據指定的列對已經創(chuàng)建的表進行分區(qū)。若表使用 filesystem sink ,則將會為每個分區(qū)創(chuàng)建一個目錄。

WITH Options

Table properties 用于創(chuàng)建 table source/sink,一般用于尋找和創(chuàng)建底層的連接器(Connector)。

表達式 key1=val1 的鍵和值必須為字符串字面量。不同 Connector 有各自的 properties。

Note. 表名可以為以下三種格式 1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name。

使用 catalog_name.db_name.table_name 的表將會與名為 catalog_name 的 catalog 和名為 db_name 的數據庫一起注冊到 metastore 中;使用 db_name.table_name 的表將會被注冊到當前執(zhí)行的 table environment 中的 catalog 且數據庫會被命名為 db_name;對于 table_name,數據表將會被注冊到當前正在運行的 catalog 和數據庫中。

Note. 使用 CREATE TABLE 語句注冊的表均可用作 table source 和 table sink。 在被 DML 語句引用前,無法決定其實際用于 source 或是 sink。

LIKE

LIKE 子句可以基于現有表的定義去創(chuàng)建新表,并且可以擴展或排除原始表中的某些部分。LIKE 子句必須在 CREATE 語句中定義,并且是基于 CREATE 語句的更上層定義。LIKE 子句可以用于定義表的多個部分,而不僅僅是 schema 部分(可以重用或改寫指定的連接器配置屬性或者添加 watermark 定義)。

示例如下:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 添加 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改寫 startup-mode 屬性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

Orders_with_watermark 表等效于使用以下語句創(chuàng)建的表:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

Merge Table

表屬性的合并邏輯可以用 like options 來控制??梢钥刂坪喜⒌谋韺傩匀缦拢?/p>

  • CONSTRAINTS - 主鍵和唯一鍵約束
  • GENERATED - 計算列
  • OPTIONS - 連接器信息、格式化方式等配置項
  • PARTITIONS - 表分區(qū)信息
  • WATERMARKS - watermark 定義

并且有三種不同的表屬性合并策略:

  • INCLUDING - 新表包含源表(source table)所有的表屬性,如果和源表的表屬性重復則會直接失敗,如新表和源表存在相同 key 的屬性。
  • EXCLUDING - 新表不包含源表指定的任何表屬性。
  • OVERWRITING - 新表包含源表的表屬性,但如果出現重復項,則會用新表的表屬性覆蓋源表中的重復表屬性,如新表和源表存在相同 key 的屬性,則會使用當前語句中定義的 key 的屬性值。

可以使用 INCLUDING/EXCLUDING ALL 這種聲明方式來指定使用怎樣的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,代表只有源表的 WATERMARKS 屬性才會被包含進新表。示例如下:

-- 存儲在文件系統的源表
CREATE TABLE Orders_in_file (
    `user` BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
)
PARTITIONED BY (`user`) 
WITH ( 
    'connector' = 'filesystem',
    'path' = '...'
);

-- 對應存儲在 kafka 的源表
CREATE TABLE Orders_in_kafka (
    -- 添加 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    ...
)
LIKE Orders_in_file (
    -- 排除需要生成 watermark 的計算列之外的所有內容。
    -- 去除不適用于 kafka 的所有分區(qū)和文件系統的相關屬性。
    EXCLUDING ALL
    INCLUDING GENERATED
);

默認將使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。

無法選擇 physical columns 的合并策略,會按照 INCLUDING 策略合并。

CREATE CATALOG

CREATE CATALOG catalog_name
  WITH (key1=val1, key2=val2, ...)

Catalog 提供了元數據信息,例如數據庫、表、分區(qū)、視圖以及數據庫或其他外部系統中存儲的函數和信息。

更多參考官方文檔

CREATE DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

根據給定的屬性創(chuàng)建數據庫。若數據庫中已存在同名表會拋出異常。

IF NOT EXISTS
若數據庫已經存在,則不會進行任何操作。

WITH OPTIONS
數據庫屬性一般用于存儲關于這個數據庫額外的信息。 表達式 key1=val1 中的鍵和值都需要是字符串字面量。

CREATE VIEW

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [( columnName [, columnName ]* )] [COMMENT view_comment]
  AS query_expression

根據給定的 query 語句創(chuàng)建一個視圖。若數據庫中已經存在同名視圖會拋出異常。

TEMPORARY
創(chuàng)建一個有 catalog 和數據庫命名空間的臨時視圖,并覆蓋原有的視圖。

IF NOT EXISTS
若該視圖已經存在,則不會進行任何操作。

CREATE FUNCTION

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

創(chuàng)建 function,可以指定 catalog 和 database,若 catalog 中,已經有同名的函數注冊了,則無法注冊。

LANGUAGE JAVA|SCALA|PYTHON 用于指定 Flink runtime 如何執(zhí)行這個函數。目前,只支持 JAVA, SCALA 和 PYTHON,且函數的默認語言為 JAVA。

如果是 JAVA 或者 SCALA,則 identifier 是 UDF 實現類的全限定名(更多參考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/)。

如果是 PYTHON,則 identifier 是 UDF 對象的全限定名(更多參考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/)。 如果是 PYTHON,而當前程序是 Java/Scala 程序或者 SQL 程序,則需要配置 Python 相關的依賴(更多參考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-dependency-in-javascala-program)。

TEMPORARY
創(chuàng)建一個臨時 catalog function,有 catalog 和 database,并覆蓋原有的 function 。

TEMPORARY SYSTEM
創(chuàng)建一個臨時 system function,有 catalog,沒有 database,并覆蓋系統內置的 function。

IF NOT EXISTS
若該函數已經存在,則不會進行任何操作。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容