順序保證難點(diǎn)
本文主要分析 CDC 業(yè)務(wù)場景中任務(wù)級順序保證,技術(shù)選型為:debezium、kafka、flink,其構(gòu)成了順序保證中至關(guān)重要的每一環(huán),應(yīng)該充分考慮、分析各組件的對于順序的支持。
首先 debezium 作為采集組件,其分別為 schema topic 和 data topic 提供了不同的時(shí)間字段,如下圖 schema topic 中提供了事件時(shí)間,data topic 中提供了事件時(shí)間和采集時(shí)間,為后續(xù)數(shù)據(jù)處理提供了依據(jù)。


Kafka 作為一款性能優(yōu)秀的消息隊(duì)列,在分布式事務(wù)中有著廣泛地應(yīng)用,其為了做到水平擴(kuò)展,達(dá)到提高并發(fā)的目的,將一個(gè) topic 分布到多個(gè) broker(服務(wù)器)上,即一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。Kafka 在發(fā)送消息時(shí),producer 可以知道相關(guān) topic 的集群信息,從而將消息按照不同的策略發(fā)送到不同的分區(qū)。常見的分區(qū)策略有很多種(常用包括輪詢、隨機(jī)、按分區(qū)權(quán)重、就近原則、按消息鍵分區(qū)等策略)。各個(gè)分區(qū)中的消息比較獨(dú)立,很難有一種高效的方法來判斷不同分區(qū)的順序。

Flink 程序本質(zhì)上是分布式并行程序。在程序執(zhí)行期間,一個(gè)流有一個(gè)或多個(gè)流分區(qū)(Stream Partition),每個(gè)算子有一個(gè)或多個(gè)算子子任務(wù)(Operator Subtask),每個(gè)子任務(wù)彼此獨(dú)立,并在不同的線程、節(jié)點(diǎn)或容器中運(yùn)行。
Flink 算子之間可以通過一對一(直傳)模式或重新分發(fā)模式傳輸數(shù)據(jù):

一對一模式(例如上圖 condensed view 中的 Source 和 map() 算子之間)可以保留元素的分區(qū)和順序信息。這意味著 map() 算子的輸入的數(shù)據(jù)以及其順序與 Source 算子的輸出的數(shù)據(jù)和順序完全相同,即同一分區(qū)的數(shù)據(jù)只會(huì)進(jìn)入到下游算子的同一分區(qū)。
重新分發(fā)模式(例如上圖 parallelized view 中的 map() 和 keyBy/window 之間,以及 keyBy/window 和 Sink 之間)會(huì)更改數(shù)據(jù)所在的流分區(qū)。當(dāng)你在程序中選擇使用:keyBy()(通過散列鍵重新分區(qū))、broadcast()(廣播)或 rebalance()(隨機(jī)重新分發(fā))會(huì)把數(shù)據(jù)發(fā)送到不同的目標(biāo)子任務(wù)。如上圖所示的 keyBy/window 和 Sink 算子之間數(shù)據(jù)的重新分發(fā)時(shí),不同鍵(key)的聚合結(jié)果到達(dá) Sink 的順序是不確定的。
綜上,順序保證中有兩大難點(diǎn):kafka 多分區(qū)、flink 多并行度。
方案設(shè)計(jì)
用 flink 處理來自 kafka 的數(shù)據(jù)時(shí),將為每一個(gè) topic(schema topic、data topic)創(chuàng)建一個(gè) consumer,對應(yīng)轉(zhuǎn)換為一條流(schema stream、data stream),每一個(gè)流單獨(dú)處理,互不影響。但流內(nèi)數(shù)據(jù)依然存在上述的 kafka 多分區(qū)、flink 多并行度導(dǎo)致的亂序問題。
單分區(qū)順序
解決亂序問題,首先想到的是排序,但是對于一個(gè)無界數(shù)據(jù)數(shù)據(jù)流無法進(jìn)行排序,由此引入窗口的概念,將有界數(shù)據(jù)流切分為一個(gè)個(gè)有界的窗口,在窗口內(nèi)便于執(zhí)行排序操作。

當(dāng)一個(gè)窗口到了關(guān)閉時(shí)間,不應(yīng)該立刻觸發(fā)窗口計(jì)算,而是等待一段時(shí)間,而是等遲到的數(shù)據(jù)來了再關(guān)閉窗口。數(shù)據(jù)流中的 Watermark 用于表示 timestamp 小于 Watermark 的數(shù)據(jù)都已經(jīng)到達(dá)了,并在該窗口內(nèi)按照事件時(shí)間處理該窗口內(nèi)的數(shù)據(jù)即可保證數(shù)據(jù)處理順序。watermark 本質(zhì)上是帶有特殊標(biāo)記的時(shí)間戳,必須單調(diào)遞增,以確保任務(wù)的事件時(shí)間時(shí)鐘在向前推進(jìn),而不是在后退。
注意:watermark 的設(shè)置是開發(fā)者在實(shí)時(shí)性與準(zhǔn)確性之間的權(quán)衡
如果 watermark 設(shè)置的延遲太大,收到結(jié)果的速度可能就會(huì)很慢,解決辦法是在水位線到達(dá)之前輸出一個(gè)近似結(jié)果(增量聚合)。
如果 watermark 到達(dá)得太小,則可能收到錯(cuò)誤結(jié)果,不過 Flink 可以通過側(cè)輸出流、允許的延遲(allowed lateness)來解決這個(gè)問題。
流級順序
上面提到對于對于流處理并行任務(wù)來說順序保證中的兩大難點(diǎn):kafka 多分區(qū)、流處理多并行度。flink 中給出了一個(gè)同時(shí)解決這兩個(gè)問題的解決方案,watermark 是一個(gè)流層面全局的概念,即一個(gè)流中維護(hù)一個(gè)全局的 watermark,保證流中多并行任務(wù)之間的順序,以下圖為例:

流中并行度為 4,partition WM 代表單個(gè)并行子任務(wù)的 watermark,Event-Time clock 代表該流中全局 watermark。
- 該時(shí)刻并行子任務(wù)的 watermark 分別為:2、4、3、6,全局 watermark 為并行子任務(wù) watermark 的最小值 2;
- 第一個(gè)子任務(wù)中 watermark 變?yōu)?4,此時(shí)并行子任務(wù)的 watermark 分別為:4、4、3、6,最小值變?yōu)?3,因此全局 watermark 值為 3;
- 第二個(gè)子任務(wù)中 watermark 變?yōu)?7,此時(shí)并行子任務(wù)的 watermark 分別為:4、7、3、6,最小值仍為 3,全局 watermark 值不變;
- 第三個(gè)子任務(wù)中 watermark 變?yōu)?6,此時(shí)并行子任務(wù)的 watermark 分別為:4、7、6、6,最小值變?yōu)?4,全局 watermark 值變?yōu)?4;
由此可見全局 watermark 的值取決于并行子任務(wù) watermark 的最小值,因此為減小各分區(qū)之間的 watermark 差值,建議 kafka 分區(qū)策略使用輪詢策略。
另外 flink 會(huì)根據(jù) kafka 分區(qū)數(shù)取模 flink 并行度的方式(kafka partitions % flink parallelism)調(diào)整各子任務(wù)具體處理哪一分區(qū)的數(shù)據(jù)。有三種可能的情況:
kafka partitions = flink parallelism:這種情況是最理想的,因?yàn)槊總€(gè)消費(fèi)者負(fù)責(zé)一個(gè)分區(qū)。如果消息在分區(qū)之間是平衡的,那么工作將均勻分布在 flink 并行任務(wù)之間;
kafka partitions < flink parallelism:一些 flink 并行任務(wù)處于空閑狀態(tài),不會(huì)收到任何消息(flink 中提供了定期空閑狀態(tài)檢查機(jī)制);
kafka partitions > flink parallelism:在這種情況下,某些任務(wù)將處理多個(gè)分區(qū),造成分區(qū)數(shù)據(jù)實(shí)際上以串行執(zhí)行。
建議使用第一種 kafka 分區(qū)與 flink 并行度分配方式,將 flink 并行度設(shè)置為 kafka 分區(qū)相同。
任務(wù)級順序
上述流內(nèi)亂序引入 window+watermark 之后即可解決,但是數(shù)據(jù)處理為達(dá)到任務(wù)級別的順序要求,不能只解決流內(nèi)亂序,因?yàn)?schema stream 和 data stream 并非完全相互獨(dú)立,如下:
假設(shè)某表的原始結(jié)構(gòu)為:CREATE TABLE tab1(uid bigint(20), name varchar(50))),下圖中 alter 代表:ALTER TABLE tab1 CHANGE COLUMN name uname varchar(50)。

unknow column name

unknow column uname
以上兩個(gè)實(shí)例說明了多流之間可能出現(xiàn)亂序的情況,為了保證任務(wù)級順序,需要在多流之間進(jìn)行分流與融合的操作,如下:將關(guān)于 tab1 的 schema 流切分出來,將其與 tab1 的 data 流進(jìn)行融合。保證其流內(nèi)順序,即可解決上述問題。

關(guān)注作者公眾號 HEY DATA,一起討論更多