Flink容錯(cuò)機(jī)制-Checkpoints和Savepoints

一、Checkpoints的算法原理

Checkpoints是flink自動(dòng)存儲(chǔ)快照

//1. 啟用Checkpoint
env.enableCheckpointing(200);
//2. 高級(jí)選項(xiàng)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);//checkpoint超時(shí)時(shí)間
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //最大并行checkpoint數(shù)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100); //最小checkpoint的間隔時(shí)間,讓兩個(gè)checkpoint之間留出一定時(shí)間
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); //容忍checkpoint失敗多少次,默認(rèn)0,不容忍

//3. 重啟策略配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000));//每隔10s重啟1次,固定延遲重啟
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10),Time.minutes(1)));//失敗率重啟,在10分鐘內(nèi)重啟,每隔1分鐘重啟一次,重啟3次

1. Barrier(checkpoint分割線)

二、Savepoints(保存點(diǎn))

Savepoints是手動(dòng)存儲(chǔ)快照,多出了一些額外元數(shù)據(jù);
Savepoints一般是手動(dòng)使用命令保存當(dāng)前flink任務(wù)快照到HDFS上指定目錄中,重啟或恢復(fù)故障時(shí)可以使用指定的Savepoints進(jìn)行啟動(dòng)

$ ./bin/flink savepoint \
      $JOB_ID \ 
      /tmp/flink-savepoints

使用 YARN 觸發(fā) Savepoint

#$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

使用 Savepoint 取消作業(yè)

$ bin/flink cancel -s [:targetDirectory] :jobId

從 Savepoint 恢復(fù)

$ bin/flink run -s :savepointPath [:runArgs]

跳過(guò)無(wú)法映射的狀態(tài)恢復(fù)

$ bin/flink run -s :savepointPath -n [:runArgs]
  • 保存點(diǎn)除了故障恢復(fù)外,保存點(diǎn)還可以用于:有計(jì)劃的手動(dòng)備份,更新應(yīng)用程序,版本遷移,暫停和重啟應(yīng)用等等

三、狀態(tài)一致性

  • 有狀態(tài)的流處理,內(nèi)部每個(gè)算子任務(wù)都可以有自己的狀態(tài)
  • 對(duì)于流處理器內(nèi)部來(lái)說(shuō),所謂的狀態(tài)一致性,就是所說(shuō)的計(jì)算結(jié)果要保證準(zhǔn)確
  • 一條數(shù)據(jù)不應(yīng)該丟失,也不應(yīng)該重復(fù)計(jì)算
  • 在遇到故障時(shí)可以恢復(fù)狀態(tài),恢復(fù)以后的重新計(jì)算,結(jié)果應(yīng)該也是完全正確的。

狀態(tài)一致性分類(lèi)

  • AT-MOST-ONCE(最多一次)
  • AT-LEAST-ONCE (至少一次)
  • EXACTLY-ONCE (精確一次)

一致性檢查點(diǎn)(checkpoints)

  • Flink使用了一種輕量級(jí)快照機(jī)制---checkpoint保證exactly-once
  • 有狀態(tài)流應(yīng)用的一致性檢查點(diǎn),其實(shí)就是:所有任務(wù)的狀態(tài),在某個(gè)時(shí)間點(diǎn)的一份拷貝(一份快照)。而這個(gè)時(shí)間點(diǎn),應(yīng)該是所有任務(wù)都恰好處理完一個(gè)相同的輸入數(shù)據(jù)的時(shí)候。(使用checkpoint的Barrier實(shí)現(xiàn),Barrier對(duì)下游分發(fā)是廣播出去的,下游需要所有的Barrier都到了才checkpoint,這也叫Barrier對(duì)齊)
  • 應(yīng)用狀態(tài)的一致檢查點(diǎn),是Flink故障恢復(fù)機(jī)制的核心

端到端(end-to-end)狀態(tài)一致性

  • 目前我們看到的一致性保證都是由流處理器實(shí)現(xiàn)的,也就是說(shuō)都是在Flink流處理器內(nèi)部保證的;而在真實(shí)應(yīng)用中,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如Kafka)和輸出到持久化系統(tǒng)

  • 端到端的一致性保證,意味著結(jié)果的正確性貫穿了整個(gè)流處理應(yīng)用的始終;每一個(gè)組件都保證了它自己的一致性

  • 整個(gè)端到端的一致性級(jí)別取決于所有組件中一致性最弱的組件
    exactly-once怎么保證,狀態(tài)一致就行,不是操作一致

  • 內(nèi)部保證 ---checkpoint

  • source端---可重設(shè)數(shù)據(jù)的讀取位置

  • sink端---從故障恢復(fù)時(shí),數(shù)據(jù)不會(huì)重復(fù)寫(xiě)入外部系統(tǒng)

  1. 冪等寫(xiě)入
  2. 事務(wù)寫(xiě)入

冪等寫(xiě)入(Idempotent Writes)

  • 所謂冪等操作,是說(shuō)一個(gè)操作,可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改,也就是說(shuō),后面再重復(fù)執(zhí)行就不起作用了
    典型應(yīng)用:hashmap,key-value Pairs, Redis的hash表,ES指定_id都滿足冪等操作
    冪等寫(xiě)入只保證最終結(jié)果不變,中間過(guò)程會(huì)有重復(fù)寫(xiě)入

事務(wù)寫(xiě)入(Transactional Writes)

  • 事務(wù)(Transaction)
    1.應(yīng)用程序中一系列嚴(yán)密的操作,所有操作必須成功完成,否則在每個(gè)操作中所作的所有更改都會(huì)被撤銷(xiāo)
    1. 具有原子性:一個(gè)事務(wù)中的一系列的操作要么全部成功,要么一個(gè)都不做
  • 實(shí)現(xiàn)思想:構(gòu)建的事務(wù)對(duì)應(yīng)著checkpoint,等到checkpoint真正完成的時(shí)候,才把所有對(duì)應(yīng)的結(jié)果寫(xiě)入sink系統(tǒng)中
  • 實(shí)現(xiàn)方式
    1. 預(yù)寫(xiě)日志
    2. 兩階段提交

預(yù)寫(xiě)日志(Write-Ahead-Log, WAL)

  • 把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到checkpoint完成的通知時(shí),一次性寫(xiě)入sink系統(tǒng)
  • 簡(jiǎn)單易于實(shí)現(xiàn),由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無(wú)論什么sink系統(tǒng),都能用這種方式一批搞定
  • DataStream API提供了一個(gè)模板類(lèi):GenericWriteAheadSink,來(lái)實(shí)現(xiàn)這種事務(wù)性sink

缺點(diǎn):會(huì)加大延遲,不能?chē)?yán)格意義實(shí)現(xiàn)Exactly once

兩階段提交(Two-Phase-Commit, 2PC)

  • 對(duì)于每個(gè)checkpoint, sink任務(wù)會(huì)啟動(dòng)一個(gè)事務(wù),并將接下來(lái)所有接收的數(shù)據(jù)添加到事務(wù)里
  • 然后將這些數(shù)據(jù)寫(xiě)入外部sink系統(tǒng),但不提交他們 ---這時(shí)只是“預(yù)提交”
  • 當(dāng)它收到checkpoint完成的通知時(shí),它才正式提交事務(wù),實(shí)現(xiàn)結(jié)果的真正寫(xiě)入
  • 這種方式真正實(shí)現(xiàn)了exactly-once,他需要一個(gè)提供事務(wù)支持的外部sink系統(tǒng)。flink提供了TwoPhaseCommitSinkFunction接口(關(guān)系型數(shù)據(jù)庫(kù),mysql,SQL server,Oracle,Posteglsql, kafka)

2PC對(duì)外部sink系統(tǒng)的要求

  • 外部sink系統(tǒng)必須提供事務(wù)支持,或者sink任務(wù)必須能夠模擬外部系統(tǒng)上的事務(wù)
  • 在checkpoint的間隔期間里,必須能夠開(kāi)啟一個(gè)事務(wù)并接受數(shù)據(jù)寫(xiě)入
  • 在收到checkpoint完成的通知之前,事務(wù)必須是“等待提交”的狀態(tài),在故障恢復(fù)的情況下,這可能需要一些時(shí)間。如果這個(gè)時(shí)候sink系統(tǒng)關(guān)閉事務(wù)(例如超時(shí)了),那么未提交的數(shù)據(jù)就會(huì)丟失。
  • sink任務(wù)必須能夠在進(jìn)程失敗后恢復(fù)事務(wù)
  • 提交事務(wù)必須是冪等操作
    優(yōu)點(diǎn):對(duì)性能影響比較低

四、kafka的Exactly-once兩階段提交步驟

  • 第一條數(shù)據(jù)來(lái)了之后,開(kāi)啟一個(gè)kafka的事務(wù)(tranasaction),正常寫(xiě)入kafka分區(qū)日志但標(biāo)記為未提交,這就是“預(yù)提交”
  • jobmanager觸發(fā)checkpoint操作,barrier從source開(kāi)始向下傳遞,遇到barrier的算子講狀態(tài)存入狀態(tài)后端,并通知jobmanager
  • sink連接器收到barrier,保存當(dāng)前狀態(tài),存入checkpoint,通知jobmanager,并開(kāi)啟下一個(gè)階段的事務(wù),用于提交下個(gè)檢查點(diǎn)的數(shù)據(jù)
  • jobmanager收到所有任務(wù)的通知,發(fā)出確認(rèn)信息,表示checkpoint完成
  • sink任務(wù)收到j(luò)obmanager的確認(rèn)信息,正式提交這段時(shí)間數(shù)據(jù)
  • 外部kafka關(guān)閉事務(wù),提交的數(shù)據(jù)可以正常消費(fèi)了
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容