Flink Job重啟與Task恢復(fù)策略簡介

前言

流式計算對穩(wěn)定性敏感,所以我們在編寫作業(yè)時一定會做好防御性編程,如各種判空、邊界條件、安全的類型轉(zhuǎn)換、格式判斷、異常捕獲等。但是墨菲定律說得好:

Anything that can go wrong will go wrong.

換言之,我們寫再多的防御性代碼,也無法覆蓋所有非法數(shù)據(jù)的可能性,何況外部環(huán)境(網(wǎng)絡(luò)、磁盤等)也會出現(xiàn)不可預(yù)知的波動,所以作業(yè)在遇到意外情況時最好能自己“復(fù)活”,而不是每次都要靠人工手動拉起來。針對這個問題,F(xiàn)link提供了重啟策略(restart strategy)使Job從最近一次checkpoint自動恢復(fù)現(xiàn)場。本文先簡要介紹一下3種Job重啟策略。

Job重啟策略

固定延時重啟(fixed-delay)

flink-conf.yaml中的配置:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 15s

或者在代碼里對每個Job進行配置,優(yōu)先級比flink-conf.yaml高:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  10,               // attempts
  Time.seconds(15)  // delay
));

在Flink Job失敗時,該策略按照restart-strategy.fixed-delay.delay參數(shù)給出的固定間隔試圖重啟Job。如果重啟次數(shù)達到restart-strategy.fixed-delay.attempts參數(shù)規(guī)定的閾值之后還沒有成功,就停止Job。

若我們的Job中啟用了檢查點機制,并且沒有對重啟策略做任何設(shè)置的話,F(xiàn)link就會fallback到此策略,但是同時會將重啟次數(shù)設(shè)定為Integer.MAX_VALUE,間隔為10秒。帶來的風(fēng)險是如果Job始終無法恢復(fù),就會無限重試,造成長時間不可用以及日志泛濫(之前在Flink社區(qū)群內(nèi)見到過,如下圖)。

所以,當(dāng)啟用檢查點時,最好手動設(shè)定重啟策略的參數(shù)。

按失敗率重啟(failure-rate)

flink-conf.yaml中的配置:

restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 10
restart-strategy.failure-rate.failure-rate-interval: 300s
restart-strategy.failure-rate.delay: 15s

或者在代碼里對每個Job進行配置:

env.setRestartStrategy(RestartStrategies.failureRateRestart(
  10,                // max-failures-per-interval
  Time.minutes(5),   // failure-rate-interval
  Time.seconds(15)   // delay
));

在Flink Job失敗時,該策略按照restart-strategy.failure-rate.delay參數(shù)給出的固定間隔試圖重啟Job。如果重啟次數(shù)在restart-strategy.failure-rate.failure-rate-interval的時間周期內(nèi)達到restart-strategy.failure-rate.max-failures-per-interval參數(shù)規(guī)定的閾值之后還沒有成功,就停止Job。

如果啟用了failure-rate重啟策略,但沒設(shè)定參數(shù)的話,F(xiàn)link默認(rèn)會將3個參數(shù)的值分別設(shè)定為1次、1分鐘和akka.ask.timeout參數(shù)指定的超時時間。

無重啟(none)

顧名思義,Job出現(xiàn)意外時直接失敗。配置方法分別如下:

restart-strategy: none
env.setRestartStrategy(RestartStrategies.noRestart());

當(dāng)Job內(nèi)沒有啟用檢查點機制并沒有設(shè)置重啟策略的話,默認(rèn)會fallback到此策略。

三種Job重啟策略說完了。當(dāng)然,只有它是不夠的,還得配合適當(dāng)?shù)淖鳂I(yè)監(jiān)控來為我們提供異常告警,以便及時提醒我們進行檢查。筆者不是專業(yè)負(fù)責(zé)監(jiān)控系統(tǒng)的,就不班門弄斧了。

Flink Job的細(xì)粒度組成是Task,Job的失敗與重啟總是可以追溯到Task級別,所以下面我們來看看Task恢復(fù)策略。

Task恢復(fù)策略

官方為了與Job重啟做區(qū)分,將Task的重啟策略叫做故障恢復(fù)策略(failover strategy),簡單的介紹見文檔。它由flink-conf.yaml中的jobmanager.execution.failover-strategy參數(shù)指定,有兩個可選項:

  • full:重啟Job中所有的Task,即重置整個ExecutionGraph,簡單粗暴。
  • region:只重啟ExecutionGraph中對應(yīng)的Region包含的Task,更加智能,降低overhead。

full策略沒什么好說的,下面我們根據(jù)FLIP-1(Fine Grained Recovery from Task Failures)中給出的設(shè)計思想簡單分析一下region策略。

根據(jù)圖論知識,如果我們的ExecutionGraph是一個非連通圖(即可以劃分為多個獨立的依賴pipeline),那么當(dāng)某個Task失敗時,就可以只回溯到該Task所在的連通分量的Source,并重啟該連通分量涉及到的所有Task,而其他Task不受影響,如下圖所示。此時一個連通分量就是一個Region。

這個思路很容易理解,但是對于ExecutionGraph本身就是連通圖的情況就不高效了,因為還是要重啟所有Task,如下圖所示。

所以Flink對這種情況又做了一個優(yōu)化:在發(fā)生一對多依賴的Task后面緩存計算出來的中間結(jié)果(intermediate result)。當(dāng)下游的Task失敗重啟時,就可以不必回溯到Source,而是回溯到中間結(jié)果就行了,重啟的Task數(shù)進一步減少。此時從中間結(jié)果緩存起計的所有下游Task形成一個Region。用語言描述可能有些不直觀,一張圖就能說明白了。

注意B1、B2后面的黑框框

當(dāng)然,如果是靠近Source一端的Task出了問題,或者中間結(jié)果緩存失效,這種方法就行不通了,老老實實從Source重啟吧。

篇幅所限(其實是筆者犯懶),本文就不再分析源碼了。Job重啟策略的相關(guān)源碼在o.a.f.runtime.executiongraph.restart包,Task重啟策略的相關(guān)源碼在o.a.f.runtime.executiongraph.failover包,看官可以自行找來閱讀。

The End

民那晚安咯。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容