四、Flink重啟策略

概述

  • Flink支持不同的重啟策略,以在故障發(fā)生時(shí)控制作業(yè)如何重啟

  • 集群在啟動(dòng)時(shí)會(huì)伴隨一個(gè)默認(rèn)的重啟策略,在沒有定義具體重啟策略時(shí)會(huì)使用該默認(rèn)策略。

  • 如果在工作提交時(shí)指定了一個(gè)重啟策略,該策略會(huì)覆蓋集群的默認(rèn)策略默認(rèn)的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個(gè)策略被使用。

  • 常用的重啟:

    策略固定間隔 (Fixed delay) 2.失敗率 (Failure rate) 3.無重啟 (No restart)

  • 如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略

  • 重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應(yīng)用代碼中動(dòng)態(tài)指定,會(huì)覆蓋全局配置

固定間隔

第一種:全局配置 flink-conf.yaml

restart-strategy: fixed-delay 
restart-strategy.fixed-delay.attempts: 3 
restart-strategy.fixed-delay.delay: 10 s

第二種:應(yīng)用代碼設(shè)置:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,// 嘗試重啟的次數(shù) 
    Time.of(10, TimeUnit.SECONDS) // 間隔 ));

失敗率

  • 失敗率重啟策略在Job失敗后會(huì)重啟,但是超過失敗率后,Job會(huì)最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間

下面配置是5分鐘內(nèi)若失敗了3次則認(rèn)為該job失敗,重試間隔為10s

第一種:全局配置 flink-conf.yaml

restart-strategy: failure-rate  
restart-strategy.failure-rate.max-failures-per-interval: 3  
restart-strategy.failure-rate.failure-rate-interval: 5 min  
restart-strategy.failure-rate.delay: 10 s

第二種:應(yīng)用代碼設(shè)置

env.setRestartStrategy(RestartStrategies.failureRateRestart(  3,//一個(gè)時(shí)間段內(nèi)的最大失敗次數(shù)  
Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數(shù)的是時(shí)間段  Time.of(10, TimeUnit.SECONDS) // 間隔  ));

無重啟策略

第一種:全局配置 flink-conf.yaml

restart-strategy: none

第二種:應(yīng)用代碼設(shè)置

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();  env.setRestartStrategy(RestartStrategies.noRestart());

代碼實(shí)例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class RestartDemo {
    public static void main(String[] args) {
        //獲取flink的運(yùn)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每隔1000 ms進(jìn)行啟動(dòng)一個(gè)檢查點(diǎn)【設(shè)置checkpoint的周期】
        env.enableCheckpointing(1000);

        // 間隔10秒 重啟3次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));

        //5分鐘內(nèi)若失敗了3次則認(rèn)為該job失敗,重試間隔為10s
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));

        //不重試
        env.setRestartStrategy(RestartStrategies.noRestart());
    }//
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容