
摘要:本文主要介紹巴別時(shí)代基于 Apache Paimon(Incubating) 構(gòu)建 Streaming Lakehouse 的生產(chǎn)實(shí)踐經(jīng)驗(yàn)。我們基于 Apache Paimon(Incubating) 構(gòu)建 Streaming Lakehouse 的落地實(shí)踐主要分為三期:
第一期是在調(diào)研驗(yàn)證的基礎(chǔ)上進(jìn)行數(shù)倉(cāng)分層,并且上線一些簡(jiǎn)單的業(yè)務(wù)驗(yàn)證效果;第二期是實(shí)現(xiàn)流式數(shù)倉(cāng)的基礎(chǔ)設(shè)施建設(shè),以便優(yōu)先替換當(dāng)前基于 Apache Kafka 構(gòu)建的實(shí)時(shí)數(shù)倉(cāng);第三期主要是完善 Paimon 的生態(tài)建設(shè),包括數(shù)據(jù)資產(chǎn)、數(shù)據(jù)服務(wù)等平臺(tái)服務(wù)建設(shè),主要目標(biāo)是提供完整的基于 Apache Paimon(Incubating) 端到端的平臺(tái)服務(wù)能力。目前基本完成第一期的數(shù)倉(cāng)分層,同時(shí)進(jìn)行數(shù)據(jù)質(zhì)量驗(yàn)證,基本可以滿足業(yè)務(wù)需求。
我們基于 Apache Paimon(Incubating) 構(gòu)建 Streaming Lakehouse 的落地實(shí)踐主要分為三期:第一期是在調(diào)研驗(yàn)證的基礎(chǔ)上進(jìn)行數(shù)倉(cāng)分層,并且上線一些簡(jiǎn)單的業(yè)務(wù)驗(yàn)證效果;第二期是實(shí)現(xiàn)流式數(shù)倉(cāng)的基礎(chǔ)設(shè)施建設(shè),以便優(yōu)先替換當(dāng)前基于 Apache Kafka 構(gòu)建的實(shí)時(shí)數(shù)倉(cāng);第三期主要是完善 Paimon 的生態(tài)建設(shè),包括數(shù)據(jù)資產(chǎn)、數(shù)據(jù)服務(wù)等平臺(tái)服務(wù)建設(shè),主要目標(biāo)是提供完整的基于 Apache Paimon(Incubating) 端到端的平臺(tái)服務(wù)能力。目前基本完成第一期的數(shù)倉(cāng)分層,同時(shí)進(jìn)行數(shù)據(jù)質(zhì)量驗(yàn)證,基本可以滿足業(yè)務(wù)需求。
點(diǎn)擊進(jìn)入 Apache Paimon 官網(wǎng)
1. 業(yè)務(wù)背景
基于 Apache Kafka 構(gòu)建的實(shí)時(shí)數(shù)倉(cāng)過(guò)程中我們遇到一些痛點(diǎn),例如中間層數(shù)據(jù)不可分析,數(shù)據(jù)保留時(shí)間短等問(wèn)題,同時(shí)我們的實(shí)時(shí)數(shù)倉(cāng)是基于 Flink+Kafka+Redis+ClickHouse 構(gòu)建的,難以查詢和分析 Kafka 的中間層數(shù)據(jù)和 Redis 的維表數(shù)據(jù)。目前只有 ADS 層數(shù)據(jù)最終寫入到 ClickHouse 里才能分析,但是 ClickHouse 對(duì)于數(shù)據(jù)更新支持的不是很好,所以我們需要通過(guò)寫入重復(fù)數(shù)據(jù)的方式以達(dá)到更新的效果,ClickHouse 去重表執(zhí)行操作也是異步的,這就需要在業(yè)務(wù)端進(jìn)行數(shù)據(jù)去重,大大增加了業(yè)務(wù) SQL 的復(fù)雜度,也有一定程度的性能損耗,并且 ClickHouse 不支持事務(wù),很難做到 Flink 到 ClickHouse 端到端的數(shù)據(jù)一致性保障。
基于以上痛點(diǎn),我們希望能夠借助當(dāng)下比較流行的數(shù)據(jù)湖存儲(chǔ)方案簡(jiǎn)化我們的數(shù)倉(cāng)架構(gòu),提高數(shù)據(jù)分析的效率,降低數(shù)據(jù)存儲(chǔ)和開(kāi)發(fā)成本,最終選擇 Apache Paimon 作為湖倉(cāng)底座,主要是基于以下幾個(gè)方面的考量:
- Apache Paimon(Incubating) 基于 LSM 的強(qiáng)大的數(shù)據(jù)更新能力正是我們需要的,基于PK進(jìn)行數(shù)據(jù)更新以及 Partial Update 的部分更新和 Aggregate 表的預(yù)聚合能力能夠大大簡(jiǎn)化我們的業(yè)務(wù)開(kāi)發(fā)的復(fù)雜度。
- Apache Paimon(Incubating) 當(dāng)時(shí)作為 Apache Flink 的子項(xiàng)目,對(duì)于 Flink 集成的成熟度也是我們所考量的,Apache Paimon(Incubating) 支持所有的 Flink SQL 語(yǔ)法,對(duì)于 Flink 集成的優(yōu)先支持是較其他數(shù)據(jù)湖框架優(yōu)勢(shì)的地方。
- Flink Forward Asia 2021 的主題演講里,Apache Flink 中文社區(qū)發(fā)起人王峰老師提出流式數(shù)倉(cāng)的概念,即整個(gè)數(shù)倉(cāng)的數(shù)據(jù)全部實(shí)時(shí)流動(dòng)起來(lái),Paimon 就是在此背景下推出的流批一體的存儲(chǔ),是 Flink 在推動(dòng)流批一體演進(jìn)中存儲(chǔ)領(lǐng)域上的重要一環(huán),流式數(shù)倉(cāng)作為新型數(shù)倉(cāng)架構(gòu)演進(jìn)的一種方案,而 Paimon 作為流式湖倉(cāng)的標(biāo)桿,毋庸置疑成為構(gòu)建流式數(shù)倉(cāng)的首選,隨著社區(qū)不斷發(fā)展和框架本身的成熟,Paimon 將成為 Streaming Lakehouse 領(lǐng)域的標(biāo)準(zhǔn)。
- 調(diào)研測(cè)試過(guò)程中發(fā)現(xiàn)之前遇到的業(yè)務(wù)問(wèn)題和需求通過(guò)在社區(qū)群中提問(wèn),能夠得到社區(qū)各位老師的耐心答疑,反饋的相關(guān)問(wèn)題能夠得到社區(qū)的快速響應(yīng)和 Bug 修復(fù),促成最終選擇 Apache Paimon(Incubating) 方案,打消使用 Apache Paimon(Incubating) 的諸多疑慮。
在此特別鳴謝之信老師、曉峰老師以及 Paimon 社區(qū)的各位開(kāi)發(fā)者的支持。
2. 數(shù)倉(cāng)架構(gòu)
目前我們完成基于 Paimon 的數(shù)倉(cāng)分層的設(shè)計(jì),包括 ODS,DWD,DIM 層的搭建以及 DWS 層一些業(yè)務(wù)模型的建設(shè),整體架構(gòu)如下:

2.1 數(shù)據(jù)來(lái)源
我們的數(shù)據(jù)源主要包括前端和后端打點(diǎn)日志以及業(yè)務(wù)數(shù)據(jù)庫(kù)的 Binlog,打點(diǎn)日志按項(xiàng)目通過(guò) Filebeat 采集到 Kafka 對(duì)應(yīng)的 Topic, 然后通過(guò) Flink SQL 同步到湖倉(cāng)的 ODS 層,業(yè)務(wù)庫(kù)的數(shù)據(jù)通過(guò) FlinkCDC 整庫(kù)同步到湖倉(cāng)的 ODS 層的 Paimon 表。
2.2 湖倉(cāng)建設(shè)
湖倉(cāng)主要基于 Apache Paimon(Incubating) 構(gòu)建,各層都是通過(guò) Flink SQL 進(jìn)行數(shù)據(jù)的準(zhǔn)實(shí)時(shí)同步。ODS 層采用 Paimon 的 Append Only 表,保留數(shù)據(jù)原貌不做更新。DIM 層采用 Paimon 的 PK 表,部分維表需要使用 Partial Update 能力保留最新的維表數(shù)據(jù)。
DWD 層也采用 Paimon 的 PK 表,ODS 層的表數(shù)據(jù)經(jīng)由 Flink SQL 做 ETL 清洗,并通過(guò) Retry Lookup Join 關(guān)聯(lián)維表拉寬后寫入到 DWD 層對(duì)應(yīng)的 Paimon 表里,由于維表數(shù)據(jù)可能晚于事實(shí)數(shù)據(jù)到達(dá)湖倉(cāng),存在 Join 不上的情況,所以這里需要增加重試機(jī)制。DWS 層主要是分主題進(jìn)行數(shù)倉(cāng)建模,目前主要采用 Paimon 的 Agg 表進(jìn)行一些預(yù)聚合模型及大寬表的建設(shè),ADS 層主要將 DWS 層的結(jié)果數(shù)據(jù)和 DWD 層的一些明細(xì)表數(shù)據(jù)流讀到 ClickHouse 在線系統(tǒng),提供在線服務(wù)使用。
2.3 在線系統(tǒng)
通過(guò) Flink SQL 將 DWS 層的結(jié)果數(shù)據(jù)和 DWD 的一些明細(xì)表數(shù)據(jù)近實(shí)時(shí)地流讀到 ClickHouse 在線系統(tǒng)進(jìn)行 OLAP 分析,提供 BI 實(shí)時(shí)報(bào)表,大屏展示以及用戶行為分析系統(tǒng)等使用,同時(shí)擴(kuò)展 Paimon 的 Presto 連接器,數(shù)據(jù)分析師可以使用 Presto 引擎進(jìn)行 Adhoc 查詢和數(shù)據(jù)撈取工作。
2.4 平臺(tái)服務(wù)
我們的湖倉(cāng)目前使用 Paimon 的 Hive Catalog, 基于 HMS 做元數(shù)據(jù)的統(tǒng)一管理,其中數(shù)據(jù)開(kāi)發(fā)是基于 Dinky 做得二次開(kāi)發(fā),使用 Dinky 在 Flink SQL 開(kāi)發(fā)這塊兒的能力,數(shù)據(jù)指標(biāo)基于不同類型的游戲進(jìn)行梳理,以便構(gòu)建統(tǒng)一的指標(biāo)體系。后面考慮基于 Paimon 構(gòu)建數(shù)據(jù)資產(chǎn)以及數(shù)據(jù)服務(wù)。
3. 生產(chǎn)實(shí)踐
介紹業(yè)務(wù)生產(chǎn)實(shí)踐之前,首先介紹一些 Paimon 的正確使用姿勢(shì),以便更好理解以下的業(yè)務(wù)建表實(shí)踐。
Merge Engine
指定 Merge Engine 的作用是把寫到 Paimon 表的多條相同 PK 的數(shù)據(jù)合并為一條,用戶可以通過(guò) merge-engine 配置項(xiàng)選擇以何種方式合并同 PK 的數(shù)據(jù)。
Paimon 支持的 Merge Engine 包括:
- deduplicate:如果用戶建表時(shí)不指定 merge-engine 配置,創(chuàng)建的 PK 表默認(rèn)的 Merge Engine 是 deduplicate 即只保留最新的記錄,其他的同 PK 數(shù)據(jù)則被丟棄,如果最新的記錄是 DELETE 記錄,那么相同 PK 的所有數(shù)據(jù)都將被刪除。
- partial-update:如果用戶建表時(shí)指定'merge-engine' = 'partial-update',那么就會(huì)使用部分更新表引擎,可以做到多個(gè) Flink 流任務(wù)去更新同一張表,每條流任務(wù)只更新一張表的部分列,最終實(shí)現(xiàn)一行完整的數(shù)據(jù)的更新,對(duì)于需要拉寬表的業(yè)務(wù)場(chǎng)景,partial-update 非常適合此場(chǎng)景,而且構(gòu)建寬表的操作也相對(duì)簡(jiǎn)單。這里所說(shuō)的多個(gè) Flink 流任務(wù)并不是指多個(gè) Flink Job 并發(fā)寫同一張 Paimon 表,這樣需要拆分 Compaction 任務(wù),就不能在每個(gè) Job 的 Writer 端做 Compaction, 需要一個(gè)獨(dú)立的 Compaction 任務(wù),比較麻煩。目前推薦將多條 Flink 流任務(wù) UNION ALL 起來(lái),啟動(dòng)一個(gè) Job 寫 Paimon 表。這里需要注意的是,對(duì)于流讀場(chǎng)景,partial-update 表引擎需要結(jié)合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,同時(shí) partial-update 不能接收和處理 DELETE 消息,為了避免接收到 DELETE 消息報(bào)錯(cuò),需要通過(guò)配置 'partial-update.ignore-delete' = 'true' 忽略 DELETE 消息。
- aggregation:如果用戶建表時(shí)指定 'merge-engine' = 'aggregation',此時(shí)使用聚合表引擎,可以通過(guò)聚合函數(shù)做一些預(yù)聚合,每個(gè)除主鍵以外的列都可以指定一個(gè)聚合函數(shù),相同主鍵的數(shù)據(jù)就可以按照列字段指定的聚合函數(shù)進(jìn)行相應(yīng)的預(yù)聚合,如果不指定則默認(rèn)為 last-non-null-value ,空值不會(huì)覆蓋。Agg 表引擎也需要結(jié)合 Lookup 或者 full-compaction 的 Changelog Producer 一起使用,需要注意的是除了 SUM 函數(shù),其他的 Agg 函數(shù)都不支持 Retraction,為了避免接收到 DELETE 和 UPDATEBEFORE 消息報(bào)錯(cuò),需要通過(guò)給指定字段配置 'fields.${field_name}.ignore-retract'='true' 忽略。
Changelog Producer
Changelog 主要應(yīng)用在流讀場(chǎng)景,在數(shù)倉(cāng)各層的建設(shè)過(guò)程中,我們需要流讀上游的數(shù)據(jù)寫入到下游,完成各層之間的數(shù)據(jù)同步,做到讓整個(gè)數(shù)倉(cāng)的數(shù)據(jù)全實(shí)時(shí)地流動(dòng)起來(lái)。如果上游流讀的 Source 是業(yè)務(wù)庫(kù)的 Binlog 或者 Kafka 等消息系統(tǒng)的消息,直接生成完整的 Changelog 以供流讀的。
但是目前數(shù)倉(cāng)分層是在 Paimon 里做的,數(shù)據(jù)以 Table Format 的形式存儲(chǔ)在文件系統(tǒng)上,如果下游的 Flink 任務(wù)要流讀 Paimon 表數(shù)據(jù),需要存儲(chǔ)幫助生成 Changelog(成本較低,但延遲相對(duì)較高),以便下游流讀的,這時(shí)就需要我們?cè)诮ū頃r(shí)指定 Paimon 的 Changelog Producer 決定以何種方式在何時(shí)生成 Changelog。如果不指定則不會(huì)在寫入 Paimon 表的時(shí)候生成 Changelog,那么下游任務(wù)需要在流讀時(shí)生成一個(gè)物化節(jié)點(diǎn)來(lái)產(chǎn)生 Changelog。這種方式的成本相對(duì)較高,同時(shí)官方不建議這樣使用,因?yàn)橄掠稳蝿?wù)在 State 中存儲(chǔ)一份全量的數(shù)據(jù),即每條數(shù)據(jù)以及其變更記錄都需要保存在狀態(tài)中。
Paimon 支持的 Changelog Produer 包括:
- none:如果不指定,默認(rèn)就是 none,成本較高,不建議使用。
- input:如果我們的 Source 源是業(yè)務(wù)庫(kù)的 Binlog ,即寫入 Paimon 表 Writer 任務(wù)的輸入是完整的 Changelog,此時(shí)能夠完全依賴輸入端的 Changelog, 并且將輸入端的 Changelog 保存到 Paimon 的 Changelog 文件,由 Paimon Source 提供給下游流讀。通過(guò)配置 'changelog-producer' = 'input',將 Changelog Producer 設(shè)置為 input 。
- lookup:如果我們的輸入不是完整的 Changelog, 并且不想在下游流讀時(shí)通過(guò) Normalize 節(jié)點(diǎn)生成 Changelog, 通過(guò)配置 'changelog-producer' = 'lookup',通過(guò) Lookup 的方式在數(shù)據(jù)寫入的時(shí)候生成 Changelog,此 Changelog Produer 目前處于實(shí)驗(yàn)狀態(tài),暫未經(jīng)過(guò)大量的生產(chǎn)驗(yàn)證。
- full-compaction:除了以上幾種方式,通過(guò)配置 'changelog-producer' = 'full-compaction' 將 Changelog Producer 設(shè)置為 full-compaction,Writer 端在 Compaction 后產(chǎn)生完整的 Changelog,并且寫入到 Changelog 文件。通過(guò)設(shè)置 changelog-producer.compaction-interval 配置項(xiàng)控制 Compaction 的間隔和頻率,不過(guò)此參數(shù)計(jì)劃棄用,建議使用 full-compaction.delta-commits,此配置下默認(rèn)為1 即每次提交都做 Compaction。
Append Only Table
建表時(shí)配置 'write-mode' = 'append-only',用戶可以創(chuàng)建 Append Only 表。Append Only 表采用追加寫的方式,只能插入一條完整的記錄,不能更新和刪除,也無(wú)需定義主鍵。Append Only 表主要用于無(wú)需更新的場(chǎng)景,例如 ODS 層數(shù)據(jù)將 Kafka 埋點(diǎn)日志數(shù)據(jù)解析后寫入到 Paimon 表,保留原貌不做任何更新,此時(shí)推薦采用 Paimon 的 Append Only 表。
需要注意的是由于 Append Only 表沒(méi)有主鍵,用戶必須指定 bucket-key,否則采用整行數(shù)據(jù)做 Hash 效率偏低。
3.1 ODS 層入湖
3.1.1 業(yè)務(wù)庫(kù)數(shù)據(jù)入湖
業(yè)務(wù)庫(kù)數(shù)據(jù)入湖,我們使用的是 FlinkCDC 的整庫(kù)同步,目前是基于 Dinky 實(shí)現(xiàn)的 FlinkCDC 到 Paimon 的整庫(kù)同步能力(這里要特別鳴謝文末老師的支持),可以自動(dòng)建表,多表或整庫(kù)同步業(yè)務(wù)庫(kù)數(shù)據(jù)到 Paimon 的對(duì)應(yīng)庫(kù)。由于我們是每個(gè)項(xiàng)目一個(gè)業(yè)務(wù)庫(kù),所以在 Paimon 中也是按項(xiàng)目建庫(kù),與 MySQL 中業(yè)務(wù)庫(kù)對(duì)應(yīng),以下是部分項(xiàng)目的圖示:

入湖 SQL:
下面以一個(gè)項(xiàng)目的入湖 SQL為例:
EXECUTE CDCSOURCE cdc_demo WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'username',
'password' = 'password',
'checkpoint' = '30000',
'scan.startup.mode' = 'initial',
'source.server-time-zone' = 'Asia/Tokyo',
'parallelism' = '4',
'database-name' = 'demo',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'fts_hive',
'sink.catalog.type' = 'fts_hive',
'sink.catalog.uri' = 'thrift://localhost:9083',
'sink.bucket' = '4',
'sink.snapshot.time-retained' = '24h',
'table-list' = 'A01,A02,A03,A04,A05',
'sink.changelog-producer' = 'input',
'sink.catalog.warehouse' = 'hdfs://cluster/warehouse/table_store',
'sink.sink.db' = 'fts_ods_db_demo'
);
FlinkCDC 整庫(kù)同步目前還是基于單表單 Task 的形式, 執(zhí)行效果如下所示:

3.1.2 日志數(shù)據(jù)入湖
日志入湖是通過(guò) Flink SQL 將 Kafka 中的日志數(shù)據(jù)同步到 ODS 層的 Paimon 表,ODS 層埋點(diǎn)日志沒(méi)有確定類型,避免由于類型轉(zhuǎn)換過(guò)濾掉數(shù)據(jù),這里以用戶登錄日志為例,介紹日志數(shù)據(jù)入湖:
入湖 SQL:
--CREATE TABLE
create table t_ods_table(
......
gn string,
dt string
) partitioned by (gn,dt)
WITH (
'bucket' = '8',
'bucket-key' = 'id',
'write-mode' = 'append-only', --創(chuàng)建 Append Anly 表
'snapshot.time-retained' = '24h'
);
--INSERT
create table default_catalog.default_database.role_login (
message string,
fields row < project_id int,
topic string,
gn string >
) with (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = '${kafka_server}',
'properties.group.id' = 'topic_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
insert into
fts_ods_log.t_ods_table
select
......
cast(SPLIT_INDEX(message, '|', 5) as int) log_create_unix_time,
fields.gn gn,
FROM_UNIXTIME(
cast(SPLIT_INDEX(message, '|', 5) as int),
'yyyy-MM-dd'
) dt
from
default_catalog.default_database.role_login
where
try_cast(SPLIT_INDEX(message, '|', 5) as int) is not null
and cast(SPLIT_INDEX(message, '|', 5) as int) between 0 and 2147483647;
日志數(shù)據(jù)入湖的執(zhí)行效果如下所示:

3.2 DIM 層入湖
DIM 層數(shù)據(jù)主要是將 ODS 層多個(gè)業(yè)務(wù)庫(kù)的相同表的數(shù)據(jù)同步到 DIM 層對(duì)應(yīng)的表,比如 fts_ods_db_A 和 fts_ods_db_B 都有同名的表 A01,需要 ODS 不同業(yè)務(wù)庫(kù)中同名的表同步 DIM 層的 fts_dim 庫(kù)中的 t_dim_A01 表中,該表的更新頻率較低且數(shù)據(jù)量較小。也有的是業(yè)務(wù)庫(kù)表和日志表數(shù)據(jù)通過(guò) Partial Update 能力拉寬后形成的維表。這里以 tdim_A01 表為例,介紹 DIM 層數(shù)據(jù)入湖:
入湖 SQL:
--CREATE TABLE
create table t_dim_A01(
......
gn string,
PRIMARY KEY (gn,lid) NOT ENFORCED
) WITH (
'bucket' = '4',
'snapshot.time-retained' = '24h'
);
--INSERT
insert into
fts_dim.t_dim_A01
select
'AA' as gn,
......
from
fts_ods_db_A.A01
union all
select
'BB' as gn,
......
from
fts_ods_db_B.A01
......
3.3 DWD 層入湖
DWD 層數(shù)據(jù)入湖是通過(guò) Flink SQL 清洗過(guò)濾,關(guān)聯(lián)維表后形成寬表寫入到 DWD 層的 Paimon 表。維表也是在 Paimon 中,所以這里很方便通過(guò) Lookup Join 關(guān)聯(lián)維表,由于維表數(shù)據(jù)可能會(huì)晚于事實(shí)表數(shù)據(jù)到達(dá) Paimon, 所以使用 Retry Lookup Join,如果事實(shí)表一開(kāi)始關(guān)聯(lián)不上維表,可以增加一些重試,以便能夠關(guān)聯(lián)上維表數(shù)據(jù),這里以用戶登錄表為例。
入湖 SQL:
--CREATE TABLE
create table t_dwd_table(
......
id string,
gn string,
dt string,
PRIMARY KEY (gn, id, log_create_unix_time, dt) NOT ENFORCED
) partitioned by (gn, dt) WITH (
'bucket' = '8',
'bucket-key' = 'id',
'changelog-producer' = 'full-compaction',
'changelog-producer.compaction-interval' = '54s',
'snapshot.time-retained' = '24h'
);
--INSERT
create view default_catalog.default_database.t_table_view as (
select
......
PROCTIME() proc_time,
gn,
dt
from
fts_ods_log.t_ods_table
where
AA is not null
and try_cast(BB as int) is not null
and try_cast(CC as int) is not null
)
insert into
fts_dwd.t_dwd_table
select
/*+ LOOKUP('table'='fts_dim.t_dim_A01', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30'),
LOOKUP('table'='fts_dim.t_dim_A02', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30'),
LOOKUP('table'='fts_dim.t_dim_A03', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30')*/
......
cast(d.open_date_time as int) open_date_time,
cast(d.merge_server_time as int) merge_server_time,
CONCAT(a.aa, a.bb) id,
a.gn,
a.dt
from
default_catalog.default_database.t_table_view as a
left join fts_dim.t_dim_A01 for SYSTEM_TIME AS OF a.proc_time as b on a.AA = b.AA
and a.BB = b.BB
left join fts_dim.t_B01 for SYSTEM_TIME AS OF a.proc_time as c on a.AA = c.AA
and a.BB = c.BB
left join fts_dim.t_dim_C01 for SYSTEM_TIME AS OF a.proc_time as d on a.AA = d.AA
and a.BB = d.BB;
3.4 DWS 層入湖
DWS 主要是分主題,按不同的維度進(jìn)行聚合,我們也有一些寬表需要有聚合的列,也放在 DWS 層構(gòu)建。這里以角色域的一張角色寬表為例,介紹 DWS 層使用 Paimon 的 Agg 表做預(yù)聚合的場(chǎng)景。
角色寬表的建表語(yǔ)句如下:
CREATE TABLE t_dws_role(
......
gn string,
id bigint,
aa int,
bb int,
acc int,
PRIMARY KEY (gn, id) NOT ENFORCED
) WITH (
'bucket' = '16',
'bucket-key' = 'id',
'merge-engine' = 'aggregation', --指定使用 Agg 表引擎
'changelog-producer' = 'full-compaction', --指定 changelog producer 為 full-compaction
'changelog-producer.compaction-interval' = '40s', --指定 campaction 的間隔為40秒
'fields.aa.aggregate-function' = 'max',
'fields.bb.aggregate-function' = 'min',
'fields.acc.aggregate-function' = 'sum',
'fields.aa.ignore-retract' = 'true', --忽略掉 retract,避免接收到 DELETE 消息出錯(cuò)
......
'snapshot.time-retained' = '24h' --指定 snapshot 文件保留24小時(shí)
);
角色寬表需要由 DWD 層的多張表關(guān)聯(lián)生成?;?Paimon 的 Agg 表引擎創(chuàng)建,PK 為 gn + id 構(gòu)成的聯(lián)合主鍵, 只要我們需要關(guān)聯(lián)的表與角色表有相同的主鍵就可以很方便的做到部分更新和預(yù)聚合,所以角色表,登錄登出表等都比較好處理,但是注冊(cè)表和前端日志表是沒(méi)有 roleid的,沒(méi)法直接寫入角色寬表做處理,因?yàn)橹麈I不同。
這里我們首先想到是不是可以通過(guò)流 Join 的形式,分別給注冊(cè)表和前端日志表添上 roleid, 這樣就與角色表有相同的 PK, 就可以更新和聚合了。但是流 Join 的方式需要保存的狀態(tài)就相對(duì)要大。
所以我們最終是通過(guò) Paimon 的 Partial Update 將注冊(cè)表和前端日志表做成一張維表,然后 Flink SQL 流讀 DWD 層角色表寫入角色寬表的時(shí)候和維表做 Lookup Join, 最終補(bǔ)全一些字段。由于注冊(cè)表和前端日志表的數(shù)據(jù)都可能先于角色表的數(shù)據(jù)到達(dá) Paimon,所以需要用 Retry Lookup Join,保證能夠 Join 上。
還有需要做特殊處理的就是訂單表,比如角色寬表中有累積付費(fèi)字段,來(lái)自于訂單表,每個(gè)角色的累積付費(fèi)需要用訂單表中的充值金額做 SUM 聚合,但是訂單表可能出現(xiàn)重復(fù)數(shù)據(jù),比如發(fā)現(xiàn)訂單數(shù)據(jù)有問(wèn)題或是缺數(shù),都可能在 CDC 端進(jìn)行重跑來(lái)修復(fù)或補(bǔ)全數(shù)據(jù),由于 DWD 層的訂單表是 PK 表,過(guò)來(lái)重復(fù)數(shù)據(jù)就會(huì)在changelog 文件中保存 -U 和 +U 的記錄,這樣 Flink SQL 流讀訂單表寫到角色寬表做聚合時(shí),過(guò)來(lái)重復(fù)的數(shù)據(jù)就會(huì)重復(fù)求和,計(jì)算結(jié)果就不準(zhǔn)了,這里使用 audit_log 系統(tǒng)表過(guò)濾 Changelog, 只要 +I 的記錄,過(guò)濾掉 -U 和 +U 的記錄,忽略掉更新的訂單數(shù)據(jù),這樣也會(huì)存在一定的問(wèn)題,比如丟失訂單的更新數(shù)據(jù),不過(guò)充值金額一般是極少更新的。
audit_log 系統(tǒng)表使用示例:
SELECT * FROM MyTable$audit_log where rowkind='+I'
下面先介紹下注冊(cè)表和 foreend 表通過(guò) Partial Update 構(gòu)建維表的示例:
建表語(yǔ)句:
CREATE TABLE t_dim_table (
gn string,
......
PRIMARY KEY (gn, id) NOT ENFORCED
) WITH (
'bucket' = '8',
'bucket-key' = 'id'
'merge-engine' = 'partial-update', --指定 merge engine 為部分更新列
'changelog-producer' = 'full-compaction', --指定 changelog producer 為 full-compaction
'changelog-producer.compaction-interval' = '48s', --compaction 間隔48秒
'snapshot.time-retained' = '24h',
'partial-update.ignore-delete' = 'true' --忽略 DELETE 數(shù)據(jù)
);
插入 SQL:
--省略 Flink SQL 參數(shù)設(shè)置
INSERT INTO
fts_dim.t_dim_table
SELECT
gn,
id,
......
CAST(NULL AS STRING),
CAST(NULL AS STRING),
unix_timestamp()
FROM
fts_dwd.t_dwd_table
UNION ALL
SELECT
gn,
id,
CAST(NULL AS STRING),
CAST(NULL AS INT),
......
unix_timestamp()
FROM
fts_dwd.t_dwd_foreend
WHERE pid <> '0'
執(zhí)行效果如下所示:

可以看到,在數(shù)據(jù)寫入頻率比較高,Compaction時(shí)間間隔設(shè)置的比較短的時(shí)候,Writer端存在一定的壓力,經(jīng)常處于Busy狀態(tài)。
角色寬表入湖 SQL:
--省略 Flink SQL 參數(shù)設(shè)置
CREATE view default_catalog.default_database.t_view AS
SELECT
......
id,
gn,
PROCTIME() AS proc_time
FROM
fts_dwd.t_dwd_table
--插入數(shù)據(jù)
INSERT INTO
fts_dws.t_dws_role
SELECT
......
id,
CAST(NULL AS STRING),
unix_timestamp(),
FROM
(
SELECT
/*+ LOOKUP('table'='fts_dim.t_dim_register', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s','max-attempts'='30')*/
a.*,
b.aa,
b.bb
FROM
(
SELECT
*
FROM
default_catalog.default_database.t_view
) AS a
LEFT JOIN fts_dim.t_dim_table for SYSTEM_TIME AS OF a.proc_time AS b ON a.AA = b.AA
AND a.BB = b.BB
)
UNION ALL
SELECT
......
createt_time,
unix_timestamp(),
CAST(NULL AS INT)
FROM
fts_dwd.`t_dwd_o_table$audit_log`
WHERE
rowkind = '+I'
UNION ALL
......
寬表入湖執(zhí)行效果如下:

可以看到,第一個(gè) Source 因?yàn)橛玫搅?Retry Lookup Join, 后面來(lái)的數(shù)據(jù)在排隊(duì),需要等前面的數(shù)據(jù) Join 上或是重試次數(shù)用完,后面的數(shù)據(jù)才會(huì)處理,很多情況下這個(gè)節(jié)點(diǎn)處于 Busy 狀態(tài),導(dǎo)致效率很低。
3.5 ADS 層流讀到 ClickHouse
--CREATE TABLE
CREATE TABLE t_role (
gn string,
id string,
......
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://localhoust:8123',
'database-name' = 'streaming_warehouse',
'table-name' = 't_role_all',
'use-local' = 'false',
'is-changelog' = 'true',
'sink.batch-size' = '5000',
'sink.flush-interval' = '2000',
'sink.max-retries' = '3',
'username' = 'username',
'password' = 'password'
);
-- INSERT
INSERT INTO
t_role
SELECT
*
FROM
fts_dws.t_dws_role
WHERE
is_role = 1
綜上所述,基于 Apache Paimon(Incubating) 構(gòu)建 Streaming Lakehouse 能夠解決我們當(dāng)前實(shí)時(shí)數(shù)倉(cāng)中間層數(shù)據(jù)不可分析,保留時(shí)間短,問(wèn)題排查困難、數(shù)據(jù)更新處理復(fù)雜等痛點(diǎn),并能夠?qū)?Hive 離線數(shù)倉(cāng) T+1 的延遲縮小到分鐘級(jí),上線后將會(huì)優(yōu)先替換實(shí)時(shí)數(shù)倉(cāng),后期慢慢賦能離線業(yè)務(wù),用流批一體的計(jì)算引擎+流批一體的存儲(chǔ)做到真正的流批一體的湖倉(cāng)體驗(yàn)。
4. 實(shí)踐總結(jié)
- 使用 full-compaction Changelog Producer 時(shí),changelog-producer.compaction-interval 和 checkpoint interval 設(shè)置值較小,比如一分鐘以下時(shí),Writer 端在寫入數(shù)據(jù)和 Compaction 時(shí)的壓力較大,需要較大的資源,如果任務(wù)暫停一段時(shí)間,再?gòu)?Savepoint 恢復(fù)時(shí),Writer 端反壓嚴(yán)重,需不斷調(diào)整資源。后面考慮測(cè)試 lookup Changelog Producer,和 full-compaction Changelog Producer 對(duì)比,測(cè)試是否可以滿足生產(chǎn)環(huán)境低延遲流讀場(chǎng)景需求。
- 當(dāng)數(shù)倉(cāng)某一層的數(shù)據(jù)出現(xiàn)問(wèn)題,需要通過(guò) Time Travel 重新讀取某個(gè)快照或是某個(gè)時(shí)間點(diǎn)開(kāi)始的數(shù)據(jù)修復(fù)問(wèn)題,此時(shí)需要 Snaphot 文件保留時(shí)間能夠滿足問(wèn)題回溯周期,但是目前 Checkpoint Interval 設(shè)置較小,寫入數(shù)據(jù)延遲較小,導(dǎo)致 Snapshot 保留時(shí)間越長(zhǎng),生成越多的小文件,存在小文件過(guò)多的問(wèn)題。
- 目前的 Retry Lookup Join 是有序的,如果前面一條數(shù)據(jù)一直 Join 不上,那么后面來(lái)的數(shù)據(jù)也會(huì)排隊(duì),并不會(huì)處理,需要一直等到前面的數(shù)據(jù)Join上維表數(shù)據(jù)或重試次數(shù)用完,這樣造成數(shù)據(jù)處理效率很低,此時(shí)如果使用 Unordered Output,則需要 Paimon 實(shí)現(xiàn)異步 Lookup Join,目前社區(qū)正在支持:(https://github.com/apache/incubator-paimon/issues/848)。
5. 未來(lái)規(guī)劃
- 完善基于 Apache Paimon(Incubating) 的流式數(shù)倉(cāng)的建設(shè)。
- 優(yōu)化 Presto 查詢,幫助基于 Paimon 進(jìn)行即席查詢發(fā)揮作用。
- 目前 Paimon 需要對(duì)接 BI 報(bào)表或是分析系統(tǒng)的數(shù)據(jù)都是事先流讀到 ClickHouse, 然后再于 ClickHouse 進(jìn)行可視化展示,后面考慮是否直接在 Paimon 里預(yù)聚合完結(jié)果數(shù)據(jù),與前端交互直接查詢 Paimon,減少數(shù)據(jù)處理鏈路以及降低復(fù)雜度。
- 完善基于 Apache Paimon(Incubating) 的平臺(tái)服務(wù)建設(shè)。
6. Paimon 信息
- 官方網(wǎng)站:https://paimon.apache.org/
- Github 項(xiàng)目:https://github.com/apache/incubator-paimon (歡迎大家 star&fork 支持)
- 釘釘交流群:10880001919 Apache Paimon 交流群
作者簡(jiǎn)介
石在虎,大數(shù)據(jù)研發(fā)工程師,專注于實(shí)時(shí)計(jì)算,對(duì)數(shù)據(jù)湖和數(shù)據(jù)集成有著濃厚的興趣
史亞光,大數(shù)據(jù)平臺(tái)工程師,專注于數(shù)據(jù)集成與平臺(tái)開(kāi)發(fā),對(duì)流式數(shù)倉(cāng)和數(shù)據(jù)湖有著濃厚的興趣