本文根據(jù)的是flink1.12和flink1.13社區(qū)文章及分享整理。個人根據(jù)社區(qū)相關(guān)學(xué)習(xí)理解整理,僅供參考。
流批一體架構(gòu)




A.flink 1.11 及之前
- 統(tǒng)一了Tabel/SQL API & Planner
- 統(tǒng)一shuffle架構(gòu)
B.flink1.12
優(yōu)化總結(jié):
1.DataStreamAPI 批執(zhí)行模式
2.流批統(tǒng)一Source&Sink API
3.Pipeline Region scheduler
1.DataStream API 批執(zhí)行模式
背景:flink 中雖然上層Table/Sql已經(jīng)流批統(tǒng)一,但底層api仍是分開的,DataStream和DataSet。
因為批處理是流處理的特例,所以講兩種合并成統(tǒng)一的API,這樣的好處是:
a. 具有好的復(fù)用性,作業(yè)可以在流和批這兩種執(zhí)行模式之間自由地切換,而無需重寫任何代碼。因此,用戶可以復(fù)用同一個作業(yè),來處理實時數(shù)據(jù)和歷史數(shù)據(jù)。
b.維護簡單,統(tǒng)一的 API 意味著流和批可以共用同一組 connector,維護同一套代碼,并能夠輕松地實現(xiàn)流批混合執(zhí)行,例如 backfilling 之類的場景。
考慮到這些優(yōu)點,社區(qū)已朝著流批統(tǒng)一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。
從長遠來看,這意味著 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。
■ 有限流上的批處理
您已經(jīng)可以使用 DataStream API 來處理有限流(例如文件)了,但需要注意的是,運行時并不“知道”作業(yè)的輸入是有限的。為了優(yōu)化在有限流情況下運行時的執(zhí)行性能,新的 BATCH 執(zhí)行模式,對于聚合操作,全部在內(nèi)存中進行,且使用 sort-based shuffle(FLIP-140)和優(yōu)化過的調(diào)度策略(請參見 Pipelined Region Scheduling 了解更多詳細信息)。因此,DataStream API 中的 BATCH 執(zhí)行模式已經(jīng)非常接近 Flink 1.12 中 DataSet API 的性能。有關(guān)性能的更多詳細信息,請查看 FLIP-140。
在 Flink 1.12 中,默認執(zhí)行模式為 STREAMING,要將作業(yè)配置為以 BATCH 模式運行,可以在提交作業(yè)的時候,設(shè)置參數(shù) execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通過編程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
2. 流批統(tǒng)一Source & Sink API
- 1.11版本已經(jīng)支持了 source connector 工作在流批兩種模式下。
TODO - 1.12支持了對Data Sink API的重構(gòu)。

現(xiàn)有只支持FileSInkConnector。替換現(xiàn)有的StreamingFileSink Connector。
新的抽象引入了 write/commit 協(xié)議和一個更加模塊化的接口。Sink 的實現(xiàn)者只需要定義 what和 how:
SinkWriter,用于寫數(shù)據(jù),并輸出需要 commit 的內(nèi)容(例如,committables);
Committer 和 GlobalCommitter,封裝了如何處理 committables。
框架會負責(zé) when 和 where:即在什么時間,以及在哪些機器或進程中 commit。
3. Pipelined Region 調(diào)度 (FLIP-119)
1). 1.12之前方案
(1)前提:兩種模式:
a. pipelined result: 數(shù)據(jù)順序一個一個的消費
b.blocking result: 在上游所有數(shù)據(jù)生成完成才開始執(zhí)行。
(2)版本1.12之前是流批分開的
- sreaming: (使用的是a方式)

- batch: (stage內(nèi)部使用的是pipelined,stage之間使用的是blocking)
這種方式優(yōu)點:a.只用調(diào)度有數(shù)據(jù)的stage,所以更高效。 b.stage fail可以單獨重啟,不需要重新計算其他stage。

(3) before 1.19 調(diào)度策略
統(tǒng)一的調(diào)度器需要對每個階段,包括流處理和批處理,都要好的資源調(diào)度。1.12之前采用的是不同的調(diào)度策略,分別解決流批問題。
a. “all at once”
立刻執(zhí)行,用于流處理。對于批處理,立刻執(zhí)行可能會影響資源利用率,可能導(dǎo)致資源預(yù)先分配,等待上游數(shù)據(jù)而導(dǎo)致資源浪費。
b."lazy from sources"
對于批處理,使用懶加載方式,即input數(shù)據(jù)準備好之后再分配后續(xù)operator的資源。
這個策略獨立運行在每個子任務(wù)中,所以不會識別同時在運行的所有subtask.
舉例:
A 是批數(shù)據(jù),B是流表,C是需要join。
slot=1:B-C chain,那么C因為A未完成而無法執(zhí)行。flink會嘗試部署A,因為沒有slot導(dǎo)致job失敗。
slot=2:這時可用,flink能部署A,job能成功執(zhí)行,但是當(dāng)A在執(zhí)行的時候,第一個slot會被B和C占用而浪費資源。
失敗情況: 如果B→C失敗,我們不用再重新執(zhí)行A,但是1.9之前是不支持的。
社區(qū)為支持流批統(tǒng)一,設(shè)計了一個統(tǒng)一的調(diào)度和失敗策略,Pipelined region scheduling.
2). pilelined region scheduling

新調(diào)度策略在開始substask之前,通過分析ExecutionGraph,識別出pipelined region。
region內(nèi)部使用的是pipelined方式,外部使用的是blocking方式。
(1)調(diào)度策略:
在region內(nèi),消費者需要不斷消費生產(chǎn)的數(shù)據(jù),以保證生產(chǎn)者不被block,并且避免背壓。因此region的所有子任務(wù)必須被調(diào)度,失敗是整體重啟,同時運行。
圖中r1→( r2,r3)→ r4,如果jobmanager有足夠資源,那么在上游數(shù)據(jù)finished之后,將盡可能的執(zhí)行更多的下游region。子任務(wù)執(zhí)行是根據(jù)region分配的,要么成功,要么失敗。
(2)失敗策略
當(dāng)然子任務(wù)失敗,那么region重啟,重新消費輸入數(shù)據(jù)。如果一些輸入數(shù)據(jù)丟失,那么flink會重新執(zhí)行上游生產(chǎn)region。
好處:
1.可以在有限資源情況下,盡可能的執(zhí)行批任務(wù)。
2.可以提高資源利用率并消除死鎖。
參考:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html
C. flink1.13
優(yōu)化總結(jié):
1.大規(guī)模作業(yè)調(diào)度優(yōu)化
2.sort-Merge Shuffle
3.有限作業(yè)一致性保證
1.大規(guī)模作業(yè)調(diào)度
背景:

由于在創(chuàng)建圖的時候,邊會存儲對象,那么在大規(guī)模作業(yè)調(diào)度時,會占用大量內(nèi)存。
引入
A. 在ExecutionGraph中有兩種,一種是pointwise模式(一對一或一對多),還有一種是alltoall(多堆多)
B. 什么情況是pointwise模式?
partition 分區(qū)方式
image.png
參考:區(qū)別https://blog.csdn.net/lvwenyuan_1/article/details/103722226
代碼中:


ForwardPartitioner 和RescalePartitioner 屬于pointwise模式,其他的均屬于多對多。
C. 針對這兩種方式,將多消費者合成消費組,減少對象創(chuàng)建,將O(n)變成了O(1)


2.sort-merge shuffle
中間數(shù)據(jù)是如何保存和讀取的?
在1.10以前實現(xiàn)了統(tǒng)一shuffle機制,參考:https://ververica.cn/developers/shuffle-mechanism/
- flink 網(wǎng)絡(luò)流控和反壓機制:https://ververica.cn/developers/advanced-tutorial-2-analysis-of-network-flow-control-and-back-pressure/
背景:
針對批作業(yè),在數(shù)據(jù)shuffle的優(yōu)化。
上游跑完寫中間文件
節(jié)省資源,不需要上游和下游同時起來。
失敗不需要重新執(zhí)行。

flink 默認的shuffle,給每個下游輸出單獨文件。
- 大量小文件
- 內(nèi)存浪費,每個文件至少用1個buffer
- 下游數(shù)據(jù)讀取產(chǎn)生大量隨機I/O
新方案:sort shuffle

-
先寫緩沖區(qū),把數(shù)據(jù)按照不同的下游分組,最后寫入文件
image.png
(1)申請固定大小緩沖區(qū),避免緩沖區(qū)隨著規(guī)模增大而增大
(2)數(shù)據(jù)寫入緩沖區(qū),在緩沖區(qū)滿的時候會對數(shù)據(jù)進行排序(合并分區(qū)),然后寫入單獨文件。后邊數(shù)據(jù)接著寫到文件后邊。文件有多個段,每個段內(nèi)有序。
沒有采用外排序,merge不劃算。
-
下游上層做I/O調(diào)度,下游讀取是通過一個調(diào)度器。
image.png
image.png
3. 有限作業(yè)一致性保證

背景:有限流不能做checkpoint,無法保證一致性。
優(yōu)化:

所有subtask結(jié)束,只存標記
部分subtask結(jié)束,會存儲剩下部分的數(shù)據(jù)。
結(jié)束語義整理:
數(shù)據(jù)有限正常結(jié)束
savepoint結(jié)束

endofinput 通知,統(tǒng)一做checkpoint,保證最后數(shù)據(jù)一定會提交到系統(tǒng)中。
stopwithsavepoint,不同統(tǒng)一做checkpoint,
正常結(jié)束,認為任務(wù)不再重啟,調(diào)用endofinput,提交最后數(shù)據(jù)。
stop-with-savepoint,通過savepoint結(jié)束,后期會重啟,不會提交最后數(shù)據(jù)
stop-with-savepoint --drain ,通過savepoint結(jié)束,后期不會重啟,調(diào)用endofinput,提交最后數(shù)據(jù)。

參考:https://developer.aliyun.com/live/246712?spm=a2c6h.12873639.0.0.2f9612a824wQIq



