【轉(zhuǎn)】Flink 原理與實(shí)現(xiàn):如何處理反壓?jiǎn)栴}

本文所討論的計(jì)算資源是指用來(lái)執(zhí)行 Task 的資源,是一個(gè)邏輯概念。本文會(huì)介紹 Flink 計(jì)算資源相關(guān)的一些核心概念,如:Slot、SlotSharingGroup、CoLocationGroup、Chain等。并會(huì)著重討論 Flink 如何對(duì)計(jì)算資源進(jìn)行管理和隔離,如何將計(jì)算資源利用率最大化等等。理解 Flink 中的計(jì)算資源對(duì)于理解 Job 如何在集群中運(yùn)行的有很大的幫助,也有利于我們更透徹地理解 Flink 原理,更快速地定位問(wèn)題。

Operator Chains

為了更高效地分布式執(zhí)行,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task。每個(gè)task在一個(gè)線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。

我們?nèi)砸越?jīng)典的 WordCount 為例(參考前文Job例子),下面這幅圖,展示了Source并行度為1,F(xiàn)latMap、KeyAggregation、Sink并行度均為2,最終以5個(gè)并行的線程來(lái)執(zhí)行的優(yōu)化過(guò)程。

image

上圖中將KeyAggregation和Sink兩個(gè)operator進(jìn)行了合并,因?yàn)檫@兩個(gè)合并后并不會(huì)改變整體的拓?fù)浣Y(jié)構(gòu)。但是,并不是任意兩個(gè) operator 就能 chain 一起的。其條件還是很苛刻的:

  1. 上下游的并行度一致
  2. 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
  3. 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
  4. 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是ALWAYS)
  5. 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD)
  6. 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū)
  7. 用戶沒(méi)有禁用 chain

Operator chain的行為可以通過(guò)編程API中進(jìn)行指定。可以通過(guò)在DataStream的operator后面(如someStream.map(..))調(diào)用startNewChain()來(lái)指示從該operator開(kāi)始一個(gè)新的chain(與前面截?cái)啵粫?huì)被chain到前面)?;蛘哒{(diào)用disableChaining()來(lái)指示該operator不參與chaining(不會(huì)與前后的operator chain一起)。在底層,這兩個(gè)方法都是通過(guò)調(diào)整operator的 chain 策略(HEAD、NEVER)來(lái)實(shí)現(xiàn)的。另外,也可以通過(guò)調(diào)用StreamExecutionEnvironment.disableOperatorChaining()來(lái)全局禁用chaining。

原理與實(shí)現(xiàn)

那么 Flink 是如何將多個(gè) operators chain在一起的呢?chain在一起的operators是如何作為一個(gè)整體被執(zhí)行的呢?它們之間的數(shù)據(jù)流又是如何避免了序列化/反序列化以及網(wǎng)絡(luò)傳輸?shù)哪??下圖展示了operators chain的內(nèi)部實(shí)現(xiàn):

image

如上圖所示,F(xiàn)link內(nèi)部是通過(guò)OperatorChain這個(gè)類來(lái)將多個(gè)operator鏈在一起形成一個(gè)新的operator。OperatorChain形成的框框就像一個(gè)黑盒,F(xiàn)link 無(wú)需知道黑盒中有多少個(gè)ChainOperator、數(shù)據(jù)在chain內(nèi)部是怎么流動(dòng)的,只需要將input數(shù)據(jù)交給 HeadOperator 就可以了,這就使得OperatorChain在行為上與普通的operator無(wú)差別,上面的OperaotrChain就可以看做是一個(gè)入度為1,出度為2的operator。所以在實(shí)現(xiàn)中,對(duì)外可見(jiàn)的只有HeadOperator,以及與外部連通的實(shí)線輸出,這些輸出對(duì)應(yīng)了JobGraph中的JobEdge,在底層通過(guò)RecordWriterOutput來(lái)實(shí)現(xiàn)。另外,框中的虛線是operator chain內(nèi)部的數(shù)據(jù)流,這個(gè)流內(nèi)的數(shù)據(jù)不會(huì)經(jīng)過(guò)序列化/反序列化、網(wǎng)絡(luò)傳輸,而是直接將消息對(duì)象傳遞給下游的 ChainOperator 處理,這是性能提升的關(guān)鍵點(diǎn),在底層是通過(guò) ChainingOutput 實(shí)現(xiàn)的,源碼如下方所示,

注:HeadOperator和ChainOperator并不是具體的數(shù)據(jù)結(jié)構(gòu),前者指代chain中的第一個(gè)operator,后者指代chain中其余的operator,它們實(shí)際上都是StreamOperator

private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
    // 注冊(cè)的下游operator
    protected final OneInputStreamOperator<T, ?> operator;
    public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
        this.operator = operator;
    }
    @Override
    // 發(fā)送消息方法的實(shí)現(xiàn),直接將消息對(duì)象傳遞給operator處理,不經(jīng)過(guò)序列化/反序列化、網(wǎng)絡(luò)傳輸
    public void collect(StreamRecord<T> record) {
        try {
            operator.setKeyContextElement1(record);
            // 下游operator直接處理消息對(duì)象
            operator.processElement(record);
        }
        catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
    ...
}

Task Slot

架構(gòu)概覽中我們介紹了 TaskManager 是一個(gè) JVM 進(jìn)程,并會(huì)以獨(dú)立的線程來(lái)執(zhí)行一個(gè)task或多個(gè)subtask。為了控制一個(gè) TaskManager 能接受多少個(gè) task,F(xiàn)link 提出了 Task Slot 的概念。

Flink 中的計(jì)算資源通過(guò) Task Slot 來(lái)定義。每個(gè) task slot 代表了 TaskManager 的一個(gè)固定大小的資源子集。例如,一個(gè)擁有3個(gè)slot的 TaskManager,會(huì)將其管理的內(nèi)存平均分成三分分給各個(gè) slot。將資源 slot 化意味著來(lái)自不同job的task不會(huì)為了內(nèi)存而競(jìng)爭(zhēng),而是每個(gè)task都擁有一定數(shù)量的內(nèi)存儲(chǔ)備。需要注意的是,這里不會(huì)涉及到CPU的隔離,slot目前僅僅用來(lái)隔離task的內(nèi)存。

通過(guò)調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的。每個(gè) TaskManager 有一個(gè)slot,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的話,也就是說(shuō)多個(gè)task運(yùn)行在同一個(gè)JVM中。而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸。也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個(gè)task的消耗。

每一個(gè) TaskManager 會(huì)擁有一個(gè)或多個(gè)的 task slot,每個(gè) slot 都能跑由多個(gè)連續(xù) task 組成的一個(gè) pipeline,比如 MapFunction 的第n個(gè)并行實(shí)例和 ReduceFunction 的第n個(gè)并行實(shí)例可以組成一個(gè) pipeline。

如上文所述的 WordCount 例子,5個(gè)Task可能會(huì)在TaskManager的slots中如下圖分布,2個(gè)TaskManager,每個(gè)有3個(gè)slot:

image

SlotSharingGroup 與 CoLocationGroup

默認(rèn)情況下,F(xiàn)link 允許subtasks共享slot,條件是它們都來(lái)自同一個(gè)Job的不同task的subtask。結(jié)果可能一個(gè)slot持有該job的整個(gè)pipeline。允許slot共享有以下兩點(diǎn)好處:

  1. Flink 集群所需的task slots數(shù)與job中最高的并行度一致。也就是說(shuō)我們不需要再去計(jì)算一個(gè)程序總共會(huì)起多少個(gè)task了。
  2. 更容易獲得更充分的資源利用。如果沒(méi)有slot共享,那么非密集型操作source/flatmap就會(huì)占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,將基線的2個(gè)并行度增加到6個(gè),能充分利用slot資源,同時(shí)保證每個(gè)TaskManager能平均分配到重的subtasks。
image

我們將 WordCount 的并行度從之前的2個(gè)增加到6個(gè)(Source并行度仍為1),并開(kāi)啟slot共享(所有operator都在default共享組),將得到如上圖所示的slot分布圖。首先,我們不用去計(jì)算這個(gè)job會(huì)其多少個(gè)task,總之該任務(wù)最終會(huì)占用6個(gè)slots(最高并行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個(gè) TaskManager。

SlotSharingGroup是Flink中用來(lái)實(shí)現(xiàn)slot共享的類,它盡可能地讓subtasks共享一個(gè)slot。相應(yīng)的,還有一個(gè) CoLocationGroup 類用來(lái)強(qiáng)制將 subtasks 放到同一個(gè) slot 中。CoLocationGroup主要用于迭代流中,用來(lái)保證迭代頭與迭代尾的第i個(gè)subtask能被調(diào)度到同一個(gè)TaskManager上。這里我們不會(huì)詳細(xì)討論CoLocationGroup的實(shí)現(xiàn)細(xì)節(jié)。

怎么判斷operator屬于哪個(gè) slot 共享組呢?默認(rèn)情況下,所有的operator都屬于默認(rèn)的共享組default,也就是說(shuō)默認(rèn)情況下所有的operator都是可以共享一個(gè)slot的。而當(dāng)所有input operators具有相同的slot共享組時(shí),該operator會(huì)繼承這個(gè)共享組。最后,為了防止不合理的共享,用戶也能通過(guò)API來(lái)強(qiáng)制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強(qiáng)制指定了filter的slot共享組為group1。

原理與實(shí)現(xiàn)

那么多個(gè)tasks(或者說(shuō)operators)是如何共享slot的呢?

我們先來(lái)看一下用來(lái)定義計(jì)算資源的slot的類圖:

image

抽象類Slot定義了該槽位屬于哪個(gè)TaskManager(instance)的第幾個(gè)槽位(slotNumber),屬于哪個(gè)Job(jobID)等信息。最簡(jiǎn)單的情況下,一個(gè)slot只持有一個(gè)task,也就是SimpleSlot的實(shí)現(xiàn)。復(fù)雜點(diǎn)的情況,一個(gè)slot能共享給多個(gè)task使用,也就是SharedSlot的實(shí)現(xiàn)。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一個(gè)SharedSlot能定義出一棵slots樹(shù)。

接下來(lái)我們來(lái)看看 Flink 為subtask分配slot的過(guò)程。關(guān)于Flink調(diào)度,有兩個(gè)非常重要的原則我們必須知道:(1)同一個(gè)operator的各個(gè)subtask是不能呆在同一個(gè)SharedSlot中的,例如FlatMap[1]FlatMap[2]是不能在同一個(gè)SharedSlot中的。(2)Flink是按照拓?fù)漤樞驈腟ource一個(gè)個(gè)調(diào)度到Sink的。例如WordCount(Source并行度為1,其他并行度為2),那么調(diào)度的順序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。假設(shè)現(xiàn)在有2個(gè)TaskManager,每個(gè)只有1個(gè)slot(為簡(jiǎn)化問(wèn)題),那么分配slot的過(guò)程如圖所示:

image

注:圖中 SharedSlot 與 SimpleSlot 后帶的括號(hào)中的數(shù)字代表槽位號(hào)(slotNumber)

  1. Source分配slot。首先,我們從TaskManager1中分配出一個(gè)SharedSlot。并從SharedSlot中為Source分配出一個(gè)SimpleSlot。如上圖中的①和②。
  2. FlatMap[1]分配slot。目前已經(jīng)有一個(gè)SharedSlot,則從該SharedSlot中分配出一個(gè)SimpleSlot用來(lái)部署FlatMap[1]。如上圖中的③。
  3. FlatMap[2]分配slot。由于TaskManager1的SharedSlot中已經(jīng)有同operator的FlatMap[1]了,我們只能分配到其他SharedSlot中去。從TaskManager2中分配出一個(gè)SharedSlot,并從該SharedSlot中為FlatMap[2]分配出一個(gè)SimpleSlot。如上圖的④和⑤。
  4. Key->Sink[1]分配slot。目前兩個(gè)SharedSlot都符合條件,從TaskManager1的SharedSlot中分配出一個(gè)SimpleSlot用來(lái)部署Key->Sink[1]。如上圖中的⑥。
  5. Key->Sink[2]分配slot。TaskManager1的SharedSlot中已經(jīng)有同operator的Key->Sink[1]了,則只能選擇另一個(gè)SharedSlot中分配出一個(gè)SimpleSlot用來(lái)部署Key->Sink[2]。如上圖中的⑦。

最后Source、FlatMap[1]Key->Sink[1]這些subtask都會(huì)部署到TaskManager1的唯一一個(gè)slot中,并啟動(dòng)對(duì)應(yīng)的線程。FlatMap[2]Key->Sink[2]這些subtask都會(huì)被部署到TaskManager2的唯一一個(gè)slot中,并啟動(dòng)對(duì)應(yīng)的線程。從而實(shí)現(xiàn)了slot共享。

總結(jié)

本文主要介紹了Flink中計(jì)算資源的相關(guān)概念以及原理實(shí)現(xiàn)。最核心的是 Task Slot,每個(gè)slot能運(yùn)行一個(gè)或多個(gè)task。為了拓?fù)涓咝У剡\(yùn)行,F(xiàn)link提出了Chaining,盡可能地將operators chain在一起作為一個(gè)task來(lái)處理。為了資源更充分的利用,F(xiàn)link又提出了SlotSharingGroup,盡可能地讓多個(gè)task共享一個(gè)slot。

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容