Flink實(shí)時(shí)計(jì)算-深入理解Checkpoint和Savepoint

我是LakeShen,專(zhuān)注大數(shù)據(jù)技術(shù)分享,程序員經(jīng)驗(yàn)分享,互聯(lián)網(wǎng)科技分享。如果我的文章對(duì)你有幫助,希望你能點(diǎn)贊或者關(guān)注我,你的鼓勵(lì),就是我前進(jìn)的最大動(dòng)力。關(guān)注就完事了。

前言

為了保證程序的容錯(cuò)恢復(fù)以及程序啟動(dòng)時(shí)其狀態(tài)恢復(fù),幾乎所有的 Flink 實(shí)時(shí)任務(wù)都會(huì)開(kāi)啟 Checkpoint 或者觸發(fā) Savepoint 進(jìn)行狀態(tài)保存。為了使得用戶(hù)更加理解這兩點(diǎn)區(qū)別,本文結(jié)合 Flink 1.9 版本,重點(diǎn)講述 Flink Checkpoint,Savepoint 相關(guān)概念以及注意事項(xiàng),使得用戶(hù)能夠更好的開(kāi)發(fā)實(shí)時(shí)任務(wù)。

1. Checkpoint,Savepoint 異同

首先,為什么會(huì)在文章開(kāi)頭對(duì)這兩點(diǎn)進(jìn)行介紹,因?yàn)橛袝r(shí)候用戶(hù)在開(kāi)發(fā)實(shí)時(shí)任務(wù)時(shí),會(huì)對(duì)這兩點(diǎn)產(chǎn)生困惑,所以這里直接開(kāi)門(mén)見(jiàn)山對(duì)這兩點(diǎn)進(jìn)行講解。

Flink Checkpoint 是一種容錯(cuò)恢復(fù)機(jī)制。這種機(jī)制保證了實(shí)時(shí)程序運(yùn)行時(shí),即使突然遇到異常也能夠進(jìn)行自我恢復(fù)。Checkpoint 對(duì)于用戶(hù)層面,是透明的,用戶(hù)會(huì)感覺(jué)程序一直在運(yùn)行。Flink Checkpoint 是 Flink 自身的系統(tǒng)行為,用戶(hù)無(wú)法對(duì)其進(jìn)行交互,用戶(hù)可以在程序啟動(dòng)之前,設(shè)置好實(shí)時(shí)程序 Checkpoint 相關(guān)參數(shù),當(dāng)程序啟動(dòng)之后,剩下的就全交給 Flink 自行管理。當(dāng)然在某些情況,比如 Flink On Yarn 模式,某個(gè) Container 發(fā)生 OOM 異常,這種情況程序直接變成失敗狀態(tài),此時(shí) Flink 程序雖然開(kāi)啟 Checkpoint 也無(wú)法恢復(fù),因?yàn)槌绦蛞呀?jīng)變成失敗狀態(tài),所以此時(shí)可以借助外部參與啟動(dòng)程序,比如外部程序檢測(cè)到實(shí)時(shí)任務(wù)失敗時(shí),從新對(duì)實(shí)時(shí)任務(wù)進(jìn)行拉起。

Flink Savepoint 你可以把它當(dāng)做在某個(gè)時(shí)間點(diǎn)程序狀態(tài)全局鏡像,以后程序在進(jìn)行升級(jí),或者修改并發(fā)度等情況,還能從保存的狀態(tài)位繼續(xù)啟動(dòng)恢復(fù)。Flink Savepoint 一般存儲(chǔ)在 HDFS 上面,它需要用戶(hù)主動(dòng)進(jìn)行觸發(fā)。如果是用戶(hù)自定義開(kāi)發(fā)的實(shí)時(shí)程序,比如使用DataStream進(jìn)行開(kāi)發(fā),建議為每個(gè)算子定義一個(gè) uid,這樣我們?cè)谛薷淖鳂I(yè)時(shí),即使導(dǎo)致程序拓?fù)鋱D改變,由于相關(guān)算子 uid 沒(méi)有變,那么這些算子還能夠繼續(xù)使用之前的狀態(tài),如果用戶(hù)沒(méi)有定義 uid , Flink 會(huì)為每個(gè)算子自動(dòng)生成 uid,如果用戶(hù)修改了程序,可能導(dǎo)致之前的狀態(tài)程序不能再進(jìn)行復(fù)用。

Checkpoint 和 Savepoint 差異對(duì)比:

  1. 概念:Checkpoint 是 自動(dòng)容錯(cuò)機(jī)制 ,Savepoint 程序全局狀態(tài)鏡像 。
  2. 目的: Checkpoint 是程序自動(dòng)容錯(cuò),快速恢復(fù) 。Savepoint是 程序修改后繼續(xù)從狀態(tài)恢復(fù),程序升級(jí)等。
  3. 用戶(hù)交互:Checkpoint 是 Flink 系統(tǒng)行為 。Savepoint是用戶(hù)觸發(fā)。
  4. 狀態(tài)文件保留策略:Checkpoint默認(rèn)程序刪除,可以設(shè)置CheckpointConfig中的參數(shù)進(jìn)行保留 。Savepoint會(huì)一直保存,除非用戶(hù)刪除 。

2. Flink Checkpoint

2.1 Flink Checkpoint 原理

Flink Checkpoint 機(jī)制保證 Flink 任務(wù)運(yùn)行突然失敗時(shí),能夠從最近 Checkpoint 進(jìn)行狀態(tài)恢復(fù)啟動(dòng),進(jìn)行錯(cuò)誤容忍。它是一種自動(dòng)容錯(cuò)機(jī)制,而不是具體的狀態(tài)存儲(chǔ)鏡像。Flink Checkpoint 受 Chandy-Lamport 分布式快照啟發(fā),其內(nèi)部使用分布式數(shù)據(jù)流輕量級(jí)異步快照。

Checkpoint 保存的狀態(tài)在程序取消時(shí),默認(rèn)會(huì)進(jìn)行清除。Checkpoint 狀態(tài)保留策略有兩種:

DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示當(dāng)程序取消時(shí),刪除 Checkpoint 存儲(chǔ)文件。
RETAIN_ON_CANCELLATION 表示當(dāng)程序取消時(shí),保存之前的 Checkpoint 存儲(chǔ)文件
用戶(hù)可以結(jié)合業(yè)務(wù)情況,設(shè)置 Checkpoint 保留模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 開(kāi)啟 checkpoint */
env.enableCheckpointing(10000);
/** 設(shè)置 checkpoint 保留策略,取消程序時(shí),保留 checkpoint 狀態(tài)文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

默認(rèn)情況下,F(xiàn)link不會(huì)觸發(fā)一次 Checkpoint 當(dāng)系統(tǒng)有其他 Checkpoint 在進(jìn)行時(shí),也就是說(shuō) Checkpoint 默認(rèn)的并發(fā)為1。針對(duì) Flink DataStream 任務(wù),程序需要經(jīng)歷從 StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖四個(gè)步驟,其中在 ExecutionGraph 構(gòu)建時(shí),會(huì)初始化 CheckpointCoordinator。ExecutionGraph通過(guò)ExecutionGraphBuilder.buildGraph方法構(gòu)建,在構(gòu)建完時(shí),會(huì)調(diào)用 ExecutionGraph 的enableCheckpointing方法創(chuàng)建CheckpointCoordinator:

public void enableCheckpointing(
            CheckpointCoordinatorConfiguration chkConfig,
            List<ExecutionJobVertex> verticesToTrigger,
            List<ExecutionJobVertex> verticesToWaitFor,
            List<ExecutionJobVertex> verticesToCommitTo,
            List<MasterTriggerRestoreHook<?>> masterHooks,
            CheckpointIDCounter checkpointIDCounter,
            CompletedCheckpointStore checkpointStore,
            StateBackend checkpointStateBackend,
            CheckpointStatsTracker statsTracker) {
        // 前面部分代碼省略....


        // create the coordinator that triggers and commits checkpoints and holds the state
        checkpointCoordinator = new CheckpointCoordinator(
            jobInformation.getJobId(),
            chkConfig,
            tasksToTrigger,
            tasksToWaitFor,
            tasksToCommitTo,
            checkpointIDCounter,
            checkpointStore,
            checkpointStateBackend,
            ioExecutor,
            SharedStateRegistry.DEFAULT_FACTORY,
            failureManager);


        // register the master hooks on the checkpoint coordinator
        for (MasterTriggerRestoreHook<?> hook : masterHooks) {
            if (!checkpointCoordinator.addMasterHook(hook)) {
                LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
            }
        }
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    // 后面部分代碼省略....

CheckpoinCoordinator 是 Flink 任務(wù) Checkpoint 的關(guān)鍵,針對(duì)每一個(gè) Flink 任務(wù),都會(huì)初始化一個(gè) CheckpointCoordinator 類(lèi),來(lái)觸發(fā) Flink 任務(wù) Checkpoint。下面是 Flink 任務(wù) Checkpoint 大致流程:

Flink 會(huì)定時(shí)在任務(wù)的 Source Task 觸發(fā) Barrier,Barrier是一種特殊的消息事件,會(huì)隨著消息通道流入到下游的算子中。只有當(dāng)最后 Sink 端的算子接收到 Barrier 并確認(rèn)該次 Checkpoint 完成時(shí),該次 Checkpoint 才算完成。所以在某些算子的 Task 有多個(gè)輸入時(shí),會(huì)存在 Barrier 對(duì)齊時(shí)間,我們可以在Web UI上面看到各個(gè) Task 的Barrier 對(duì)齊時(shí)間

2.2 Flink Checkpoint 語(yǔ)義

Flink Checkpoint 支持兩種語(yǔ)義:Exactly OnceAt least Once,默認(rèn)的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具體是針對(duì) Flink 狀態(tài)而言。具體語(yǔ)義含義如下:

Exactly Once 含義是:保證每條數(shù)據(jù)對(duì)于 Flink 的狀態(tài)結(jié)果只影響一次。打個(gè)比方,比如 WordCount程序,目前實(shí)時(shí)統(tǒng)計(jì)的 "hello" 這個(gè)單詞數(shù)為5,同時(shí)這個(gè)結(jié)果在這次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又來(lái)2個(gè) "hello" 單詞,突然程序遇到外部異常容錯(cuò)自動(dòng)回復(fù),從最近的 Checkpoint 點(diǎn)開(kāi)始恢復(fù),那么會(huì)從單詞數(shù) 5 這個(gè)狀態(tài)開(kāi)始恢復(fù),Kafka 消費(fèi)的數(shù)據(jù)點(diǎn)位還是狀態(tài) 5 這個(gè)時(shí)候的點(diǎn)位開(kāi)始計(jì)算,所以即使程序遇到外部異常自我恢復(fù),也不會(huì)影響到 Flink 狀態(tài)的結(jié)果。

At Least Once 含義是:每條數(shù)據(jù)對(duì)于 Flink 狀態(tài)計(jì)算至少影響一次。比如在 WordCount 程序中,你統(tǒng)計(jì)到的某個(gè)單詞的單詞數(shù)可能會(huì)比真實(shí)的單詞數(shù)要大,因?yàn)橥粭l消息,你可能將其計(jì)算多次。

Flink 中 Exactly Once 和 At Least Once 具體是針對(duì) Flink 任務(wù)狀態(tài)而言的,并不是 Flink 程序?qū)ζ涮幚硪淮?。舉個(gè)例子,當(dāng)前 Flink 任務(wù)正在做 Checkpoint,該次Checkpoint還么有完成,該次 Checkpoint 時(shí)間端的數(shù)據(jù)其實(shí)已經(jīng)進(jìn)入 Flink 程序處理,只是程序狀態(tài)沒(méi)有最終存儲(chǔ)到遠(yuǎn)程存儲(chǔ)。當(dāng)程序突然遇到異常,進(jìn)行容錯(cuò)恢復(fù),那么就會(huì)從最新的 Checkpoint 進(jìn)行狀態(tài)恢復(fù)重啟,上一部分還會(huì)進(jìn)入 Flink 系統(tǒng)處理:

checkpoint失敗.png

上圖中表示,在進(jìn)行 chk-5 Checkpoint 時(shí),突然遇到程序異常,那么會(huì)從 chk-4 進(jìn)行恢復(fù),那么之前chk-5 處理的數(shù)據(jù),會(huì)再次進(jìn)行處理。

Exactly Once 和 At Least Once 具體在底層實(shí)現(xiàn)大致相同,具體差異表現(xiàn)在 Barrier 對(duì)齊方式處理:

如果是 Exactly Once 模式,某個(gè)算子的 Task 有多個(gè)輸入通道時(shí),當(dāng)其中一個(gè)輸入通道收到 Barrier 時(shí),F(xiàn)link Task 會(huì)阻塞處理該通道,其不會(huì)處理這些數(shù)據(jù),但是會(huì)將這些數(shù)據(jù)存儲(chǔ)到內(nèi)部緩存中,一旦完成了所有輸入通道的 Barrier 對(duì)齊,才會(huì)繼續(xù)對(duì)這些數(shù)據(jù)進(jìn)行消費(fèi)處理。

對(duì)于 At least Once,同樣針對(duì)某個(gè)算子的 Task 有多個(gè)輸入通道的情況下,當(dāng)某個(gè)輸入通道接收到 Barrier 時(shí),它不同于Exactly Once,At Least Once 會(huì)繼續(xù)處理接受到的數(shù)據(jù),即使沒(méi)有完成所有輸入通道 Barrier 對(duì)齊。所以使用At Least Once 不能保證數(shù)據(jù)對(duì)于狀態(tài)計(jì)算只有一次影響。

2.4 Flink Checkpoint 參數(shù)配置及建議

  1. 當(dāng) Checkpoint 時(shí)間比設(shè)置的 Checkpoint 間隔時(shí)間要長(zhǎng)時(shí),可以設(shè)置 Checkpoint 間最小時(shí)間間隔 。這樣在上次 Checkpoint 完成時(shí),不會(huì)立馬進(jìn)行下一次 Checkpoint,而是會(huì)等待一個(gè)最小時(shí)間間隔,然后在進(jìn)行該次 Checkpoint。否則,每次 Checkpoint 完成時(shí),就會(huì)立馬開(kāi)始下一次 Checkpoint,系統(tǒng)會(huì)有很多資源消耗 Checkpoint。
  1. 如果Flink狀態(tài)很大,在進(jìn)行恢復(fù)時(shí),需要從遠(yuǎn)程存儲(chǔ)讀取狀態(tài)恢復(fù),此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢,可以設(shè)置 Flink Task 本地狀態(tài)恢復(fù)。任務(wù)狀態(tài)本地恢復(fù)默認(rèn)沒(méi)有開(kāi)啟,可以設(shè)置參數(shù)state.backend.local-recovery值為true進(jìn)行激活。
  1. Checkpoint保存數(shù),Checkpoint 保存數(shù)默認(rèn)是1,也就是保存最新的 Checkpoint 文件,當(dāng)進(jìn)行狀態(tài)恢復(fù)時(shí),如果最新的Checkpoint文件不可用時(shí)(比如HDFS文件所有副本都損壞或者其他原因),那么狀態(tài)恢復(fù)就會(huì)失敗,如果設(shè)置 Checkpoint 保存數(shù)2,即使最新的Checkpoint恢復(fù)失敗,那么Flink 會(huì)回滾到之前那一次Checkpoint進(jìn)行恢復(fù)。考慮到這種情況,用戶(hù)可以增加 Checkpoint 保存數(shù)。
  1. 建議設(shè)置的 Checkpoint 的間隔時(shí)間最好大于 Checkpoint 的完成時(shí)間。

下圖是不設(shè)置 Checkpoint 最小時(shí)間間隔示例圖,可以看到,系統(tǒng)一致在進(jìn)行 Checkpoint,可能對(duì)運(yùn)行的任務(wù)產(chǎn)生一定影響:

3. Flink Savepoint

3.1 Flink Savepoint 原理

Flink Savepoint 作為實(shí)時(shí)任務(wù)的全局鏡像,其在底層使用的代碼和Checkpoint的代碼是一樣的,因?yàn)镾avepoint可以看做 Checkpoint在特定時(shí)期的一個(gè)狀態(tài)快照。

Flink 在觸發(fā)Savepoint 或者 Checkpoint時(shí),會(huì)根據(jù)這次觸發(fā)的類(lèi)型計(jì)算出在HDFS上面的目錄:


checkpoint和Savepoint目錄.png

如果類(lèi)型是 Savepoint,那么 其 HDFS 上面的目錄為:Savepoint 根目錄+savepoint-jobid前六位+隨機(jī)數(shù)字,具體如下格式:


savepoint目錄.png

Checkpoint 目錄為 chk-checkpoint ID,具體格式如下:


checkpoint目錄.png

一次 Savepoint 目錄下面至少包括一個(gè)文件,既 _metadata文件。當(dāng)然如果實(shí)時(shí)任務(wù)某些算子有狀態(tài)的話(huà),那么在 這次 Savepoint目錄下面會(huì)包含一個(gè) _metadata 文件以及多個(gè)狀態(tài)數(shù)據(jù)文件。_metadata文件以絕對(duì)路徑的形式指向狀態(tài)文件的指針。

社區(qū)方面,在以前的 Flink 版本,當(dāng)用戶(hù)選擇不同的狀態(tài)存儲(chǔ),其底層狀態(tài)存儲(chǔ)的二進(jìn)制格式都不相同。針對(duì)這種情況,目前 FLIP-41 對(duì)于 Keyed State 使用統(tǒng)一的二進(jìn)制文件進(jìn)行存儲(chǔ)。這里的 Keyed State 主要是針對(duì) Savepoint 的狀態(tài),Checkpoint 狀態(tài)的存儲(chǔ)可以根據(jù)具體的狀態(tài)后端進(jìn)行存儲(chǔ),允許狀態(tài)存儲(chǔ)底層格式的差異。對(duì)于 Savepoint 狀態(tài)底層格式的統(tǒng)一,應(yīng)用的狀態(tài)可以在不同的狀態(tài)后端進(jìn)行遷移,更方便應(yīng)用程序的恢復(fù)。重做與狀態(tài)快照和恢復(fù)相關(guān)的抽象,當(dāng)實(shí)現(xiàn)實(shí)現(xiàn)新?tīng)顟B(tài)后端時(shí),可以降低開(kāi)銷(xiāo),同時(shí)減少代碼重復(fù)。

3.2 Flink Savepoint 觸發(fā)方式

Flink Savepoint 觸發(fā)方式目前有三種:

  1. 使用 flink savepoint 命令觸發(fā) Savepoint,其是在程序運(yùn)行期間觸發(fā) savepoint,
  2. 使用 flink cancel -s 命令,取消作業(yè)時(shí),并觸發(fā) Savepoint.
  3. 使用 Rest API 觸發(fā) Savepoint,格式為:**/jobs/:jobid /savepoints**

3.3 Flink Savepoint 注意點(diǎn)

  1. 使用 flink cancel -s 命令取消作業(yè)同時(shí)觸發(fā) Savepoint 時(shí),會(huì)有一個(gè)問(wèn)題,可能存在觸發(fā) Savepoint 失敗。比如實(shí)時(shí)程序處于異常狀態(tài)(比如 Checkpoint失敗),而此時(shí)你停止作業(yè),同時(shí)觸發(fā) Savepoint,這次 Savepoint 就會(huì)失敗,這種情況會(huì)導(dǎo)致,在實(shí)時(shí)平臺(tái)上面看到任務(wù)已經(jīng)停止,但是實(shí)際實(shí)時(shí)作業(yè)在 Yarn 還在運(yùn)行。針對(duì)這種情況,需要捕獲觸發(fā) Savepoint 失敗的異常,當(dāng)拋出異常時(shí),可以直接在 Yarn 上面 Kill 掉該任務(wù)。
  2. 使用 DataStream 程序開(kāi)發(fā)時(shí),最好為每個(gè)算子分配 uid,這樣即使作業(yè)拓?fù)鋱D變了,相關(guān)算子還是能夠從之前的狀態(tài)進(jìn)行恢復(fù),默認(rèn)情況下,F(xiàn)link 會(huì)為每個(gè)算子分配 uid,這種情況下,當(dāng)你改變了程序的某些邏輯時(shí),可能導(dǎo)致算子的 uid 發(fā)生改變,那么之前的狀態(tài)數(shù)據(jù),就不能進(jìn)行復(fù)用,程序在啟動(dòng)的時(shí)候,就會(huì)報(bào)錯(cuò)。
  3. 由于 Savepoint 是程序的全局狀態(tài),對(duì)于某些狀態(tài)很大的實(shí)時(shí)任務(wù),當(dāng)我們觸發(fā) Savepoint,可能會(huì)對(duì)運(yùn)行著的實(shí)時(shí)任務(wù)產(chǎn)生影響,個(gè)人建議如果對(duì)于狀態(tài)過(guò)大的實(shí)時(shí)任務(wù),觸發(fā) Savepoint 的時(shí)間,不要太過(guò)頻繁。根據(jù)狀態(tài)的大小,適當(dāng)?shù)脑O(shè)置觸發(fā)時(shí)間。
  4. 當(dāng)我們從 Savepoint 進(jìn)行恢復(fù)時(shí),需要檢查這次 Savepoint 目錄文件是否可用??赡艽嬖谀闵洗斡|發(fā) Savepoint 沒(méi)有成功,導(dǎo)致 HDFS 目錄上面 Savepoint 文件不可用或者缺少數(shù)據(jù)文件等,這種情況下,如果在指定損壞的 Savepoint 的狀態(tài)目錄進(jìn)行狀態(tài)恢復(fù),任務(wù)會(huì)啟動(dòng)不起來(lái)。

5. 總結(jié)

本文沒(méi)有過(guò)多的講述源碼,考慮大家的都能夠讀懂,其語(yǔ)言竟可能通俗一一點(diǎn)。如果有需要改進(jìn)的地方,希望大家能夠指出。后續(xù)我會(huì)不斷的和大家一起大數(shù)據(jù)相關(guān)的技術(shù),和大家一起交流學(xué)習(xí)。

參考資料

  1. FLIP-41 Unify Binary format for Keyed State
  2. FlIP-47-Checkpoints vs Savepoints
  3. Apache Kafka Connector
  4. Flink Savepoints
  5. Flink Checkpoints
  6. Flink Checkpointing

更多程序員面試經(jīng)驗(yàn)、大數(shù)據(jù)技術(shù)、互聯(lián)網(wǎng)科技分享,請(qǐng)關(guān)注公眾號(hào):LakeShen,上面會(huì)進(jìn)行第一時(shí)間首發(fā):

LakeShen.jpg

?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容