我是LakeShen,專(zhuān)注大數(shù)據(jù)技術(shù)分享,程序員經(jīng)驗(yàn)分享,互聯(lián)網(wǎng)科技分享。如果我的文章對(duì)你有幫助,希望你能點(diǎn)贊或者關(guān)注我,你的鼓勵(lì),就是我前進(jìn)的最大動(dòng)力。關(guān)注就完事了。
前言
為了保證程序的容錯(cuò)恢復(fù)以及程序啟動(dòng)時(shí)其狀態(tài)恢復(fù),幾乎所有的 Flink 實(shí)時(shí)任務(wù)都會(huì)開(kāi)啟 Checkpoint 或者觸發(fā) Savepoint 進(jìn)行狀態(tài)保存。為了使得用戶(hù)更加理解這兩點(diǎn)區(qū)別,本文結(jié)合 Flink 1.9 版本,重點(diǎn)講述 Flink Checkpoint,Savepoint 相關(guān)概念以及注意事項(xiàng),使得用戶(hù)能夠更好的開(kāi)發(fā)實(shí)時(shí)任務(wù)。
1. Checkpoint,Savepoint 異同
首先,為什么會(huì)在文章開(kāi)頭對(duì)這兩點(diǎn)進(jìn)行介紹,因?yàn)橛袝r(shí)候用戶(hù)在開(kāi)發(fā)實(shí)時(shí)任務(wù)時(shí),會(huì)對(duì)這兩點(diǎn)產(chǎn)生困惑,所以這里直接開(kāi)門(mén)見(jiàn)山對(duì)這兩點(diǎn)進(jìn)行講解。
Flink Checkpoint 是一種容錯(cuò)恢復(fù)機(jī)制。這種機(jī)制保證了實(shí)時(shí)程序運(yùn)行時(shí),即使突然遇到異常也能夠進(jìn)行自我恢復(fù)。Checkpoint 對(duì)于用戶(hù)層面,是透明的,用戶(hù)會(huì)感覺(jué)程序一直在運(yùn)行。Flink Checkpoint 是 Flink 自身的系統(tǒng)行為,用戶(hù)無(wú)法對(duì)其進(jìn)行交互,用戶(hù)可以在程序啟動(dòng)之前,設(shè)置好實(shí)時(shí)程序 Checkpoint 相關(guān)參數(shù),當(dāng)程序啟動(dòng)之后,剩下的就全交給 Flink 自行管理。當(dāng)然在某些情況,比如 Flink On Yarn 模式,某個(gè) Container 發(fā)生 OOM 異常,這種情況程序直接變成失敗狀態(tài),此時(shí) Flink 程序雖然開(kāi)啟 Checkpoint 也無(wú)法恢復(fù),因?yàn)槌绦蛞呀?jīng)變成失敗狀態(tài),所以此時(shí)可以借助外部參與啟動(dòng)程序,比如外部程序檢測(cè)到實(shí)時(shí)任務(wù)失敗時(shí),從新對(duì)實(shí)時(shí)任務(wù)進(jìn)行拉起。
Flink Savepoint 你可以把它當(dāng)做在某個(gè)時(shí)間點(diǎn)程序狀態(tài)全局鏡像,以后程序在進(jìn)行升級(jí),或者修改并發(fā)度等情況,還能從保存的狀態(tài)位繼續(xù)啟動(dòng)恢復(fù)。Flink Savepoint 一般存儲(chǔ)在 HDFS 上面,它需要用戶(hù)主動(dòng)進(jìn)行觸發(fā)。如果是用戶(hù)自定義開(kāi)發(fā)的實(shí)時(shí)程序,比如使用DataStream進(jìn)行開(kāi)發(fā),建議為每個(gè)算子定義一個(gè) uid,這樣我們?cè)谛薷淖鳂I(yè)時(shí),即使導(dǎo)致程序拓?fù)鋱D改變,由于相關(guān)算子 uid 沒(méi)有變,那么這些算子還能夠繼續(xù)使用之前的狀態(tài),如果用戶(hù)沒(méi)有定義 uid , Flink 會(huì)為每個(gè)算子自動(dòng)生成 uid,如果用戶(hù)修改了程序,可能導(dǎo)致之前的狀態(tài)程序不能再進(jìn)行復(fù)用。
Checkpoint 和 Savepoint 差異對(duì)比:
- 概念:Checkpoint 是 自動(dòng)容錯(cuò)機(jī)制 ,Savepoint 程序全局狀態(tài)鏡像 。
- 目的: Checkpoint 是程序自動(dòng)容錯(cuò),快速恢復(fù) 。Savepoint是 程序修改后繼續(xù)從狀態(tài)恢復(fù),程序升級(jí)等。
- 用戶(hù)交互:Checkpoint 是 Flink 系統(tǒng)行為 。Savepoint是用戶(hù)觸發(fā)。
- 狀態(tài)文件保留策略:Checkpoint默認(rèn)程序刪除,可以設(shè)置CheckpointConfig中的參數(shù)進(jìn)行保留 。Savepoint會(huì)一直保存,除非用戶(hù)刪除 。
2. Flink Checkpoint
2.1 Flink Checkpoint 原理
Flink Checkpoint 機(jī)制保證 Flink 任務(wù)運(yùn)行突然失敗時(shí),能夠從最近 Checkpoint 進(jìn)行狀態(tài)恢復(fù)啟動(dòng),進(jìn)行錯(cuò)誤容忍。它是一種自動(dòng)容錯(cuò)機(jī)制,而不是具體的狀態(tài)存儲(chǔ)鏡像。Flink Checkpoint 受 Chandy-Lamport 分布式快照啟發(fā),其內(nèi)部使用分布式數(shù)據(jù)流輕量級(jí)異步快照。
Checkpoint 保存的狀態(tài)在程序取消時(shí),默認(rèn)會(huì)進(jìn)行清除。Checkpoint 狀態(tài)保留策略有兩種:
DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION
DELETE_ON_CANCELLATION 表示當(dāng)程序取消時(shí),刪除 Checkpoint 存儲(chǔ)文件。
RETAIN_ON_CANCELLATION 表示當(dāng)程序取消時(shí),保存之前的 Checkpoint 存儲(chǔ)文件
用戶(hù)可以結(jié)合業(yè)務(wù)情況,設(shè)置 Checkpoint 保留模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 開(kāi)啟 checkpoint */
env.enableCheckpointing(10000);
/** 設(shè)置 checkpoint 保留策略,取消程序時(shí),保留 checkpoint 狀態(tài)文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
默認(rèn)情況下,F(xiàn)link不會(huì)觸發(fā)一次 Checkpoint 當(dāng)系統(tǒng)有其他 Checkpoint 在進(jìn)行時(shí),也就是說(shuō) Checkpoint 默認(rèn)的并發(fā)為1。針對(duì) Flink DataStream 任務(wù),程序需要經(jīng)歷從 StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖四個(gè)步驟,其中在 ExecutionGraph 構(gòu)建時(shí),會(huì)初始化 CheckpointCoordinator。ExecutionGraph通過(guò)ExecutionGraphBuilder.buildGraph方法構(gòu)建,在構(gòu)建完時(shí),會(huì)調(diào)用 ExecutionGraph 的enableCheckpointing方法創(chuàng)建CheckpointCoordinator:
public void enableCheckpointing(
CheckpointCoordinatorConfiguration chkConfig,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
List<MasterTriggerRestoreHook<?>> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) {
// 前面部分代碼省略....
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
jobInformation.getJobId(),
chkConfig,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
checkpointStateBackend,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY,
failureManager);
// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
if (!checkpointCoordinator.addMasterHook(hook)) {
LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
}
}
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// 后面部分代碼省略....
CheckpoinCoordinator 是 Flink 任務(wù) Checkpoint 的關(guān)鍵,針對(duì)每一個(gè) Flink 任務(wù),都會(huì)初始化一個(gè) CheckpointCoordinator 類(lèi),來(lái)觸發(fā) Flink 任務(wù) Checkpoint。下面是 Flink 任務(wù) Checkpoint 大致流程:
Flink 會(huì)定時(shí)在任務(wù)的 Source Task 觸發(fā) Barrier,Barrier是一種特殊的消息事件,會(huì)隨著消息通道流入到下游的算子中。只有當(dāng)最后 Sink 端的算子接收到 Barrier 并確認(rèn)該次 Checkpoint 完成時(shí),該次 Checkpoint 才算完成。所以在某些算子的 Task 有多個(gè)輸入時(shí),會(huì)存在 Barrier 對(duì)齊時(shí)間,我們可以在Web UI上面看到各個(gè) Task 的Barrier 對(duì)齊時(shí)間
2.2 Flink Checkpoint 語(yǔ)義
Flink Checkpoint 支持兩種語(yǔ)義:Exactly Once 和 At least Once,默認(rèn)的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具體是針對(duì) Flink 狀態(tài)而言。具體語(yǔ)義含義如下:
Exactly Once 含義是:保證每條數(shù)據(jù)對(duì)于 Flink 的狀態(tài)結(jié)果只影響一次。打個(gè)比方,比如 WordCount程序,目前實(shí)時(shí)統(tǒng)計(jì)的 "hello" 這個(gè)單詞數(shù)為5,同時(shí)這個(gè)結(jié)果在這次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又來(lái)2個(gè) "hello" 單詞,突然程序遇到外部異常容錯(cuò)自動(dòng)回復(fù),從最近的 Checkpoint 點(diǎn)開(kāi)始恢復(fù),那么會(huì)從單詞數(shù) 5 這個(gè)狀態(tài)開(kāi)始恢復(fù),Kafka 消費(fèi)的數(shù)據(jù)點(diǎn)位還是狀態(tài) 5 這個(gè)時(shí)候的點(diǎn)位開(kāi)始計(jì)算,所以即使程序遇到外部異常自我恢復(fù),也不會(huì)影響到 Flink 狀態(tài)的結(jié)果。
At Least Once 含義是:每條數(shù)據(jù)對(duì)于 Flink 狀態(tài)計(jì)算至少影響一次。比如在 WordCount 程序中,你統(tǒng)計(jì)到的某個(gè)單詞的單詞數(shù)可能會(huì)比真實(shí)的單詞數(shù)要大,因?yàn)橥粭l消息,你可能將其計(jì)算多次。
Flink 中 Exactly Once 和 At Least Once 具體是針對(duì) Flink 任務(wù)狀態(tài)而言的,并不是 Flink 程序?qū)ζ涮幚硪淮?。舉個(gè)例子,當(dāng)前 Flink 任務(wù)正在做 Checkpoint,該次Checkpoint還么有完成,該次 Checkpoint 時(shí)間端的數(shù)據(jù)其實(shí)已經(jīng)進(jìn)入 Flink 程序處理,只是程序狀態(tài)沒(méi)有最終存儲(chǔ)到遠(yuǎn)程存儲(chǔ)。當(dāng)程序突然遇到異常,進(jìn)行容錯(cuò)恢復(fù),那么就會(huì)從最新的 Checkpoint 進(jìn)行狀態(tài)恢復(fù)重啟,上一部分還會(huì)進(jìn)入 Flink 系統(tǒng)處理:

上圖中表示,在進(jìn)行 chk-5 Checkpoint 時(shí),突然遇到程序異常,那么會(huì)從 chk-4 進(jìn)行恢復(fù),那么之前chk-5 處理的數(shù)據(jù),會(huì)再次進(jìn)行處理。
Exactly Once 和 At Least Once 具體在底層實(shí)現(xiàn)大致相同,具體差異表現(xiàn)在 Barrier 對(duì)齊方式處理:

如果是 Exactly Once 模式,某個(gè)算子的 Task 有多個(gè)輸入通道時(shí),當(dāng)其中一個(gè)輸入通道收到 Barrier 時(shí),F(xiàn)link Task 會(huì)阻塞處理該通道,其不會(huì)處理這些數(shù)據(jù),但是會(huì)將這些數(shù)據(jù)存儲(chǔ)到內(nèi)部緩存中,一旦完成了所有輸入通道的 Barrier 對(duì)齊,才會(huì)繼續(xù)對(duì)這些數(shù)據(jù)進(jìn)行消費(fèi)處理。
對(duì)于 At least Once,同樣針對(duì)某個(gè)算子的 Task 有多個(gè)輸入通道的情況下,當(dāng)某個(gè)輸入通道接收到 Barrier 時(shí),它不同于Exactly Once,At Least Once 會(huì)繼續(xù)處理接受到的數(shù)據(jù),即使沒(méi)有完成所有輸入通道 Barrier 對(duì)齊。所以使用At Least Once 不能保證數(shù)據(jù)對(duì)于狀態(tài)計(jì)算只有一次影響。
2.4 Flink Checkpoint 參數(shù)配置及建議
- 當(dāng) Checkpoint 時(shí)間比設(shè)置的 Checkpoint 間隔時(shí)間要長(zhǎng)時(shí),可以設(shè)置 Checkpoint 間最小時(shí)間間隔 。這樣在上次 Checkpoint 完成時(shí),不會(huì)立馬進(jìn)行下一次 Checkpoint,而是會(huì)等待一個(gè)最小時(shí)間間隔,然后在進(jìn)行該次 Checkpoint。否則,每次 Checkpoint 完成時(shí),就會(huì)立馬開(kāi)始下一次 Checkpoint,系統(tǒng)會(huì)有很多資源消耗 Checkpoint。
- 如果Flink狀態(tài)很大,在進(jìn)行恢復(fù)時(shí),需要從遠(yuǎn)程存儲(chǔ)讀取狀態(tài)恢復(fù),此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢,可以設(shè)置 Flink Task 本地狀態(tài)恢復(fù)。任務(wù)狀態(tài)本地恢復(fù)默認(rèn)沒(méi)有開(kāi)啟,可以設(shè)置參數(shù)
state.backend.local-recovery值為true進(jìn)行激活。
- Checkpoint保存數(shù),Checkpoint 保存數(shù)默認(rèn)是1,也就是保存最新的 Checkpoint 文件,當(dāng)進(jìn)行狀態(tài)恢復(fù)時(shí),如果最新的Checkpoint文件不可用時(shí)(比如HDFS文件所有副本都損壞或者其他原因),那么狀態(tài)恢復(fù)就會(huì)失敗,如果設(shè)置 Checkpoint 保存數(shù)2,即使最新的Checkpoint恢復(fù)失敗,那么Flink 會(huì)回滾到之前那一次Checkpoint進(jìn)行恢復(fù)。考慮到這種情況,用戶(hù)可以增加 Checkpoint 保存數(shù)。
- 建議設(shè)置的 Checkpoint 的間隔時(shí)間最好大于 Checkpoint 的完成時(shí)間。
下圖是不設(shè)置 Checkpoint 最小時(shí)間間隔示例圖,可以看到,系統(tǒng)一致在進(jìn)行 Checkpoint,可能對(duì)運(yùn)行的任務(wù)產(chǎn)生一定影響:

3. Flink Savepoint
3.1 Flink Savepoint 原理
Flink Savepoint 作為實(shí)時(shí)任務(wù)的全局鏡像,其在底層使用的代碼和Checkpoint的代碼是一樣的,因?yàn)镾avepoint可以看做 Checkpoint在特定時(shí)期的一個(gè)狀態(tài)快照。
Flink 在觸發(fā)Savepoint 或者 Checkpoint時(shí),會(huì)根據(jù)這次觸發(fā)的類(lèi)型計(jì)算出在HDFS上面的目錄:

如果類(lèi)型是 Savepoint,那么 其 HDFS 上面的目錄為:Savepoint 根目錄+savepoint-jobid前六位+隨機(jī)數(shù)字,具體如下格式:

Checkpoint 目錄為 chk-checkpoint ID,具體格式如下:

一次 Savepoint 目錄下面至少包括一個(gè)文件,既 _metadata文件。當(dāng)然如果實(shí)時(shí)任務(wù)某些算子有狀態(tài)的話(huà),那么在 這次 Savepoint目錄下面會(huì)包含一個(gè) _metadata 文件以及多個(gè)狀態(tài)數(shù)據(jù)文件。_metadata文件以絕對(duì)路徑的形式指向狀態(tài)文件的指針。
社區(qū)方面,在以前的 Flink 版本,當(dāng)用戶(hù)選擇不同的狀態(tài)存儲(chǔ),其底層狀態(tài)存儲(chǔ)的二進(jìn)制格式都不相同。針對(duì)這種情況,目前 FLIP-41 對(duì)于 Keyed State 使用統(tǒng)一的二進(jìn)制文件進(jìn)行存儲(chǔ)。這里的 Keyed State 主要是針對(duì) Savepoint 的狀態(tài),Checkpoint 狀態(tài)的存儲(chǔ)可以根據(jù)具體的狀態(tài)后端進(jìn)行存儲(chǔ),允許狀態(tài)存儲(chǔ)底層格式的差異。對(duì)于 Savepoint 狀態(tài)底層格式的統(tǒng)一,應(yīng)用的狀態(tài)可以在不同的狀態(tài)后端進(jìn)行遷移,更方便應(yīng)用程序的恢復(fù)。重做與狀態(tài)快照和恢復(fù)相關(guān)的抽象,當(dāng)實(shí)現(xiàn)實(shí)現(xiàn)新?tīng)顟B(tài)后端時(shí),可以降低開(kāi)銷(xiāo),同時(shí)減少代碼重復(fù)。
3.2 Flink Savepoint 觸發(fā)方式
Flink Savepoint 觸發(fā)方式目前有三種:
- 使用
flink savepoint命令觸發(fā) Savepoint,其是在程序運(yùn)行期間觸發(fā) savepoint, - 使用
flink cancel -s命令,取消作業(yè)時(shí),并觸發(fā) Savepoint. - 使用 Rest API 觸發(fā) Savepoint,格式為:
**/jobs/:jobid /savepoints**
3.3 Flink Savepoint 注意點(diǎn)
- 使用
flink cancel -s命令取消作業(yè)同時(shí)觸發(fā) Savepoint 時(shí),會(huì)有一個(gè)問(wèn)題,可能存在觸發(fā) Savepoint 失敗。比如實(shí)時(shí)程序處于異常狀態(tài)(比如 Checkpoint失敗),而此時(shí)你停止作業(yè),同時(shí)觸發(fā) Savepoint,這次 Savepoint 就會(huì)失敗,這種情況會(huì)導(dǎo)致,在實(shí)時(shí)平臺(tái)上面看到任務(wù)已經(jīng)停止,但是實(shí)際實(shí)時(shí)作業(yè)在 Yarn 還在運(yùn)行。針對(duì)這種情況,需要捕獲觸發(fā) Savepoint 失敗的異常,當(dāng)拋出異常時(shí),可以直接在 Yarn 上面 Kill 掉該任務(wù)。 - 使用 DataStream 程序開(kāi)發(fā)時(shí),最好為每個(gè)算子分配
uid,這樣即使作業(yè)拓?fù)鋱D變了,相關(guān)算子還是能夠從之前的狀態(tài)進(jìn)行恢復(fù),默認(rèn)情況下,F(xiàn)link 會(huì)為每個(gè)算子分配uid,這種情況下,當(dāng)你改變了程序的某些邏輯時(shí),可能導(dǎo)致算子的uid發(fā)生改變,那么之前的狀態(tài)數(shù)據(jù),就不能進(jìn)行復(fù)用,程序在啟動(dòng)的時(shí)候,就會(huì)報(bào)錯(cuò)。 - 由于 Savepoint 是程序的全局狀態(tài),對(duì)于某些狀態(tài)很大的實(shí)時(shí)任務(wù),當(dāng)我們觸發(fā) Savepoint,可能會(huì)對(duì)運(yùn)行著的實(shí)時(shí)任務(wù)產(chǎn)生影響,個(gè)人建議如果對(duì)于狀態(tài)過(guò)大的實(shí)時(shí)任務(wù),觸發(fā) Savepoint 的時(shí)間,不要太過(guò)頻繁。根據(jù)狀態(tài)的大小,適當(dāng)?shù)脑O(shè)置觸發(fā)時(shí)間。
- 當(dāng)我們從 Savepoint 進(jìn)行恢復(fù)時(shí),需要檢查這次 Savepoint 目錄文件是否可用??赡艽嬖谀闵洗斡|發(fā) Savepoint 沒(méi)有成功,導(dǎo)致 HDFS 目錄上面 Savepoint 文件不可用或者缺少數(shù)據(jù)文件等,這種情況下,如果在指定損壞的 Savepoint 的狀態(tài)目錄進(jìn)行狀態(tài)恢復(fù),任務(wù)會(huì)啟動(dòng)不起來(lái)。
5. 總結(jié)
本文沒(méi)有過(guò)多的講述源碼,考慮大家的都能夠讀懂,其語(yǔ)言竟可能通俗一一點(diǎn)。如果有需要改進(jìn)的地方,希望大家能夠指出。后續(xù)我會(huì)不斷的和大家一起大數(shù)據(jù)相關(guān)的技術(shù),和大家一起交流學(xué)習(xí)。
參考資料
- FLIP-41 Unify Binary format for Keyed State
- FlIP-47-Checkpoints vs Savepoints
- Apache Kafka Connector
- Flink Savepoints
- Flink Checkpoints
- Flink Checkpointing
更多程序員面試經(jīng)驗(yàn)、大數(shù)據(jù)技術(shù)、互聯(lián)網(wǎng)科技分享,請(qǐng)關(guān)注公眾號(hào):LakeShen,上面會(huì)進(jìn)行第一時(shí)間首發(fā):

?