Flink Checkpoint 原理流程以及常見(jiàn)失敗原因分析

本文僅為筆者平日學(xué)習(xí)記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/dKpYz-YvySAyAEFCq5_dGA

前言

目前實(shí)時(shí)任務(wù)主要以 Flink 為主,為了保證實(shí)時(shí)任務(wù)的容錯(cuò)恢復(fù)以及停止重啟時(shí)的狀態(tài)恢復(fù),幾乎所有的實(shí)時(shí)任務(wù)都會(huì)開(kāi)啟 Checkpoint 或者觸發(fā) Savepoint 進(jìn)行狀態(tài)保存。由于 Savepoint 底層原理的實(shí)現(xiàn)和 Checkpoint 幾乎一致,本文結(jié)合 Flink 1.9 版本,重點(diǎn)講述 Flink Checkpoint 原理流程以及常見(jiàn)原因分析,讓用戶(hù)能夠更好的理解 Flink Checkpoint,從而開(kāi)發(fā)出更健壯的實(shí)時(shí)任務(wù)。

一、 什么是 Flink Checkpoint 和狀態(tài)

1.1 Flink Checkpoint 是什么

Flink Checkpoint 是一種容錯(cuò)恢復(fù)機(jī)制。這種機(jī)制保證了實(shí)時(shí)程序運(yùn)行時(shí),即使突然遇到異常或者機(jī)器問(wèn)題時(shí)也能夠進(jìn)行自我恢復(fù)。Flink Checkpoint 對(duì)于用戶(hù)層面來(lái)說(shuō),是透明的,用戶(hù)會(huì)感覺(jué)實(shí)時(shí)任務(wù)一直在運(yùn)行。

Flink Checkpoint 是 Flink 自身的系統(tǒng)行為,用戶(hù)無(wú)法對(duì)其進(jìn)行交互,用戶(hù)可以在程序啟動(dòng)之前,設(shè)置好實(shí)時(shí)任務(wù) Checkpoint 相關(guān)的參數(shù),當(dāng)任務(wù)啟動(dòng)之后,剩下的就全交給 Flink 自行管理。

1.2 為什么要開(kāi)啟 Checkpoint

實(shí)時(shí)任務(wù)不同于批處理任務(wù),除非用戶(hù)主動(dòng)停止,一般會(huì)一直運(yùn)行,運(yùn)行的過(guò)程中可能存在機(jī)器故障、網(wǎng)絡(luò)問(wèn)題、外界存儲(chǔ)問(wèn)題等等,要想實(shí)時(shí)任務(wù)一直能夠穩(wěn)定運(yùn)行,實(shí)時(shí)任務(wù)要有自動(dòng)容錯(cuò)恢復(fù)的功能。而批處理任務(wù)在遇到異常情況時(shí),在重新計(jì)算一遍即可。實(shí)時(shí)任務(wù)因?yàn)闀?huì)一直運(yùn)行的特性,如果在從頭開(kāi)始計(jì)算,成本會(huì)很大,尤其是對(duì)于那種運(yùn)行時(shí)間很久的實(shí)時(shí)任務(wù)來(lái)說(shuō)。

實(shí)時(shí)任務(wù)開(kāi)啟 Checkpoint 功能,也能夠減少容錯(cuò)恢復(fù)的時(shí)間。因?yàn)槊看味际菑淖钚碌?Chekpoint 點(diǎn)位開(kāi)始狀態(tài)恢復(fù),而不是從程序啟動(dòng)的狀態(tài)開(kāi)始恢復(fù)。舉個(gè)列子,如果你有一個(gè)運(yùn)行一年的實(shí)時(shí)任務(wù),如果容錯(cuò)恢復(fù)是從一年前啟動(dòng)時(shí)的狀態(tài)恢復(fù),實(shí)時(shí)任務(wù)可能需要運(yùn)行很久才能恢復(fù)到現(xiàn)在狀態(tài),這一般是業(yè)務(wù)方所不允許的。

1.3 Flink 任務(wù)狀態(tài)是什么

Flink Checkpoint 會(huì)將實(shí)時(shí)任務(wù)的狀態(tài)存儲(chǔ)到遠(yuǎn)端存儲(chǔ),比如 HDFS ,亞馬遜的 S3 等等。Flink 任務(wù)狀態(tài)可以理解為實(shí)時(shí)任務(wù)計(jì)算過(guò)程中,中間產(chǎn)生的數(shù)據(jù)結(jié)果,同時(shí)這些計(jì)算結(jié)果會(huì)在后續(xù)實(shí)時(shí)任務(wù)處理時(shí),能夠繼續(xù)進(jìn)行使用。實(shí)時(shí)任務(wù)的狀態(tài)可以是一個(gè)聚合結(jié)果值,比如 WordCount 統(tǒng)計(jì)的每個(gè)單詞的數(shù)量,也可以是消息流中的明細(xì)數(shù)據(jù)。

Flink 任務(wù)狀態(tài)整體可以劃分兩種:Operator 狀態(tài)和 KeyedState。常見(jiàn)的 Operator 狀態(tài),比如 Kafka Topic 每個(gè)分區(qū)的偏移量。KeyedState 是基于 KeyedStream 來(lái)使用的,所以在使用前,你需要對(duì)你的流通過(guò) keyby 來(lái)進(jìn)行分區(qū),常見(jiàn)的狀態(tài)比如有 MapState、ListState、ValueState 等等。

下面是一個(gè)實(shí)時(shí)計(jì)算奇數(shù)和偶數(shù)的任務(wù)的示例:

在上圖中,假如輸入的流來(lái)自于 Kafka ,那么 Kafka Topic 分區(qū)的偏移量是狀態(tài),所有奇數(shù)的和、所有偶數(shù)的和也都是狀態(tài)。

二、 Flink Checkpoint 流程和原理

2.1 開(kāi)啟 Checkpoint 功能

想要使用 Flink Checkpoint 功能,首先是要在實(shí)時(shí)任務(wù)開(kāi)啟 Checkpoint。Flink 默認(rèn)情況下是關(guān)閉 Checkpoint 功能,下面代碼是開(kāi)啟 Checkpoint :

上述代碼中,設(shè)置了 Flink Checkpoint 的間隔 3 秒,設(shè)置的 Checkpoint 的語(yǔ)義為 EXACTLY_ONCE。Flink 默認(rèn)的 Checkpoint 語(yǔ)義為 EXACTLY_ONCE。上述代碼也使用 RocksDBStateBackend 進(jìn)行狀態(tài)存儲(chǔ)。用戶(hù)也可以自己設(shè)置 Flink Checkpoint 的參數(shù),通過(guò) CheckpointConfig 這個(gè)類(lèi)進(jìn)行設(shè)置,代碼如下:

CheckpointConfig
 chkConfig = env.getCheckpointConfig();
/** 調(diào)用 CheckpointConfig 各種 set 方法 */
chkConfig.setXXX

2.2 Flink 一次 Checkpoint 的參與者

Flink 整體作業(yè)采用主從架構(gòu),Master 為 JobManager,Slave 為 TaskManager,Client 則是負(fù)責(zé)提交用戶(hù)實(shí)時(shí)任務(wù)的代碼邏輯 ,F(xiàn)link 整體框架圖如下圖所示:

JobManager 主要負(fù)責(zé)實(shí)時(shí)任務(wù)的調(diào)度以及對(duì) Checkpoint 的觸發(fā),TaskManager 負(fù)責(zé)真正用戶(hù)的代碼執(zhí)行邏輯,具體表現(xiàn)形式則是 Task 在 TaskManager上面進(jìn)行運(yùn)行,一個(gè) Task 對(duì)應(yīng)一個(gè)線(xiàn)程,它可能運(yùn)行一個(gè)算子的 SubTask,也可能是運(yùn)行多個(gè) Chain 起來(lái)的算子的 SubTask。

Flink 實(shí)時(shí)任務(wù)一次 Checkpoint 的參與者主要包括三塊:JobManager、TaskManager以及 Zookeeper。JobManager 定時(shí)會(huì)觸發(fā)執(zhí)行 Checkpoint,具體則是在 JobManager 中運(yùn)行的 CheckpointCoordinator 中觸發(fā)所有 Source 的 SubTask 向下游廣播 CheckpointBarrier。

TaskManager 收到 CheckpointBarrier 后,根據(jù) Checkpoint 的語(yǔ)義,決定是否在進(jìn)行 CheckpointBarrier 對(duì)齊時(shí),緩沖后續(xù)的數(shù)據(jù)記錄,當(dāng)收到所有上游輸入的 CheckpointBarrier 后,開(kāi)始做 Checkpoint。TaskManager Checkpoint 完成后,會(huì)向 JobManager 發(fā)送確認(rèn)完成的消息。只有當(dāng)所有 Sink 算子完成 Checkpoint 且發(fā)送確認(rèn)消息后,該次 Checkpoint 才算完成。

在高可用模式下,ZooKeeper 主要存儲(chǔ)最新一次 Checkpoint 成功的目錄,當(dāng)Flink 任務(wù)容錯(cuò)恢復(fù)時(shí),會(huì)從最新成功的 Checkpoint 恢復(fù)。Zookeeper 同時(shí)也存儲(chǔ)著 Flink 作業(yè)的元數(shù)據(jù)信息。比如在高可用模式下,F(xiàn)link 會(huì)將 JobGraph 以及相關(guān) Jar 包存儲(chǔ)在 HDFS 上面,Zookeeper 記錄著該信息。再次容錯(cuò)重啟時(shí),讀取這些信息,進(jìn)行任務(wù)啟動(dòng)。

下圖是一次 Checkpoint 的參與者:

2.3 Checkpoint 協(xié)調(diào)者 — CheckpointCoordinator

CheckpointCoordinator,是 Checkpoint 中最重要的類(lèi),協(xié)調(diào)著實(shí)時(shí)任務(wù)整個(gè) Checkpoint 的執(zhí)行。下圖是 CheckpointCoordinator 中的方法:

Flink CheckpointCoordinator 中有幾個(gè)比較重要的方法:

  1. triggerCheckpoint,觸發(fā) Flink 任務(wù)進(jìn)行 Checkpoint 的方法

  2. triggerSavepoint,觸發(fā) Flink 任務(wù) Savepoint 的方法

  3. restoreSavepoint,F(xiàn)link 任務(wù)從 Savepoint 狀態(tài)恢復(fù)

  4. restoreLatestCheckpointedState,從最新一次 Checkpoint 點(diǎn)位狀態(tài)恢復(fù)

  5. receiveAcknowledgeMessage,接受 Operator SubTask Checkpoint 完成的消息并處理

Flink CheckpointCoordinator 類(lèi)是在 ExecutionGraph 形成時(shí)進(jìn)行初始化的,具體則是在 ExecutionGraph 創(chuàng)建之后,調(diào)用 enableCheckpointing 方法,然后在該方法中,CheckpointCoordinator 進(jìn)行創(chuàng)建。以下是 Flink Checkpoint 觸發(fā)的時(shí)序圖:

當(dāng) Flink 作業(yè)狀態(tài)由創(chuàng)建到運(yùn)行時(shí),CheckpointCoordinator 中的 ScheduledThreadPoolExecutor 會(huì)定時(shí)執(zhí)行 ScheduledTrigger 中的邏輯。ScheduledTrigger 本質(zhì)就是一個(gè) Runnable,run 方法中執(zhí)行 triggerCheckpoint 方法。

2.4 Flink Checkpoint 流程與原理

一次 Flink Checkpoint 的流程是從 CheckpointCoordinator 的 triggerCheckpoint 方法開(kāi)始,下面來(lái)看看一次 Flink Checkpoint 涉及到的主要內(nèi)容:

  1. Checkpoint 開(kāi)始之前先進(jìn)行預(yù)檢查,比如檢查最大并發(fā)的 Checkpoint 數(shù),最小的 Checkpoint 之間的時(shí)間間隔。默認(rèn)情況下,最大并發(fā)的 Checkpoint 數(shù)為 1,最小的 Checkpoint 之間的時(shí)間間隔為 0.

  2. 判斷所有 Source 算子的 Subtask (Execution) 是否都處于運(yùn)行狀態(tài),有則直接報(bào)錯(cuò)。同時(shí)檢查所有待確認(rèn)的算子的 SubTask(Execution)是否是運(yùn)行狀態(tài),有則直接報(bào)錯(cuò)。

  3. 創(chuàng)建 PendingCheckpoint,同時(shí)為該次 Checkpoint 創(chuàng)建一個(gè) Runnable,即超時(shí)取消線(xiàn)程,默認(rèn) Checkpoint 十分鐘超時(shí)。

  4. 循環(huán)遍歷所有 Source 算子的 Subtask(Execution),最底層調(diào)用 Task 的triggerCheckpointBarrier, 廣播 CheckBarrier 到下游 ,同時(shí) Checkpoint 其狀態(tài)。

  5. 下游的輸入中有 CheckpointBarrierHandler 類(lèi)來(lái)處理 CheckpoinBarrier,然后會(huì)調(diào)用 notifyCheckpoint 方法,通知 Operator SubTask 進(jìn)行 Checkpoint。

  6. 每當(dāng) Operator SubTask 完成 Checkpoint 時(shí),都會(huì)向 CheckpointCoordoritor 發(fā)送確認(rèn)消息。CheckpointCoordinator 的 receiveAcknowledgeMessage 方法會(huì)進(jìn)行處理。

  7. 在一次 Checkpoint 過(guò)程中,當(dāng)所有從 Source 端到 Sink 端的算子 SubTask 都完成之后,CheckpointCoordoritor 會(huì)通知算子進(jìn)行 notifyCheckpointCompleted 方法,前提是算子的函數(shù)實(shí)現(xiàn) CheckpointListener 接口。

Flink 會(huì)定時(shí)在任務(wù)的 Source 算子的 SubTask 觸發(fā) CheckpointBarrier,CheckpointBarrier 是一種特殊的消息事件,會(huì)隨著消息通道流入到下游的算子中。只有當(dāng)最后 Sink 端的算子接收到 CheckpointBarrier 并確認(rèn)該次 Checkpoint 完成時(shí),該次 Checkpoint 才算完成。所以在某些算子的 Task 有多個(gè)輸入時(shí),會(huì)存在 Barrier 對(duì)齊時(shí)間,我們可以在 Flink Web UI上面看到各個(gè) Task 的 CheckpointBarrier 對(duì)齊時(shí)間。

下圖是一次 Flink Checkpoint 實(shí)例流程示意圖:

Flin Checkpoint 保存的任務(wù)狀態(tài)在程序取消停止時(shí),默認(rèn)會(huì)進(jìn)行清除。Checkpoint 狀態(tài)保留策略主要有兩種:

DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示當(dāng)程序取消時(shí),刪除 Checkpoint 存儲(chǔ)的狀態(tài)文件。RETAIN_ON_CANCELLATION 表示當(dāng)程序取消時(shí),保存之前的 Checkpoint 存儲(chǔ)的狀態(tài)文件 用戶(hù)可以結(jié)合業(yè)務(wù)情況,設(shè)置 Checkpoint 保留模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 開(kāi)啟 checkpoint */
env.enableCheckpointing(10000);
/** 設(shè)置 checkpoint 保留策略,取消程序時(shí),保留 checkpoint 狀態(tài)文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.5 Flink Checkpoint 語(yǔ)義

Flink Checkpoint 支持兩種語(yǔ)義:Exactly_OnceAt_least_Once,默認(rèn)的 Checkpoint 語(yǔ)義是 Exactly_Once。具體語(yǔ)義含義如下:

Exactly_Once 含義是:保證每條數(shù)據(jù)對(duì)于 Flink 任務(wù)的狀態(tài)結(jié)果只影響一次。打個(gè)比方,比如 WordCount 程序,目前實(shí)時(shí)統(tǒng)計(jì)的 "hello" 這個(gè)單詞數(shù)為 5,同時(shí)這個(gè)結(jié)果在這次 Checkpoint 成功后,保存在了 HDFS。在下次 Checkpoint 之前, 又來(lái) 2 個(gè) "hello" 單詞,突然程序遇到外部異常自動(dòng)容錯(cuò)恢復(fù),會(huì)從最近的 Checkpoint 點(diǎn)開(kāi)始恢復(fù),那么會(huì)從單詞數(shù)為 5 的這個(gè)狀態(tài)點(diǎn)開(kāi)始恢復(fù),Kafka 消費(fèi)的數(shù)據(jù)點(diǎn)位也是狀態(tài)為 5 這個(gè)點(diǎn)位開(kāi)始計(jì)算,所以即使程序遇到外部異常自動(dòng)恢復(fù)時(shí),也不會(huì)影響到 Flink 狀態(tài)的結(jié)果計(jì)算。

At_Least_Once 含義是:每條數(shù)據(jù)對(duì)于 Flink 任務(wù)的狀態(tài)計(jì)算至少影響一次。比如在 WordCount 程序中,你統(tǒng)計(jì)到的某個(gè)單詞的單詞數(shù)可能會(huì)比真實(shí)的單詞數(shù)要大,因?yàn)橥粭l消息,當(dāng) Flink 任務(wù)容錯(cuò)恢復(fù)后,可能將其計(jì)算多次。

Flink 中 Exactly_Once 和 At_Least_Once 具體是針對(duì) Flink 任務(wù)狀態(tài)而言的,并不是 Flink 程序?qū)ο⒂涗浿惶幚硪淮?。舉個(gè)例子,當(dāng)前 Flink 任務(wù)正在做 Checkpoint,該次 Checkpoint 還沒(méi)有完成,這次 Checkpoint 時(shí)間段的數(shù)據(jù)其實(shí)已經(jīng)進(jìn)入 Flink 程序處理,只是程序狀態(tài)沒(méi)有最終存儲(chǔ)到遠(yuǎn)程存儲(chǔ)。當(dāng)程序突然遇到異常,進(jìn)行容錯(cuò)恢復(fù)時(shí),那么就會(huì)從最新的 Checkpoint 進(jìn)行狀態(tài)恢復(fù)重啟,上一次 Checkpoint 成功到這次 Checkpoint 失敗的數(shù)據(jù)還會(huì)進(jìn)入 Flink 系統(tǒng)重新處理,具體實(shí)例如下圖:

上圖中表示一個(gè) WordCount 實(shí)時(shí)任務(wù)的 Checkpoint,在進(jìn)行 chk-5 Checkpoint 時(shí),突然遇到程序異常,那么實(shí)時(shí)任務(wù)會(huì)從 chk-4 進(jìn)行恢復(fù),那么之前 chk-5 處理的數(shù)據(jù),F(xiàn)link 系統(tǒng)會(huì)再次進(jìn)行處理。不過(guò)這些數(shù)據(jù)的狀態(tài)沒(méi)有 Checkpoint 成功,所以 Flink 任務(wù)容錯(cuò)恢復(fù)再次運(yùn)行時(shí),對(duì)于狀態(tài)的影響還是只有一次。

Exactly_Once 和 At_Least_Once 具體在底層實(shí)現(xiàn)大致相同,具體差異表現(xiàn)在 CheckpointBarrier 對(duì)齊方式的處理:

如果是 Exactly_Once 模式,某個(gè)算子的 Task 有多個(gè)輸入通道時(shí),當(dāng)其中一個(gè)輸入通道收到 CheckpointBarrier 時(shí),F(xiàn)link Task 會(huì)阻塞該通道,其不會(huì)處理該通道后續(xù)數(shù)據(jù),但是會(huì)將這些數(shù)據(jù)緩存起來(lái),一旦完成了所有輸入通道的 CheckpointBarrier 對(duì)齊,才會(huì)繼續(xù)對(duì)這些數(shù)據(jù)進(jìn)行消費(fèi)處理。

對(duì)于 At_least_Once,同樣針對(duì)某個(gè)算子的 Task 有多個(gè)輸入通道的情況下,當(dāng)某個(gè)輸入通道接收到 CheckpointBarrier 時(shí),它不同于 Exactly Once,即使沒(méi)有完成所有輸入通道 CheckpointBarrier 對(duì)齊,At Least Once 也會(huì)繼續(xù)處理后續(xù)接收到的數(shù)據(jù)。所以使用 At Least Once 不能保證數(shù)據(jù)對(duì)于狀態(tài)計(jì)算只有一次的計(jì)算影響。

三、 Flink Checkpoint 常見(jiàn)失敗原因和注意點(diǎn)

3.1 Flink Checkpoint 常見(jiàn)失敗原因分析

Flink Checkpoint 失敗有很多種原因,常見(jiàn)的失敗原因如下:

  1. 用戶(hù)代碼邏輯沒(méi)有對(duì)于異常處理,讓其直接在運(yùn)行中拋出。比如解析 Json 異常,沒(méi)有捕獲,導(dǎo)致 Checkpoint失敗,或者調(diào)用 Dubbo 超時(shí)異常等等。

  2. 依賴(lài)外部存儲(chǔ)系統(tǒng),在進(jìn)行數(shù)據(jù)交互時(shí),出錯(cuò),異常沒(méi)有處理。比如輸出數(shù)據(jù)到 Kafka、Redis、HBase等,客戶(hù)端拋出了超時(shí)異常,沒(méi)有進(jìn)行捕獲,F(xiàn)link 任務(wù)容錯(cuò)機(jī)制會(huì)再次重啟。

  3. 內(nèi)存不足,頻繁GC,超出了 GC 負(fù)載的限制。比如 OOM 異常

  4. 網(wǎng)絡(luò)問(wèn)題、機(jī)器不可用問(wèn)題等等。

從目前的具體實(shí)踐情況來(lái)看,F(xiàn)link Checkpoint 異常覺(jué)大多數(shù)還是用戶(hù)代碼邏輯的問(wèn)題,對(duì)于程序異常沒(méi)有正確的處理導(dǎo)致。所以在編寫(xiě) Flink 實(shí)時(shí)任務(wù)時(shí),一定要注意處理程序可能出現(xiàn)的各種異常。這樣,也會(huì)讓實(shí)時(shí)任務(wù)的邏輯更加的健壯。

當(dāng)自己的 Flink 實(shí)時(shí)任務(wù) Checkpoint 失敗時(shí),用戶(hù)可以先通過(guò) Flink Web UI 進(jìn)行快速定位 Checkpoint 失敗的原因,如果在 Flink Web UI 上面沒(méi)有看到異常信息,可以去看任務(wù)的具體日志進(jìn)行定位,如下是 Flink Web UI 查看錯(cuò)誤原因示意圖:

3.2 Flink Checkpoint 參數(shù)配置及注意點(diǎn)

下面是設(shè)置 Flink Checkpoint 參數(shù)配置的建議及注意點(diǎn):

  1. 當(dāng) Checkpoint 時(shí)間比設(shè)置的 Checkpoint 間隔時(shí)間要長(zhǎng)時(shí),可以設(shè)置 Checkpoint 間最小時(shí)間間隔。這樣在上次 Checkpoint 完成時(shí),不會(huì)立馬進(jìn)行下一次 Checkpoint,而是會(huì)等待一個(gè)最小時(shí)間間隔,之后再進(jìn)行 Checkpoint。否則,每次 Checkpoint 完成時(shí),就會(huì)立馬開(kāi)始下一次 Checkpoint,系統(tǒng)會(huì)有很多資源消耗 Checkpoint 方面,而真正任務(wù)計(jì)算的資源就會(huì)變少。
  2. 如果Flink狀態(tài)很大,在進(jìn)行恢復(fù)時(shí),需要從遠(yuǎn)程存儲(chǔ)上讀取狀態(tài)進(jìn)行恢復(fù),如果狀態(tài)文件過(guò)大,此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢,大量的時(shí)間浪費(fèi)在網(wǎng)絡(luò)傳輸方面。此時(shí)可以設(shè)置 Flink Task 本地狀態(tài)恢復(fù),任務(wù)狀態(tài)本地恢復(fù)默認(rèn)沒(méi)有開(kāi)啟,可以設(shè)置參數(shù) state.backend.local-recovery 值為 true 進(jìn)行激活。
  3. Checkpoint 保存數(shù),Checkpoint 保存數(shù)默認(rèn)是1,也就是只保存最新的 Checkpoint 的狀態(tài)文件,當(dāng)進(jìn)行狀態(tài)恢復(fù)時(shí),如果最新的 Checkpoint 文件不可用時(shí)(比如 HDFS 文件所有副本都損壞或者其他原因),那么狀態(tài)恢復(fù)就會(huì)失敗,如果設(shè)置 Checkpoint 保存數(shù) 2,即使最新的Checkpoint恢復(fù)失敗,那么Flink 會(huì)回滾到之前那一次 Checkpoint 的狀態(tài)文件進(jìn)行恢復(fù)。考慮到這種情況,用戶(hù)可以增加 Checkpoint 保存數(shù)。
  4. 建議設(shè)置的 Checkpoint 的間隔時(shí)間最好大于 Checkpoint 的完成時(shí)間。

下圖是不設(shè)置 Checkpoint 最小時(shí)間間隔示例圖,可以看到,系統(tǒng)一致在進(jìn)行 Checkpoint,大量的資源使用在 Flink Chekpoint 上,可能對(duì)運(yùn)行的任務(wù)產(chǎn)生一定影響:

還有一種特殊的情況,F(xiàn)link 端到端 Sink 的 EXACTLYONCE 的問(wèn)題,也就是數(shù)據(jù)從 Flink 端到外部消息系統(tǒng)的消息一致性。打個(gè)比方,F(xiàn)link 輸出數(shù)據(jù)到 Kafka 消息系統(tǒng)中,如果使用 Kafka 0.10 的版本,F(xiàn)link 不支持端到端的 EXACTLYONCE,可能存在消息重復(fù)輸入到 Kafka。

如上圖所示,當(dāng)做 chk-5 Checkpoint 的時(shí)候,chk-5 失敗,然后從 chk-4 來(lái)進(jìn)行恢復(fù),但是 chk-5 的部分?jǐn)?shù)據(jù)在 Chekpoint 失敗之前就有部分進(jìn)入到 Kafka 消息系統(tǒng),再次恢復(fù)時(shí),該部分?jǐn)?shù)據(jù)可能再次重放到 Kafka 消息系統(tǒng)中。

Flink 中解決端到端的一致性有兩種方法:做冪等以及事務(wù)寫(xiě),冪等的話(huà),可以使用 KV 存儲(chǔ)系統(tǒng)來(lái)做冪等,因?yàn)?KV 存儲(chǔ)系統(tǒng)的多次操作結(jié)果都是相同的。Flink 內(nèi)部目前支持二階段事務(wù)提交,Kafka 0.11 以上版本支持事務(wù)寫(xiě),所以支持 Flink 端到 Kafka 端的 EXACTLY_ONCE。

四、 優(yōu)化實(shí)踐

實(shí)時(shí)計(jì)算對(duì)于 Flink 任務(wù)的 Checkpoint 和 Savepoint 做了兩個(gè)方面工作,第一個(gè)工作是對(duì)于 Flink Checkpoint 失敗的情況,如果 Checkpoint 失敗過(guò)于頻繁,同時(shí) Flink Checkpoint 失敗次數(shù)如果達(dá)到平臺(tái)默認(rèn)的失敗閾值,平臺(tái)會(huì)及時(shí)給用戶(hù)報(bào)警提示。我們會(huì)每 5 分鐘檢查一次實(shí)時(shí)任務(wù),統(tǒng)計(jì)實(shí)時(shí)任務(wù)近 15 分鐘內(nèi),F(xiàn)link Checkpoint 失敗次數(shù)的最大值和最小值的差值達(dá)到平臺(tái)默認(rèn)的閾值,則會(huì)立馬給用戶(hù)報(bào)警,讓用戶(hù)能夠及時(shí)的處理問(wèn)題。

當(dāng)然,并不是所有的 Flink 實(shí)時(shí)任務(wù) Checkpoint 失敗平臺(tái)都能發(fā)現(xiàn),因?yàn)?Checkpoint 失敗次數(shù)的檢查,首先與用戶(hù)配置的 Checkpoint 的時(shí)間間隔有關(guān)。舉個(gè)例子,如果用戶(hù)配置的 Checkpoint 間隔為 1 小時(shí),其實(shí)平臺(tái)默認(rèn) Checkpoint 邏輯檢查根本就無(wú)法發(fā)現(xiàn)實(shí)時(shí)任務(wù) Checkpoint 失敗。

針對(duì)這種情況,實(shí)時(shí)平臺(tái)也支持用戶(hù)自定義設(shè)置 Checkpoint 失敗閾值,目前支持兩種 Checkpoint 失敗邏輯檢查,一個(gè)是 實(shí)時(shí)任務(wù)的 Checkpoint 失敗次數(shù)的總和達(dá)到閾值,另一個(gè)則是近 10 分鐘內(nèi),F(xiàn)link Checkpoint 次數(shù)的最大值和最小值的差值的計(jì)算邏輯,用戶(hù)可以根據(jù)實(shí)時(shí)任務(wù)的敏感度,設(shè)置具體的參數(shù)。

第二個(gè)方面則是針對(duì) Flink 任務(wù)的狀態(tài)恢復(fù),為了防止實(shí)時(shí)任務(wù)的狀態(tài)丟失,實(shí)時(shí)計(jì)算平臺(tái)會(huì)定期的對(duì)實(shí)時(shí)任務(wù)進(jìn)行 Savepoint 觸發(fā),當(dāng)任務(wù)由于外界因素導(dǎo)致任務(wù)失敗時(shí),這種失敗是任務(wù)直接掛掉,Yarn 任務(wù)的狀態(tài)直接為 Killed,這種情況下,如果用戶(hù)開(kāi)啟自動(dòng)拉起功能,實(shí)時(shí)平臺(tái)自動(dòng)拉起實(shí)時(shí)任務(wù),同時(shí)從最新的 Savepoint 進(jìn)行狀態(tài)恢復(fù),以至于狀態(tài)不丟失。同時(shí),實(shí)時(shí)計(jì)算平臺(tái)也支持用戶(hù)停止任務(wù)時(shí),觸發(fā) Savepoint,再次重啟實(shí)時(shí)任務(wù)時(shí),還是從停止時(shí)的任務(wù)狀態(tài)進(jìn)行恢復(fù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容