Flink 使用之重啟策略

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

Flink作業(yè)重啟策略

實(shí)際生產(chǎn)作業(yè)中,我們期望Flink作業(yè)遇到錯(cuò)誤的時(shí)候,能夠自動(dòng)重啟恢復(fù)到正常運(yùn)行狀態(tài)。

Flink支持多種作業(yè)重啟策略,但默認(rèn)作業(yè)重啟策略為none,即不會(huì)自動(dòng)重啟。作業(yè)一旦出現(xiàn)異常,會(huì)被標(biāo)記為failed直接退出。

接下來為大家?guī)鞦link支持的重啟策略類型和配置方法。

重啟策略類型

Flink支持的重啟策略類型如下:

  • none, off, disable:無重啟策略,作業(yè)遇到問題直接失敗,不會(huì)重啟。
  • fixeddelay, fixed-delay:作業(yè)失敗后,延遲一定時(shí)間重啟。但是有最大重啟次數(shù)限制,超過這個(gè)限制后作業(yè)失敗,不再重啟。
  • failurerate, failure-rate:作業(yè)失敗后,延遲一定時(shí)間重啟。但是有最大失敗率限制。如果一定時(shí)間內(nèi)作業(yè)失敗次數(shù)超過配置值,則標(biāo)記為真的失敗,不再重啟。
  • exponentialdelay, exponential-delay:作業(yè)失敗后重啟延遲時(shí)間隨著失敗次數(shù)指數(shù)遞增。沒有最大重啟次數(shù)限制,無限嘗試重啟作業(yè)。

注意:如果啟用了checkpoint并且沒有顯式配置重啟策略,會(huì)默認(rèn)使用fixeddelay策略,最大重試次數(shù)為Integer.MAX_VALUE

全局配置

全局配置影響Flink提交的所有作業(yè)的。修改全局配置需要編輯flink-conf.yaml文件。

配置重啟策略的方式:

restart-strategy: none, off, disable | fixeddelay, fixed-delay | failurerate, failure-rate | exponentialdelay, exponential-delay

接下來分別列出各個(gè)重啟策略專屬的配置參數(shù)和含義。

fixeddelay

# 嘗試重啟次數(shù)
restart-strategy.fixed-delay.attempts: 10
# 兩次連續(xù)重啟的間隔時(shí)間
restart-strategy.fixed-delay.delay: 20 s

failurerate

# 兩次連續(xù)重啟的間隔時(shí)間
restart-strategy.failure-rate.delay: 10 s
# 計(jì)算失敗率的統(tǒng)計(jì)時(shí)間跨度
restart-strategy.failure-rate.failure-rate-interval: 2 min
# 計(jì)算失敗率的統(tǒng)計(jì)時(shí)間內(nèi)的最大失敗次數(shù)
restart-strategy.failure-rate.max-failures-per-interval: 10

exponentialdelay

# 初次失敗后重啟時(shí)間間隔(初始值)
restart-strategy.exponential-delay.initial-backoff: 1 s
# 以后每次失敗,重啟時(shí)間間隔為上一次重啟時(shí)間間隔乘以這個(gè)值
restart-strategy.exponential-delay.backoff-multiplier: 2
# 每次重啟間隔時(shí)間的最大抖動(dòng)值(加或減去該配置項(xiàng)范圍內(nèi)的一個(gè)隨機(jī)數(shù)),防止大量作業(yè)在同一時(shí)刻重啟
restart-strategy.exponential-delay.jitter-factor: 0.1
# 最大重啟時(shí)間間隔,超過這個(gè)最大值后,重啟時(shí)間間隔不再增大
restart-strategy.exponential-delay.max-backoff: 1 min
# 多長(zhǎng)時(shí)間作業(yè)運(yùn)行無失敗后,重啟間隔時(shí)間會(huì)重置為初始值(第一個(gè)配置項(xiàng)的值)
restart-strategy.exponential-delay.reset-backoff-threshold: 1 h

作業(yè)級(jí)別配置

作業(yè)級(jí)別的配置僅僅會(huì)影響到單個(gè)job的行為,通常使用代碼的方式。通過調(diào)用env.setRestartStrategy方法,可以為指定作業(yè)級(jí)別的重啟策略。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(...)

創(chuàng)建重啟策略需要用到RestartStrategies這個(gè)類。接下來講解下如何創(chuàng)建第二節(jié)所述的各種重啟策略。

fixeddelay

// 設(shè)置重啟次數(shù)為10,重啟間隔時(shí)間為1000ms
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))
// 時(shí)間既可以使用long類型(毫秒為單位),也可以使用org.apache.flink.api.common.time.Time類型,更加直觀
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1000)))

failurerate

// 設(shè)置如果10分鐘內(nèi)失敗10次不再嘗試重啟,每次重啟間隔5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(10, Time.minutes(10), Time.seconds(5)))

exponentialdelay

// 設(shè)置初始重啟間隔為1秒,最大重啟間隔為5分鐘,每次失敗重啟間隔擴(kuò)大2倍,1小時(shí)內(nèi)作業(yè)無失敗重啟時(shí)間間隔重置,抖動(dòng)值為0.5
env.setRestartStrategy(
    RestartStrategies.exponentialDelayRestart(
      Time.seconds(1),
      Time.minutes(5),
      2.0,
      Time.hours(1),
      0.5
    )
)

演示代碼

為了在測(cè)試環(huán)境演示自動(dòng)重啟效果,我們可以使用socketTextStream實(shí)時(shí)接收用戶輸入,通過輸入錯(cuò)誤來模擬程序崩潰的場(chǎng)景。

一段示例代碼如下所示:

def main(args: Array[String]): Unit = {
    val conf = new Configuration
    // IDE本地運(yùn)行啟用WebUI
    conf.setInteger(RestOptions.PORT, 8080)
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 設(shè)置重啟策略為fixedDelay
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))

    val stream = env.socketTextStream("127.0.0.1", 9000)
    // 這里將輸入轉(zhuǎn)換為整數(shù),潛在的崩潰點(diǎn)
    .map(s => s.toInt)
    .print()

    env.execute()
}

通過這段代碼,我們可以模擬字符串轉(zhuǎn)整數(shù)報(bào)錯(cuò),從而引起作業(yè)重啟。

接下來我們重點(diǎn)觀察下作業(yè)重啟過程的日志輸出。

首先開啟socket端口,啟動(dòng)作業(yè)。作業(yè)啟動(dòng)成功后,我們看到類似如下輸出:

11:16:44,458 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (2/12) (9992bdbd2191dfee866dd6f47c8d7815) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (11/12) (efa872145cbb71236739c2cffc085cc9) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (9/12) (dce870ff8e69fcbd2fc9dcdec454d4f6) switched from INITIALIZING to RUNNING.
11:16:44,459 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map -> Sink: Print to Std. Out (6/12) (c86cd1b0771887f188bd96acca83e2db) switched from INITIALIZING to RUNNING.

說明作業(yè)狀態(tài)已從INITIALIZING切換為RUNNING,作業(yè)啟動(dòng)成功。

然后故意輸入一個(gè)無法轉(zhuǎn)換為整數(shù)類型的字符,例如s。我們?cè)诔绦驁?bào)錯(cuò)之后看到了如下內(nèi)容:

11:17:48,964 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy  - Calculating tasks to restart to recover the failed task 20ba6b65f97481d5570070de90e4e791_7.
11:17:48,964 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy  - 13 tasks should be restarted to recover the failed task 20ba6b65f97481d5570070de90e4e791_7. 
11:17:48,966 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (9dd4540acc7f786b009bd8e986d9a9d0) switched from state RUNNING to RESTARTING.

Flink首先計(jì)算需要重啟的task數(shù)量,然后將Job的狀態(tài)從RUNNING切換為RESTARTING。接下來和正常啟動(dòng)的流程基本一致。有興趣的讀者可以嘗試下上述其他重啟策略的具體行為。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容