關(guān)于Flink的exactly-once語義

原文地址:https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets

一、關(guān)于Checkpoints

Checkpoints是Flink從故障中恢復(fù)的一種內(nèi)部機(jī)制。
Checkpoints是Flink應(yīng)用程序狀態(tài)的一致性副本,包括輸入的讀取位置點(diǎn)。
如果出現(xiàn)故障,F(xiàn)link通過從檢查點(diǎn)加載應(yīng)用程序狀態(tài)并從恢復(fù)的讀取位置點(diǎn)繼續(xù)執(zhí)行,就好像什么都沒有發(fā)生一樣,從而恢復(fù)應(yīng)用程序。
Flink的Checkpoints是基于Chandy-Lamport算法的分布式一致性快照

Checkpoints使Flink具有容錯(cuò)性,并確保流式應(yīng)用程序的語義在發(fā)生故障時(shí)得到保留。檢查點(diǎn)按應(yīng)用程序可以配置的定期觸發(fā)。

Flink中的Kafka Consumers與Flink的Checkpoints機(jī)制集成為一個(gè)有狀態(tài)operator,其狀態(tài)是所有Kafka分區(qū)中的讀取偏移量 offsets。
當(dāng)一個(gè)檢查點(diǎn)被觸發(fā)時(shí),每個(gè)分區(qū)的偏移量都存儲(chǔ)在檢查點(diǎn)中。
Flink的檢查點(diǎn)機(jī)制確保所有操作任務(wù)的存儲(chǔ)狀態(tài)是一致的,即它們基于相同的輸入數(shù)據(jù)。當(dāng)所有操作任務(wù)成功存儲(chǔ)其狀態(tài)時(shí),檢查點(diǎn)即完成。
因此,當(dāng)從潛在的系統(tǒng)故障中重新啟動(dòng)恢復(fù)時(shí),系統(tǒng)提供exactly-once狀態(tài)更新保證。

二、Flink 中的 Kafka Consumers offsets是如何做檢查點(diǎn)的

栗子
(數(shù)據(jù)被存在了 Flink 的 JobMaster 中,在 POC 或生產(chǎn)用例下,這些數(shù)據(jù)最好是能存到一個(gè)外部文件系統(tǒng)(如HDFS或S3)中)

Step 1:

如下所示,從一個(gè) Kafka topic讀取,有兩個(gè)partition,每個(gè)partition都含有 “A”, “B”, “C”, ”D”, “E” 5條消息。
我們將兩個(gè)partition的offset都設(shè)置為0.


step1.png
Step 2:

Kafka comsumer開始從 partition 0 讀取消息。
消息“A”正在被處理,
第一個(gè) consumer 的 offset 變成了1。


step2.png
Step 3:

消息“A”到達(dá)了 Flink Map Task。
兩個(gè) consumer 都開始讀取下一條消息(partition 0 讀取“B”,partition 1 讀取“A”)。
各自將 offset 更新成 2 和 1 。
同時(shí),F(xiàn)link 的 JobMaster 開始在 source 觸發(fā)了一個(gè)檢查點(diǎn)。


step 3.png
Step 4:

接下來,由于 source 觸發(fā)了檢查點(diǎn),Kafka consumer tasks創(chuàng)建了它們狀態(tài)的第一個(gè)快照(”offset = 2, 1”),并將快照存到了 Flink 的 JobMaster 中。
Source 在消息“B”和“A”從partition 0 和 1 發(fā)出之后,發(fā)出一個(gè) checkpoint barrier。
Checkopint barrier 用于對(duì)齊所有 operator task 的檢查點(diǎn),保證了整個(gè)檢查點(diǎn)的一致性。
消息“A”到達(dá)了 Flink Map Task,而上面的 consumer 繼續(xù)讀取下一條消息(消息“C”)。


step4.png
Step 5:

Flink Map Task 從source和檢查點(diǎn)接收 checkpoint barrier 后,并將其狀態(tài)發(fā)送給 JobMaster。
同時(shí),consumer 會(huì)繼續(xù)從 Kafka 讀取更多的事件。

step5.png
Step 6:

Flink Map Task 完成了它自己狀態(tài)的快照流程后,就會(huì)和Flink JobMaster進(jìn)行通信, 匯報(bào)它已經(jīng)完成了這個(gè) checkpoint。
當(dāng)所有的 task 都確認(rèn)其狀態(tài) checkpoint 后,JobMaster 就會(huì)將這個(gè) checkpoint 標(biāo)記為成功。

從此刻開始,這個(gè) checkpoint 就可以用于故障恢復(fù)了。
值得一提的是,F(xiàn)link 并不依賴 Kafka offset 從系統(tǒng)故障中恢復(fù)。

step6.png

三、故障恢復(fù)

在發(fā)生故障時(shí)(比如,某個(gè) worker 掛了),所有的 operator task 會(huì)被重啟,而他們的狀態(tài)會(huì)被重置到最近一次完成的checkpoint。
Kafka source 分別從 offset 2 和 1 重新開始讀取消息(因?yàn)檫@是完成的 checkpoint 中存的 offset)。
當(dāng)作業(yè)重啟后,我們可以期待正常的系統(tǒng)操作,就好像之前沒有發(fā)生故障一樣。如下圖所示:


故障恢復(fù).png

官網(wǎng)中關(guān)于exactly-once的解釋:

With Flink’s checkpointing enabled, the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions) can provide exactly-once delivery guarantees.
Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate semantic parameter to the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions):
啟用flink的檢查點(diǎn)之外,可以通過將適當(dāng)?shù)恼Z義參數(shù)傳遞給flinkkafkaproducer011,有三種不同的操作模式:

  • Semantic.NONE:
  • Semantic.AT_LEAST_ONCE (default setting)
  • Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.
    使用Kafka事務(wù)提供一次語義。無論何時(shí)使用事務(wù)寫入Kafka,都不要忘記為任何使用Kafka記錄的應(yīng)用程序設(shè)置所需的isolation.level(read_committed或read_uncommitted-后者是默認(rèn)值)。

Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). Having this in mind, please configure your transaction timeout appropriately to your expected down times.
Semantic.EXACTLY_ONCE模式依賴于在從所述檢查點(diǎn)恢復(fù)后,提交在接受檢查點(diǎn)之前啟動(dòng)的事務(wù)的能力。如果Flink崩潰和完成重新啟動(dòng)之間的時(shí)間大于Kafka的事務(wù)超時(shí),則將丟失數(shù)據(jù)(Kafka將自動(dòng)中止超過超時(shí)時(shí)間的事務(wù))??紤]到這一點(diǎn),根據(jù)預(yù)期的停機(jī)時(shí)間適當(dāng)配置事務(wù)超時(shí)。

Kafka Consumers and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
啟用flink的檢查點(diǎn)后,flink-kafka Consumers將使用某個(gè)topic的記錄,并以一致的方式定期檢查其所有kafka偏移量以及其他操作的狀態(tài)。
在作業(yè)失敗的情況下,F(xiàn)link將把流媒體程序恢復(fù)到最新檢查點(diǎn)的狀態(tài),并從存儲(chǔ)在檢查點(diǎn)中的偏移量開始重新使用來自Kafka的記錄。
因此,繪制檢查點(diǎn)的間隔定義了程序在失敗時(shí)最多需要返回多少。
要使用容錯(cuò)kafka Consumers,需要在執(zhí)行環(huán)境中啟用拓?fù)錂z查點(diǎn):

val env = >StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 >msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
還要注意,只有在有足夠的processing slots來重新啟動(dòng)topology時(shí),F(xiàn)link才能重新啟動(dòng)topology。因此,如果topology由于TaskManager丟失而失敗,那么之后必須有足夠的可用slots。 Flink on YARN支持自動(dòng)重啟丟失的YARN。
如果未啟用檢查點(diǎn),Kafka使用者將定期將偏移提交給Zookeeper。

// checkpoint常用設(shè)置參數(shù)
env.enableCheckpointing(4000)
env.getCheckpointConfig.setCheckpointingMode(Checkpoin>tingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(10000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容