Streaming -- State & Fault Tolerance -- Checkpointing

Flink中的每個(gè)函數(shù)和運(yùn)算符都可以是有狀態(tài)的(有關(guān)詳細(xì)信息,請(qǐng)參見(jiàn)使用狀態(tài))。 有狀態(tài)功能在處理單個(gè)元素/事件的過(guò)程中存儲(chǔ)數(shù)據(jù),使?fàn)顟B(tài)成為任何類(lèi)型的更復(fù)雜操作的關(guān)鍵構(gòu)建塊。

為了使?fàn)顟B(tài)容錯(cuò),F(xiàn)link需要對(duì)狀態(tài)進(jìn)行檢查點(diǎn)。檢查點(diǎn)允許Flink恢復(fù)流中的狀態(tài)和位置,從而為應(yīng)用程序提供與無(wú)故障執(zhí)行相同的語(yǔ)義。

documentation on streaming fault tolerance詳細(xì)描述了Flink流容錯(cuò)機(jī)制背后的技術(shù)。

Prerequisites

Flink的檢查點(diǎn)機(jī)制與流和狀態(tài)的持久存儲(chǔ)交互。一般來(lái)說(shuō),它需要:

  • 可以在一定時(shí)間內(nèi)重播記錄的持續(xù)的(或長(zhǎng)久的)數(shù)據(jù)源。這類(lèi)源的例子有持久的消息隊(duì)列(例如,Apache Kafka, RabbitMQ, Amazon Kinesis,谷歌PubSub)或文件系統(tǒng)(例如,HDFS, S3, GFS, NFS, Ceph)。
  • 狀態(tài)的持久存儲(chǔ),通常是分布式文件系統(tǒng)(例如,HDFS、S3、GFS、NFS、Ceph)
Enabling and Configuring Checkpointing

默認(rèn)情況下,檢查點(diǎn)是禁用的。要啟用檢查點(diǎn),在StreamExecutionEnvironment上調(diào)用enableCheckpointing(n),其中n是檢查點(diǎn)間隔(以毫秒為單位)。

檢查點(diǎn)的其他參數(shù)包括:

  • 精確一次vs.至少一次:您可以選擇將模式傳遞給enableCheckpointing(n)方法,以便在兩個(gè)保證級(jí)別之間進(jìn)行選擇。對(duì)大多數(shù)應(yīng)用程序來(lái)說(shuō),精確一次是最好的。對(duì)于某些超低延遲(始終只有幾毫秒)的應(yīng)用程序,至少需要一次。

  • 檢查點(diǎn)超時(shí):如果正在進(jìn)行的檢查點(diǎn)在此之前沒(méi)有完成,則在此之后中止的時(shí)間。

  • 檢查點(diǎn)之間的最小時(shí)間:為了確保流應(yīng)用程序在檢查點(diǎn)之間取得一定的進(jìn)展,可以定義檢查點(diǎn)之間需要經(jīng)過(guò)多少時(shí)間。例如,如果將該值設(shè)置為5000,則下一個(gè)檢查點(diǎn)將在上一個(gè)檢查點(diǎn)完成后不早于5秒啟動(dòng),無(wú)論檢查點(diǎn)持續(xù)時(shí)間和間隔如何。注意,這意味著檢查點(diǎn)間隔永遠(yuǎn)不會(huì)小于此參數(shù)。
    通過(guò)定義檢查點(diǎn)之間的時(shí)間通常比定義檢查點(diǎn)間隔更容易配置應(yīng)用程序,因?yàn)闄z查點(diǎn)之間的時(shí)間不受檢查點(diǎn)有時(shí)花費(fèi)的時(shí)間超過(guò)平均時(shí)間這一事實(shí)的影響(例如,如果目標(biāo)存儲(chǔ)系統(tǒng)暫時(shí)變慢)。

  • 并發(fā)檢查點(diǎn)的數(shù)量:默認(rèn)情況下,當(dāng)一個(gè)檢查點(diǎn)仍在運(yùn)行時(shí),系統(tǒng)不會(huì)觸發(fā)另一個(gè)檢查點(diǎn)。這確保拓?fù)洳粫?huì)在檢查點(diǎn)上花費(fèi)太多時(shí)間,也不會(huì)在處理流方面取得進(jìn)步。允許多個(gè)重疊的檢查點(diǎn)是可能的,這對(duì)于具有一定處理延遲(例如,因?yàn)楹瘮?shù)調(diào)用了需要一些時(shí)間來(lái)響應(yīng)的外部服務(wù))但仍然希望執(zhí)行非常頻繁的檢查點(diǎn)(100毫秒)以在失敗時(shí)很少重新處理的管道來(lái)說(shuō)是很有趣的。
    當(dāng)定義了檢查點(diǎn)之間的最短時(shí)間時(shí),不能使用此選項(xiàng)。

  • 外部化檢查點(diǎn):可以配置定期檢查點(diǎn),以在外部持久保存。外部化的檢查點(diǎn)將其元數(shù)據(jù)寫(xiě)入持久存儲(chǔ),并且在作業(yè)失敗時(shí)不會(huì)自動(dòng)清除。這樣,如果作業(yè)失敗,您的周?chē)蜁?huì)有一個(gè)檢查點(diǎn)來(lái)恢復(fù)。在 deployment notes on externalized checkpoints中有更多細(xì)節(jié)。

  • 檢查點(diǎn)錯(cuò)誤的失敗/繼續(xù)任務(wù):如果執(zhí)行任務(wù)的檢查點(diǎn)過(guò)程時(shí)發(fā)生錯(cuò)誤,這將確定任務(wù)是否失敗。 這是默認(rèn)行為。 或者,禁用此選項(xiàng)后,任務(wù)將簡(jiǎn)單地將檢查點(diǎn)拒絕給檢查點(diǎn)協(xié)調(diào)器并繼續(xù)運(yùn)行。

  • 寧愿使用檢查點(diǎn)進(jìn)行恢復(fù):這決定了一個(gè)作業(yè)是否將回退到最新的檢查點(diǎn),即使有更多的最近保存點(diǎn)可用來(lái)減少恢復(fù)時(shí)間。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

Related Config Options

可以通過(guò)conf / flink-conf.yaml設(shè)置更多的參數(shù)和/或默認(rèn)值(請(qǐng)參閱configuration以獲取完整指南):

Key Default Type Description
state.backend (none) String 用于存儲(chǔ)和檢查點(diǎn)狀態(tài)的狀態(tài)后端。
state.backend.async true Boolean 選項(xiàng)狀態(tài)后端是否應(yīng)在可能且可配置的情況下使用異步快照方法。一些狀態(tài)后端可能不支持異步快照,或者只支持異步快照,并忽略此選項(xiàng)。
state.backend.fs.memory-threshold 1024 Integer 狀態(tài)數(shù)據(jù)文件的最小大小。所有小于此值的狀態(tài)塊都內(nèi)聯(lián)存儲(chǔ)在根檢查點(diǎn)元數(shù)據(jù)文件中。
state.backend.fs.write-buffer-size 4096 Integer 寫(xiě)到文件系統(tǒng)的檢查點(diǎn)流的寫(xiě)緩沖區(qū)的默認(rèn)大小。實(shí)際的寫(xiě)緩沖區(qū)大小被確定為這個(gè)選項(xiàng)和選項(xiàng)'state.backend.fs.memory-threshold'的最大值。
state.backend.incremental false Boolean 選項(xiàng)狀態(tài)后端是否應(yīng)該創(chuàng)建增量檢查點(diǎn)(如果可能)。對(duì)于增量檢查點(diǎn),只存儲(chǔ)與前一個(gè)檢查點(diǎn)的差異,而不是完整的檢查點(diǎn)狀態(tài)。一些狀態(tài)后端可能不支持增量檢查點(diǎn)并忽略此選項(xiàng)。
state.backend.local-recovery false Boolean 此選項(xiàng)配置此狀態(tài)后端的本地恢復(fù)。默認(rèn)情況下,本地恢復(fù)是停用的。本地恢復(fù)目前只覆蓋鍵控狀態(tài)后端。目前,MemoryStateBackend不支持本地恢復(fù)并忽略此選項(xiàng)。
state.checkpoints.dir (none) String 用于在Flink支持的文件系統(tǒng)中存儲(chǔ)檢查點(diǎn)的數(shù)據(jù)文件和元數(shù)據(jù)的默認(rèn)目錄。 必須從所有參與的進(jìn)程/節(jié)點(diǎn)(即所有TaskManager和JobManager)訪問(wèn)存儲(chǔ)路徑。
state.checkpoints.num-retained 1 Integer 要保留的已完成檢查點(diǎn)的最大數(shù)量。
state.savepoints.dir (none) String 保存點(diǎn)的默認(rèn)目錄。由將保存點(diǎn)寫(xiě)入文件系統(tǒng)的狀態(tài)后端使用(MemoryStateBackend, FsStateBackend, RocksDBStateBackend)。
taskmanager.state.local.root-dirs (none) String 定義根目錄的配置參數(shù),用于存儲(chǔ)用于本地恢復(fù)的基于文件的狀態(tài)。本地恢復(fù)目前只覆蓋鍵控狀態(tài)后端。目前,MemoryStateBackend不支持本地恢復(fù)并忽略此選項(xiàng)

Selecting a State Backend

Flink的checkpointing mechanism在計(jì)時(shí)器和有狀態(tài)操作符中存儲(chǔ)所有狀態(tài)的一致快照,包括連接器、窗口和任何用戶定義的狀態(tài)。檢查點(diǎn)存儲(chǔ)的位置(例如,JobManager內(nèi)存、文件系統(tǒng)、數(shù)據(jù)庫(kù))取決于配置的狀態(tài)后端。

默認(rèn)情況下,狀態(tài)保存在taskmanager的內(nèi)存中,檢查點(diǎn)存儲(chǔ)在JobManager的內(nèi)存中。為了正確地持久化大狀態(tài),F(xiàn)link支持在其他狀態(tài)后端存儲(chǔ)和檢查點(diǎn)狀態(tài)的各種方法。狀態(tài)后端可以通過(guò)StreamExecutionEnvironment.setStateBackend()配置。

有關(guān)可用狀態(tài)后端以及作業(yè)范圍和集群范圍配置選項(xiàng)的詳細(xì)信息,請(qǐng)參閱狀態(tài)后端。

State Checkpoints in Iterative Jobs

Flink目前僅為沒(méi)有迭代的作業(yè)提供處理保證。在迭代作業(yè)上啟用檢查點(diǎn)會(huì)導(dǎo)致異常。為了在迭代程序上強(qiáng)制檢查點(diǎn),用戶需要在啟用檢查點(diǎn)時(shí)設(shè)置一個(gè)特殊的標(biāo)志:env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,force = true)。

請(qǐng)注意,在失敗期間,循環(huán)邊緣中的運(yùn)行記錄(以及與它們相關(guān)聯(lián)的狀態(tài)更改)將丟失。

Restart Strategies

Flink支持不同的重新啟動(dòng)策略,這些策略控制在出現(xiàn)故障時(shí)如何重新啟動(dòng)作業(yè)。有關(guān)更多信息,請(qǐng)參見(jiàn) Restart Strategies

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容