Flink+Kafka 0.11端到端精確一次處理語義實(shí)現(xiàn)

2017年12月Apache Flink社區(qū)發(fā)布了1.4版本。該版本正式引入了一個(gè)里程碑式的功能:兩階段提交Sink,即TwoPhaseCommitSinkFunction。該SinkFunction提取并封裝了兩階段提交協(xié)議中的公共邏輯,自此Flink搭配特定source和sink(特別是0.11版本Kafka)搭建精確一次處理語義( exactly-once semantics)應(yīng)用成為了可能。作為一個(gè)抽象類TwoPhaseCommitSinkFunction提供了一個(gè)抽象層供用戶自行實(shí)現(xiàn)特定方法來支持 exactly-once semantics。

用戶可以閱讀Java文檔來學(xué)習(xí)如何使用TwoPhaseCommitSinkFunction,或者參考Flink官網(wǎng)文檔來了解FlinkKafkaProducer011是如何支持 exactly-once semantics的,因?yàn)楹笳哒腔赥woPhaseCommitSinkFunction實(shí)現(xiàn)的。

本文將深入討論一下Flink 1.4這個(gè)新特性以及其背后的設(shè)計(jì)思想。在本文中我們將:

1. 描述Flink應(yīng)用中的checkpoint如何幫助確保exactly-once semantics

2. 展示Flink如何通過兩階段提交協(xié)議與source和sink交互以實(shí)現(xiàn)端到端的 exactly-once semantics交付保障

3. 給出一個(gè)使用TwoPhaseCommitSinkFunction實(shí)現(xiàn) exactly-once semantics的文件Sink實(shí)例

Flink應(yīng)用的僅一次處理

當(dāng)談及僅一次處理時(shí),我們真正想表達(dá)的是每條輸入消息只會(huì)影響最終結(jié)果一次!【譯者:影響應(yīng)用狀態(tài)一次,而非被處理一次】即使出現(xiàn)機(jī)器故障或軟件崩潰,F(xiàn)link也要保證不會(huì)有數(shù)據(jù)被重復(fù)處理或壓根就沒有被處理從而影響狀態(tài)。長久以來Flink一直宣稱支持 exactly-once semantics是指在一個(gè)Flink應(yīng)用內(nèi)部。在過去的幾年間,F(xiàn)link開發(fā)出了checkpointing機(jī)制,而它則是提供這種應(yīng)用內(nèi)僅一次處理的基石。

在繼續(xù)之前我們簡要總結(jié)一下checkpointing算法,這對于我們了解本文內(nèi)容至關(guān)重要。簡單來說,一個(gè)Flink checkpoint是一個(gè)一致性快照,它包含:

1. 應(yīng)用的當(dāng)前狀態(tài)

2. 消費(fèi)的輸入流位置

Flink會(huì)定期地產(chǎn)生checkpoint并且把這些checkpoint寫入到一個(gè)持久化存儲上,比如S3或HDFS。這個(gè)寫入過程是異步的,這就意味著Flink即使在checkpointing過程中也是不斷處理輸入數(shù)據(jù)的。

如果出現(xiàn)機(jī)器或軟件故障,F(xiàn)link應(yīng)用重啟后會(huì)從最新成功完成的checkpoint中恢復(fù)——重置應(yīng)用狀態(tài)并回滾狀態(tài)到checkpoint中輸入流的正確位置,之后再開始執(zhí)行數(shù)據(jù)處理,就好像該故障或崩潰從未發(fā)生過一般。

在Flink 1.4版本之前,僅一次處理只限于Flink應(yīng)用內(nèi)。Flink處理完數(shù)據(jù)后需要將結(jié)果發(fā)送到外部系統(tǒng),這個(gè)過程中Flink并不保證僅一次處理。但是Flink應(yīng)用通常都需要接入很多下游子系統(tǒng),而開發(fā)人員很希望能在多個(gè)系統(tǒng)上維持僅一次處理語義,即維持端到端的僅一次處理語義。

為了提供端到端的僅一次處理語義,僅一次處理語義必須也要應(yīng)用于Flink寫入數(shù)據(jù)的外部系統(tǒng)——故這些外部系統(tǒng)必須提供一種手段允許提交或回滾這些寫入操作,同時(shí)還要保證與Flink checkpoint能夠協(xié)調(diào)使用。

在分布式系統(tǒng)中協(xié)調(diào)提交和回滾的一個(gè)常見方法就是使用兩階段提交協(xié)議。下一章節(jié)中我們將討論下Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協(xié)議來實(shí)現(xiàn)exactly-once semantics的。

Flink實(shí)現(xiàn)僅一次語義的應(yīng)用

下面將給出一個(gè)實(shí)例來幫助了解兩階段提交協(xié)議以及Flink如何使用它來實(shí)現(xiàn)僅一次處理語義。該實(shí)例從Kafka中讀取數(shù)據(jù),經(jīng)處理之后再寫回到Kafka。Kafka是非常受歡迎的消息隊(duì)列,而Kafka 0.11.0.0版本正式發(fā)布了對于事務(wù)的支持——這是與Kafka交互的Flink應(yīng)用要實(shí)現(xiàn)端到端僅一次語義的必要條件。

當(dāng)然,F(xiàn)link支持這種僅一次處理語義并不只是限于與Kafka的結(jié)合,可以使用任何source/sink,只要它們提供了必要的協(xié)調(diào)機(jī)制。舉個(gè)例子,Pravega是Dell/EMC的一個(gè)開源流式存儲系統(tǒng),F(xiàn)link搭配它也可以實(shí)現(xiàn)端到端的exactly-once semantics。

image.jpeg

本例中的Flink應(yīng)用包含以下組件,如上圖所示:

1. 一個(gè)source,從Kafka中讀取數(shù)據(jù)(即KafkaConsumer)

2. 一個(gè)時(shí)間窗口化的聚會(huì)操作

3. 一個(gè)sink,將結(jié)果寫回到Kafka(即KafkaProducer)

若要sink支持 exactly-once semantics,它必須以事務(wù)的方式寫數(shù)據(jù)到Kafka,這樣當(dāng)提交事務(wù)時(shí)兩次checkpoint間的所有寫入操作當(dāng)作為一個(gè)事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時(shí)這些寫入操作能夠被回滾。

當(dāng)然了,在一個(gè)分布式且含有多個(gè)并發(fā)執(zhí)行sink的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因?yàn)樗薪M件都必須對這些提交或回滾達(dá)成共識,這樣才能保證得到一個(gè)一致性的結(jié)果。Flink使用兩階段提交協(xié)議以及預(yù)提交(pre-commit)階段來解決這個(gè)問題。

Flink checkpointing開始時(shí)便進(jìn)入到pre-commit階段。具體來說,一旦checkpoint開始,F(xiàn)link的JobManager向輸入流中寫入一個(gè)checkpoint barrier將流中所有消息分割成屬于本次checkpoint的消息以及屬于下次checkpoint的。barrier也會(huì)在操作算子間流轉(zhuǎn)。對于每個(gè)operator來說,該barrier會(huì)觸發(fā)operator狀態(tài)后端為該operator狀態(tài)打快照。

image.jpeg

眾所周知,flink kafka source保存Kafka消費(fèi)offset,一旦完成位移保存,它會(huì)將checkpoint barrier傳給下一個(gè)operator。

這個(gè)方法對于opeartor只有內(nèi)部狀態(tài)的場景是可行的。所謂的內(nèi)部狀態(tài)就是完全由Flink狀態(tài)保存并管理的——本例中的第二個(gè)opeartor:時(shí)間窗口上保存的求和數(shù)據(jù)就是這樣的例子。當(dāng)只有內(nèi)部狀態(tài)時(shí),pre-commit階段無需執(zhí)行額外的操作,僅僅是寫入一些已定義的狀態(tài)變量即可。當(dāng)chckpoint成功時(shí)Flink負(fù)責(zé)提交這些寫入,否則就終止取消掉它們。

image.jpeg

當(dāng)時(shí),一旦operator包含外部狀態(tài),事情就不一樣了。我們不能像處理內(nèi)部狀態(tài)一樣處理這些外部狀態(tài)。因?yàn)橥獠繝顟B(tài)通常都涉及到與外部系統(tǒng)的交互。如果是這樣的話,外部系統(tǒng)必須要支持可與兩階段提交協(xié)議捆綁使用的事務(wù)才能確保實(shí)現(xiàn)整體的exactly-once semantics。

顯然本例中的data sink是有外部狀態(tài)的,因?yàn)樗枰獙懭霐?shù)據(jù)到Kafka。此時(shí)的pre-commit階段下data sink在保存狀態(tài)到狀態(tài)存儲的同時(shí)還必須預(yù)提交它的外部事務(wù),如下圖所示:

image.jpeg

當(dāng)checkpoint barrier在所有operator都傳遞了一遍且對應(yīng)的快照也都成功完成之后,pre-commit階段才算完成。該過程中所有創(chuàng)建的快照都被視為是checkpoint的一部分。其實(shí),checkpoint就是整個(gè)應(yīng)用的全局狀態(tài),當(dāng)然也包含pre-commit階段提交的外部狀態(tài)。當(dāng)出現(xiàn)崩潰時(shí),我們可以回滾狀態(tài)到最新已成功完成快照時(shí)的時(shí)間點(diǎn)。

下一步就是通知所有的operator,告訴它們checkpoint已成功完成。這便是兩階段提交協(xié)議的第二個(gè)階段:commit階段。該階段中JobManager會(huì)為應(yīng)用中每個(gè)operator發(fā)起checkpoint已完成的回調(diào)邏輯。

本例中的data source和窗口操作無外部狀態(tài),因此在該階段,這兩個(gè)opeartor無需執(zhí)行任何邏輯,但是data sink是有外部狀態(tài)的,因此此時(shí)我們必須提交外部事務(wù),如下圖所示:

image.jpeg

匯總以上所有信息,總結(jié)一下:

1. 一旦所有operator完成各自的pre-commit,它們會(huì)發(fā)起一個(gè)commit操作

2. 倘若有一個(gè)pre-commit失敗,所有其他的pre-commit必須被終止,并且Flink會(huì)回滾到最近成功完成decheckpoint

3. 一旦pre-commit完成,必須要確保commit也要成功——operator和外部系統(tǒng)都需要對此進(jìn)行保證。倘若commit失敗(比如網(wǎng)絡(luò)故障等),F(xiàn)link應(yīng)用就會(huì)崩潰,然后根據(jù)用戶重啟策略執(zhí)行重啟邏輯,之后再次重試commit。這個(gè)過程至關(guān)重要,因?yàn)樘热鬰ommit無法順利執(zhí)行,就可能出現(xiàn)數(shù)據(jù)丟失的情況

因此,所有opeartor必須對checkpoint最終結(jié)果達(dá)成共識:即所有operator都必須認(rèn)定數(shù)據(jù)提交要么成功執(zhí)行,要么被終止然后回滾。

Flink中實(shí)現(xiàn)兩階段提交

這種operator的管理有些復(fù)雜,這也是為什么Flink提取了公共邏輯并封裝進(jìn)TwoPhaseCommitSinkFunction抽象類的原因。

下面討論一下如何擴(kuò)展TwoPhaseCommitSinkFunction類來實(shí)現(xiàn)一個(gè)簡單的基于文件的sink。若要實(shí)現(xiàn)支持exactly-once semantics的文件sink,我們需要實(shí)現(xiàn)以下4個(gè)方法:

1. beginTransaction:開啟一個(gè)事務(wù),在臨時(shí)目錄下創(chuàng)建一個(gè)臨時(shí)文件,之后,寫入數(shù)據(jù)到該文件中

2. preCommit:在pre-commit階段,flush緩存數(shù)據(jù)塊到磁盤,然后關(guān)閉該文件,確保再不寫入新數(shù)據(jù)到該文件。同時(shí)開啟一個(gè)新事務(wù)執(zhí)行屬于下一個(gè)checkpoint的寫入操作

3. commit:在commit階段,我們以原子性的方式將上一階段的文件寫入真正的文件目錄下。注意:這會(huì)增加輸出數(shù)據(jù)可見性的延時(shí)。通俗說就是用戶想要看到最終數(shù)據(jù)需要等會(huì),不是實(shí)時(shí)的。

4. abort:一旦終止事務(wù),我們離自己刪除臨時(shí)文件

當(dāng)出現(xiàn)崩潰時(shí),F(xiàn)link會(huì)恢復(fù)最新已完成快照中應(yīng)用狀態(tài)。需要注意的是在某些極偶然的場景下,pre-commit階段已成功完成而commit尚未開始(也就是operator尚未來得及被告知要開啟commit),此時(shí)倘若發(fā)生崩潰Flink會(huì)將opeartor狀態(tài)恢復(fù)到已完成pre-commit但尚未commit的狀態(tài)。

在一個(gè)checkpoint狀態(tài)中,對于已完成pre-commit的事務(wù)狀態(tài),我們必須保存足夠多的信息,這樣才能確保在重啟后要么重新發(fā)起commit亦或是終止掉事務(wù)。本例中這部分信息就是臨時(shí)文件所在的路徑以及目標(biāo)目錄。

TwoPhaseCommitSinkFunction考慮了這種場景,因此當(dāng)應(yīng)用從checkpoint恢復(fù)之后TwoPhaseCommitSinkFunction總是會(huì)發(fā)起一個(gè)搶占式的commit。這種commit必須是冪等性的,雖然大部分情況下這都不是問題。本例中對應(yīng)的這種場景就是:臨時(shí)文件不在臨時(shí)目錄下,而是已經(jīng)被移動(dòng)到目標(biāo)目錄下。

總結(jié)

本文的一些關(guān)鍵要點(diǎn):

  1. Flinkcheckpointing機(jī)制是實(shí)現(xiàn)兩階段提交協(xié)議以及提供僅一次語義的基石

  2. 與其他系統(tǒng)持久化傳輸中的數(shù)據(jù)不同,F(xiàn)link不需要將計(jì)算的每個(gè)階段寫入到磁盤中

  3. Flink新的TwoPhaseCommitSinkFunction封裝兩階段提交協(xié)議的公共邏輯使之搭配支持事務(wù)的外部系統(tǒng)來共同構(gòu)建僅一次語義應(yīng)用成為可能

  4. 自1.4版本起,F(xiàn)link + Pravega和Kafka 0.11 producer開始支持僅一次語義

  5. Flink Kafka 0.11 producer基于TwoPhaseCommitSinkFunction實(shí)現(xiàn),比起至少一次語義的producer而言開銷并未顯著增加

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容