1.前言
在上一篇文章當(dāng)中,也算是比較詳細(xì)且通俗的聊了聊Flink是如何通過checkpoint機(jī)制來完成數(shù)據(jù)精準(zhǔn)一次性的實(shí)現(xiàn)的。并且也在上一章的結(jié)尾表示,要在接下來聊一聊Flink與它的鐵哥們Kafaka之間,是如何實(shí)現(xiàn)數(shù)據(jù)的精準(zhǔn)一次性消費(fèi)的。
本次的聊法,還是要通過以kafka(source)->Flink,Flink(source)->Kafka來分別展開討論。
2.輸入端kafka與Flink之間
kafka是一個(gè)具有數(shù)據(jù)保存、數(shù)據(jù)回放能力的消息隊(duì)列,說白了就是kafka中的每一個(gè)數(shù)據(jù),都有一個(gè)專門的標(biāo)記作為標(biāo)識(shí)。而在Flink消費(fèi)kafka傳入的數(shù)據(jù)的時(shí)候,source任務(wù)就能夠?qū)⑦@個(gè)偏移量以算子狀態(tài)的角色進(jìn)行保存,寫入到設(shè)定好的檢查點(diǎn)中。這樣一旦發(fā)生故障,F(xiàn)link中的FlinkKafkaProduce連接器就i能夠按照自己保存的偏移量,自己去Kafka中重新拉取數(shù)據(jù),也正是通過這種方式,就能夠確保Kafka到Flink之間的精準(zhǔn)一次性。
3.Flink與輸出端之間
在上一篇文章當(dāng)中,已經(jīng)表明了,如果想要讓輸出端能夠進(jìn)行精準(zhǔn)一次性消費(fèi),就需要使用到冪等性或者是事務(wù)。而事務(wù)中的兩階段提交是所有方案里面最好的實(shí)現(xiàn)。
其實(shí)Flink到Kafak之間也是采用了這種方式,具體的可以看一下ctrl進(jìn)到FlinkKafkaProduce連接器內(nèi)部去看一看:
@PublicEvolving
public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<
IN,
FlinkKafkaProducer.KafkaTransactionState,
FlinkKafkaProducer.KafkaTransactionContext> {
}
這也就表明了,當(dāng)數(shù)據(jù)通過Flink發(fā)送給sink端Kafka的時(shí)候,是經(jīng)歷了兩個(gè)階段的處理的。第一階段就是Flink向Kafka中插入數(shù)據(jù),進(jìn)入預(yù)提交階段。當(dāng)JobManager發(fā)送的Ckeckpoint保存成功信號(hào)過來之后,才會(huì)提交事務(wù)進(jìn)行正式的數(shù)據(jù)發(fā)送,也就是讓原來不可用的數(shù)據(jù)可以被使用了。
4.具體步驟
這個(gè)實(shí)現(xiàn)過程到目前階段就很清晰了,它的主體流程無非就是在開啟檢查點(diǎn)之后,由JobManager向各個(gè)階段的處理邏輯發(fā)送有關(guān)于檢查點(diǎn)的barrier。所有的計(jì)算任務(wù)接收到之后,就會(huì)根據(jù)自己當(dāng)前的狀態(tài)做一個(gè)檢查點(diǎn)保存。而當(dāng)這個(gè)barrier來到sink任務(wù)的時(shí)候,sink就會(huì)開啟一個(gè)事務(wù),然后通過這個(gè)事務(wù)向外預(yù)寫數(shù)據(jù)。直到Jobmanager來告訴它這一次的檢查點(diǎn)已經(jīng)保存完成了,sink就會(huì)進(jìn)行第二次提交,數(shù)據(jù)也就算是成功寫出了。
5.實(shí)現(xiàn)精準(zhǔn)一次性的前提
1.必須要保證檢查點(diǎn)被打開了,如果檢查點(diǎn)沒有打開,那么之前說的一切話都是空談。因?yàn)镕link默認(rèn)檢查點(diǎn)是關(guān)著的。
2.在FlinkKafakProducer連接器的構(gòu)造函數(shù)中要傳入?yún)?shù),這個(gè)參數(shù)就是用來保證狀態(tài)一致性的。就是在構(gòu)造函數(shù)的最后一個(gè)參數(shù)輸入如下:
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
3.配置Kafka讀取數(shù)據(jù)的隔離級(jí)別
在kafka中有個(gè)配置,這個(gè)配置用來管理Kafka讀取數(shù)據(jù)的級(jí)別。而這個(gè)配置默認(rèn)是能夠讀取預(yù)提交階段的數(shù)據(jù)的,所以如果你沒改這個(gè)配置,那兩階段提交的第一階段就是白費(fèi)了。所以需要改一下這個(gè)配置,來更換一下隔離級(jí)別:
isolation.level=read_committed
4.事務(wù)超時(shí)時(shí)間
這個(gè)配置也很有意思,大家試想一下。如果要進(jìn)行兩階段提交,就要保證sink端支持事務(wù),Kafka是支持事務(wù)的,但是像這個(gè)組件對于很多機(jī)制都有一個(gè)超時(shí)時(shí)間的概念,也就是說如果時(shí)間到了這個(gè)界限還沒完成工作,那就會(huì)默認(rèn)這個(gè)工作失敗。Kafka中由這個(gè)概念,F(xiàn)link中同樣由這個(gè)概念。但是flink默認(rèn)超時(shí)時(shí)間是1小時(shí),而Kafka默認(rèn)是15分鐘,這就有可能出現(xiàn)檢查點(diǎn)保存東西的時(shí)間大于15分鐘,假如說是16分鐘保存完成然后給sink發(fā)送檢查點(diǎn)保存陳功可以提交事務(wù)的信號(hào),但是這個(gè)時(shí)候Kafka已經(jīng)認(rèn)為事務(wù)失敗,把之前的數(shù)據(jù)都扔了。那數(shù)據(jù)不就是丟失了么。所以說Kafka的超時(shí)時(shí)間要大于Flink的超時(shí)時(shí)間才好。
//kafka超時(shí)時(shí)間配置項(xiàng)
transaction.max.timeout.ms
//Flink超時(shí)時(shí)間配置項(xiàng)
transaction.timeout.ms
6.結(jié)尾
截止到目前為止,基本上把有關(guān)于狀態(tài)維護(hù)的一些東西都說完了,有狀態(tài)后端、有檢查點(diǎn)。還通過檢查點(diǎn)完成可端到端的數(shù)據(jù)精準(zhǔn)一次性消費(fèi)。但是想到這我又感覺,如果有學(xué)習(xí)進(jìn)度比我差一些的,萬一沒辦法很好的理解怎么辦。所以在下一篇文章當(dāng)中我就聊聊Flink中的“狀態(tài)”到底是個(gè)什么東西,都有什么類型,都怎么去用。