Flink Table API 和 SQL

Apache Flink 具有兩個關(guān)系型API:Table API 和SQL,用于統(tǒng)一流和批處理。
Table API 是用于 Scala 和 Java 語言的查詢API,允許以非常直觀的方式組合關(guān)系運算符的查詢,例如 select,filter 和 join。Flink SQL 的支持是基于實現(xiàn)了SQL標準的 Apache Calcite。無論輸入是批輸入(DataSet)還是流輸入(DataStream),任一接口中指定的查詢都具有相同的語義并指定相同的結(jié)果。

Table API 和 SQL 還沒有完全支持并且正在積極開發(fā)中。

要使用 Table API 和SQL,需要將以下依賴引入項目:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.6.1</version>
</dependency>

Table API 和SQL

批處理和流式傳輸?shù)?Table API 和SQL程序都遵循相同的模式。以下代碼示例顯示了常見的程序結(jié)構(gòu):

// 批處理使用 ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 創(chuàng)建 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注冊 Table
tableEnv.registerTable("table1", ...)

// Table API query
val tapiResult = tableEnv.scan("table1").select(...)

// SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// Sink query result
tapiResult.writeToSink(...)

// execute
env.execute()

TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念,它負責:

  • 在內(nèi)部目錄中注冊表
  • 注冊外部目錄
  • 執(zhí)行SQL查詢
  • 注冊用戶定義的函數(shù)
  • DataStream 或 DataSet 轉(zhuǎn)換為 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 總是與特定的 TableEnvironment 綁定。不能在同一查詢中組合不同 TableEnvironments 的表(例如,union 或 join)。創(chuàng)建 TableEnvironment:

// STREAMING QUERY
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// BATCH QUERY
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

注冊 Table

TableEnvironment 維護一個按名稱注冊的表的目錄。有兩種類型的表,輸入表(input table)和輸出表(output table)。輸入表可以在 Table API 和SQL查詢中引用,并提供輸入數(shù)據(jù)。輸出表可用于將 Table API 或SQL查詢的結(jié)果發(fā)送到外部系統(tǒng)。

輸入表的注冊源:

  • Table API 或SQL查詢的結(jié)果表
  • 訪問外部數(shù)據(jù)的 TableSource,例如文件,數(shù)據(jù)庫或消息系統(tǒng)
  • DataStream 或 DataSet。

輸出表的注冊源:TableSink

代碼示例:

val tableEnv = TableEnvironment.getTableEnvironment(env)

// from Table API or SQL
val projTable: Table = tableEnv.scan("X").select(...)
tableEnv.registerTable("projectedTable", projTable)

// from TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)

// from TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注冊外部目錄

外部目錄(external catalog)可以提供有關(guān)外部數(shù)據(jù)庫和表的信息(如名稱,schema,統(tǒng)計信息以及訪問信息)。可以通過實現(xiàn) ExternalCatalog 接口創(chuàng)建外部目錄,并在 TableEnvironment 中注冊:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 創(chuàng)建一個外部目錄
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 注冊外部目錄
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

查詢

Table API

Table API 是一個 Scala 和 Java 的語言集成查詢API,是基于 Table類。Table類代表了一個流或者批表,并提供方法來使用關(guān)系型操作。這些方法返回一個新的 Table 對象,這個新的 Table 對象代表著輸入的 Table 應用關(guān)系型操作后的結(jié)果。下面的例子展示了一個簡單的 Table API 聚合查詢:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注冊一個名叫 Orders 的表 ...

// 掃描注冊的 Orders 表
val orders = tableEnv.scan("Orders")

// 計算所有來自法國的客戶的收入
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName')
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 執(zhí)行查詢

SQL

Flink SQL 集成是基于 Apache Calcite,Apache Calcite 實現(xiàn)了標準的SQL。下面的例子展示了如何指定一個查詢并返回結(jié)果:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注冊一個名叫 Orders 的表

// 計算所有來自法國的客戶的收入
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 執(zhí)行查詢

指定將其結(jié)果插入已注冊表的更新查詢:

// 注冊一個名叫 RevenueFrance 的輸出表

// 計算所有來自法國的客戶的收入,并更新到 RevenueFrance 表
tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 執(zhí)行查詢

混合使用 Table API 和SQL,Table API 和SQL查詢可以很容易地合并因為它們都返回 Table 對象:

  1. Table API 查詢可以基于SQL查詢結(jié)果的 Table 來進行
  2. SQL查詢可以基于 Table API 查詢的結(jié)果來定義

輸出表

要輸出 Table 可以寫入 TableSink。TableSink 是通用接口,支持各種文件格式(如:CSV,Apache Parquet,Apache Avro)、存儲系統(tǒng)(如:JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系統(tǒng)(如:Apache Kafka,RabbitMQ)。

批處理 Table 只能寫入 BatchTableSink,而流式處理 Table 需要 AppendStreamTableSink,RetractStreamTableSink 或 UpsertStreamTableSink。有關(guān)可用接收器的詳細信息,請參閱 Sources & Sinks

有兩種方法可以發(fā)送表:

  • Table.writeToSink(TableSink sink) 自動匹配 schema
  • Table.insertInto(String sinkTable) 使用特定 schema

以下示例顯示如何發(fā)出Table:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查詢獲取一個 Table
val result: Table = ...

// 創(chuàng)建一個 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// METHOD 1:
//   將結(jié)果表寫入 TableSink
result.writeToSink(sink)

// METHOD 2:
//   注冊指定 schema 的 TableSink
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
//   將結(jié)果表寫入 TableSink
result.insertInto("CsvSinkTable")

// 執(zhí)行程序

與 DataStream 和 DataSet API 集成

Table API 和SQL查詢可以很容易地進行集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查詢一個外部表(來自關(guān)系型數(shù)據(jù)庫的表),做一些處理(如過濾、映射、聚合或者關(guān)聯(lián)元數(shù)據(jù)),然后使用 DataStream 或者 DataSet API(以及在這些API之上構(gòu)建的任何庫,例如CEP或 Gelly) 進行進一步處理。

同樣,Table API 或者SQL查詢也可以應用于 DataStream 或者 DataSet 程序的結(jié)果中。這種交互可以通過將 DataStream 或者 DataSet 轉(zhuǎn)換成一個 Table 及將 Table 轉(zhuǎn)換成 DataStream 或者 DataSet 來實現(xiàn)。

Scala 隱式轉(zhuǎn)換

Scala Table API 支持 DataSet,DataStream 以及 Table 間的隱式轉(zhuǎn)換。需要引入 org.apache.flink.table.api.scala._org.apache.flink.api.scala._。

DataStream 或 DataSet 轉(zhuǎn)換為 Table

DataStream 或 DataSet 可以在 TableEnvironment 中注冊為表,表的 schema 根據(jù)注冊的 DataStream 或 DataSet 的數(shù)據(jù)類型來定:

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

也可以直接轉(zhuǎn)換為表,而不需要注冊:

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

Table 轉(zhuǎn)換為 DataStream 或 DataSet

Table 可以轉(zhuǎn)換為 DataStream 或者 DataSet,通過這種方式,DataStream 或者 DataSet 程序就可以基于 Table API 或者SQL查詢的結(jié)果來執(zhí)行了。

當將一個 Table 轉(zhuǎn)換為 DataStream 或 DataSet 時,需要指定生成的 DataStream 或 DataSet 的數(shù)據(jù)類型,即需要轉(zhuǎn)換表的行的數(shù)據(jù)類型,通常最方便的轉(zhuǎn)換類型是 Row,下面列表描述了不同選項的功能:

  1. Row:字段按位置、任意數(shù)量字段映射,支持 null 值,無類型安全訪問
  2. POJO:字段按名稱(POJO 字段命名為 Table 字段)、任意數(shù)量字段映射,支持 null 值,類型安全訪問
  3. Case Class:字段按位置映射,不支持 null 值,類型安全訪問
  4. Tuple:字段按位置映射,不得多于22(Scala)或 25(Java)個字段,不支持 null 值,類型安全訪問
  5. Atomic Type:Table 必須有一個字段,不支持 null 值,類型安全訪問

Table 轉(zhuǎn)換 DataStream

流式查詢的結(jié)果表會動態(tài)地更新,每個新的記錄到達輸入流時結(jié)果就會發(fā)生變化。有兩種模式將 Table 轉(zhuǎn)換為 DataStream:

  1. Append Mode:只適用于當動態(tài)表僅由 INSERT 修改時,之前的結(jié)果不會被更新。
  2. Retract Mode:始終都可以使用此模式,使用一個 boolean 標識來編碼 INSERTDELETE 更改。
// 有兩個字段的 Table(String name, Integer age)
val table: Table = ...

// 將 Table 轉(zhuǎn)換為 Row 類型的 Append DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// 將 Table 轉(zhuǎn)換為 Tuple2<String, Integer> 類型的 Append DataStream  
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// 將 Table 轉(zhuǎn)換為 Row 類型的 Retact DataStream
//   一個 ReactDataStream 的類型X為表示為 DataStream[(Boolean, X)]
//   boolean 字段指定了更改的類型
//   True 是 INSERT, false 是 DELETE
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

Table 轉(zhuǎn)換 DataSet

// 有兩個字段的 Table(String name, Integer age)
val table: Table = ...

// 將 Table 轉(zhuǎn)換為 Row 類型的 DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

//  將 Table 轉(zhuǎn)換為 Tuple2<String, Integer> 類型的 DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

將數(shù)據(jù)類型映射到表模式(Table schema)

DataStream 和 DataSet API 支持多種數(shù)據(jù)類型,如:Tuple、POJO、case class 及 Row 類型。

原子類型

Flink 將原生類型(Integer、Double、String...)或泛型類型視為原子類型(Atomic type)。一個原子類型的 DataStream 或 DataSet 可以轉(zhuǎn)換為只有一個屬性的 Table,屬性的類型根據(jù)原子類型推算,并且必須指定屬性的名稱。

val stream: DataStream[Long] = ...

// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

Tuple 和 Case Class

Flink 支持 Scala 原生的 Tuple 類型,也為 Java 提供了 Tuple 類。兩種類型的 DataStream 和 DataSet 都可以被轉(zhuǎn)換為 Table。通過為所有字段提供名稱(基于位置的映射),可以重命名字段。如果未指定字段名,則使用默認字段名。基于名稱的映射允許使用別名(as)重新排序字段。

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO

Flink 支持使用 POJO 作為復合類型。當將一個 POJO 類型的 DataStream 或 DataSet 轉(zhuǎn)換為 Table 而不指定字段名稱時,Table 的字段名稱將采用 POJO 原生的字段名稱。重命名原始的 POJO 字段需要關(guān)鍵字AS,因為 POJO 沒有固定的順序,名稱映射需要原始名稱并且不能通過位置來完成。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row 類型支持任意數(shù)量的字段,并且支持 null 值。字段名稱可以通過 RowTypeInfo 來指定或者將一個 Row 類型的 DataStream 或 DataSet 轉(zhuǎn)換為 Table 時指定。Row 類型支持按位置和名字映射??梢酝ㄟ^為所有字段提供名稱(基于位置)或為 映射/排序/重命名(基于名稱)單獨選擇字段來重命名字段。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容