一、Checkpoints的算法原理
Checkpoints是flink自動(dòng)存儲(chǔ)快照
//1. 啟用Checkpoint
env.enableCheckpointing(200);
//2. 高級(jí)選項(xiàng)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);//checkpoint超時(shí)時(shí)間
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //最大并行checkpoint數(shù)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100); //最小checkpoint的間隔時(shí)間,讓兩個(gè)checkpoint之間留出一定時(shí)間
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); //容忍checkpoint失敗多少次,默認(rèn)0,不容忍
//3. 重啟策略配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000));//每隔10s重啟1次,固定延遲重啟
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10),Time.minutes(1)));//失敗率重啟,在10分鐘內(nèi)重啟,每隔1分鐘重啟一次,重啟3次
1. Barrier(checkpoint分割線)
二、Savepoints(保存點(diǎn))
Savepoints是手動(dòng)存儲(chǔ)快照,多出了一些額外元數(shù)據(jù);
Savepoints一般是手動(dòng)使用命令保存當(dāng)前flink任務(wù)快照到HDFS上指定目錄中,重啟或恢復(fù)故障時(shí)可以使用指定的Savepoints進(jìn)行啟動(dòng)
$ ./bin/flink savepoint \
$JOB_ID \
/tmp/flink-savepoints
使用 YARN 觸發(fā) Savepoint
#$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
使用 Savepoint 取消作業(yè)
$ bin/flink cancel -s [:targetDirectory] :jobId
從 Savepoint 恢復(fù)
$ bin/flink run -s :savepointPath [:runArgs]
跳過(guò)無(wú)法映射的狀態(tài)恢復(fù)
$ bin/flink run -s :savepointPath -n [:runArgs]
- 保存點(diǎn)除了故障恢復(fù)外,保存點(diǎn)還可以用于:有計(jì)劃的手動(dòng)備份,更新應(yīng)用程序,版本遷移,暫停和重啟應(yīng)用等等
三、狀態(tài)一致性
- 有狀態(tài)的流處理,內(nèi)部每個(gè)算子任務(wù)都可以有自己的狀態(tài)
- 對(duì)于流處理器內(nèi)部來(lái)說(shuō),所謂的狀態(tài)一致性,就是所說(shuō)的計(jì)算結(jié)果要保證準(zhǔn)確
- 一條數(shù)據(jù)不應(yīng)該丟失,也不應(yīng)該重復(fù)計(jì)算
- 在遇到故障時(shí)可以恢復(fù)狀態(tài),恢復(fù)以后的重新計(jì)算,結(jié)果應(yīng)該也是完全正確的。
狀態(tài)一致性分類(lèi)
- AT-MOST-ONCE(最多一次)
- AT-LEAST-ONCE (至少一次)
- EXACTLY-ONCE (精確一次)
一致性檢查點(diǎn)(checkpoints)
- Flink使用了一種輕量級(jí)快照機(jī)制---checkpoint保證exactly-once
- 有狀態(tài)流應(yīng)用的一致性檢查點(diǎn),其實(shí)就是:所有任務(wù)的狀態(tài),在某個(gè)時(shí)間點(diǎn)的一份拷貝(一份快照)。而這個(gè)時(shí)間點(diǎn),應(yīng)該是所有任務(wù)都恰好處理完一個(gè)相同的輸入數(shù)據(jù)的時(shí)候。(使用checkpoint的Barrier實(shí)現(xiàn),Barrier對(duì)下游分發(fā)是廣播出去的,下游需要所有的Barrier都到了才checkpoint,這也叫Barrier對(duì)齊)
- 應(yīng)用狀態(tài)的一致檢查點(diǎn),是Flink故障恢復(fù)機(jī)制的核心
端到端(end-to-end)狀態(tài)一致性
目前我們看到的一致性保證都是由流處理器實(shí)現(xiàn)的,也就是說(shuō)都是在Flink流處理器內(nèi)部保證的;而在真實(shí)應(yīng)用中,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如Kafka)和輸出到持久化系統(tǒng)
端到端的一致性保證,意味著結(jié)果的正確性貫穿了整個(gè)流處理應(yīng)用的始終;每一個(gè)組件都保證了它自己的一致性
整個(gè)端到端的一致性級(jí)別取決于所有組件中一致性最弱的組件
exactly-once怎么保證,狀態(tài)一致就行,不是操作一致內(nèi)部保證 ---checkpoint
source端---可重設(shè)數(shù)據(jù)的讀取位置
sink端---從故障恢復(fù)時(shí),數(shù)據(jù)不會(huì)重復(fù)寫(xiě)入外部系統(tǒng)
- 冪等寫(xiě)入
- 事務(wù)寫(xiě)入
冪等寫(xiě)入(Idempotent Writes)
- 所謂冪等操作,是說(shuō)一個(gè)操作,可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改,也就是說(shuō),后面再重復(fù)執(zhí)行就不起作用了
典型應(yīng)用:hashmap,key-value Pairs, Redis的hash表,ES指定_id都滿足冪等操作
冪等寫(xiě)入只保證最終結(jié)果不變,中間過(guò)程會(huì)有重復(fù)寫(xiě)入
事務(wù)寫(xiě)入(Transactional Writes)
- 事務(wù)(Transaction)
1.應(yīng)用程序中一系列嚴(yán)密的操作,所有操作必須成功完成,否則在每個(gè)操作中所作的所有更改都會(huì)被撤銷(xiāo)- 具有原子性:一個(gè)事務(wù)中的一系列的操作要么全部成功,要么一個(gè)都不做
- 實(shí)現(xiàn)思想:構(gòu)建的事務(wù)對(duì)應(yīng)著checkpoint,等到checkpoint真正完成的時(shí)候,才把所有對(duì)應(yīng)的結(jié)果寫(xiě)入sink系統(tǒng)中
- 實(shí)現(xiàn)方式
- 預(yù)寫(xiě)日志
- 兩階段提交
預(yù)寫(xiě)日志(Write-Ahead-Log, WAL)
- 把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到checkpoint完成的通知時(shí),一次性寫(xiě)入sink系統(tǒng)
- 簡(jiǎn)單易于實(shí)現(xiàn),由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無(wú)論什么sink系統(tǒng),都能用這種方式一批搞定
- DataStream API提供了一個(gè)模板類(lèi):GenericWriteAheadSink,來(lái)實(shí)現(xiàn)這種事務(wù)性sink
缺點(diǎn):會(huì)加大延遲,不能?chē)?yán)格意義實(shí)現(xiàn)Exactly once
兩階段提交(Two-Phase-Commit, 2PC)
- 對(duì)于每個(gè)checkpoint, sink任務(wù)會(huì)啟動(dòng)一個(gè)事務(wù),并將接下來(lái)所有接收的數(shù)據(jù)添加到事務(wù)里
- 然后將這些數(shù)據(jù)寫(xiě)入外部sink系統(tǒng),但不提交他們 ---這時(shí)只是“預(yù)提交”
- 當(dāng)它收到checkpoint完成的通知時(shí),它才正式提交事務(wù),實(shí)現(xiàn)結(jié)果的真正寫(xiě)入
- 這種方式真正實(shí)現(xiàn)了exactly-once,他需要一個(gè)提供事務(wù)支持的外部sink系統(tǒng)。flink提供了TwoPhaseCommitSinkFunction接口(關(guān)系型數(shù)據(jù)庫(kù),mysql,SQL server,Oracle,Posteglsql, kafka)
2PC對(duì)外部sink系統(tǒng)的要求
- 外部sink系統(tǒng)必須提供事務(wù)支持,或者sink任務(wù)必須能夠模擬外部系統(tǒng)上的事務(wù)
- 在checkpoint的間隔期間里,必須能夠開(kāi)啟一個(gè)事務(wù)并接受數(shù)據(jù)寫(xiě)入
- 在收到checkpoint完成的通知之前,事務(wù)必須是“等待提交”的狀態(tài),在故障恢復(fù)的情況下,這可能需要一些時(shí)間。如果這個(gè)時(shí)候sink系統(tǒng)關(guān)閉事務(wù)(例如超時(shí)了),那么未提交的數(shù)據(jù)就會(huì)丟失。
- sink任務(wù)必須能夠在進(jìn)程失敗后恢復(fù)事務(wù)
- 提交事務(wù)必須是冪等操作
優(yōu)點(diǎn):對(duì)性能影響比較低
四、kafka的Exactly-once兩階段提交步驟
- 第一條數(shù)據(jù)來(lái)了之后,開(kāi)啟一個(gè)kafka的事務(wù)(tranasaction),正常寫(xiě)入kafka分區(qū)日志但標(biāo)記為未提交,這就是“預(yù)提交”
- jobmanager觸發(fā)checkpoint操作,barrier從source開(kāi)始向下傳遞,遇到barrier的算子講狀態(tài)存入狀態(tài)后端,并通知jobmanager
- sink連接器收到barrier,保存當(dāng)前狀態(tài),存入checkpoint,通知jobmanager,并開(kāi)啟下一個(gè)階段的事務(wù),用于提交下個(gè)檢查點(diǎn)的數(shù)據(jù)
- jobmanager收到所有任務(wù)的通知,發(fā)出確認(rèn)信息,表示checkpoint完成
- sink任務(wù)收到j(luò)obmanager的確認(rèn)信息,正式提交這段時(shí)間數(shù)據(jù)
- 外部kafka關(guān)閉事務(wù),提交的數(shù)據(jù)可以正常消費(fèi)了