Flink SQL Query 語(yǔ)法(一)

主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/overview/

SELECT 語(yǔ)句和 VALUES 語(yǔ)句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,會(huì)以 Table 的形式返回 SELECT (或 VALUE)的查詢(xún)結(jié)果。Table 可被用于 SQL 或 Table API 查詢(xún)、轉(zhuǎn)換為 DataSet 或 DataStream、輸出到 TableSink。SQL 與 Table API 的查詢(xún)可以進(jìn)行無(wú)縫融合、整體優(yōu)化。

為了可以在 SQL 查詢(xún)中訪問(wèn)到表,需要先在 TableEnvironment 中注冊(cè)表(可以通過(guò) TableSource、Table、CREATE TABLE 語(yǔ)句、DataStream 或 DataSet 注冊(cè))。為方便起見(jiàn) Table.toString() 將會(huì)在其 TableEnvironment 中以唯一的名稱(chēng)自動(dòng)注冊(cè)表,并返回名稱(chēng)。

注意: 查詢(xún)?nèi)舭瞬恢С值?SQL 特性,將會(huì)拋出 TableException。

指定查詢(xún)

以下示例顯示如何在已注冊(cè)和內(nèi)聯(lián)表上指定 SQL 查詢(xún)。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 從外部數(shù)據(jù)源獲取一個(gè) DataStream
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// 查詢(xún)一個(gè)未注冊(cè)的 Table
Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
Table result = tableEnv.sqlQuery(
  "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

// 查詢(xún)一個(gè)注冊(cè)的 Table
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"));

// 執(zhí)行 sqlQuery() 返回 Table 對(duì)象
Table result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

final Schema schema = new Schema()
    .field("product", DataTypes.STRING())
    .field("amount", DataTypes.INT());

// 創(chuàng)建并注冊(cè) TableSink
tableEnv.connect(new FileSystem().path("/path/to/file"))
    .withFormat(...)
    .withSchema(schema)
    .createTemporaryTable("RubberOrders");

// 調(diào)用 executeSql() 執(zhí)行 INSERT SQL,查詢(xún)結(jié)果寫(xiě)入 TableSink
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

執(zhí)行查詢(xún)

SELECT 語(yǔ)句或者 VALUES 語(yǔ)句可以通過(guò) TableEnvironment.executeSql() 方法來(lái)執(zhí)行,該方法返回 TableResult 對(duì)象用于包裝查詢(xún)的結(jié)果,一個(gè) Table 對(duì)象可以通過(guò) Table.execute() 方法執(zhí)行獲取查詢(xún)結(jié)果。TableResult.collect() 方法返回一個(gè)可以關(guān)閉的行迭代器(除非所有的數(shù)據(jù)都被收集到本地,否則一個(gè)查詢(xún)作業(yè)永遠(yuǎn)不會(huì)結(jié)束。所以通過(guò) CloseableIterator#close() 方法主動(dòng)地關(guān)閉作業(yè)以防止資源泄露)。 還可以通過(guò) TableResult.print() 方法將查詢(xún)結(jié)果打印到控制臺(tái)。TableResult 中的結(jié)果數(shù)據(jù)只能被訪問(wèn)一次,因此一個(gè) TableResult 實(shí)例中,collect() 方法和 print() 方法不能被同時(shí)使用。

TableResult.collect()TableResult.print() 的行為在不同的 checkpointing 模式下略有不同。

  • 對(duì)于批作業(yè)或沒(méi)有配置任何 checkpointing 的流作業(yè),TableResult.collect()TableResult.print() 既不保證 Exactly-once、也不保證 At-least-once。查詢(xún)結(jié)果在產(chǎn)生后可被客戶端即刻訪問(wèn),但作業(yè)失敗和重啟時(shí)將會(huì)報(bào)錯(cuò)。
  • 對(duì)于配置了 Exactly-once checkpointing 的流作業(yè),TableResult.collect()TableResult.print() 保證 Exactly-once。一條結(jié)果數(shù)據(jù)只有在其關(guān)聯(lián)的 checkpointing 完成后才能在客戶端被訪問(wèn)。
  • 對(duì)于配置了 At-least-once checkpointing 的流作業(yè),TableResult.collect()TableResult.print() 保證 At-least-once。查詢(xún)結(jié)果在產(chǎn)生后可被客戶端即刻訪問(wèn),同一條結(jié)果可能被多次傳遞給客戶端。

語(yǔ)法

Flink 通過(guò)支持標(biāo)準(zhǔn) ANSI SQL的 Apache Calcite 解析 SQL。以下“BNF-語(yǔ)法”描述了批處理和流處理查詢(xún)中所支持的 SQL 特性的超集。

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

tablePath:
  [ [ catalogName . ] schemaName . ] tableName

systemTimePeriod:
  FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

Flink SQL 對(duì)于標(biāo)識(shí)符(表、屬性、函數(shù)名)的命名策略類(lèi)似于 Java 的詞法約定:

  • 標(biāo)識(shí)符大小寫(xiě)敏感

  • 通過(guò)反引號(hào),可以允許標(biāo)識(shí)符帶有非字母的字符

    • SELECT a AS `my field` FROM t
      

字符串文本常量需要被單引號(hào)包起來(lái)(如 SELECT 'Hello World' )。兩個(gè)單引號(hào)表示轉(zhuǎn)義(如 SELECT 'It''s me.')。字符串文本常量支持 Unicode 字符,如需明確使用 Unicode 編碼,請(qǐng)使用以下語(yǔ)法:

  • 使用反斜杠(\)作為轉(zhuǎn)義字符(默認(rèn)):SELECT U&'\263A'
  • 使用自定義的轉(zhuǎn)義字符: SELECT U&'#263A' UESCAPE '#'

操作符

WITH

WITH 提供了編寫(xiě)輔助語(yǔ)句的方法,以便在更大的查詢(xún)中使用。這些語(yǔ)句通常被稱(chēng)為公共表表達(dá)式(Common Table Expression,CTE),可以認(rèn)為它定義了只存在于一個(gè)查詢(xún)中的臨時(shí)視圖。

WITH 語(yǔ)法:

WITH <with_item_definition> [ , ... ]
SELECT ... FROM ...;

<with_item_defintion>:
    with_item_name (column_name[, ...n]) AS ( <select_query> )

下面的示例定義了一個(gè) CTE:orders_with_total,并在 GROUP BY 查詢(xún)中使用它。

WITH orders_with_total AS (
    SELECT order_id, price + tax AS total
    FROM Orders
)

SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

SELECT & WHERE

SELECT 語(yǔ)句的一般語(yǔ)法為:

SELECT select_list FROM table_expression [ WHERE boolean_expression ]

table_expression 可以是任何數(shù)據(jù)源(表、視圖、VALUES 子句、多個(gè)表的 Join 結(jié)果、子查詢(xún))。下面的事例讀取 Orders 表的所有列:

SELECT * FROM Orders

select_list 指定 * 表示解析所有的列,但是不建議在生產(chǎn)環(huán)境中使用,會(huì)降低性能,建議只查詢(xún)需要的列:

SELECT order_id, price + tax FROM Orders

查詢(xún)可以使用 VALUES 子句,每個(gè)元組(Tuple)對(duì)應(yīng)一個(gè) Row,并且可以設(shè)置別名:

SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)

WHERE 語(yǔ)句可以過(guò)濾 Row:

SELECT price + tax FROM Orders WHERE id = 10

可以對(duì)每行數(shù)據(jù)的指定列調(diào)用函數(shù)(內(nèi)置、自定義函數(shù),自定義函數(shù)必須提前注冊(cè)):

SELECT PRETTY_PRINT(order_id) FROM Orders

SELECT DISTINCT

如果指定 SELECT DISTINCT,則將從結(jié)果集中刪除重復(fù)行(每組重復(fù)中保留一行)。

SELECT DISTINCT id FROM Orders

對(duì)于流式查詢(xún),計(jì)算查詢(xún)結(jié)果所需的狀態(tài)(State)可能會(huì)無(wú)限增長(zhǎng)。狀態(tài)大小取決于不同行的數(shù)量???lt;u>以為查詢(xún)配置適當(dāng)?shù)臓顟B(tài)生存時(shí)間(TTL),以防止?fàn)顟B(tài)大小過(guò)大。這可能會(huì)影響查詢(xún)結(jié)果的正確性</u>。

Windowing TVF(1.13)

Window 是流處理的核心。Windows 將流拆分為有限大小的片段應(yīng)用計(jì)算。只有流處理支持。

Flink 1.13 提供了幾個(gè) Table-valued functions(TVF,區(qū)別于 Group Window Function),將表中的元素劃分為 windows,包括:

- 滾動(dòng)窗口(Tumbling windows)

- 滑動(dòng)窗口(Hop, Sliding windows)

- 累加窗口(Cumulate windows)

- 會(huì)話窗口(Session windows,TVF 暫不支持)

每個(gè)元素在邏輯上可以屬于多個(gè)窗口,具體取決于所使用的窗口函數(shù)。TVF 必須和聚合操作一起使用:

假設(shè)存在一個(gè) Bid

Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
|          bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 |  4.00 | C    |
| 2020-04-15 08:07 |  2.00 | A    |
| 2020-04-15 08:09 |  5.00 | D    |
| 2020-04-15 08:11 |  3.00 | B    |
| 2020-04-15 08:13 |  1.00 | E    |
| 2020-04-15 08:17 |  6.00 | F    |
+------------------+-------+------+

滾動(dòng)窗口(Tumbling windows)

指定一個(gè)固定大小的窗口,并且不重疊,語(yǔ)法:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

-- data: 表名,表需要有時(shí)間屬性字段
-- timecol: 表中的時(shí)間屬性字段,用于劃分窗口
-- size: 窗口大小

設(shè)定一個(gè)10分鐘大小的滾動(dòng)窗口,

SELECT window_start, window_end, SUM(price)
FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
  
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

滑動(dòng)窗口(Hop, Sliding windows)

指定一個(gè)固定大小的窗口,設(shè)定滑動(dòng)間隔,元素會(huì)被指定給多個(gè)窗口,語(yǔ)法:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

-- data: 表名,表需要有時(shí)間屬性字段
-- timecol: 表中的時(shí)間屬性字段,用于劃分窗口
-- size: 窗口大小
-- slide:窗口滑動(dòng)的大小

設(shè)定一個(gè)10分鐘大小,每5分鐘滑動(dòng)的窗口,

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
  
  
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 |  6.00 |
+------------------+------------------+-------+

累加窗口(Cumulate windows)

指定一個(gè)窗口的最大規(guī)模,按照指定時(shí)間間隔增長(zhǎng)累加,直到達(dá)到窗口的最大規(guī)模,每次窗口增長(zhǎng)會(huì)進(jìn)行一次計(jì)算,可以理解為多次計(jì)算的滾動(dòng)窗口,語(yǔ)法:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

-- data: 表名,表需要有時(shí)間屬性字段
-- timecol: 表中的時(shí)間屬性字段,用于劃分窗口
-- size: 窗口最大大小
-- step:窗口增長(zhǎng)大小

設(shè)定一個(gè)10分鐘大小,每2分鐘累計(jì)一次的窗口,

SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

    
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 |  4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 |  6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 |  3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容