Flink Job 調(diào)度

[toc]

介紹 Flink 是如何調(diào)度 Job,以及如何在 JobManager 上維護(hù)并跟蹤 Job 狀態(tài)。

調(diào)度

Flink 通過任務(wù)槽(Task slot)定義執(zhí)行資源,每個(gè) TaskManager 都有一或多個(gè) Task Slot,每個(gè) Task Slot 都可以運(yùn)行并行 Task 的一個(gè) pipeline。pipeline 包括多個(gè)連續(xù)的任務(wù),參照下圖說明,由一個(gè) Data source、一個(gè) MapFunction 和一個(gè) ReduceFunction 組成的程序,Data source 和 MapFunction 的并發(fā)度都為 4,ReduceFunction 的并發(fā)度為 3。一個(gè) pipeline 由 Source-Map-Reduce 組成,在具有 2 個(gè) TaskManager,每個(gè) TaskManager 有 3 個(gè) Task slot 的集群上運(yùn)行,程序執(zhí)行情況下:

說明如下:

  • TaskManager 1 和 TaskManager 上,分別有 2 個(gè) pipeline,各占用一個(gè) Task slot

  • 4 個(gè) pipeline 是并行執(zhí)行的

Flink 內(nèi)通過 SlotSharingGroupCoLocationGroup 來定義哪些任務(wù)共享一個(gè) Task slot,哪些任務(wù)必須嚴(yán)格的放到一個(gè) Task slot中。

JobManager數(shù)據(jù)結(jié)構(gòu)

Job 執(zhí)行期間,JobManager 會(huì)持續(xù)跟蹤分布式任務(wù),決定什么時(shí)候調(diào)度下一個(gè) Task(或者一組 Task),并且對(duì)已完成的或執(zhí)行失敗的 Task 進(jìn)行響應(yīng)。

JobManager 接收 JobGraph,JobGraph 是數(shù)據(jù)流的表現(xiàn)形式,包括算子(JobVertex)和中間結(jié)果(IntermediateDataSet)。每個(gè)算子都有并行度和執(zhí)行代碼等屬性。此外,JobGraph 還擁有一些在算子執(zhí)行代碼時(shí)所需要的附加庫。

JobManager 將 JobGraph 轉(zhuǎn)換為 ExecutionGraph ,ExecutionGraph 是 JobGraph 的并行版本:每個(gè) JobVertex 包含并行 SubTask 的 ExecutionVertex 。一個(gè)并行度為 100 的算子將擁有一個(gè) JobVertex 和 100 個(gè) ExecutionVertex。ExecutionVertex 會(huì)跟蹤特定 SubTask 的執(zhí)行狀態(tài)。來自一個(gè) JobVertex 的所有 ExecutionVertex 都由一個(gè) ExecutionJobVertex 管理保存,ExecutionJobVertex 跟蹤算子整體狀態(tài)。除了各個(gè)節(jié)點(diǎn)之外,ExecutionGraph 還包括了 IntermediateResult IntermediateResultPartition,前者跟蹤中間結(jié)果的狀態(tài),后者跟蹤中間結(jié)果每個(gè)分區(qū)的狀態(tài)。

Job 狀態(tài)

每個(gè) ExecutionGraph 都有一個(gè)與其相關(guān)聯(lián)的 Job Status,指示 Job 執(zhí)行的當(dāng)前狀態(tài)。

一個(gè) Flink Job 狀態(tài)機(jī)首先處于創(chuàng)建狀態(tài)(created),然后切換到運(yùn)行狀態(tài)(running),并且在完成所有工作后,切換到完成狀態(tài)(finished)。在失敗的情況下,狀態(tài)機(jī)首先切換到失敗狀態(tài)(failing),取消所有正在運(yùn)行 Task。如果所有 JobVertex 都已達(dá)到最終狀態(tài)(參考 Vertex 狀態(tài)),并且 Job 不可重新啟動(dòng),則狀態(tài)機(jī)將轉(zhuǎn)換為失敗(failed)。如果 Job 可以重新啟動(dòng),那么狀態(tài)機(jī)將進(jìn)入重新啟動(dòng)狀態(tài)(restarting)。一旦完成重新啟動(dòng),狀態(tài)機(jī)將變成創(chuàng)建狀態(tài)(created)。

在用戶取消 Job 的情況下,將進(jìn)入取消狀態(tài)(cancelling),會(huì)取消所有當(dāng)前正在運(yùn)行的 Task 。一旦所有運(yùn)行的 Task 已經(jīng)達(dá)到最終狀態(tài)(參考 Vertex 狀態(tài)),job 狀態(tài)機(jī)將轉(zhuǎn)換到已取消狀態(tài)(canceled)。

完成狀態(tài)(finished),取消狀態(tài)(canceled)和失敗狀態(tài)(failed)表示一個(gè)全局的終結(jié)狀態(tài),并且觸發(fā)清理工作,而暫停狀態(tài)(suspended)僅處于本地終止?fàn)顟B(tài)。意味著 job 的執(zhí)行在相應(yīng)的 JobManager 上終止,但集群的另一個(gè) JobManager 可以從持久化的高可用存儲(chǔ)中恢復(fù)這個(gè) Job 并重新啟動(dòng)。因此,處于暫停狀態(tài)(suspended)的 Job 將不會(huì)被完全清理。

Vertex 狀態(tài)

在執(zhí)行 ExecutionGraph 期間,每個(gè)并行 Task 經(jīng)過多個(gè)階段,從創(chuàng)建(created)到完成(finished)或失敗(failed),下圖說明了它們之間的狀態(tài)和可能的轉(zhuǎn)換。任務(wù)可以執(zhí)行多次(例如故障恢復(fù))。每個(gè) Execution 跟蹤一個(gè) ExecutionVertex 的執(zhí)行,每個(gè) ExecutionVertex 都有一個(gè)當(dāng)前 Execution(current execution)和一個(gè)前置 Execution(prior execution)。

Flink 執(zhí)行圖

在 Flink 中的執(zhí)行圖可以分為四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖

  • StreamGraph:Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)??梢哉{(diào)用 env.getExecutionPlan() 輸出 Json 串,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 可視化該執(zhí)行圖。

  • JobGraph:StreamGraph 經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn)鏈接(chain)在一起作為一個(gè)節(jié)點(diǎn),減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。

  • ExecutionGraph:JobManager 根據(jù) JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。

  • 物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。

四層執(zhí)行圖的演變過程如下圖所示(來源:Flink 原理與實(shí)現(xiàn):架構(gòu)和拓?fù)涓庞[):


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容