Flink 使用介紹相關(guān)文檔目錄
Flink作業(yè)重啟策略
實(shí)際生產(chǎn)作業(yè)中,我們期望Flink作業(yè)遇到錯(cuò)誤的時(shí)候,能夠自動(dòng)重啟恢復(fù)到正常運(yùn)行狀態(tài)。
Flink支持多種作業(yè)重啟策略,但默認(rèn)作業(yè)重啟策略為none,即不會(huì)自動(dòng)重啟。作業(yè)一旦出現(xiàn)異常,會(huì)被標(biāo)記為failed直接退出。
接下來為大家?guī)鞦link支持的重啟策略類型和配置方法。
重啟策略類型
Flink支持的重啟策略類型如下:
- none, off, disable:無重啟策略,作業(yè)遇到問題直接失敗,不會(huì)重啟。
- fixeddelay, fixed-delay:作業(yè)失敗后,延遲一定時(shí)間重啟。但是有最大重啟次數(shù)限制,超過這個(gè)限制后作業(yè)失敗,不再重啟。
- failurerate, failure-rate:作業(yè)失敗后,延遲一定時(shí)間重啟。但是有最大失敗率限制。如果一定時(shí)間內(nèi)作業(yè)失敗次數(shù)超過配置值,則標(biāo)記為真的失敗,不再重啟。
- exponentialdelay, exponential-delay:作業(yè)失敗后重啟延遲時(shí)間隨著失敗次數(shù)指數(shù)遞增。沒有最大重啟次數(shù)限制,無限嘗試重啟作業(yè)。
注意:如果啟用了checkpoint并且沒有顯式配置重啟策略,會(huì)默認(rèn)使用fixeddelay策略,最大重試次數(shù)為
Integer.MAX_VALUE。
全局配置
全局配置影響Flink提交的所有作業(yè)的。修改全局配置需要編輯flink-conf.yaml文件。
配置重啟策略的方式:
restart-strategy: none, off, disable | fixeddelay, fixed-delay | failurerate, failure-rate | exponentialdelay, exponential-delay
接下來分別列出各個(gè)重啟策略專屬的配置參數(shù)和含義。
fixeddelay
# 嘗試重啟次數(shù)
restart-strategy.fixed-delay.attempts: 10
# 兩次連續(xù)重啟的間隔時(shí)間
restart-strategy.fixed-delay.delay: 20 s
failurerate
# 兩次連續(xù)重啟的間隔時(shí)間
restart-strategy.failure-rate.delay: 10 s
# 計(jì)算失敗率的統(tǒng)計(jì)時(shí)間跨度
restart-strategy.failure-rate.failure-rate-interval: 2 min
# 計(jì)算失敗率的統(tǒng)計(jì)時(shí)間內(nèi)的最大失敗次數(shù)
restart-strategy.failure-rate.max-failures-per-interval: 10
exponentialdelay
# 初次失敗后重啟時(shí)間間隔(初始值)
restart-strategy.exponential-delay.initial-backoff: 1 s
# 以后每次失敗,重啟時(shí)間間隔為上一次重啟時(shí)間間隔乘以這個(gè)值
restart-strategy.exponential-delay.backoff-multiplier: 2
# 每次重啟間隔時(shí)間的最大抖動(dòng)值(加或減去該配置項(xiàng)范圍內(nèi)的一個(gè)隨機(jī)數(shù)),防止大量作業(yè)在同一時(shí)刻重啟
restart-strategy.exponential-delay.jitter-factor: 0.1
# 最大重啟時(shí)間間隔,超過這個(gè)最大值后,重啟時(shí)間間隔不再增大
restart-strategy.exponential-delay.max-backoff: 1 min
# 多長(zhǎng)時(shí)間作業(yè)運(yùn)行無失敗后,重啟間隔時(shí)間會(huì)重置為初始值(第一個(gè)配置項(xiàng)的值)
restart-strategy.exponential-delay.reset-backoff-threshold: 1 h
作業(yè)級(jí)別配置
作業(yè)級(jí)別的配置僅僅會(huì)影響到單個(gè)job的行為,通常使用代碼的方式。通過調(diào)用env.setRestartStrategy方法,可以為指定作業(yè)級(jí)別的重啟策略。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(...)
創(chuàng)建重啟策略需要用到RestartStrategies這個(gè)類。接下來講解下如何創(chuàng)建第二節(jié)所述的各種重啟策略。
fixeddelay
// 設(shè)置重啟次數(shù)為10,重啟間隔時(shí)間為1000ms
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))
// 時(shí)間既可以使用long類型(毫秒為單位),也可以使用org.apache.flink.api.common.time.Time類型,更加直觀
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.milliseconds(1000)))
failurerate
// 設(shè)置如果10分鐘內(nèi)失敗10次不再嘗試重啟,每次重啟間隔5秒
env.setRestartStrategy(RestartStrategies.failureRateRestart(10, Time.minutes(10), Time.seconds(5)))
exponentialdelay
// 設(shè)置初始重啟間隔為1秒,最大重啟間隔為5分鐘,每次失敗重啟間隔擴(kuò)大2倍,1小時(shí)內(nèi)作業(yè)無失敗重啟時(shí)間間隔重置,抖動(dòng)值為0.5
env.setRestartStrategy(
RestartStrategies.exponentialDelayRestart(
Time.seconds(1),
Time.minutes(5),
2.0,
Time.hours(1),
0.5
)
)
演示代碼
為了在測(cè)試環(huán)境演示自動(dòng)重啟效果,我們可以使用socketTextStream實(shí)時(shí)接收用戶輸入,通過輸入錯(cuò)誤來模擬程序崩潰的場(chǎng)景。
一段示例代碼如下所示:
def main(args: Array[String]): Unit = {
val conf = new Configuration
// IDE本地運(yùn)行啟用WebUI
conf.setInteger(RestOptions.PORT, 8080)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// 設(shè)置重啟策略為fixedDelay
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000))
val stream = env.socketTextStream("127.0.0.1", 9000)
// 這里將輸入轉(zhuǎn)換為整數(shù),潛在的崩潰點(diǎn)
.map(s => s.toInt)
.print()
env.execute()
}
通過這段代碼,我們可以模擬字符串轉(zhuǎn)整數(shù)報(bào)錯(cuò),從而引起作業(yè)重啟。
接下來我們重點(diǎn)觀察下作業(yè)重啟過程的日志輸出。
首先開啟socket端口,啟動(dòng)作業(yè)。作業(yè)啟動(dòng)成功后,我們看到類似如下輸出:
11:16:44,458 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (2/12) (9992bdbd2191dfee866dd6f47c8d7815) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (11/12) (efa872145cbb71236739c2cffc085cc9) switched from INITIALIZING to RUNNING.
11:16:44,458 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (9/12) (dce870ff8e69fcbd2fc9dcdec454d4f6) switched from INITIALIZING to RUNNING.
11:16:44,459 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Print to Std. Out (6/12) (c86cd1b0771887f188bd96acca83e2db) switched from INITIALIZING to RUNNING.
說明作業(yè)狀態(tài)已從INITIALIZING切換為RUNNING,作業(yè)啟動(dòng)成功。
然后故意輸入一個(gè)無法轉(zhuǎn)換為整數(shù)類型的字符,例如s。我們?cè)诔绦驁?bào)錯(cuò)之后看到了如下內(nèi)容:
11:17:48,964 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task 20ba6b65f97481d5570070de90e4e791_7.
11:17:48,964 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 13 tasks should be restarted to recover the failed task 20ba6b65f97481d5570070de90e4e791_7.
11:17:48,966 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (9dd4540acc7f786b009bd8e986d9a9d0) switched from state RUNNING to RESTARTING.
Flink首先計(jì)算需要重啟的task數(shù)量,然后將Job的狀態(tài)從RUNNING切換為RESTARTING。接下來和正常啟動(dòng)的流程基本一致。有興趣的讀者可以嘗試下上述其他重啟策略的具體行為。