一、數(shù)據(jù)流圖(Dataflow Graph)
所有的 Flink 程序都可以歸納為由三部分構(gòu)成:Source、Transformation 和 Sink。
- Source 表示“源算子”,負(fù)責(zé)讀取數(shù)據(jù)源。
- Transformation 表示“轉(zhuǎn)換算子”,利用各種算子進(jìn)行處理加工。
- Sink 表示“下沉算子”,負(fù)責(zé)數(shù)據(jù)的輸出。
Flink 程序會(huì)被映射成所有算子按照邏輯順序連接在一起的一張圖,這被稱(chēng)為“邏輯數(shù)據(jù)流”(logical dataflow),或者叫“數(shù)據(jù)流圖”(dataflow graph)。我們提交作業(yè)之后,打開(kāi) Flink 自帶的 Web UI,點(diǎn)擊作業(yè)就能看到對(duì)應(yīng)的 dataflow。

在運(yùn)行時(shí),F(xiàn)link 上運(yùn)行的程序會(huì)被映射成“邏輯數(shù)據(jù)流”(dataflows),它包含了這三部分。每一個(gè) dataflow 以一個(gè)或多個(gè) sources 開(kāi)始以一個(gè)或多個(gè) sinks 結(jié)束。dataflow 類(lèi)似于任意的有向無(wú)環(huán)圖(DAG)。在大部分情況下,程序中的轉(zhuǎn)換運(yùn)算(transformations)跟 dataflow 中的算子(operator)是 一 一 對(duì)應(yīng)的關(guān)系,但有時(shí)候,一個(gè) transformation 可能對(duì)應(yīng)多個(gè) operator。
二、并行度(Parallelism)
如果說(shuō) Spark基于 MapReduce 架構(gòu)的思想是“數(shù)據(jù)不動(dòng)代碼動(dòng)”,那么 Flink 就類(lèi)似“代碼不動(dòng)數(shù)據(jù)流動(dòng)”,原因就在于流式數(shù)據(jù)本身是連續(xù)到來(lái)的、我們不會(huì)同時(shí)傳輸所有數(shù)據(jù),這其實(shí)是更符合數(shù)據(jù)流本身特點(diǎn)的處理方式。
一個(gè)特定算子的子任務(wù)(subtask)的個(gè)數(shù)被稱(chēng)之為其并行度。 這樣,包含并行子任務(wù)的數(shù)據(jù)流,就是并行數(shù)據(jù)流,它需要多個(gè)分區(qū)(stream partition)來(lái)分配并行任務(wù)。一般情況下,一個(gè)流程序的并行度,可以認(rèn)為就是其所有算子中最大的并行度。一個(gè)程序中,不同的算子可能具有不同的并行度。
一個(gè)并行的任務(wù),需要占用的slot數(shù)是整個(gè)任務(wù)最大并行的數(shù)量,也就是你設(shè)置的Parallelism在整個(gè)處理流程重最大的數(shù)量。
并行度優(yōu)先級(jí):代碼層面 > 全局層面 > Web UI 層面 > 集群配置文件層面
在不設(shè)置并行度的情況下,并行度的數(shù)量取決于CPU有多少核心
三、算子鏈(Operator Chain)
數(shù)據(jù)傳輸?shù)姆绞剑?/p>
3.1 一對(duì)一(One-to-one,forwarding)
這種模式下,數(shù)據(jù)流維護(hù)著分區(qū)以及元素的順序。比如圖中的 source 和 map 算子,source算子讀取數(shù)據(jù)之后,可以直接發(fā)送給 map 算子做處理,它們之間不需要重新分區(qū),也不需要調(diào)整數(shù)據(jù)的順序。這就意味著 map 算子的子任務(wù),看到的元素個(gè)數(shù)和順序跟 source 算子的子任務(wù)產(chǎn)生的完全一樣,保證著“一對(duì)一”的關(guān)系。map、filter、flatMap 等算子都是這種 one-to-one的對(duì)應(yīng)關(guān)系。
這種關(guān)系類(lèi)似于 Spark 中的窄依賴。
3.2 重分區(qū)(Redistributing)
在這種模式下,數(shù)據(jù)流的分區(qū)會(huì)發(fā)生改變。
每一個(gè)算子的子任務(wù),會(huì)根據(jù)數(shù)據(jù)傳輸?shù)牟呗?,把?shù)據(jù)發(fā)送到不同的下游目標(biāo)任務(wù)。例如,keyBy()是分組操作,本質(zhì)上基于鍵(key)的哈希值(hashCode)進(jìn)行了重分區(qū);而當(dāng)并行度改變時(shí),比如從并行度為 2 的 window 算子,要傳遞到并行度為 1 的 Sink 算子,這時(shí)的數(shù)據(jù)傳輸方式是再平衡(rebalance),會(huì)把數(shù)據(jù)均勻地向下游子任務(wù)分發(fā)出去。這些傳輸方式都會(huì)引起重分區(qū)(redistribute)的過(guò)程,這一過(guò)程類(lèi)似于 Spark 中的 shuffle。
總體說(shuō)來(lái),這種算子間的關(guān)系類(lèi)似于 Spark 中的寬依賴。
在 Flink 中,并行度相同的一對(duì)一(one to one)算子操作,可以直接鏈接在一起形成一個(gè)“大”的任務(wù)(task),這樣原來(lái)的算子就成為了真正任務(wù)里的一部分,如圖 所示。每個(gè) task會(huì)被一個(gè)線程執(zhí)行。這樣的技術(shù)被稱(chēng)為“算子鏈”(Operator Chain)。

Flink 為什么要有算子鏈這樣一個(gè)設(shè)計(jì)呢?
這是因?yàn)閷⑺阕渔溄映?task 是非常有效的優(yōu)化:可以減少線程之間的切換和基于緩存區(qū)的數(shù)據(jù)交換,在減少時(shí)延的同時(shí)提升吞吐量。
Flink 默認(rèn)會(huì)按照算子鏈的原則進(jìn)行鏈接合并,如果我們想要禁止合并或者自行定義,也可以在代碼中對(duì)算子做一些特定的設(shè)置:
// 禁用算子鏈
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 從當(dāng)前算子開(kāi)始新鏈
.map(word -> Tuple2.of(word, 1L)).startNewChain()
四、作業(yè)圖(JobGraph)與執(zhí)行圖(ExecutionGraph)
邏輯流圖也就是數(shù)據(jù)流圖
Flink 中任務(wù)調(diào)度執(zhí)行的圖,按照生成順序可以分成四層:
邏輯流圖(StreamGraph)→ 作業(yè)圖(JobGraph)→ 執(zhí)行圖(ExecutionGraph)→ 物理圖(Physical Graph)
4.1 邏輯流圖(StreamGraph)
這是根據(jù)用戶通過(guò) DataStream API 編寫(xiě)的代碼生成的最初的 DAG 圖,用來(lái)表示程序的拓?fù)浣Y(jié)構(gòu)。這一步一般在客戶端完成。
我們可以看到,邏輯流圖中的節(jié)點(diǎn),完全對(duì)應(yīng)著代碼中的四步算子操作:
源算子 Source(socketTextStream())→扁平映射算子 Flat Map(flatMap()) →分組聚合算子Keyed Aggregation(keyBy/sum()) →輸出算子 Sink(print())。

4.2 作業(yè)圖(JobGraph)
StreamGraph 經(jīng)過(guò)優(yōu)化后生成的就是作業(yè)圖(JobGraph),這是提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu),確定了當(dāng)前作業(yè)中所有任務(wù)的劃分。主要的優(yōu)化為: 將多個(gè)符合條件的節(jié)點(diǎn)鏈接在一起合并成一個(gè)任務(wù)節(jié)點(diǎn),形成算子鏈,這樣可以減少數(shù)據(jù)交換的消耗。JobGraph 一般也是在客戶端生成的,在作業(yè)提交時(shí)傳遞給 JobMaster。
分組聚合算子(Keyed Aggregation)和輸出算子 Sink(print)并行度都為 2,而且是一對(duì)一的關(guān)系,滿足算子鏈的要求,所以會(huì)合并在一起,成為一個(gè)任務(wù)節(jié)點(diǎn)。

4.3 執(zhí)行圖(ExecutionGraph)
JobMaster 收到 JobGraph 后,會(huì)根據(jù)它來(lái)生成執(zhí)行圖(ExecutionGraph)。ExecutionGraph是 JobGraph 的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。與 JobGraph 最大的區(qū)別就是按照并行度對(duì)并行子任務(wù)進(jìn)行了拆分,并明確了任務(wù)間數(shù)據(jù)傳輸?shù)姆绞健?/p>

4.4 物理圖(Physical Graph)
JobMaster 生成執(zhí)行圖后, 會(huì)將它分發(fā)給 TaskManager;各個(gè) TaskManager 會(huì)根據(jù)執(zhí)行圖部署任務(wù),最終的物理執(zhí)行過(guò)程也會(huì)形成一張“圖”,一般就叫作物理圖(Physical Graph)。這只是具體執(zhí)行層面的圖,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。物理圖主要就是在執(zhí)行圖的基礎(chǔ)上,進(jìn)一步確定數(shù)據(jù)存放的位置和收發(fā)的具體方式。有了物理圖,TaskManager 就可以對(duì)傳遞來(lái)的數(shù)據(jù)進(jìn)行處理計(jì)算了。
所以我們可以看到,程序里定義了四個(gè)算子操作:源(Source)->轉(zhuǎn)換(flatMap)->分組聚合(keyBy/sum)->輸出(print);合并算子鏈進(jìn)行優(yōu)化之后,就只有三個(gè)任務(wù)節(jié)點(diǎn)了;再考慮并行度后,一共有 5 個(gè)并行子任務(wù),最終需要 5 個(gè)線程來(lái)執(zhí)行。

參考:
https://blog.csdn.net/weixin_45417821/article/details/124101562