前言
Flink的檢查點和恢復機制定期的會保存應用程序狀態(tài)的一致性檢查點。在故障的情況下,應用程序的狀態(tài)將會從最近一次完成的檢查點恢復,并繼續(xù)處理。盡管如此,可以使用檢查點來重置應用程序的狀態(tài)無法完全達到令人滿意的一致性保證。相反,source和sink的連接器需要和Flink的檢查點和恢復機制進行集成才能提供有意義的一致性保證。
狀態(tài)一致性
對于流處理器內(nèi)部來說,所謂的狀態(tài)一致性,其實就是我們所說的計算結果要保證準確。 一條數(shù)據(jù)不應該丟失,也不應該重復計算 在遇到故障時可以恢復狀態(tài),恢復以后的重新計算,結果應該也是完全正確的。
分類
AT-MOST-ONCE(最多一次) 當任務故障時,最簡單的做法是什么都不干,既不恢復丟失的狀態(tài),也不重播丟失的數(shù)據(jù)。At-most-once 語義的含義是最多處理一次事件。
AT-LEAST-ONCE(至少一次) 在大多數(shù)的真實應用場景,我們希望不丟失事件。這種類型的保障稱為 at-least-once,意思是所有的事件都得到了處理,而一些事件還可能被處理多次。
EXACTLY-ONCE(精確一次) 恰好處理一次是最嚴格的保證,也是最難實現(xiàn)的。恰好處理一次語義不僅僅意味著沒有事件丟失,還意味著針對每一個數(shù)據(jù),內(nèi)部狀態(tài)僅僅更新一次。
EXACTLY-ONCE 的保證
Flink的 checkpoint機制和故障恢復機制給Flink內(nèi)部提供了精確一次的保證,需要注意的是,所謂精確一次并不是說精確到每個event只執(zhí)行一次,而是每個event對狀態(tài)(計算結果)的影響只有一次。
端到端 EXACTLY-ONCE
目前我們看到的一致性保證都是由流處理器實現(xiàn)的,也就是說都是在 Flink 流處理器內(nèi)部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數(shù)據(jù)源(例如 Kafka)和輸出到持久化系統(tǒng)
端到端的一致性保證,意味著結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性
不同Source 和Sink的一致性保證
| source/sink | 不可重置 | 可重置 |
|---|---|---|
| 任意(Any) | At-most-once | At-least-once |
| 冪等 | At-most-once | Exactly-once(故障恢復時會出現(xiàn)暫時不一致) |
| 預寫日志(WAL) | At-most-once | At-least-once |
| 兩階段提交(2PC) | At-most-once | Exactly-once |
整個端到端的一致性級別取決于所有組件中一致性最弱的組件
內(nèi)部保證 —— checkpoint
source 端 —— 可重設數(shù)據(jù)的讀取位置
-
sink 端 —— 從故障恢復時,數(shù)據(jù)不會重復寫入外部系統(tǒng)
冪等寫入
事務寫入
Fink的檢查點和恢復機制和可以重置讀位置的source連接器結合使用,可以保證應用程序不會丟失任何數(shù)據(jù)。盡管如此,應用程序可能會發(fā)出兩次計算結果,因為從上一次檢查點恢復的應用程序所計算的結果將會被重新發(fā)送一次(一些結果已經(jīng)發(fā)送出去了,這時任務故障,然后從上一次檢查點恢復,這些結果將被重新計算一次然后發(fā)送出去)。所以,可重置讀位置的source和Flink的恢復機制不足以提供端到端的恰好處理一次語義,即使應用程序的狀態(tài)是恰好處理一次一致性級別。
端到端恰好處理一次語義一致性的應用程序需要特殊的sink連接器。sink連接器可以在不同的情況下使用兩種技術來達到恰好處理一次一致性語義:冪等性寫入和事務性寫入。
冪等寫入
冪等概念
所謂冪等操作,是說一個操作,可以重復執(zhí)行很多次,但只導致一次結果更改,也就是說,后面再重復執(zhí)行就不起作用了
實現(xiàn)思想
必須保證在從檢查點恢復以后,它將會覆蓋之前已經(jīng)寫入的結果。
優(yōu)缺點
從Flink程序sink到的key-value存儲中讀取數(shù)據(jù)的應用,在Flink從檢查點恢復的過程中,可能會看到不想看到的結果。當重播開始時,之前已經(jīng)發(fā)出的計算結果可能會被更早的結果所覆蓋(因為在恢復過程中)。所以,一個消費Flink程序輸出數(shù)據(jù)的應用,可能會觀察到時間回退,例如讀到了比之前小的計數(shù)。
事務寫入
事務概念
應用程序中一系列嚴密的操作,所有操作必須成功完成,否則在每個操作中所作的所有更改都會被撤消
具有原子性:一個事務中的一系列的操作要么全部成功,要么一個都不做
實現(xiàn)思想
構建的事務對應著 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統(tǒng)中
優(yōu)缺點
事務性的方法將不會遭受冪等性寫入所遭受的重播不一致的問題。但是,事務性寫入?yún)s帶來了延遲,因為只有在檢查點完成以后,我們才能看到計算結果。
Flink提供了兩種構建模塊來實現(xiàn)事務性sink連接器:write-ahead-log(WAL,預寫式日志)sink和兩階段提交sink。
實現(xiàn)方式
預寫日志
把結果數(shù)據(jù)先當成狀態(tài)保存,然后在收到 checkpoint 完成的通知時,一次性寫入 sink 系統(tǒng)
簡單易于實現(xiàn),由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無論什么 sink 系統(tǒng),都能用這種方式一批搞定
DataStream API 提供了一個模板類:GenericWriteAheadSink,來實現(xiàn)這種事務性 sink
兩階段提交
對于每個 checkpoint,sink 任務會啟動一個事務,并將接下來所有接收的數(shù)據(jù)添加到事務里
然后將這些數(shù)據(jù)寫入外部 sink 系統(tǒng),但不提交它們 —— 這時只是“預提交”
當它收到 checkpoint 完成的通知時,它才正式提交事務,實現(xiàn)結果的真正寫入
這種方式真正實現(xiàn)了 exactly-once,它需要一個提供事務支持的外部 sink 系統(tǒng)。
Flink 提供了 TwoPhaseCommitSinkFunction 接口。
2PC 對外部 sink 系統(tǒng)的要求
外部 sink 系統(tǒng)必須提供事務支持,或者 sink 任務必須能夠模擬外部系統(tǒng)上的事務
在 checkpoint 的間隔期間里,必須能夠開啟一個事務并接受數(shù)據(jù)寫入
在收到 checkpoint 完成的通知之前,事務必須是“等待提交”的狀態(tài)。在故障恢復的情況下,這可能需要一些時間。如果這個時候sink系統(tǒng)關閉事務(例如超時了),那么未提交的數(shù)據(jù)就會丟失
sink 任務必須能夠在進程失敗后恢復事務
提交事務必須是冪等操作
Flink+Kafka 端到端狀態(tài)一致性的保證
使用flink+kafka來實現(xiàn)一個端對端一致性保證,source -> transform -> sink
內(nèi)部 —— 利用 checkpoint 機制,把狀態(tài)存盤,發(fā)生故障的時候可以恢復,保證內(nèi)部的狀態(tài)一致性
source —— kafka consumer 作為 source,可以將偏移量保存下來,如果后續(xù)任務出現(xiàn)了故障,恢復的時候可以由連接器重置偏移量,重新消費數(shù)據(jù),保證一致性
sink —— kafka producer 作為sink,采用兩階段提交 sink,需要實現(xiàn)一個 TwoPhaseCommitSinkFunction
圖解Exactly-Once 兩階段提交

Exactly-once 兩階段提交1:
JobManager 協(xié)調(diào)各個 TaskManager 進行 checkpoint 存儲 checkpoint保存在 StateBackend中,默認StateBackend是內(nèi)存級的,也可以改為文件級的進行持久化保存

Exactly-once 兩階段提交2:
當開啟了checkpoint ,JobManager 會將檢查點分界線(barrier)注入數(shù)據(jù)流 barrier會在算子間傳遞下去

每個算子會對當前的狀態(tài)做個快照,保存到狀態(tài)后端
checkpoint 機制可以保證內(nèi)部的狀態(tài)一致性

每個內(nèi)部的 transform 任務遇到 barrier 時,都會把狀態(tài)存到 checkpoint 里
sink 任務首先把數(shù)據(jù)寫入外部 kafka,這些數(shù)據(jù)都屬于預提交的事務;
遇到 barrier 時,把狀態(tài)保存到狀態(tài)后端,并開啟新的預提交事務

當所有算子任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發(fā)通知,確認這次 checkpoint 完成
sink 任務收到確認通知,正式提交之前的事務,kafka 中未確認數(shù)據(jù)改為“已確認”
總結 Exactly-once 兩階段提交步驟
第一條數(shù)據(jù)來了之后,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區(qū)日志但標記為未提交,這就是“預提交”
jobmanager 觸發(fā) checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的算子將狀態(tài)存入狀態(tài)后端,并通知 jobmanager sink 連接器收到 barrier,保存當前狀態(tài),存入 checkpoint,通知 jobmanager,并開啟下一階段的事務,用于提交下個檢查點的數(shù)據(jù)
jobmanager 收到所有任務的通知,發(fā)出確認信息,表示 checkpoint 完成
sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數(shù)據(jù)
外部kafka關閉事務,提交的數(shù)據(jù)可以正常消費了。
在使用kafka011 sink 時注意的點:
1.為了保證事務特性,在使用其他程序去消費我們flink sink 數(shù)據(jù)的kafka時,這個consumer需要設置了isolation.level = read_committed,那么它只會讀取已經(jīng)提交了的消息。
2.Checkpoint超時時間 必需大于 kafka 提交事務時間。
假如checkpoint失敗時間高于 kafka事務等待時間,比如,設置了一個checkpoint最多等待10分鐘,10分鐘后會失敗這個checkpoint的保存。而kafka 的事務只能等待5分鐘,5分鐘后把uncommitted的事務關掉。這個時候6分鐘checkpoint成功了,但是對應kafka數(shù)據(jù)的事務已經(jīng)失敗。這樣就無法保證Exactly-once的實現(xiàn)