我們知道Flink提供了容錯機制,能夠在應(yīng)用失敗的時候重新恢復(fù)任務(wù)。這個機制主要就是通過持續(xù)產(chǎn)生快照的方式實現(xiàn)的。Flink快照主要包括兩部分?jǐn)?shù)據(jù)一部分是數(shù)據(jù)流的數(shù)據(jù),另一部分是operator的狀態(tài)數(shù)據(jù)。對應(yīng)的快照機制的實現(xiàn)有主要兩個部分組成,一個是屏障(Barrier),一個是狀態(tài)(State)。因為Flink這里處理的數(shù)據(jù)流,數(shù)據(jù)在多個operator的DAG拓?fù)渲谐掷m(xù)流動,要想實現(xiàn)某個時刻快照可以用于系統(tǒng)故障恢復(fù),必須保證這個快照,完全能夠確定某一個時刻狀態(tài),這個時刻之前的數(shù)據(jù)全部處理完,之后的數(shù)據(jù)一個都沒有處理。這里就引入了屏障這個概念。這里我們主要介紹一下屏障實現(xiàn)。
屏障 Barrier
Flink 分布式快照里面的一個核心的元素就是流屏障(stream barrier)。這些屏障會被插入(injected)到數(shù)據(jù)流中,并作為數(shù)據(jù)流的一部分隨著數(shù)據(jù)流動。屏障并不會持有任何數(shù)據(jù),而是和數(shù)據(jù)一樣線性的流動。可以看到屏障將數(shù)據(jù)流分成了兩部分?jǐn)?shù)據(jù)(實際上是多個連續(xù)的部分),一部分是當(dāng)前快照的數(shù)據(jù),一部分下一個快照的數(shù)據(jù)。每個屏障會帶有它的快照ID。這個快照的數(shù)據(jù)都在這個屏障的前面。從圖上看,數(shù)據(jù)是從左向右移動(右邊的先進入系統(tǒng)),那么快照n包含的數(shù)據(jù)就是右側(cè)到下一個屏障(n-1)截止的數(shù)據(jù),圖中兩個灰色豎線之間的部分,也就是part of checkpoint n。另外屏障并不會打斷數(shù)的流動,因而屏障是非常輕量的。在同一個時刻,多個快照可以在同一個數(shù)據(jù)流中,這也就是說多個快照可以同時產(chǎn)生。

如果是多個輸入數(shù)據(jù)流,多個數(shù)據(jù)流的屏障會被同時插入到數(shù)據(jù)流中??煺課的屏障被插入到數(shù)據(jù)流的點(我們稱之為Sn),就是數(shù)據(jù)流中一直到的某個位置(包含了當(dāng)前時刻之前時間的所有數(shù)據(jù)),也就是包含的這部分?jǐn)?shù)據(jù)的快照。舉例來說,在Kafka中,這個位置就是這個分區(qū)的最后一條記錄的offset。這個位置Sn就會上報給 checkpoint 的協(xié)調(diào)器(Flink的 JobManager)。
然后屏障開始向下流動。當(dāng)一個中間的operator收到它的所有輸入源的快照n屏障后,它就會向它所有的輸出流發(fā)射一個快照n的屏障,一旦一個sink的operator收到所有輸入數(shù)據(jù)流的屏障n,它就會向checkpoint的協(xié)調(diào)器發(fā)送快照n確認(rèn)。當(dāng)所有的sink都確認(rèn)了快照n,系統(tǒng)才認(rèn)為當(dāng)前快照的數(shù)據(jù)已經(jīng)完成。
一旦快照n已經(jīng)執(zhí)行完成,任務(wù)則不會再請求Sn之前的數(shù)據(jù),因為此刻,這些數(shù)據(jù)都已經(jīng)完全通過了數(shù)據(jù)流拓?fù)鋱D。
對齊機制
接收不止一個數(shù)據(jù)輸入的operator需要基于屏障對齊輸入數(shù)據(jù)流。詳述如下:
整個流程圖如下所示

然后我們挨個看一下:
-
當(dāng)operator接收到快照的屏障n后并不能直接處理之后的數(shù)據(jù),而是需要等待其他輸入快照的屏障n。否則話,將會將快照n的數(shù)據(jù)和快照n+1的數(shù)據(jù)混在一起。圖中第一個所示,operator即將要收到數(shù)據(jù)流1(上面這個我們當(dāng)成數(shù)據(jù)流1(6,5,4,3,2,1),下面的當(dāng)成數(shù)據(jù)流2好了)的屏障n,1,2,3在屏障n之后到達operator,這個時候如果數(shù)據(jù)流1的繼續(xù)處理,那么operator中就會包含n屏障之后的數(shù)據(jù)(1,2,3),但是operator中此刻在接收和處理數(shù)據(jù)流2,數(shù)據(jù)(a,b,c)就會和數(shù)據(jù)流1中的(1,2,3)混合。
image.png
- 快照n的數(shù)據(jù)流會被暫時放到一邊。從這些數(shù)據(jù)流中獲取到的數(shù)據(jù)不會被處理,而是存儲到一個緩沖中。圖中第一個所示,因為數(shù)據(jù)流2的屏障n還沒到,所以operator持續(xù)接收1,2,3然而并不做任何處理。但是需要將1,2,3存入到buffer中。此時第二個數(shù)據(jù)流接到a,b,則直接發(fā)送,接到c發(fā)送c。

- 一旦最后一個數(shù)據(jù)流收到了快照n,opertor就會將發(fā)出所有阻塞的數(shù)據(jù),并發(fā)出自己的屏障。如圖中第三個所示,operator最后收到了另一個數(shù)據(jù)流的屏障n,然后再發(fā)出a,b,c(圖中operator中的c,b,a)以后,發(fā)出自己的屏障,這個時候buffer中又增加了一個4,變成(4,3,2,1)。

- 之后operator會重新開始處理所有的輸入數(shù)據(jù)流,先處理buffer中的數(shù)據(jù),處理完之后再處理輸入數(shù)據(jù)流的數(shù)據(jù)。如圖第四個所示,先將buffer中的1,2,3,4先處理完,在接收并處理這兩個數(shù)據(jù)源的數(shù)據(jù)。

··=-·=···············
