【Flink 精選】如何排查 Checkpoint 異常問題?

本文詳解 Checkpoint 機制,分享 Checkpoint 問題排查的手段,包括失敗、延期等問題。


1.Checkpoint 機制

1.1 Checkpoint 概念

Checkpoint 檢查點,F(xiàn)link 定期把 state 緩存數(shù)據(jù)持久化保存下來的過程。它的目的是容錯和 exactly-once 語義功能。

1.2 Checkpoint 設(shè)計和執(zhí)行流程

(1)Checkpoint 的設(shè)計

分布式系統(tǒng)實現(xiàn)一個全局狀態(tài)保留的功能。
① 傳統(tǒng)方案使用一個統(tǒng)一時鐘,通過 master 節(jié)點廣播到每個 slaves 節(jié)點。當 slaves 接收到后,記錄其狀態(tài)。缺點:單點故障、數(shù)據(jù)不一致(延遲/失?。⑾到y(tǒng)不穩(wěn)定
② Flink 采用柵欄 Barrier 作為 Checkpoint 的傳遞信號,與業(yè)務(wù)數(shù)據(jù)一樣,無差別的傳遞下去。

屏障傳遞.png

(2)Checkpoint 的執(zhí)行流程

checkpoint的執(zhí)行流程.png

每一個 Flink 作業(yè)都會有一個 JobManager ,JobManager 里面的 checkpoint coordinator 管理整個作業(yè)的 checkpoint 過程。用戶通過 env 設(shè)置 checkpoint 的時間間隔,使得 checkpoint coordinator 定時將 checkpoint 的 barrier 發(fā)送給每個 source subtask。

當 source 算子實例收到一個 barrier 時,它會暫停自身的數(shù)據(jù)處理,然后將自己的當前 緩存數(shù)據(jù) state 保存為快照 snapshot,并且持久化到指定的存儲,最后算子實例向 checkpoint coordinator 異步發(fā)送一個確認信號 ack,同時向所有下游算子廣播該 barrier 和恢復(fù)自身的數(shù)據(jù)處理。

以此類推,每個算子不斷制作 snapshot 并向下游廣播 barrier,直到 barrier 傳遞到 sink 算子實例,此時確定全局快照完成。

1.3 Checkpoint 和 StateBackend的使用


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);   // 設(shè)置Checkpoint的時間間隔為10s
env.setStateBackend(new RocksDBStateBackend(filebackend, true));   // 采用RocksDB作為state的存儲后端
env.getCheckpointConfig().setCheckpointTimeout(60000);   // 設(shè)置checkpoint的超時時間為60s
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);   // 開啟exactly-once語義
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);   // 設(shè)置checkpoint的清除策略

2.Checkpoint 問題排查

Flink Web UI 有 Checkpoint 監(jiān)控信息,包括統(tǒng)計信息和每個Checkpoint的詳情。如下圖所示,紅框里面可以看到一共觸發(fā)了 569K 次 Checkpoint,然后全部都成功完成,沒有 fail 的。

Checkpoint統(tǒng)計信息.png

如下圖所示,點擊某次 Checkpoint “+”,可知該Checkpoint 的詳情。

Checkpoint 詳情.JPG

Acknowledged 表示有多少個 subtask 對這個 Checkpoint 進行了 ack,從圖中可知,共有3個 operator 分為2個 subtask,這2個 subtask 都完成 ack。
Latest Acknowledgement 表示所有 subtask 的最后 ack 的時間;

End to End Duration 表示所有 subtask 中完成 snapshot 的最長時間;

State Size 表示當前 Checkpoint 的 state 大?。ㄈ绻窃隽?checkpoint,則表示增量大?。?;

Buffered During Alignment 表示在 barrier 對齊階段累計多少數(shù)據(jù)(如果這個數(shù)據(jù)過大,則間接表示對齊比較慢);

2.1 Checkpoint 失敗

如下圖所示,F(xiàn)link Web UI 的 Checkpoint 界面顯示 Checkpoint 10432 失敗。點擊 Checkpoint 10423 的詳情“+”,可知 Acknowledged、Latest Acknowledgement等信息。

Checkpoint 失敗.png

2.1.1 Checkpoint Decline 拒絕

查看 JobManager 的日志 jobmanager.log,其中關(guān)鍵日志,如下


Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178

解析:10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 task execution id 即 subtask id,85d268e6fbc19411185f7e4868a44178 是 job id。
從上述的 jobmanager.log 日志中,可知 subtask id 和 job id,可以確定 taskmanager 和 slot。


2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - XXXXXXXXXXX (100/289) (85d268e6fbc19411185f7e4868a44178 ) switched from SCHEDULED to DEPLOYING.
2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

從上面的日志,可知 subtask 被調(diào)度到節(jié)點 hostnameABCDEcontainer_e24_1566836790522_8088_04_013155_1 slot,接著到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失敗的具體原因。

2.1.2 Checkpoint Cancel 取消

如果較小的 Checkpoint 沒有對齊的情況,F(xiàn)link 收到了更大的 Checkpoint,則會把較小的 Checkpoint 給取消,其關(guān)鍵日志如下。


$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

該日志表示當前 Checkpoint 19 還在對齊階段,同時收到了 Checkpoint 20 的 barrier,接著通知到下游的 task checkpoint 19 被取消了,同時也會通知 JM 當前 Checkpoint 被 decline 掉了。
當下游 task 收到被 cancel barrier 的時候,打印如下的關(guān)鍵日志,表示當前 task 接收到上游發(fā)送過來的 barrier cancel 消息,從而取消了對應(yīng)的 Checkpoint。

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.

WARN
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

說明:如果日志是 debug 級別,會標記為 DEBUG。

2.1.3 Checkpoint Expire 過期

如果 Checkpoint 做的非常慢,超過了 timeout 還沒有完成,則整個 Checkpoint 也會失敗。例如,如果 Checkpoint 21 由于超時而失敗是,jobmanager.log 的關(guān)鍵日志如下。

// 表示 Chekpoint 21 由于超時而沒有完成
Checkpoint 21 of job 85d268e6fbc19411185f7e4868a44178  expired before completing.

// 表示 超時 Checkpoint 是來自 job id 為 85d268e6fbc19411185f7e4868a44178, subtask 為 0b60f08bf8984085b59f8d9bc74ce2e1 
Received late message for now expired checkpoint attempt 21 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

接著打開 debug 級別的日志, taskmananger.log 的 snapshot 分為三個階段,開始 snapshot 前,同步階段,異步階段:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
// 該日志表示 TM 端 barrier 對齊后,準備開始做 Checkpoint,其中6751是checkpoint id,CHECKPOINT是類型,taskNameWithSubtasks是subtask name

DEBUG
2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy       - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx
_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.
// 該日志表示當前這個 backend 的同步階段完成,共使用了 0 ms
// 說明: fink-config.yaml的state.backend.async配置異步/同步snapshot,默認是異步snapshot

DEBUG
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx,  metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms
// 該日志表示異步階段完成,異步階段使用了 369 ms

通過上述3個日志,定位 snapshot 是開始晚,同步階段做的慢,還是異步階段做的慢,然后再繼續(xù)進一步排查問題。

2.2 Checkpoint 慢

Checkpoint 慢的場景,例如 Checkpoint interval 1 分鐘,超時 10 分鐘,Checkpoint 經(jīng)常需要做 9 分鐘,而且實際 state size 比預(yù)期的大很多。

2.2.1 作業(yè)存在反壓或者數(shù)據(jù)傾斜

簡單介紹 Checkpoint Barrier 對齊機制:算子 Operator 從輸入流接收到 barrier n 后,它就不能處理來自該流的任何數(shù)據(jù)記錄,直到它從其他所有輸入接收到 barrier n。如下圖所示,Operator 從數(shù)字流中接收到 barrier n 后,接著數(shù)字流的數(shù)據(jù)不會被處理而是放入輸入緩沖區(qū)。直到字母流的 barrier n 達到 Operator 后,Operator 向下游發(fā)送 barrier n 和 緩沖區(qū)的數(shù)據(jù),同時進行自身的 snapshot。

checkpoint barrier對齊機制.JPG

由于 barrier 對齊機制,算子需要接收到上游全部 barrier n 后,才會進行 snapshot。如果作業(yè)存在反壓或者數(shù)據(jù)傾斜,則會導致全部的 channel 或者某些 channel 的 barrier 發(fā)送慢,從而整體影響 Checkpoint 的時間。如下圖所示,通過Flink Web UI 監(jiān)控 subtask 數(shù)據(jù)量 和反壓 BackPressure。

Web UI數(shù)據(jù)量監(jiān)控.JPG

web反壓監(jiān)控.jpg

參考:
【Flink 精選】如何分析及處理反壓?
【Flink 精選】如何處理作業(yè)的數(shù)據(jù)傾斜?

2.2.2 Barrier 對齊慢

介紹Checkpoint Barrier 對齊機制,算子 Operator 收齊上游的 barrier n 才能觸發(fā) snapshot。例如,StateBackend 是 RocksDB,snapshot 開始的時候保存數(shù)據(jù)到 RocksDB,然后 RocksDB 異步持久化到 FS。如果 barrier n 一直對不齊的話,就不會開始做 snapshot。

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
// 該日志表示barrier 對齊后開始checkpoint
// 定位: 如果沒有該日志,即表示barrier一直沒有對齊,接下來需要了解哪些上游的 barrier 沒有發(fā)送下來。

// 建議: 使用 At-Least-Once,可以觀察下面的日志
DEBUG
Received barrier for checkpoint 96508 from channel 5
// 該日志表示該task收到了channel 5來的 barrier,然后看對應(yīng) checkpoint
// Exactly-Once暫時沒有類似的日志,可以考慮自己添加,或者 jmap 查看。

2.2.3 全量 Checkpoint 導致 snapshot 持久化慢

Checkpoint 有兩種模式:全量 Checkpoint 和 增量 Checkpoint。全量 Checkpoint 會把當前的 state 全部備份一次到持久化存儲,而增量 Checkpoint,則只備份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上傳的內(nèi)容會相對更好,在速度上會有更大的優(yōu)勢。

目前僅 RocksDBStateBackend 支持增量 Checkpoint。

2.2.4 Checkpoint 同步階段慢

如果通過日志發(fā)現(xiàn)同步階段比較慢,對于非 RocksDBBackend,可以考慮開啟異步 snapshot。如果開啟了異步 snapshot 還是慢,需要使用 AsyncProfile 查看整個JVM。
對于 RocksDBBackend,使用 iostate 查看磁盤的壓力,同時查看 TaskMananger 的 RocksDB log日志,查看其中 snapshot 時間總開銷。

// RocksDB 開始 snapshot 的日志
2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

// RocksDB 開始 snapshot 的日志
2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good

2.2.5 Checkpoint 異步階段慢

異步階段,TaskManager 主要將 state 備份到持久化存儲 HDFS。對于非 RocksDBBackend,主要瓶頸來自于網(wǎng)絡(luò),可以考慮觀察網(wǎng)絡(luò)的 metric,或者使用 iftop 觀察對應(yīng)機器上的網(wǎng)絡(luò)流量情況。
對于 RocksDB,則需要從本地讀取文件,寫入到遠程的持久化存儲上 HDFS,所以不僅需要考慮網(wǎng)絡(luò)的瓶頸,還需要考慮本地磁盤的性能。

2.2.6 Source Trigger Checkpoint 慢

該場景出現(xiàn)的概率比較小,source 做 snapshot 并往下游發(fā)送 barrier 的時候,需要搶鎖。如果一直搶不到鎖的話,則可能導致 Checkpoint 一直得不到機會進行。如果在 Source 所在的 taskmanager.log 中找不到開始做 Checkpoint 的 log,則可以考慮是否屬于這種情況,可以通過 jstack 進行進一步確認鎖的持有情況。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容