Checkpointing

原文鏈接


Flink中的每個函數(shù)和操作符都可以是有狀態(tài)的(詳細信息請參閱使用狀態(tài))。有狀態(tài)的函數(shù)在單個元素/事件的處理過程中存儲數(shù)據(jù),使狀態(tài)成為任何類型的復雜操作的關鍵部分。
為了使狀態(tài)容錯,F(xiàn)link需要checkpoint狀態(tài)。checkpoint允許Flink恢復流中的狀態(tài)和位置,以使應用獲得和無故障運行相同的語義。
文檔《Data Streaming Fault Tolerance》描述了Flink的流容錯機制背后的技術。

先決條件

Flink的checkpoint機制與流和狀態(tài)的持久化存儲交互。一般來說,它要求:

  • 一個可以在一定時間內(nèi)重放記錄的持久化數(shù)據(jù)源。這些數(shù)據(jù)源的一些例子是持久化消息隊列(例如,Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub)或者文件系統(tǒng)(例如,HDFS, S3, GFS, NFS, Ceph, …)。
  • 狀態(tài)的持久化存儲,通常是一個分布式文件系統(tǒng)(例如,HDFS, S3, GFS, NFS, Ceph, …)。

啟用和配置checkpoint

默認情況下,checkpoint是禁用的。調(diào)用StreamExecutionEnvironment的enableCheckpointing(n)方法開啟checkpoint,參數(shù)n是相鄰checkpoint之間間隔的毫秒數(shù)。
checkpoint的其它參數(shù)包括:

  • exactly-once vs. at-least-once: 可以選擇這兩種保證級別中的一個模式傳遞到enableCheckpointing(n)方法中。對于大多數(shù)程序來說,Exactly-once更可取。一些超低延遲(幾毫秒)的應用可能更關注At-least-once。
  • checkpoint timeout: checkpoint完成的超時時間,超過該時間后,checkpoint進程會被終止。
  • minimum time between checkpoints: 為了確保流應用在相鄰checkpoint之間完成了一定的業(yè)務邏輯,可以定義相鄰checkpoint之間必須間隔的時間。如果將此值設置為5000,下一個checkpoint不會在前一個checkpoint完成之后的5秒內(nèi)啟動,不論checkpoint持續(xù)時間和checkpoint間隔是多少。注意,這意味著checkpoint間隔永遠不會小于這個參數(shù)。
    通常通過定義“checkpoint之間的時間”而不是checkpoint間隔來配置應用,因為,“checkpoint之間的時間”不容易受到執(zhí)行checkpoint消費比平時更長的時間(例如,如果目標存儲系統(tǒng)突然變慢)的影響。
    注意這個值也意味著checkpoint的并發(fā)度為1。
  • number of concurrent checkpoints: 默認情況下,在一個checkpoint正在執(zhí)行的過程中,系統(tǒng)不會觸發(fā)另一個checkpoint。這確保拓補結構不會在checkpoint上花費太多的時間,以及在處理流方面沒有進展??梢栽试S多個重疊的checkpoint,這比較適合有一定處理延遲(例如,因為函數(shù)調(diào)用外部服務而需要一些時間響應)的管道,但是它仍然想做非常頻繁的checkpoint(100毫秒)來重新處理小概率的失敗。
    當定義了相鄰checkpoint最小間隔時間時,不能使用此選項。
  • externalized checkpoints: 可以配置周期性的checkpoint,以便在外部持久化。外部的checkpoint將它們的元數(shù)據(jù)寫入到持久化介質(zhì)中,并且在作業(yè)失敗時不會自動的清除。這樣,如果你的作業(yè)失敗時,你就會有一個checkpoint來恢復。更多的細節(jié)見Checkpoints。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

相關配置選項

更多的參數(shù)和默認值可以通過conf/flink-conf.yaml(請參閱完整配置)設置。

  • state.backend: 如果開啟了checkpoint,將用于存儲操作符狀態(tài)checkpoint的后端。支持的后端有:
    • jobmanager: 內(nèi)存狀態(tài),備份到JobManager/ZooKeeper的內(nèi)存中。應該只用于最小狀態(tài)(Kafka偏移)或者測試和本地調(diào)試。
    • filesystem: 狀態(tài)在TaskManager的內(nèi)存中,狀態(tài)快照存儲在文件系統(tǒng)中。支持所有Flink支持的文件系統(tǒng),例如HDFS, S3, …
  • state.backend.fs.checkpointdir: Flink支持的文件系統(tǒng)中存儲checkpoint的目錄。例如:狀態(tài)后端必須由JobManager訪問,使用file://僅限于本地設置。
  • state.backend.rocksdb.checkpointdir: 存儲RocksDb文件的本地目錄,或由系統(tǒng)目錄分隔符分割的目錄列表(例如,在Linux/Unix上是":")。默認值是taskmanager.tmp.dirs。
  • state.checkpoints.dir: 外部checkpoint的元數(shù)據(jù)的目標目錄。
  • state.checkpoints.num-retained: 要保留的完整checkpoint的數(shù)量。如果最近的checkpoint已經(jīng)損壞,那么擁有不止一個允許恢復之前狀態(tài)的checkpoint。默認值是1。

選擇一個狀態(tài)后端

Flink的checkpoint機制存儲計時器和有狀態(tài)的操作符中的所有狀態(tài)(包括連接器,窗口和任何用戶定義的狀態(tài))的一致性快照。checkpoint存儲在何處(例如,JobManager memory, file system, database)取決于配置的后端狀態(tài)。
默認情況下,狀態(tài)保存在TaskManager的內(nèi)存中,checkpoint存儲在JobManager的內(nèi)存中。為了適配大狀態(tài)的持久化,F(xiàn)link支持在其它后端狀態(tài)中存儲和checkpoint狀態(tài)??梢酝ㄟ^StreamExecutionEnvironment.setStateBackend(…)方法配置后端狀態(tài)。
更多關于可用后端狀態(tài)和作業(yè)范圍和集群范圍的配置選項請參閱后端狀態(tài)。

迭代作業(yè)中的checkpoint狀態(tài)

Flink當前僅為沒有迭代的作業(yè)提供處理保證。在迭代作業(yè)中啟用checkpoint會導致異常。為了在迭代程序上強制checkpoint,當開啟checkpoint時用戶需要設置一個特殊的標識env.enableCheckpointing(interval, force = true)。
請注意,當失敗時,已經(jīng)通過循環(huán)的記錄(和跟它們相關的狀態(tài)修改)會丟失。

重啟策略

Flink支持不同的重啟策略,該策略控制在失敗時如何重新啟動作業(yè)。更多的信息,見重啟策略。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容