Flink CDC & MongoDB 聯合實時數倉的探索實踐

685.jpg

摘要:本文整理自 XTransfer 技術專家, Flink CDC Maintainer 孫家寶,在 Flink Forward Asia 2022 數據集成專場的分享。本篇內容主要分為四個部分:

  1. MongoDB 在實時數倉的探索

  2. MongoDB CDC Connector 的實現原理和使用實踐

  3. FLIP-262 MongoDB Connector 的功能預覽

  4. 總結和展望

點擊查看原文視頻 & 演講PPT

一、MongoDB 在實時數倉的探索

MongoDB 是一款非關系型的文檔數據庫,支持大規(guī)模的數據存儲和靈活的存儲結構,在 XTransfer 內部有著比較大規(guī)模的應用。

另外,XTransfer 在實時數倉方面也有著積極的探索,除了目前比較流行的基于湖技術的構建實時數倉的方式外,Flink 和 MongoDB 也有著構建輕量級實時數倉的潛力。

1.1 MongoDB 簡介

1.jpg

MongoDB 是一種面向文檔的非關系型數據庫,支持半結構化的數據存儲。它還是一種分布式的數據庫,提供副本集和分片級兩種集群部署模式,具有高可用和水平擴展的能力,適合大規(guī)模的數據存儲。另外,MongoDB 在 3.0 版本之后還引入了 Change Streams 特性,支持并簡化了數據庫的變更訂閱。

1.2 常見的實時架構選型

2.jpg
  • Flink 和 Kafka 純實時鏈路的實時數倉。

優(yōu)勢包括,數據新鮮度高;數據寫入較快;Kafka 的周邊組件生態(tài)較好。

缺陷包括,中間結果不可查。Kafka 是線性存儲,記錄了數據的每一次變更,因此如果要得到最新的鏡像值,需要遍歷所有在 Kafka 中的記錄,因此也無法進行比較靈活快速的 OLAP 查詢,對于排查問題方面也比較困難;Kafka 的冷熱分離還有待實現,不能充分利用一些廉價存儲;這套架構一般需要額外維護兩套流批架構,對部署開發(fā)運維成本會較高。

3.jpg
  • 基于湖存儲的實時數倉架構。

目前比較流行的數據湖 Iceberg、Hudi,同時支持了批式讀取和流式讀取的能力,可以通過 Flink 實現流批一體的計算能力,其次,湖存儲在存儲上會充分考慮如何利用廉價存儲,相對于 Kafka 具有更低的存儲成本。

但基于湖存儲的實時數倉也有一些缺點,包括部署成本較高,例如需要額外部署一些 OLAP 查詢引擎。其次,對于數據權限也需要額外的組件來支持。

4.jpg
  • 基于 MongoDB 的實時數倉。

MongoDB 本身支持大規(guī)模數據集存儲,也支持靈活的數據格式;MongoDB 的部署成本低,組件依賴少,并且有完整的權限控制。相比于其他的實時數倉架構,Flink 和 MongoDB 也有著構建輕量級實時數倉的潛力。這種模式要求 Flink 對 MongoDB 擁有流式讀取、批式讀取、維表查詢和行級寫入的能力。

目前全增量一體化流式查詢可以通過 Flink CDC MongoDB Connector 提供,批式讀取維表查詢寫入的功能可以由 FLIP-262 MongoDB Connector 提供。

二、MongoDB CDC Connector 的實現原理和使用實踐

2.1 MongoDB CDC Connector

5.jpg

MongoDB CDC Connector 由 XTransfer 基礎架構團隊開發(fā),并已貢獻給了 Flink CDC 社區(qū)。在 Flink CDC 2.1.0 版本中正式引入,支持了全增量一體化的 CDC 讀取以及元數據提取的功能;2.1.1 版本中,支持連接未開啟認證的 MongoDB;2.2.0 版本中,支持正則表達式篩選的功能;2.3.0 版本中,基于增量快照讀框架,實現了并行增量快照讀的功能。

2.2 Change Streams 特性

6.jpg

MongoDB CDC Connector 是基于 MongoDB Change Streams 特性來實現的。MongoDB 是一個分布式的數據庫,在分布式的環(huán)境中,集群成員之間一般會進行相互復制,來確保數據的完整性和容錯性。與 MySQL 的 Binlog 比較類似,MongoDB 也提供了 oplog 來記錄數據的操作變更,次要節(jié)點之間通過訪問主節(jié)點的變更日志來進行數據的同步。

我們也可以通過直接遍歷 MongoDB oplog 的方式來獲取數據庫的變更。但分片集群一般由多個 shard 組成,每個 shard 一般也是一個獨立的副本集。在分片上的 oplog 僅包含在它分片范圍里的數據,因此我們需要遍歷所有 shard 上的 oplog,并把它們根據時間進行排序合并,這顯然會比較復雜。

值得一提的是,在 MongoDB 3.6 版本之后,引入了 Change Streams 特性,提供了更簡單的 API 來簡化數據訂閱。

7.jpg

使用 Change Streams 的 API,我們可以屏蔽遍歷 oplog 并整合的復雜度,并且支持實例、庫、集合等多種級別的訂閱方式,以及完整的故障恢復機制。

2.3 Change Streams 的故障恢復

8.jpg

MongoDB 通過 ResumeToken 來進行斷點恢復,Change Streams 返回的每條記錄都會攜帶一個 ResumeToken,每個 ResumeToken 都對應了 oplog 中的一條具體記錄,表示已經讀到的 oplog 的位置。另外,還記錄了變更時間以及變更文檔主鍵的信息。通過 ResumeAfter、startAfter 等方法,將 ResumeToken 作為起始參數可以對中斷的 Change Streams 進行恢復。

Change Streams 的 ResumeToken 是由 MongoDB KeyStream 編碼的一個字符串,它的結構如上圖左側所示。ts 代表數據發(fā)生變更的時間,ui 代表發(fā)生變更 collection 的 UUID,o2 代表發(fā)生變更的文檔的主鍵。詳細的 oplog 字段描述可以參考 oplog_entry 。

上圖右側是一個 oplog 的具體記錄,它描述了在 107 結尾主鍵下的一條記錄的一次更改,將 weight 字段修改成了 5.4。值得一提的是 MongoDB 在 6.0 版本中并沒有提供變更前和變更后完整的鏡像值。這也是我們沒有直接采用 oplog 去實現 MongoDB CDC Connector 的一個原因。

2.4 Change Streams 的演進

9.jpg

MongoDB 在 3.6 版本中正式引入了變更流特性,但僅支持對于單個集合的訂閱。在 4.0 版本支持了實例、庫級別的訂閱,也支持了指定時間戳開啟變更流的功能。在 4.0.7 版本引入了 postBatchResumeToken:

在 4.0 版本之前打開一個變更流后,如果沒有新的變更數據產生,那么將不會獲取到最新的 ResumeToken。如果此時發(fā)生故障,并且嘗試使用了比較老舊的 ResumeToken 來恢復,可能會降低服務器的性能,因為服務器可能會需要掃描更多的 oplog 的條目。如果 ResumeToken 對應的 oplog 被清除了,那么這個變更流將無法進行恢復。

為了解決這個問題,MongoDB 4.0 提供了 postBatchResumeToken,標記已經掃描的 oplog 的位置,并且會隨時間持續(xù)推進。另外,利用這個特性,我們可以比較準確的定位當前 Change Streams 消費的位置,進而實現增量快照讀的功能。

在 MongoDB 4.2 版本,可以使用 startAfter 去處理一些 invalid 的事件,在 MongoDB 5.1 版本對 Change Streams 進行了一系列的優(yōu)化。在 MongoDB 6.0 版本,提供了 Change Streams 前置、后置鏡像值完整信息,以及 Schema 變更的訂閱機制。

2.5 MongoDB CDC Connector

10.jpg

MongoDB CDC Connector 的實現原理,是利用了 Change Streams 的特性,將增、刪、改等變更事件轉換成 Flink 的 upsert 類型的變更流。在 Flink SQL 場景下,Planner 會加上 Changelog Normalize 的算子,將 upsert 類型的變更流進行標準化。結合 Flink 強大的計算能力,容易實現實時 ETL 甚至異構異構數據源的計算場景。

11.jpg

在 Flink CDC 2.3 版本,依托于增量快照讀框架實現了無鎖快照讀的功能,支持并發(fā)快照,大大縮短了快照時間。關于增量快照讀的總體流程是如上圖所示。為了讓 snapshot 并行化,首先要將完整的數據集切分成多個區(qū)塊。將這些區(qū)塊分配給不同的 Source Reader 并行讀取,以提升整個 snapshot 的速度。但 MongoDB 的主鍵它多為 ObjectId,不能按照簡單的增加范圍的方式去切分,因此對于 MongoDB 的切分策略需要單獨去設計。

12.jpg

MongoDB 有以下三種切分策略,這些切分策略參考了 Mongo Spark 項目。

  • 第一種切分策略使用了 Sample 命令對集合進行隨機采樣,再通過文檔的平均大小計算出分桶數量。然后將采樣數據分配到各個桶中,構成 Chunk 的邊界。優(yōu)點是速度快,適用于數據量大且未分片的集合。缺點是采用抽樣預估,Chunk 的大小不能做到絕對均勻。

  • 第二種切分策略使用了 SplitVector 命令。SplitVector 是 MongoDB 分片式計算分裂點的內部命令,通過訪問指定索引來計算每個節(jié)點 Chunk 的邊界。優(yōu)點是速度快,并且 Chunk 的大小均勻,但它額外需要 SplitVector 命令的執(zhí)行權限。

  • 第三種切分策略針對于分片集合,對于已經分片好的集合,我們不用重新計算它的分片結果,可以直接讀取 MongoDB 已經分片好的結果作為 Check 的邊界。優(yōu)點是速度快,Chunk 的大小均勻,但是 Chunk 的大小無法調節(jié),依賴于 MongoDB 自身對于每個分片的配置,默認大小為 64mb,另外它額外需要對 config 庫的讀取權限。

13.jpg

接下來介紹一下增量快照讀的過程。對于一個已經切分好的區(qū)塊,在快照執(zhí)行前后分別記錄當前 Change Streams 的位置。在快照結束之后,根據快照起始、結束的位點范圍,對變更流進行回放,最后將快照記錄和變更記錄按 Key 進行合并,得到完整的結果,避免了重復數據的發(fā)送。

14.jpg

在單個 Chunk 的增量讀階段,我們讀取了 Chunk 范圍內的快照數據以及 Chunk 范圍內的增量數據,并將其進行合并。但整體的 snapshot 的過程可能并沒有結束,那么已經完成 snapshot 的區(qū)塊,在后邊的時間仍然可能會發(fā)生變更,因此我們需要對這些變更數據進行補償。從全局最低的高水位點處開始啟動變更流,對于變更時間高于 Chunk 高位點的變更數據進行補償。當達到全局 snapshot 最高位點的時候,我們的補償便可以結束。

15.jpg

接下來介紹一些關于 MongoDB CDC Connector 的生產建議。

  • 第一,使用增量快照特性,MongoDB 的最小可用版本在 4.0 版本。因為在 4.0 版本之前,沒有發(fā)生變更時無法獲取 ResumeToken,且也不能夠從指定的時間點進行啟動,因此難以實現增量快照特性。

    在 MongoDB 4.0.7 版本之后,引入了 postBatchResumeToken,可以比較容易的獲取當前 Change Streams 的位置,因此比較推薦的版本在 4.0.7 以上。

16.jpg
  • 第二,控制文檔的大小不要超過 8mb,因為 MongoDB 對單條文檔有 16 mb 的限制,變更文檔因為包含一些額外的信息,比如修改的字段是哪些等等,即使原文檔沒有超過 16mb,變更文檔也會超過 16mb 的限制,從而導致 Change Streams 異常終止。這個應該屬于 MongoDB Change Streams 的一個缺陷。

    關于 MongoDB 的變更文檔可以超過 16mb 的限制,已在 MongoDB 的 issue 中進行推進。

17.jpg
  • 第三,在 MongoDB 中分片件其實在開啟事務之后允許被修改。但修改分片鍵可能會引起分片的頻繁移動,引起額外的性能開銷。另外,修改分片鍵還可能導致 Update Lookup 功能失效,在 CDC 的場景中可能會導致結果的不一致。

三、FLIP-262 MongoDB Connector 的功能預覽

上面我們介紹了 MongoDB CDC Connector,可以對 MongoDB 進行增量的 CDC 讀取,但如果要在 MongoDB 上構建實時數倉,我們還需要對 MongoDB 進行批量讀取、寫入以及 Lookup 的能力。這些功能在 FLIP-262 MongoDB Connector 中進行實現,目前已經發(fā)布第一個版本。

3.1 FLIP-262 Introduce MongoDB Connector

18.jpg

在并行讀取方面,MongoDB Connector 基于 FLIP-27 新的 Source API 實現;支持批量讀??;支持 Lookup。在并行寫入方面,基于 FLIP-177 Sink API 實現;支持 Upsert 寫入。在 Table API 方面,實現了 FLIP-95 Table API 使用 Flink SQL 進行讀取或寫入。

3.2 讀取 MongoDB

19.jpg

首先我們在 MongoDB 中插入一些測試數據,然后使用 Flink SQL 定義一張 users 表,通過 select 語句我們可以得到右邊所示的結果??梢园l(fā)現右邊的結果和我們插入的測試數據是一致的。

3.3 寫入 MongoDB

20.jpg

首先我們定義一張 users snapshot 的結果表,對應 MongoDB users snapshot 的集合。然后我們通過 Flink SQL 的 insert 語句,將上面定義的 users 表集合的數據,讀取并寫入到 MongoDB。

最后查詢一下我們新定義的這張結果表,它的結果如右邊所示??梢园l(fā)現它的結果和之前源表的結果是一致的,這代表著我們寫入一張新的集合是成功的。

3.4 用作維表關聯

21.jpg

接下來來演示一下,將上面定義的 user 表作為維表進行 Lookup 的場景。

首先我們定義一張 pageviews 的事實表,user_id 作為 Lookup Key,對應于我們之前定義的 users 表的主鍵。然后我們查詢 pageviews 表可以得到右邊的結果。

22.jpg

接著定義一張結果表代表打款以后的結果,這個結果表對于 users 是作為維表關聯去補充一些區(qū)域信息。然后我們通過 Flink SQL 將 pageviews 事實表和 users 維表進行關聯,寫入到結果表。然后查詢結果表可以得到打寬后 user_region 的信息。如右圖所示,打寬以后的 user_region 在最后一列,這說明我們的 Lookup 是成功的。

四、總結和展望

4.1 總結

23.jpg

至此,Flink 聯合 MongoDB 的實時數倉架構便可以實現,在建設實時數倉時多了一份選擇。如圖所示,通過 CDC Connector 完成整套流式鏈路,輔助 Lookup 進行數據打寬。通過 Source Connector 完成一整套批式鏈路,最后將計算的中間結果通過 Sink Connector 進行存儲,那么整套實時數倉的架構便得以實現。

4.2 存在的問題

目前還存在著以下問題:

  • Changelog Normalize 是一個有狀態(tài)的算子,它會一些額外的狀態(tài)開銷。

  • Update Lookup 的完整狀態(tài)提取,也需要一定的查詢開銷。

  • 在 MongoDB 中的文檔大小有 16mb 限制。如果有一些很大的單條數據,那么可能并不適合采用這種架構。

4.3 未來規(guī)劃

在 MongoDB CDC Connector 方面,我們需要:

  • 支持 MongoDB 6.0 版本。

  • 支持指定時間點啟動。

  • 推進推進 Changelog Normalize 優(yōu)化。

在 MongoDB Connector 方面,我們需要:

  • 支持謂詞下推。

  • 支持 AsyncLookup。

  • 支持 AsyncSink。

點擊查看原文視頻 & 演講PPT

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容