Flink實時計算-深入理解Checkpoint和Savepoint

我是LakeShen,專注大數(shù)據(jù)技術(shù)分享,程序員經(jīng)驗分享,互聯(lián)網(wǎng)科技分享。如果我的文章對你有幫助,希望你能點贊或者關(guān)注我,你的鼓勵,就是我前進的最大動力。關(guān)注就完事了。

前言

為了保證程序的容錯恢復(fù)以及程序啟動時其狀態(tài)恢復(fù),幾乎所有的 Flink 實時任務(wù)都會開啟 Checkpoint 或者觸發(fā) Savepoint 進行狀態(tài)保存。為了使得用戶更加理解這兩點區(qū)別,本文結(jié)合 Flink 1.9 版本,重點講述 Flink Checkpoint,Savepoint 相關(guān)概念以及注意事項,使得用戶能夠更好的開發(fā)實時任務(wù)。

1. Checkpoint,Savepoint 異同

首先,為什么會在文章開頭對這兩點進行介紹,因為有時候用戶在開發(fā)實時任務(wù)時,會對這兩點產(chǎn)生困惑,所以這里直接開門見山對這兩點進行講解。

Flink Checkpoint 是一種容錯恢復(fù)機制。這種機制保證了實時程序運行時,即使突然遇到異常也能夠進行自我恢復(fù)。Checkpoint 對于用戶層面,是透明的,用戶會感覺程序一直在運行。Flink Checkpoint 是 Flink 自身的系統(tǒng)行為,用戶無法對其進行交互,用戶可以在程序啟動之前,設(shè)置好實時程序 Checkpoint 相關(guān)參數(shù),當程序啟動之后,剩下的就全交給 Flink 自行管理。當然在某些情況,比如 Flink On Yarn 模式,某個 Container 發(fā)生 OOM 異常,這種情況程序直接變成失敗狀態(tài),此時 Flink 程序雖然開啟 Checkpoint 也無法恢復(fù),因為程序已經(jīng)變成失敗狀態(tài),所以此時可以借助外部參與啟動程序,比如外部程序檢測到實時任務(wù)失敗時,從新對實時任務(wù)進行拉起。

Flink Savepoint 你可以把它當做在某個時間點程序狀態(tài)全局鏡像,以后程序在進行升級,或者修改并發(fā)度等情況,還能從保存的狀態(tài)位繼續(xù)啟動恢復(fù)。Flink Savepoint 一般存儲在 HDFS 上面,它需要用戶主動進行觸發(fā)。如果是用戶自定義開發(fā)的實時程序,比如使用DataStream進行開發(fā),建議為每個算子定義一個 uid,這樣我們在修改作業(yè)時,即使導(dǎo)致程序拓撲圖改變,由于相關(guān)算子 uid 沒有變,那么這些算子還能夠繼續(xù)使用之前的狀態(tài),如果用戶沒有定義 uid , Flink 會為每個算子自動生成 uid,如果用戶修改了程序,可能導(dǎo)致之前的狀態(tài)程序不能再進行復(fù)用。

Checkpoint 和 Savepoint 差異對比:

  1. 概念:Checkpoint 是 自動容錯機制 ,Savepoint 程序全局狀態(tài)鏡像 。
  2. 目的: Checkpoint 是程序自動容錯,快速恢復(fù) 。Savepoint是 程序修改后繼續(xù)從狀態(tài)恢復(fù),程序升級等。
  3. 用戶交互:Checkpoint 是 Flink 系統(tǒng)行為 。Savepoint是用戶觸發(fā)。
  4. 狀態(tài)文件保留策略:Checkpoint默認程序刪除,可以設(shè)置CheckpointConfig中的參數(shù)進行保留 。Savepoint會一直保存,除非用戶刪除 。

2. Flink Checkpoint

2.1 Flink Checkpoint 原理

Flink Checkpoint 機制保證 Flink 任務(wù)運行突然失敗時,能夠從最近 Checkpoint 進行狀態(tài)恢復(fù)啟動,進行錯誤容忍。它是一種自動容錯機制,而不是具體的狀態(tài)存儲鏡像。Flink Checkpoint 受 Chandy-Lamport 分布式快照啟發(fā),其內(nèi)部使用分布式數(shù)據(jù)流輕量級異步快照。

Checkpoint 保存的狀態(tài)在程序取消時,默認會進行清除。Checkpoint 狀態(tài)保留策略有兩種:

DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示當程序取消時,刪除 Checkpoint 存儲文件。
RETAIN_ON_CANCELLATION 表示當程序取消時,保存之前的 Checkpoint 存儲文件
用戶可以結(jié)合業(yè)務(wù)情況,設(shè)置 Checkpoint 保留模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 開啟 checkpoint */
env.enableCheckpointing(10000);
/** 設(shè)置 checkpoint 保留策略,取消程序時,保留 checkpoint 狀態(tài)文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

默認情況下,F(xiàn)link不會觸發(fā)一次 Checkpoint 當系統(tǒng)有其他 Checkpoint 在進行時,也就是說 Checkpoint 默認的并發(fā)為1。針對 Flink DataStream 任務(wù),程序需要經(jīng)歷從 StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖四個步驟,其中在 ExecutionGraph 構(gòu)建時,會初始化 CheckpointCoordinator。ExecutionGraph通過ExecutionGraphBuilder.buildGraph方法構(gòu)建,在構(gòu)建完時,會調(diào)用 ExecutionGraph 的enableCheckpointing方法創(chuàng)建CheckpointCoordinator:

public void enableCheckpointing(
            CheckpointCoordinatorConfiguration chkConfig,
            List<ExecutionJobVertex> verticesToTrigger,
            List<ExecutionJobVertex> verticesToWaitFor,
            List<ExecutionJobVertex> verticesToCommitTo,
            List<MasterTriggerRestoreHook<?>> masterHooks,
            CheckpointIDCounter checkpointIDCounter,
            CompletedCheckpointStore checkpointStore,
            StateBackend checkpointStateBackend,
            CheckpointStatsTracker statsTracker) {
        // 前面部分代碼省略....


        // create the coordinator that triggers and commits checkpoints and holds the state
        checkpointCoordinator = new CheckpointCoordinator(
            jobInformation.getJobId(),
            chkConfig,
            tasksToTrigger,
            tasksToWaitFor,
            tasksToCommitTo,
            checkpointIDCounter,
            checkpointStore,
            checkpointStateBackend,
            ioExecutor,
            SharedStateRegistry.DEFAULT_FACTORY,
            failureManager);


        // register the master hooks on the checkpoint coordinator
        for (MasterTriggerRestoreHook<?> hook : masterHooks) {
            if (!checkpointCoordinator.addMasterHook(hook)) {
                LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
            }
        }
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    // 后面部分代碼省略....

CheckpoinCoordinator 是 Flink 任務(wù) Checkpoint 的關(guān)鍵,針對每一個 Flink 任務(wù),都會初始化一個 CheckpointCoordinator 類,來觸發(fā) Flink 任務(wù) Checkpoint。下面是 Flink 任務(wù) Checkpoint 大致流程:

Flink 會定時在任務(wù)的 Source Task 觸發(fā) Barrier,Barrier是一種特殊的消息事件,會隨著消息通道流入到下游的算子中。只有當最后 Sink 端的算子接收到 Barrier 并確認該次 Checkpoint 完成時,該次 Checkpoint 才算完成。所以在某些算子的 Task 有多個輸入時,會存在 Barrier 對齊時間,我們可以在Web UI上面看到各個 Task 的Barrier 對齊時間

2.2 Flink Checkpoint 語義

Flink Checkpoint 支持兩種語義:Exactly OnceAt least Once,默認的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具體是針對 Flink 狀態(tài)而言。具體語義含義如下:

Exactly Once 含義是:保證每條數(shù)據(jù)對于 Flink 的狀態(tài)結(jié)果只影響一次。打個比方,比如 WordCount程序,目前實時統(tǒng)計的 "hello" 這個單詞數(shù)為5,同時這個結(jié)果在這次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又來2個 "hello" 單詞,突然程序遇到外部異常容錯自動回復(fù),從最近的 Checkpoint 點開始恢復(fù),那么會從單詞數(shù) 5 這個狀態(tài)開始恢復(fù),Kafka 消費的數(shù)據(jù)點位還是狀態(tài) 5 這個時候的點位開始計算,所以即使程序遇到外部異常自我恢復(fù),也不會影響到 Flink 狀態(tài)的結(jié)果。

At Least Once 含義是:每條數(shù)據(jù)對于 Flink 狀態(tài)計算至少影響一次。比如在 WordCount 程序中,你統(tǒng)計到的某個單詞的單詞數(shù)可能會比真實的單詞數(shù)要大,因為同一條消息,你可能將其計算多次。

Flink 中 Exactly Once 和 At Least Once 具體是針對 Flink 任務(wù)狀態(tài)而言的,并不是 Flink 程序?qū)ζ涮幚硪淮?。舉個例子,當前 Flink 任務(wù)正在做 Checkpoint,該次Checkpoint還么有完成,該次 Checkpoint 時間端的數(shù)據(jù)其實已經(jīng)進入 Flink 程序處理,只是程序狀態(tài)沒有最終存儲到遠程存儲。當程序突然遇到異常,進行容錯恢復(fù),那么就會從最新的 Checkpoint 進行狀態(tài)恢復(fù)重啟,上一部分還會進入 Flink 系統(tǒng)處理:

checkpoint失敗.png

上圖中表示,在進行 chk-5 Checkpoint 時,突然遇到程序異常,那么會從 chk-4 進行恢復(fù),那么之前chk-5 處理的數(shù)據(jù),會再次進行處理。

Exactly Once 和 At Least Once 具體在底層實現(xiàn)大致相同,具體差異表現(xiàn)在 Barrier 對齊方式處理:

如果是 Exactly Once 模式,某個算子的 Task 有多個輸入通道時,當其中一個輸入通道收到 Barrier 時,F(xiàn)link Task 會阻塞處理該通道,其不會處理這些數(shù)據(jù),但是會將這些數(shù)據(jù)存儲到內(nèi)部緩存中,一旦完成了所有輸入通道的 Barrier 對齊,才會繼續(xù)對這些數(shù)據(jù)進行消費處理。

對于 At least Once,同樣針對某個算子的 Task 有多個輸入通道的情況下,當某個輸入通道接收到 Barrier 時,它不同于Exactly Once,At Least Once 會繼續(xù)處理接受到的數(shù)據(jù),即使沒有完成所有輸入通道 Barrier 對齊。所以使用At Least Once 不能保證數(shù)據(jù)對于狀態(tài)計算只有一次影響。

2.4 Flink Checkpoint 參數(shù)配置及建議

  1. 當 Checkpoint 時間比設(shè)置的 Checkpoint 間隔時間要長時,可以設(shè)置 Checkpoint 間最小時間間隔 。這樣在上次 Checkpoint 完成時,不會立馬進行下一次 Checkpoint,而是會等待一個最小時間間隔,然后在進行該次 Checkpoint。否則,每次 Checkpoint 完成時,就會立馬開始下一次 Checkpoint,系統(tǒng)會有很多資源消耗 Checkpoint。
  1. 如果Flink狀態(tài)很大,在進行恢復(fù)時,需要從遠程存儲讀取狀態(tài)恢復(fù),此時可能導(dǎo)致任務(wù)恢復(fù)很慢,可以設(shè)置 Flink Task 本地狀態(tài)恢復(fù)。任務(wù)狀態(tài)本地恢復(fù)默認沒有開啟,可以設(shè)置參數(shù)state.backend.local-recovery值為true進行激活。
  1. Checkpoint保存數(shù),Checkpoint 保存數(shù)默認是1,也就是保存最新的 Checkpoint 文件,當進行狀態(tài)恢復(fù)時,如果最新的Checkpoint文件不可用時(比如HDFS文件所有副本都損壞或者其他原因),那么狀態(tài)恢復(fù)就會失敗,如果設(shè)置 Checkpoint 保存數(shù)2,即使最新的Checkpoint恢復(fù)失敗,那么Flink 會回滾到之前那一次Checkpoint進行恢復(fù)。考慮到這種情況,用戶可以增加 Checkpoint 保存數(shù)。
  1. 建議設(shè)置的 Checkpoint 的間隔時間最好大于 Checkpoint 的完成時間。

下圖是不設(shè)置 Checkpoint 最小時間間隔示例圖,可以看到,系統(tǒng)一致在進行 Checkpoint,可能對運行的任務(wù)產(chǎn)生一定影響:

3. Flink Savepoint

3.1 Flink Savepoint 原理

Flink Savepoint 作為實時任務(wù)的全局鏡像,其在底層使用的代碼和Checkpoint的代碼是一樣的,因為Savepoint可以看做 Checkpoint在特定時期的一個狀態(tài)快照。

Flink 在觸發(fā)Savepoint 或者 Checkpoint時,會根據(jù)這次觸發(fā)的類型計算出在HDFS上面的目錄:


checkpoint和Savepoint目錄.png

如果類型是 Savepoint,那么 其 HDFS 上面的目錄為:Savepoint 根目錄+savepoint-jobid前六位+隨機數(shù)字,具體如下格式:


savepoint目錄.png

Checkpoint 目錄為 chk-checkpoint ID,具體格式如下:


checkpoint目錄.png

一次 Savepoint 目錄下面至少包括一個文件,既 _metadata文件。當然如果實時任務(wù)某些算子有狀態(tài)的話,那么在 這次 Savepoint目錄下面會包含一個 _metadata 文件以及多個狀態(tài)數(shù)據(jù)文件。_metadata文件以絕對路徑的形式指向狀態(tài)文件的指針。

社區(qū)方面,在以前的 Flink 版本,當用戶選擇不同的狀態(tài)存儲,其底層狀態(tài)存儲的二進制格式都不相同。針對這種情況,目前 FLIP-41 對于 Keyed State 使用統(tǒng)一的二進制文件進行存儲。這里的 Keyed State 主要是針對 Savepoint 的狀態(tài),Checkpoint 狀態(tài)的存儲可以根據(jù)具體的狀態(tài)后端進行存儲,允許狀態(tài)存儲底層格式的差異。對于 Savepoint 狀態(tài)底層格式的統(tǒng)一,應(yīng)用的狀態(tài)可以在不同的狀態(tài)后端進行遷移,更方便應(yīng)用程序的恢復(fù)。重做與狀態(tài)快照和恢復(fù)相關(guān)的抽象,當實現(xiàn)實現(xiàn)新狀態(tài)后端時,可以降低開銷,同時減少代碼重復(fù)。

3.2 Flink Savepoint 觸發(fā)方式

Flink Savepoint 觸發(fā)方式目前有三種:

  1. 使用 flink savepoint 命令觸發(fā) Savepoint,其是在程序運行期間觸發(fā) savepoint,
  2. 使用 flink cancel -s 命令,取消作業(yè)時,并觸發(fā) Savepoint.
  3. 使用 Rest API 觸發(fā) Savepoint,格式為:**/jobs/:jobid /savepoints**

3.3 Flink Savepoint 注意點

  1. 使用 flink cancel -s 命令取消作業(yè)同時觸發(fā) Savepoint 時,會有一個問題,可能存在觸發(fā) Savepoint 失敗。比如實時程序處于異常狀態(tài)(比如 Checkpoint失敗),而此時你停止作業(yè),同時觸發(fā) Savepoint,這次 Savepoint 就會失敗,這種情況會導(dǎo)致,在實時平臺上面看到任務(wù)已經(jīng)停止,但是實際實時作業(yè)在 Yarn 還在運行。針對這種情況,需要捕獲觸發(fā) Savepoint 失敗的異常,當拋出異常時,可以直接在 Yarn 上面 Kill 掉該任務(wù)。
  2. 使用 DataStream 程序開發(fā)時,最好為每個算子分配 uid,這樣即使作業(yè)拓撲圖變了,相關(guān)算子還是能夠從之前的狀態(tài)進行恢復(fù),默認情況下,F(xiàn)link 會為每個算子分配 uid,這種情況下,當你改變了程序的某些邏輯時,可能導(dǎo)致算子的 uid 發(fā)生改變,那么之前的狀態(tài)數(shù)據(jù),就不能進行復(fù)用,程序在啟動的時候,就會報錯。
  3. 由于 Savepoint 是程序的全局狀態(tài),對于某些狀態(tài)很大的實時任務(wù),當我們觸發(fā) Savepoint,可能會對運行著的實時任務(wù)產(chǎn)生影響,個人建議如果對于狀態(tài)過大的實時任務(wù),觸發(fā) Savepoint 的時間,不要太過頻繁。根據(jù)狀態(tài)的大小,適當?shù)脑O(shè)置觸發(fā)時間。
  4. 當我們從 Savepoint 進行恢復(fù)時,需要檢查這次 Savepoint 目錄文件是否可用??赡艽嬖谀闵洗斡|發(fā) Savepoint 沒有成功,導(dǎo)致 HDFS 目錄上面 Savepoint 文件不可用或者缺少數(shù)據(jù)文件等,這種情況下,如果在指定損壞的 Savepoint 的狀態(tài)目錄進行狀態(tài)恢復(fù),任務(wù)會啟動不起來。

5. 總結(jié)

本文沒有過多的講述源碼,考慮大家的都能夠讀懂,其語言竟可能通俗一一點。如果有需要改進的地方,希望大家能夠指出。后續(xù)我會不斷的和大家一起大數(shù)據(jù)相關(guān)的技術(shù),和大家一起交流學(xué)習(xí)。

參考資料

  1. FLIP-41 Unify Binary format for Keyed State
  2. FlIP-47-Checkpoints vs Savepoints
  3. Apache Kafka Connector
  4. Flink Savepoints
  5. Flink Checkpoints
  6. Flink Checkpointing

更多程序員面試經(jīng)驗、大數(shù)據(jù)技術(shù)、互聯(lián)網(wǎng)科技分享,請關(guān)注公眾號:LakeShen,上面會進行第一時間首發(fā):

LakeShen.jpg

?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容