
作者|Jingsong Lee jingsonglee0@gmail.com
Apache Flink 社區(qū)很高興地宣布發(fā)布 Apache Flink Table Store 0.2.0。
在這個版本中,增加了 Hive、Spark 和 Trino 等計算引擎的對接支持,并且穩(wěn)定了存儲的格式。歡迎大家試用和反饋!
Flink Table Store 倉庫地址:
https://github.com/apache/flink-table-store
項目文檔和用戶指南請查看:
https://nightlies.apache.org/flink/flink-table-store-docs-release-0.2/
Flink Table Store 是什么
Flink Table Store是一個數(shù)據(jù)湖存儲,用于實時流式 Changelog 攝取 (比如來自 Flink CDC 的數(shù)據(jù)) 和高性能查詢。

作為一種新型的可更新數(shù)據(jù)湖,F(xiàn)link Table Store 具有以下特點:
- 大吞吐量的更新數(shù)據(jù)攝取,同時提供良好的查詢性能。
- 具有主鍵過濾器的高性能查詢,響應(yīng)時間最快可達到百毫秒級別。
- 流式讀取在 Lake Storage 上可用,Lake Storage 還可以與 Kafka 集成,以提供毫秒級流式讀取。
功能
在這個版本中,我們完成了許多令人興奮的功能。
Catalog
此版本引入了 Table Store 自己的 Catalog,在 Catalog 下創(chuàng)建的表,持久化保存表信息等元數(shù)據(jù),可以跨 session 訪問存量表。
默認情況下元數(shù)據(jù)都保存在 DFS 上。也支持配置 Hive Metastore 的自動同步。
CREATE CATALOG tablestore WITH (
'type'='table-store',
'warehouse'='hdfs://nn:8020/warehouse/path',
-- optional hive metastore
'metastore'='hive',
'uri'='thrift://<hive-metastore-host-name>:<port>'
);
USE CATALOG tablestore;
CREATE TABLE my_table ...
當(dāng)開啟 Hive Metastore 時,你可以比較方便的使用 Hive 引擎來查詢 Flink Table Store。
生態(tài)
在本版本中,我們不僅支持 Flink 1.15,也支持了 Flink 1.14,并為多個計算引擎提供讀取支持。
我們會保持 Flink 引擎和 Flink Table Store 的全面結(jié)合,構(gòu)建完整的流批一體計算和存儲的流式數(shù)倉。此外,F(xiàn)link Table Store 也支持了更多的計算引擎,包括 Hive/Spark/Trino 等,從而可以兼容更多的生態(tài),便于在現(xiàn)有生產(chǎn)環(huán)境中使用。

如果你有關(guān)于生態(tài)的需求和想法,比如想讓 Spark 或 Hive 支持寫入 Flink Table Store,歡迎通過掃描文末的二維碼入群交流,或者在 Flink 社區(qū)創(chuàng)建 issue 進行討論。
Append-only 表
Append-only 表功能是一種性能改進,只接受 INSERT_ONLY 的數(shù)據(jù)以 Append 到存儲,而不是更新或刪除現(xiàn)有數(shù)據(jù),適用于不需要更新的用例(如日志數(shù)據(jù)同步)。
CREATE TABLE my_table (
...
) WITH (
'write-mode' = 'append-only',
...
)
流式寫入 Append-only 表也具有異步 Compaction,從而不需要擔(dān)心小文件問題。
Bucket 擴縮容
單個 Bucket 內(nèi)是一個單獨的 LSM 結(jié)構(gòu),Bucket 的數(shù)量會影響性能:
- 過小的 Bucket 數(shù)量會導(dǎo)致寫入作業(yè)有瓶頸,吞吐跟不上寫入速度。
- 過大的 Bucket 數(shù)量會導(dǎo)致有大量小文件,且影響查詢速度。
Flink Table Store 允許用戶通過 ALTER TABLE 命令調(diào)整存儲桶數(shù)量,并通過 INSERT OVERWRITE 重新組織必要的分區(qū),舊分區(qū)保持不變。
性能測試
在以下的模塊里,我們創(chuàng)建了關(guān)于流計算更新和查詢的 Benchmark:
https://github.com/apache/flink-table-store/tree/master/flink-table-store-benchmark
更新性能和查詢性能是互相權(quán)衡的,所以在性能測試中不能單獨衡量更新性能或者查詢性能。
- 如果只考慮查詢性能,那么 Copy On Write (COW) 是最適合的技術(shù)方案,但這種設(shè)計下更新時會覆寫所有數(shù)據(jù),因此是以犧牲更新性能為代價的。
- 如果只考慮更新性能,那么 Merge On Read (MOR) 是最適合的技術(shù)方案,但這種設(shè)計下會在讀取時對數(shù)據(jù)進行合并,從而影響查詢的性能。
- Flink Table Store 目前只支持 MOR 模式,但通過 Data Skipping 等技術(shù)對查詢性能做了優(yōu)化。
下面對比了 Flink Table Store 和 Hudi MOR、Hudi COW,在實時更新場景的寫入(包含插入和更新)與查詢性能。目前湖存儲中,只有 Hudi 比較好的支持了流更新寫入,而 Iceberg 和 Delta 更適合使用批 SQL 的 MERGE INTO 來完成更新,所以這里只對比了 Hudi。
測試方法:
- 通過 Flink SQL 向定義了主鍵的表里寫入定量的隨機數(shù)據(jù),測量耗時以及平均的 Cpu 消耗,以此衡量存儲的更新性能。
- 通過 Spark SQL 查詢寫好數(shù)據(jù)的表,測量三種 Query:查詢?nèi)繑?shù)據(jù)、查詢主鍵的某個范圍、點查某個主鍵,以此衡量存儲的查詢性能。
測試用例:
- 總量:數(shù)據(jù)總條數(shù) 5 億條。
- 主鍵:隨機的數(shù)據(jù),隨機范圍是 1 億。
- 大?。好織l數(shù)據(jù)大概 150 字節(jié)。
此測試用例比較簡單,如有需要可以利用 benchmark 構(gòu)建更復(fù)雜的用例來貼合自己的生產(chǎn)場景。
測試環(huán)境:
- Flink 版本: 1.14.5
- Spark 版本:3.2.2
- Flink Table Store 版本: 0.2.0
- Hudi 版本:0.11.1
- 集群:三臺物理機的 Hadoop 集群
Flink 集群參數(shù):

Spark 集群參數(shù):

Flink Table Store 參數(shù):

Hudi 參數(shù):

寫入性能 (throughput / core):

查詢性能 (秒) (Flink Table Store vs Hudi MOR):

查詢性能 (秒) (Flink Table Store vs Hudi COW):

結(jié)論,面向此測試用例:
- Flink Table Store 對比 Merge On Read 有著比較好的更新性能和查詢性能。
- Flink Table Store 對比 Copy On Write 有著比較好的更新性能,但是查詢所有數(shù)據(jù)不如 COW,F(xiàn)link Table Store 是一個 Merge On Read 的技術(shù),有 Merge 的開銷,但是 Merge 的效率非常高。
- Flink Table Store 因為保持了有序性,直接查詢表可以有很好的 Data Skipping,點查甚至可以達到 100ms 以內(nèi)的延遲。
下一步
在即將發(fā)布的 0.3.0 版本中,您可以期待以下功能:
- Lookup:支持 Flink Dim Lookup Join。(即將來臨)
- 并發(fā)更新:多個 Flink 作業(yè)寫入同一張 Flink Table Store 表。
- Compaction分離:單獨的任務(wù)完成Compaction。
- 物化視圖:Flink Table Store 提供預(yù)聚合模型。
- 變更日志生成:為各種 MergeEngine 生成準確的變更日志。
- 多引擎的寫支持:支持 Spark、Hive 寫入 Flink Table Store。
Flink Table Store 長期目標是滿足批流一體對存儲的所有要求,并構(gòu)建實時低成本的 Streaming Data Warehouse。