Savepoint
Savepoint 和 Checkpoint 的區(qū)別
Savepoint 是命令觸發(fā)的 Checkpoint,對流式程序做一次完整的快照并將結(jié)果寫到 State backend,可用于停止、恢復(fù)或更新 Flink 程序。整個(gè)過程依賴于 Checkpoint 機(jī)制。另一個(gè)不同之處是,Savepoint 不會自動清除。
分配 Operator IDs
Savepoint 中會以 Operator ID 作為 key 保存每個(gè)有狀態(tài)算子的狀態(tài):
| Operator ID | State |
|---|---|
| source-id | State of StatefulSource |
| mapper-id | State of StatefulMapper |
Operator ID 用于確定每個(gè)算子的狀態(tài),只要ID不變,就可以從 Savepoint 中恢復(fù),Operator ID 如果不顯示指定會自動生成,生成的ID取決于程序的結(jié)構(gòu),并且對程序更改很敏感。因此,建議手動分配這些ID:
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
Savepoint 操作
觸發(fā) Savepoint 時(shí),會創(chuàng)建一個(gè)新的 Savepoint 目錄,其中將存儲數(shù)據(jù)和元數(shù)據(jù)。可以通過配置默認(rèn) targetDirectory 或指定自定義 targetDirectory:
state.savepoints.dir: hdfs:///flink/savepoints
如果既未配置缺省值也未指定自定義目錄,Savepoint 將失敗。
觸發(fā) Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]
生成 Savepoint(以 jobId 作為唯一ID),并返回創(chuàng)建的 Savepoint 的路徑,恢復(fù)時(shí)需要使用。
在 Yarn 集群觸發(fā) Savepoint
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
要指定 jobId 和 yarnAppId(YARN應(yīng)用程序ID),并返回創(chuàng)建的 Savepoint 的路徑。
取消作業(yè)時(shí)生成 Savepoint
$ bin/flink cancel -s [:targetDirectory] :jobId
以原子方式觸發(fā)具有 jobId 的 Savepoint,并取消作業(yè)。
恢復(fù) Savepoint
$ bin/flink run -s :savepointPath [:runArgs]
提交作業(yè),并指定要恢復(fù)的 Savepoint路徑。
允許啟動有未恢復(fù) State
$ bin/flink run -s :savepointPath -n [:runArgs]
默認(rèn)情況下,恢復(fù)操作將嘗試將 Savepoint 的所有 State 恢復(fù)。如果刪除了運(yùn)算符,則可以通過 –allowNonRestoredState(簡寫為 -n) 選項(xiàng)跳過無法映射到新程序的狀態(tài)。
刪除 Savepoint
$ bin/flink savepoint -d :savepointPath
通過指定路徑刪除 Savepoint,也可以通過文件系統(tǒng)手動刪除 Savepoint 數(shù)據(jù),而不會影響其他 Savepoint 或 Checkpoint。
常見問題
應(yīng)該為所有算子分配ID嗎?
根據(jù)經(jīng)驗(yàn),是的。嚴(yán)格地說,只需要通過該uid()方法將ID分配給作業(yè)中的有狀態(tài) 算子。Savepoint 僅包含這些算子的 State,無狀態(tài)算子不是保存點(diǎn)的一部分。
如果在作業(yè)中新添加一個(gè)有狀態(tài)算子,會發(fā)生什么?
新算子將在沒有任何狀態(tài)的情況下進(jìn)行初始化,類似于無狀態(tài)算子。
如果在作業(yè)刪除一個(gè)有狀態(tài)的算子,會發(fā)生什么?
如果沒有指定允許啟動有未恢復(fù) State(–allowNonRestoredState / -n),啟動會失敗。
如果在作業(yè)中重新排列有狀態(tài)算子,會發(fā)生什么?
如果手動這些算子分配了ID,作業(yè)將照?;謴?fù)。否則,重新排序后,有狀態(tài)算子的自動生成ID很可能會更改,將導(dǎo)致無法從 Savepoint 恢復(fù)。
如果在作業(yè)中添加,刪除或重新排序沒有狀態(tài)的算子,會發(fā)生什么?
如果為有狀態(tài)算子手動分配了ID,作業(yè)將照?;謴?fù),則無狀態(tài)算子的改變不會影響。否則,重新排序后,有狀態(tài)算子的自動生成ID很可能會更改,將導(dǎo)致無法從 Savepoint 恢復(fù)。
如果作業(yè)的并行性發(fā)生改變,會發(fā)生什么?
如果 Savepoint 的生成是使用 Flink 1.2.0 以及之后的版本,并且沒有使用棄用狀態(tài)API,可以正?;謴?fù)作業(yè)。
如果 Savepoint 的生成比 Flink 1.2.0 更早的版本,或者使用棄用狀態(tài)API,則首先必須將作業(yè)和 Savepoint 升級到1.2.0以及之后的版本,然后才能更改并行度。請參考官方 升級指南。
Restart
Flink 支持多種不同的重啟策略,控制著作業(yè)失敗后如何重啟。集群可以設(shè)置默認(rèn)的重啟策略,作業(yè)提交的時(shí)候也可以指定重啟策略,覆蓋默認(rèn)的重啟策略。
默認(rèn)的重啟策略配置在 conf/flink-conf.yaml,參數(shù) restart-strategy 定義了采用什么策略。如果 checkpoint 未啟用,就會采用 "no restart" 策略,如果啟用了 checkpoint 機(jī)制,但是未指定重啟策略的話,就會采用 "fixed-delay" 策略。每個(gè)重啟策略都有自己的參數(shù)來控制它的行為,這些值也可以在配置文件中設(shè)置,每個(gè)重啟策略的描述都包含著各自的配置值信息。
以下是支持的三種重啟策略的可配置項(xiàng)
| 重啟策略 | 重啟策略值 |
|---|---|
| Fixed delay | fixed-delay |
| Failure rate | failure-rate |
| No restart | None |
除了定義一個(gè)默認(rèn)的重啟策略之外,你還可以為每一個(gè)Job指定它自己的重啟策略,這個(gè)重啟策略可以在ExecutionEnvironment中調(diào)用setRestartStrategy()方法來程序化地調(diào)用,這種方式同樣適用于StreamExecutionEnvironment。
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
固定延遲重啟策略(Fixed Delay Restart Strategy)
嘗試一個(gè)給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會等待一個(gè)固定的時(shí)間。
| 參數(shù)配置 | 描述 | 默認(rèn)值 |
|---|---|---|
| restart-strategy.fixed-delay.attempts | Flink嘗試執(zhí)行的次數(shù) | 1,如果啟用checkpoint的話是Integer.MAX_VALUE |
| restart-strategy.fixed-delay.delay | 兩次重啟之間等待的時(shí)間 | akka.ask.timeout,如果啟用checkpoint的話是1s |
flink-conf.yaml 參數(shù)配置:
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
失敗率重啟策略(Failure Rate Restart Strategy)
Job失敗后會重啟次數(shù)如果超過失敗率,Job會最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會等待一個(gè)固定的時(shí)間。
| 配置參數(shù) | 描述 | 默認(rèn)值 |
|---|---|---|
| restart-strategy.failure-rate.max-failures-per-interval | Flink嘗試執(zhí)行的次數(shù) | 1 |
| restart-strategy.failure-rate.failure-rate-interval | 計(jì)算失敗率的時(shí)間間隔 | 1 min |
| restart-strategy.failure-rate.delay | 兩次重啟之間等待的時(shí)間 | akka.ask.timeout |
flink-conf.yaml 參數(shù)配置:
restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每個(gè)測量時(shí)間間隔最大失敗次數(shù)
Time.of(5, TimeUnit.MINUTES), //失敗率測量的時(shí)間間隔
Time.of(10, TimeUnit.SECONDS) // 兩次連續(xù)重啟嘗試的時(shí)間間隔
))
無重啟策略(No Restart Strategy)
Job直接失敗,不會嘗試進(jìn)行重啟
flink-conf.yaml 參數(shù)配置:
restart-strategy: none
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/restart_strategies.html