Flink DataStream 狀態(tài)和容錯(cuò) 三:Savepoint 和 Restart

Savepoint

Savepoint 和 Checkpoint 的區(qū)別

Savepoint 是命令觸發(fā)的 Checkpoint,對流式程序做一次完整的快照并將結(jié)果寫到 State backend,可用于停止、恢復(fù)或更新 Flink 程序。整個(gè)過程依賴于 Checkpoint 機(jī)制。另一個(gè)不同之處是,Savepoint 不會自動清除。

分配 Operator IDs

Savepoint 中會以 Operator ID 作為 key 保存每個(gè)有狀態(tài)算子的狀態(tài):

Operator ID State
source-id State of StatefulSource
mapper-id State of StatefulMapper

Operator ID 用于確定每個(gè)算子的狀態(tài),只要ID不變,就可以從 Savepoint 中恢復(fù),Operator ID 如果不顯示指定會自動生成,生成的ID取決于程序的結(jié)構(gòu),并且對程序更改很敏感。因此,建議手動分配這些ID:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

Savepoint 操作

觸發(fā) Savepoint 時(shí),會創(chuàng)建一個(gè)新的 Savepoint 目錄,其中將存儲數(shù)據(jù)和元數(shù)據(jù)。可以通過配置默認(rèn) targetDirectory 或指定自定義 targetDirectory:

state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定義目錄,Savepoint 將失敗。

觸發(fā) Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

生成 Savepoint(以 jobId 作為唯一ID),并返回創(chuàng)建的 Savepoint 的路徑,恢復(fù)時(shí)需要使用。

在 Yarn 集群觸發(fā) Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

要指定 jobId 和 yarnAppId(YARN應(yīng)用程序ID),并返回創(chuàng)建的 Savepoint 的路徑。

取消作業(yè)時(shí)生成 Savepoint

$ bin/flink cancel -s [:targetDirectory] :jobId

以原子方式觸發(fā)具有 jobId 的 Savepoint,并取消作業(yè)。

恢復(fù) Savepoint

$ bin/flink run -s :savepointPath [:runArgs]

提交作業(yè),并指定要恢復(fù)的 Savepoint路徑。

允許啟動有未恢復(fù) State

$ bin/flink run -s :savepointPath -n [:runArgs]

默認(rèn)情況下,恢復(fù)操作將嘗試將 Savepoint 的所有 State 恢復(fù)。如果刪除了運(yùn)算符,則可以通過 –allowNonRestoredState(簡寫為 -n) 選項(xiàng)跳過無法映射到新程序的狀態(tài)。

刪除 Savepoint

$ bin/flink savepoint -d :savepointPath

通過指定路徑刪除 Savepoint,也可以通過文件系統(tǒng)手動刪除 Savepoint 數(shù)據(jù),而不會影響其他 Savepoint 或 Checkpoint。

常見問題

應(yīng)該為所有算子分配ID嗎?
根據(jù)經(jīng)驗(yàn),是的。嚴(yán)格地說,只需要通過該uid()方法將ID分配給作業(yè)中的有狀態(tài) 算子。Savepoint 僅包含這些算子的 State,無狀態(tài)算子不是保存點(diǎn)的一部分。

如果在作業(yè)中新添加一個(gè)有狀態(tài)算子,會發(fā)生什么?
新算子將在沒有任何狀態(tài)的情況下進(jìn)行初始化,類似于無狀態(tài)算子。

如果在作業(yè)刪除一個(gè)有狀態(tài)的算子,會發(fā)生什么?
如果沒有指定允許啟動有未恢復(fù) State(–allowNonRestoredState / -n),啟動會失敗。

如果在作業(yè)中重新排列有狀態(tài)算子,會發(fā)生什么?
如果手動這些算子分配了ID,作業(yè)將照?;謴?fù)。否則,重新排序后,有狀態(tài)算子的自動生成ID很可能會更改,將導(dǎo)致無法從 Savepoint 恢復(fù)。

如果在作業(yè)中添加,刪除或重新排序沒有狀態(tài)的算子,會發(fā)生什么?
如果為有狀態(tài)算子手動分配了ID,作業(yè)將照?;謴?fù),則無狀態(tài)算子的改變不會影響。否則,重新排序后,有狀態(tài)算子的自動生成ID很可能會更改,將導(dǎo)致無法從 Savepoint 恢復(fù)。

如果作業(yè)的并行性發(fā)生改變,會發(fā)生什么?
如果 Savepoint 的生成是使用 Flink 1.2.0 以及之后的版本,并且沒有使用棄用狀態(tài)API,可以正?;謴?fù)作業(yè)。

如果 Savepoint 的生成比 Flink 1.2.0 更早的版本,或者使用棄用狀態(tài)API,則首先必須將作業(yè)和 Savepoint 升級到1.2.0以及之后的版本,然后才能更改并行度。請參考官方 升級指南。

Restart

Flink 支持多種不同的重啟策略,控制著作業(yè)失敗后如何重啟。集群可以設(shè)置默認(rèn)的重啟策略,作業(yè)提交的時(shí)候也可以指定重啟策略,覆蓋默認(rèn)的重啟策略。

默認(rèn)的重啟策略配置在 conf/flink-conf.yaml,參數(shù) restart-strategy 定義了采用什么策略。如果 checkpoint 未啟用,就會采用 "no restart" 策略,如果啟用了 checkpoint 機(jī)制,但是未指定重啟策略的話,就會采用 "fixed-delay" 策略。每個(gè)重啟策略都有自己的參數(shù)來控制它的行為,這些值也可以在配置文件中設(shè)置,每個(gè)重啟策略的描述都包含著各自的配置值信息。

以下是支持的三種重啟策略的可配置項(xiàng)

重啟策略 重啟策略值
Fixed delay fixed-delay
Failure rate failure-rate
No restart None

除了定義一個(gè)默認(rèn)的重啟策略之外,你還可以為每一個(gè)Job指定它自己的重啟策略,這個(gè)重啟策略可以在ExecutionEnvironment中調(diào)用setRestartStrategy()方法來程序化地調(diào)用,這種方式同樣適用于StreamExecutionEnvironment。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

固定延遲重啟策略(Fixed Delay Restart Strategy)

嘗試一個(gè)給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會等待一個(gè)固定的時(shí)間。

參數(shù)配置 描述 默認(rèn)值
restart-strategy.fixed-delay.attempts Flink嘗試執(zhí)行的次數(shù) 1,如果啟用checkpoint的話是Integer.MAX_VALUE
restart-strategy.fixed-delay.delay 兩次重啟之間等待的時(shí)間 akka.ask.timeout,如果啟用checkpoint的話是1s

flink-conf.yaml 參數(shù)配置:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

失敗率重啟策略(Failure Rate Restart Strategy)

Job失敗后會重啟次數(shù)如果超過失敗率,Job會最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會等待一個(gè)固定的時(shí)間。

配置參數(shù) 描述 默認(rèn)值
restart-strategy.failure-rate.max-failures-per-interval Flink嘗試執(zhí)行的次數(shù) 1
restart-strategy.failure-rate.failure-rate-interval 計(jì)算失敗率的時(shí)間間隔 1 min
restart-strategy.failure-rate.delay 兩次重啟之間等待的時(shí)間 akka.ask.timeout

flink-conf.yaml 參數(shù)配置:

restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3 
restart-strategy.failure-rate.failure-rate-interval: 5 min 
restart-strategy.failure-rate.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment() 
env.setRestartStrategy(RestartStrategies.failureRateRestart( 
  3, // 每個(gè)測量時(shí)間間隔最大失敗次數(shù) 
  Time.of(5, TimeUnit.MINUTES), //失敗率測量的時(shí)間間隔 
  Time.of(10, TimeUnit.SECONDS) // 兩次連續(xù)重啟嘗試的時(shí)間間隔 
))

無重啟策略(No Restart Strategy)

Job直接失敗,不會嘗試進(jìn)行重啟

flink-conf.yaml 參數(shù)配置:

restart-strategy: none
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/restart_strategies.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容