SQL API 是 Flink 中最頂級(jí)的 API , 它構(gòu)建了 Table API 之上, 也可以方便的和 Table 做轉(zhuǎn)換, 構(gòu)建 SQL 所使用的Environment 也是 Table Environment . Flink SQL 底層使用 Apache Calcite 框架, 將標(biāo)準(zhǔn)的 Flink SQL 語句解析并轉(zhuǎn)換成底層的算子處理邏輯. 下面就直接用 Flink 官方倉庫中的 案例 Code Link來做一個(gè)演示.
- 獲取執(zhí)行環(huán)境
// 首先同樣有流處理和批處理的區(qū)別,
// 獲取對(duì)應(yīng)的environment之后直接轉(zhuǎn)換為Table environment ,
// 就可以使用SQL API,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- 拿到要操作的表
// 將 Stream 轉(zhuǎn)換為 Table, 可以采用不同的辦法
// 將 DataStream 轉(zhuǎn)換為 Table
Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
// 將 DataStream 注冊(cè)成 Table
tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
- 執(zhí)行SQL語句
// TableEnvironment 有 SqlQuery 和 SqlUpdate 兩種操作符可以使用
// union 兩個(gè) table
Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount > 2");
SQL可以執(zhí)行許多復(fù)雜的操作,本文先簡(jiǎn)單的了解下 SQL 的API