概述
Flink支持不同的重啟策略,以在故障發(fā)生時(shí)控制作業(yè)如何重啟
集群在啟動(dòng)時(shí)會(huì)伴隨一個(gè)默認(rèn)的重啟策略,在沒有定義具體重啟策略時(shí)會(huì)使用該默認(rèn)策略。
如果在工作提交時(shí)指定了一個(gè)重啟策略,該策略會(huì)覆蓋集群的默認(rèn)策略默認(rèn)的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個(gè)策略被使用。
-
常用的重啟:
策略固定間隔 (Fixed delay) 2.失敗率 (Failure rate) 3.無重啟 (No restart)
如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略
重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應(yīng)用代碼中動(dòng)態(tài)指定,會(huì)覆蓋全局配置
固定間隔
第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二種:應(yīng)用代碼設(shè)置:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,// 嘗試重啟的次數(shù)
Time.of(10, TimeUnit.SECONDS) // 間隔 ));
失敗率
- 失敗率重啟策略在Job失敗后會(huì)重啟,但是超過失敗率后,Job會(huì)最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間
下面配置是5分鐘內(nèi)若失敗了3次則認(rèn)為該job失敗,重試間隔為10s
第一種:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二種:應(yīng)用代碼設(shè)置
env.setRestartStrategy(RestartStrategies.failureRateRestart( 3,//一個(gè)時(shí)間段內(nèi)的最大失敗次數(shù)
Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數(shù)的是時(shí)間段 Time.of(10, TimeUnit.SECONDS) // 間隔 ));
無重啟策略
第一種:全局配置 flink-conf.yaml
restart-strategy: none
第二種:應(yīng)用代碼設(shè)置
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart());
代碼實(shí)例
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class RestartDemo {
public static void main(String[] args) {
//獲取flink的運(yùn)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進(jìn)行啟動(dòng)一個(gè)檢查點(diǎn)【設(shè)置checkpoint的周期】
env.enableCheckpointing(1000);
// 間隔10秒 重啟3次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
//5分鐘內(nèi)若失敗了3次則認(rèn)為該job失敗,重試間隔為10s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));
//不重試
env.setRestartStrategy(RestartStrategies.noRestart());
}//
}