flink狀態(tài)管理及容錯(cuò)機(jī)制

[TOC]

一. 狀態(tài)管理的基本概念

image.png

首先舉一個(gè)無(wú)狀態(tài)計(jì)算的例子:消費(fèi)延遲計(jì)算。假設(shè)現(xiàn)在有一個(gè)消息隊(duì)列,消息隊(duì)列中有一個(gè)生產(chǎn)者持續(xù)往消費(fèi)隊(duì)列寫(xiě)入消息,多個(gè)消費(fèi)者分別從消息隊(duì)列中讀取消息。從圖上可以看出,生產(chǎn)者已經(jīng)寫(xiě)入 16 條消息,Offset 停留在 15 ;有 3 個(gè)消費(fèi)者,有的消費(fèi)快,而有的消費(fèi)慢。消費(fèi)快的已經(jīng)消費(fèi)了 13 條數(shù)據(jù),消費(fèi)者慢的才消費(fèi)了 7、8 條數(shù)據(jù)。

如何實(shí)時(shí)統(tǒng)計(jì)每個(gè)消費(fèi)者落后多少條數(shù)據(jù),如圖給出了輸入輸出的示例。可以了解到輸入的時(shí)間點(diǎn)有一個(gè)時(shí)間戳,生產(chǎn)者將消息寫(xiě)到了某個(gè)時(shí)間點(diǎn)的位置,每個(gè)消費(fèi)者同一時(shí)間點(diǎn)分別讀到了什么位置。剛才也提到了生產(chǎn)者寫(xiě)入了 15 條,消費(fèi)者分別讀取了 10、7、12 條。那么問(wèn)題來(lái)了,怎么將生產(chǎn)者、消費(fèi)者的進(jìn)度轉(zhuǎn)換為右側(cè)示意圖信息呢?

consumer 0 落后了 5 條,consumer 1 落后了 8 條,consumer 2 落后了 3 條,根據(jù) Flink 的原理,此處需進(jìn)行 Map 操作。Map 首先把消息讀取進(jìn)來(lái),然后分別相減,即可知道每個(gè) consumer 分別落后了幾條。Map 一直往下發(fā),則會(huì)得出最終結(jié)果。

大家會(huì)發(fā)現(xiàn),在這種模式的計(jì)算中,無(wú)論這條輸入進(jìn)來(lái)多少次,輸出的結(jié)果都是一樣的,因?yàn)閱螚l輸入中已經(jīng)包含了所需的所有信息。消費(fèi)落后等于生產(chǎn)者減去消費(fèi)者。生產(chǎn)者的消費(fèi)在單條數(shù)據(jù)中可以得到,消費(fèi)者的數(shù)據(jù)也可以在單條數(shù)據(jù)中得到,所以相同輸入可以得到相同輸出,這就是一個(gè)無(wú)狀態(tài)的計(jì)算。

image.png

相應(yīng)的什么是有狀態(tài)的計(jì)算?
以訪問(wèn)日志統(tǒng)計(jì)量的例子進(jìn)行說(shuō)明,比如當(dāng)前拿到一個(gè) Nginx 訪問(wèn)日志,一條日志表示一個(gè)請(qǐng)求,記錄該請(qǐng)求從哪里來(lái),訪問(wèn)的哪個(gè)地址,需要實(shí)時(shí)統(tǒng)計(jì)每個(gè)地址總共被訪問(wèn)了多少次,也即每個(gè) API 被調(diào)用了多少次??梢钥吹较旅婧?jiǎn)化的輸入和輸出,輸入第一條是在某個(gè)時(shí)間點(diǎn)請(qǐng)求 GET 了 /api/a;第二條日志記錄了某個(gè)時(shí)間點(diǎn) Post /api/b ;第三條是在某個(gè)時(shí)間點(diǎn) GET了一個(gè) /api/a,總共有 3 個(gè) Nginx 日志。從這 3 條 Nginx 日志可以看出,第一條進(jìn)來(lái)輸出 /api/a 被訪問(wèn)了一次,第二條進(jìn)來(lái)輸出 /api/b 被訪問(wèn)了一次,緊接著又進(jìn)來(lái)一條訪問(wèn) api/a,所以 api/a 被訪問(wèn)了 2 次。不同的是,兩條 /api/a 的 Nginx 日志進(jìn)來(lái)的數(shù)據(jù)是一樣的,但輸出的時(shí)候結(jié)果可能不同,第一次輸出 count=1 ,第二次輸出 count=2,說(shuō)明相同輸入可能得到不同輸出。輸出的結(jié)果取決于當(dāng)前請(qǐng)求的 API 地址之前累計(jì)被訪問(wèn)過(guò)多少次。第一條過(guò)來(lái)累計(jì)是 0 次,count = 1,第二條過(guò)來(lái) API 的訪問(wèn)已經(jīng)有一次了,所以 /api/a 訪問(wèn)累計(jì)次數(shù) count=2。單條數(shù)據(jù)其實(shí)僅包含當(dāng)前這次訪問(wèn)的信息,而不包含所有的信息。要得到這個(gè)結(jié)果,還需要依賴(lài) API 累計(jì)訪問(wèn)的量,即狀態(tài)。

這個(gè)計(jì)算模式是將數(shù)據(jù)輸入算子中,用來(lái)進(jìn)行各種復(fù)雜的計(jì)算并輸出數(shù)據(jù)。這個(gè)過(guò)程中算子會(huì)去訪問(wèn)之前存儲(chǔ)在里面的狀態(tài)。另外一方面,它還會(huì)把現(xiàn)在的數(shù)據(jù)對(duì)狀態(tài)的影響實(shí)時(shí)更新,如果輸入 200 條數(shù)據(jù),最后輸出就是 200 條結(jié)果。

image.png

什么場(chǎng)景會(huì)用到狀態(tài)呢?下面列舉了常見(jiàn)的 4 種:

  • 去重:比如上游的系統(tǒng)數(shù)據(jù)可能會(huì)有重復(fù),落到下游系統(tǒng)時(shí)希望把重復(fù)的數(shù)據(jù)都去掉。去重需要先了解哪些數(shù)據(jù)來(lái)過(guò),哪些數(shù)據(jù)還沒(méi)有來(lái),也就是把所有的主鍵都記錄下來(lái),當(dāng)一條數(shù)據(jù)到來(lái)后,能夠看到在主鍵當(dāng)中是否存在。
  • 窗口計(jì)算:比如統(tǒng)計(jì)每分鐘 Nginx 日志 API 被訪問(wèn)了多少次。窗口是一分鐘計(jì)算一次,在窗口觸發(fā)前,如 08:00 ~ 08:01 這個(gè)窗口,前59秒的數(shù)據(jù)來(lái)了需要先放入內(nèi)存,即需要把這個(gè)窗口之內(nèi)的數(shù)據(jù)先保留下來(lái),等到 8:01 時(shí)一分鐘后,再將整個(gè)窗口內(nèi)觸發(fā)的數(shù)據(jù)輸出。未觸發(fā)的窗口數(shù)據(jù)也是一種狀態(tài)。
  • 機(jī)器學(xué)習(xí)/深度學(xué)習(xí):如訓(xùn)練的模型以及當(dāng)前模型的參數(shù)也是一種狀態(tài),機(jī)器學(xué)習(xí)可能每次都用有一個(gè)數(shù)據(jù)集,需要在數(shù)據(jù)集上進(jìn)行學(xué)習(xí),對(duì)模型進(jìn)行一個(gè)反饋。
  • 訪問(wèn)歷史數(shù)據(jù):比如與昨天的數(shù)據(jù)進(jìn)行對(duì)比,需要訪問(wèn)一些歷史數(shù)據(jù)。如果每次從外部去讀,對(duì)資源的消耗可能比較大,所以也希望把這些歷史數(shù)據(jù)也放入狀態(tài)中做對(duì)比。
2. 為什么要管理狀態(tài)
image.png

管理狀態(tài)最直接的方式就是將數(shù)據(jù)都放到內(nèi)存中,這也是很常見(jiàn)的做法。比如在做 WordCount 時(shí),Word 作為輸入,Count 作為輸出。在計(jì)算的過(guò)程中把輸入不斷累加到 Count。

但對(duì)于流式作業(yè)有以下要求:

  • 7*24小時(shí)運(yùn)行,高可靠;

  • 數(shù)據(jù)不丟不重,恰好計(jì)算一次;

  • 數(shù)據(jù)實(shí)時(shí)產(chǎn)出,不延遲;

基于以上要求,內(nèi)存的管理就會(huì)出現(xiàn)一些問(wèn)題。由于內(nèi)存的容量是有限制的。如果要做 24 小時(shí)的窗口計(jì)算,將 24 小時(shí)的數(shù)據(jù)都放到內(nèi)存,可能會(huì)出現(xiàn)內(nèi)存不足;另外,作業(yè)是 7*24,需要保障高可用,機(jī)器若出現(xiàn)故障或者宕機(jī),需要考慮如何備份及從備份中去恢復(fù),保證運(yùn)行的作業(yè)不受影響;此外,考慮橫向擴(kuò)展,假如網(wǎng)站的訪問(wèn)量不高,統(tǒng)計(jì)每個(gè) API 訪問(wèn)次數(shù)的程序可以用單線程去運(yùn)行,但如果網(wǎng)站訪問(wèn)量突然增加,單節(jié)點(diǎn)無(wú)法處理全部訪問(wèn)數(shù)據(jù),此時(shí)需要增加幾個(gè)節(jié)點(diǎn)進(jìn)行橫向擴(kuò)展,這時(shí)數(shù)據(jù)的狀態(tài)如何平均分配到新增加的節(jié)點(diǎn)也問(wèn)題之一。因此,將數(shù)據(jù)都放到內(nèi)存中,并不是最合適的一種狀態(tài)管理方式。

3. 理想的狀態(tài)管理

image.png

最理想的狀態(tài)管理需要滿(mǎn)足易用、高效、可靠三點(diǎn)需求:

  • 易用,F(xiàn)link 提供了豐富的數(shù)據(jù)結(jié)構(gòu)、多樣的狀態(tài)組織形式以及簡(jiǎn)潔的擴(kuò)展接口,讓狀態(tài)管理更加易用;
  • 高效,實(shí)時(shí)作業(yè)一般需要更低的延遲,一旦出現(xiàn)故障,恢復(fù)速度也需要更快;當(dāng)處理能力不夠時(shí),可以橫向擴(kuò)展,同時(shí)在處理備份時(shí),不影響作業(yè)本身處理性能;
  • 可靠,F(xiàn)link 提供了狀態(tài)持久化,包括不丟不重的語(yǔ)義以及具備自動(dòng)的容錯(cuò)能力,比如 HA,當(dāng)節(jié)點(diǎn)掛掉后會(huì)自動(dòng)拉起,不需要人工介入。

二. Flink 狀態(tài)的類(lèi)型與使用示例

image.png

Managed State 是 Flink 自動(dòng)管理的 State,而 Raw State 是原生態(tài) State,兩者的區(qū)別如下:

  1. 從狀態(tài)管理方式的方式來(lái)說(shuō),Managed State 由 Flink Runtime 管理,自動(dòng)存儲(chǔ),自動(dòng)恢復(fù),在內(nèi)存管理上有優(yōu)化;而 Raw State 需要用戶(hù)自己管理,需要自己序列化,F(xiàn)link 不知道 State 中存入的數(shù)據(jù)是什么結(jié)構(gòu),只有用戶(hù)自己知道,需要最終序列化為可存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu)。
  2. 從狀態(tài)數(shù)據(jù)結(jié)構(gòu)來(lái)說(shuō),Managed State 支持已知的數(shù)據(jù)結(jié)構(gòu),如 Value、List、Map 等。而 Raw State只支持字節(jié)數(shù)組 ,所有狀態(tài)都要轉(zhuǎn)換為二進(jìn)制字節(jié)數(shù)組才可以。
  3. 從推薦使用場(chǎng)景來(lái)說(shuō),Managed State 大多數(shù)情況下均可使用,而 Raw State 是當(dāng) Managed State 不夠用時(shí),比如需要自定義 Operator 時(shí),推薦使用 Raw State。

2. Keyed State & Operator State

image.png

Managed State 分為兩種,一種是 Keyed State;另外一種是 Operator State。在Flink Stream模型中,Datastream 經(jīng)過(guò) keyBy 的操作可以變?yōu)?KeyedStream 。

每個(gè) Key 對(duì)應(yīng)一個(gè) State,即一個(gè) Operator 實(shí)例處理多個(gè) Key,訪問(wèn)相應(yīng)的多個(gè) State,并由此就衍生了 Keyed State。Keyed State 只能用在 KeyedStream 的算子中,即在整個(gè)程序中沒(méi)有 keyBy 的過(guò)程就沒(méi)有辦法使用 KeyedStream。

相比較而言,Operator State 可以用于所有算子,相對(duì)于數(shù)據(jù)源有一個(gè)更好的匹配方式,常用于 Source,例如 FlinkKafkaConsumer。相比 Keyed State,一個(gè) Operator 實(shí)例對(duì)應(yīng)一個(gè) State,隨著并發(fā)的改變,Keyed State 中,State 隨著 Key 在實(shí)例間遷移,比如原來(lái)有 1 個(gè)并發(fā),對(duì)應(yīng)的 API 請(qǐng)求過(guò)來(lái),/api/a 和 /api/b 都存放在這個(gè)實(shí)例當(dāng)中;如果請(qǐng)求量變大,需要擴(kuò)容,就會(huì)把 /api/a 的狀態(tài)和 /api/b 的狀態(tài)分別放在不同的節(jié)點(diǎn)。由于 Operator State 沒(méi)有 Key,并發(fā)改變時(shí)需要選擇狀態(tài)如何重新分配。其中內(nèi)置了 2 種分配方式:一種是均勻分配,另外一種是將所有 State 合并為全量 State 再分發(fā)給每個(gè)實(shí)例。

在訪問(wèn)上,Keyed State 通過(guò) RuntimeContext 訪問(wèn),這需要 Operator 是一個(gè)Rich Function。Operator State 需要自己實(shí)現(xiàn) CheckpointedFunction 或 ListCheckpointed 接口。在數(shù)據(jù)結(jié)構(gòu)上,Keyed State 支持的數(shù)據(jù)結(jié)構(gòu),比如 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的數(shù)據(jù)結(jié)構(gòu)相對(duì)較少,如 ListState。

3. Keyed State 使用示例

image.png

Keyed State 有很多種,如圖為幾種 Keyed State 之間的關(guān)系。首先 State 的子類(lèi)中一級(jí)子類(lèi)有 ValueState、MapState、AppendingState。AppendingState 又有一個(gè)子類(lèi) MergingState。MergingState 又分為 3 個(gè)子類(lèi)分別是ListState、ReducingState、AggregatingState。這個(gè)繼承關(guān)系使它們的訪問(wèn)方式、數(shù)據(jù)結(jié)構(gòu)也存在差異。

image.png

幾種 Keyed State 的差異具體體現(xiàn)在:

  • ValueState 存儲(chǔ)單個(gè)值,比如 Wordcount,用 Word 當(dāng) Key,State 就是它的 Count。這里面的單個(gè)值可能是數(shù)值或者字符串,作為單個(gè)值,訪問(wèn)接口可能有兩種,get 和 set。在 State 上體現(xiàn)的是 update(T) / T value()。
  • MapState 的狀態(tài)數(shù)據(jù)類(lèi)型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一個(gè)。
  • ListState 狀態(tài)數(shù)據(jù)類(lèi)型是 List,訪問(wèn)接口如 add、update 等。
  • ReducingState 和 AggregatingState 與 ListState 都是同一個(gè)父類(lèi),但狀態(tài)數(shù)據(jù)類(lèi)型上是單個(gè)值,原因在于其中的 add 方法不是把當(dāng)前的元素追加到列表中,而是把當(dāng)前元素直接更新進(jìn)了 Reducing 的結(jié)果中。
  • AggregatingState 的區(qū)別是在訪問(wèn)接口,ReducingState 中 add(T)和 T get() 進(jìn)去和出來(lái)的元素都是同一個(gè)類(lèi)型,但在 AggregatingState 輸入的 IN,輸出的是 OUT。
image.png

如圖為 Flink 作業(yè)的主方法與主函數(shù)中的內(nèi)容,前面的輸入、后面的輸出以及一些個(gè)性化的配置項(xiàng)都已去掉,僅保留了主干。

首先 events 是一個(gè) DataStream,通過(guò) env.addSource 加載數(shù)據(jù)進(jìn)來(lái),接下來(lái)有一個(gè) DataStream 叫 alerts,先 keyby 一個(gè) sourceAddress,然后在 flatMap 一個(gè)StateMachineMapper。StateMachineMapper 就是一個(gè)狀態(tài)機(jī),狀態(tài)機(jī)指有不同的狀態(tài)與狀態(tài)間有不同的轉(zhuǎn)換關(guān)系的結(jié)合,以買(mǎi)東西的過(guò)程簡(jiǎn)單舉例。首先下訂單,訂單生成后狀態(tài)為待付款,當(dāng)再來(lái)一個(gè)事件狀態(tài)付款成功,則事件的狀態(tài)將會(huì)從待付款變?yōu)橐迅犊?,待發(fā)貨。已付款,待發(fā)貨的狀態(tài)再來(lái)一個(gè)事件發(fā)貨,訂單狀態(tài)將會(huì)變?yōu)榕渌椭?,配送中的狀態(tài)再來(lái)一個(gè)事件簽收,則該訂單的狀態(tài)就變?yōu)橐押炇?。在整個(gè)過(guò)程中,隨時(shí)都可以來(lái)一個(gè)事件,取消訂單,無(wú)論哪個(gè)狀態(tài),一旦觸發(fā)了取消訂單事件最終就會(huì)將狀態(tài)轉(zhuǎn)移到已取消,至此狀態(tài)就結(jié)束了

Flink 寫(xiě)狀態(tài)機(jī)是如何實(shí)現(xiàn)的?首先這是一個(gè) RichFlatMapFunction,要用 Keyed State getRuntimeContext,getRuntimeContext 的過(guò)程中需要 RichFunction,所以需要在 open 方法中獲取 currentState ,然后 getState,currentState 保存的是當(dāng)前狀態(tài)機(jī)上的狀態(tài)。

如果剛下訂單,那么 currentState 就是待付款狀態(tài),初始化后,currentState 就代表訂單完成。訂單來(lái)了后,就會(huì)走 flatMap 這個(gè)方法,在 flatMap 方法中,首先定義一個(gè) State,從 currentState 取出,即 Value,Value 取值后先判斷值是否為空,如果 sourceAddress state 是空,則說(shuō)明沒(méi)有被使用過(guò),那么此狀態(tài)應(yīng)該為剛創(chuàng)建訂單的初始狀態(tài),即待付款。然后賦值 state = State.Initial,注意此處的 State 是本地的變量,而不是 Flink 中管理的狀態(tài),將它的值從狀態(tài)中取出。接下來(lái)在本地又會(huì)來(lái)一個(gè)變量,然后 transition,將事件對(duì)它的影響加上,剛才待付款的訂單收到付款成功的事件,就會(huì)變成已付款,待發(fā)貨,然后 nextState 即可算出。此外,還需要判斷 State 是否合法,比如一個(gè)已簽收的訂單,又來(lái)一個(gè)狀態(tài)叫取消訂單,會(huì)發(fā)現(xiàn)已簽收訂單不能被取消,此時(shí)這個(gè)狀態(tài)就會(huì)下發(fā),訂單狀態(tài)為非法狀態(tài)。

如果不是非法的狀態(tài),還要看該狀態(tài)是否已經(jīng)無(wú)法轉(zhuǎn)換,比如這個(gè)狀態(tài)變?yōu)橐讶∠麜r(shí),就不會(huì)在有其他的狀態(tài)再發(fā)生了,此時(shí)就會(huì)從 state 中 clear。clear 是所有的 Flink 管理 keyed state 都有的公共方法,意味著將信息刪除,如果既不是一個(gè)非法狀態(tài)也不是一個(gè)結(jié)束狀態(tài),后面可能還會(huì)有更多的轉(zhuǎn)換,此時(shí)需要將訂單的當(dāng)前狀態(tài) update ,這樣就完成了 ValueState 的初始化、取值、更新以及清零,在整個(gè)過(guò)程中狀態(tài)機(jī)的作用就是將非法的狀態(tài)進(jìn)行下發(fā),方便下游進(jìn)行處理。其他的狀態(tài)也是類(lèi)似的使用方式。

三. 容錯(cuò)機(jī)制與故障恢復(fù)

  1. 狀態(tài)如何保存及恢復(fù)


    image.png

Flink 狀態(tài)保存主要依靠 Checkpoint 機(jī)制,Checkpoint 會(huì)定時(shí)制作分布式快照,對(duì)程序中的狀態(tài)進(jìn)行備份。分布式快照是如何實(shí)現(xiàn)的可以參考【第二課時(shí)】的內(nèi)容,這里就不再闡述分布式快照具體是如何實(shí)現(xiàn)的。分布式快照 Checkpoint 完成后,當(dāng)作業(yè)發(fā)生故障了如何去恢復(fù)?假如作業(yè)分布跑在 3 臺(tái)機(jī)器上,其中一臺(tái)掛了。這個(gè)時(shí)候需要把進(jìn)程或者線程移到 active 的 2 臺(tái)機(jī)器上,此時(shí)還需要將整個(gè)作業(yè)的所有 Task 都回滾到最后一次成功 Checkpoint 中的狀態(tài),然后從該點(diǎn)開(kāi)始繼續(xù)處理。

如果要從 Checkpoint 恢復(fù),必要條件是數(shù)據(jù)源需要支持?jǐn)?shù)據(jù)重新發(fā)送。Checkpoint恢復(fù)后, Flink 提供兩種一致性語(yǔ)義,一種是恰好一次,一種是至少一次。在做 Checkpoint時(shí),可根據(jù) Barries 對(duì)齊來(lái)判斷是恰好一次還是至少一次,如果對(duì)齊,則為恰好一次,否則沒(méi)有對(duì)齊即為至少一次。如果只有一個(gè)上游,也就是說(shuō) Barries 是不需要對(duì)齊的的;如果只有一個(gè) Checkpoint 在做,不管什么時(shí)候從 Checkpoint 恢復(fù),都會(huì)恢復(fù)到剛才的狀態(tài);如果有多個(gè)上游,假如一個(gè)上游的 Barries 到了,另一個(gè) Barries 還沒(méi)有來(lái),如果這個(gè)時(shí)候?qū)顟B(tài)進(jìn)行快照,那么從這個(gè)快照恢復(fù)的時(shí)候其中一個(gè)上游的數(shù)據(jù)可能會(huì)有重復(fù)。

Checkpoint 通過(guò)代碼的實(shí)現(xiàn)方法如下:

  • 首先從作業(yè)的運(yùn)行環(huán)境 env.enableCheckpointing 傳入 1000,意思是做 2 個(gè) Checkpoint 的事件間隔為 1 秒。Checkpoint 做的越頻繁,恢復(fù)時(shí)追數(shù)據(jù)就會(huì)相對(duì)減少,同時(shí) Checkpoint 相應(yīng)的也會(huì)有一些 IO 消耗。

  • 接下來(lái)是設(shè)置 Checkpoint 的 model,即設(shè)置了 Exactly_Once 語(yǔ)義,表示需要 Barrier 對(duì)齊,這樣可以保證消息不會(huì)丟失也不會(huì)重復(fù)。

  • setMinPauseBetweenCheckpoints 是 2 個(gè) Checkpoint 之間最少是要等 500ms,也就是剛做完一個(gè) Checkpoint。比如某個(gè) Checkpoint 做了700ms,按照原則過(guò) 300ms 應(yīng)該是做下一個(gè) Checkpoint,因?yàn)樵O(shè)置了 1000ms 做一次 Checkpoint 的,但是中間的等待時(shí)間比較短,不足 500ms 了,需要多等 200ms,因此以這樣的方式防止 Checkpoint 太過(guò)于頻繁而導(dǎo)致業(yè)務(wù)處理的速度下降。

  • setCheckpointTimeout 表示做 Checkpoint 多久超時(shí),如果 Checkpoint 在 1min 之內(nèi)尚未完成,說(shuō)明 Checkpoint 超時(shí)失敗。setMaxConcurrentCheckpoints 表示同時(shí)有多少個(gè) Checkpoint 在做快照,這個(gè)可以根據(jù)具體需求去做設(shè)置。

一方面 Flink 在 Cancel 時(shí)允許在外部介質(zhì)保留 Checkpoint ;另一方面,F(xiàn)link 還有另外一個(gè)機(jī)制是 SavePoint。

image.png

Savepoint 與 Checkpoint 類(lèi)似,同樣是把狀態(tài)存儲(chǔ)到外部介質(zhì)。當(dāng)作業(yè)失敗時(shí),可以從外部恢復(fù)。Savepoint 與 Checkpoint 有什么區(qū)別呢?

  • 從觸發(fā)管理方式來(lái)講,Checkpoint 由 Flink 自動(dòng)觸發(fā)并管理,而 Savepoint 由用戶(hù)手動(dòng)觸發(fā)并人肉管理;
  • 從用途來(lái)講,Checkpoint 在 Task 發(fā)生異常時(shí)快速恢復(fù),例如網(wǎng)絡(luò)抖動(dòng)或超時(shí)異常,而 Savepoint 有計(jì)劃地進(jìn)行備份,使作業(yè)能停止后再恢復(fù),例如修改代碼、調(diào)整并發(fā);
  • 最后從特點(diǎn)來(lái)講,Checkpoint 比較輕量級(jí),作業(yè)出現(xiàn)問(wèn)題會(huì)自動(dòng)從故障中恢復(fù),在作業(yè)停止后默認(rèn)清除;而 Savepoint 比較持久,以標(biāo)準(zhǔn)格式存儲(chǔ),允許代碼或配置發(fā)生改變,恢復(fù)需要啟動(dòng)作業(yè)手動(dòng)指定一個(gè)路徑恢復(fù)。

2. 可選的狀態(tài)存儲(chǔ)方式

image.png

Checkpoint 的存儲(chǔ),第一種是內(nèi)存存儲(chǔ),即 MemoryStateBackend,構(gòu)造方法是設(shè)置最大的StateSize,選擇是否做異步快照,這種存儲(chǔ)狀態(tài)本身存儲(chǔ)在 TaskManager 節(jié)點(diǎn)也就是執(zhí)行節(jié)點(diǎn)內(nèi)存中的,因?yàn)閮?nèi)存有容量限制,所以單個(gè) State maxStateSize 默認(rèn) 5 M,且需要注意 maxStateSize <= akka.framesize 默認(rèn) 10 M。Checkpoint 存儲(chǔ)在 JobManager 內(nèi)存中,因此總大小不超過(guò) JobManager 的內(nèi)存。推薦使用的場(chǎng)景為:本地測(cè)試、幾乎無(wú)狀態(tài)的作業(yè),比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。不推薦在生產(chǎn)場(chǎng)景使用。

image.png

另一種就是在文件系統(tǒng)上的 FsStateBackend ,構(gòu)建方法是需要傳一個(gè)文件路徑和是否異步快照。State 依然在 TaskManager 內(nèi)存中,但不會(huì)像 MemoryStateBackend 有 5 M 的設(shè)置上限,Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或 HDFS),打破了總大小 Jobmanager 內(nèi)存的限制。容量限制上,單 TaskManager 上 State 總量不超過(guò)它的內(nèi)存,總大小不超過(guò)配置的文件系統(tǒng)容量。推薦使用的場(chǎng)景、常規(guī)使用狀態(tài)的作業(yè)、例如分鐘級(jí)窗口聚合或 join、需要開(kāi)啟HA的作業(yè)。

image.png

還有一種存儲(chǔ)為 RocksDBStateBackend ,RocksDB 是一個(gè) key/value 的內(nèi)存存儲(chǔ)系統(tǒng),和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中,如果內(nèi)存快滿(mǎn)時(shí),則寫(xiě)入到磁盤(pán)中,但需要注意 RocksDB 不支持同步的 Checkpoint,構(gòu)造方法中沒(méi)有同步快照這個(gè)選項(xiàng)。不過(guò) RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或HDFS),其容量限制只要單個(gè) TaskManager 上 State 總量不超過(guò)它的內(nèi)存+磁盤(pán),單 Key最大 2G,總大小不超過(guò)配置的文件系統(tǒng)容量即可。推薦使用的場(chǎng)景為:超大狀態(tài)的作業(yè),例如天級(jí)窗口聚合、需要開(kāi)啟 HA 的作業(yè)、最好是對(duì)狀態(tài)讀寫(xiě)性能要求不高的作業(yè)。

四. 總結(jié)

  1. 為什么要使用狀態(tài)?

前面提到有狀態(tài)的作業(yè)要有有狀態(tài)的邏輯,有狀態(tài)的邏輯是因?yàn)閿?shù)據(jù)之間存在關(guān)聯(lián),單條數(shù)據(jù)是沒(méi)有辦法把所有的信息給表現(xiàn)出來(lái)。所以需要通過(guò)狀態(tài)來(lái)滿(mǎn)足業(yè)務(wù)邏輯。

2.為什么要管理狀態(tài)?

使用了狀態(tài),為什么要管理狀態(tài)?因?yàn)閷?shí)時(shí)作業(yè)需要7*24不間斷的運(yùn)行,需要應(yīng)對(duì)不可靠的因素而帶來(lái)的影響。

3.如何選擇狀態(tài)的類(lèi)型和存儲(chǔ)方式?

那如何選擇狀態(tài)的類(lèi)型和存儲(chǔ)方式?結(jié)合前面的內(nèi)容,可以看到,首先是要分析清楚業(yè)務(wù)場(chǎng)景;比如想要做什么,狀態(tài)到底大不大。比較各個(gè)方案的利弊,選擇根據(jù)需求合適的狀態(tài)類(lèi)型和存儲(chǔ)方式即可。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Apache Flink? — Stateful Computations over Data Streams(數(shù)...
    少校1222閱讀 267評(píng)論 0 1
  • 0.問(wèn)題 1、什么是狀態(tài)?2、Flink狀態(tài)類(lèi)型有哪幾種?3、狀態(tài)有什么作用?4、如何使用狀態(tài),實(shí)現(xiàn)什么樣的API...
    LZhan閱讀 8,342評(píng)論 0 5
  • 一、狀態(tài)分類(lèi) 相對(duì)于其他流計(jì)算框架,F(xiàn)link 一個(gè)比較重要的特性就是其支持有狀態(tài)計(jì)算。即你可以將中間的計(jì)算結(jié)果進(jìn)...
    程序男保姆閱讀 711評(píng)論 0 0
  • [TOC] 一、前言 有狀態(tài)的計(jì)算是流處理框架要實(shí)現(xiàn)的重要功能,因?yàn)樯詮?fù)雜的流處理場(chǎng)景都需要記錄狀態(tài),然后在新流入...
    w1992wishes閱讀 7,140評(píng)論 0 6
  • 推薦指數(shù): 6.0 書(shū)籍主旨關(guān)鍵詞:特權(quán)、焦點(diǎn)、注意力、語(yǔ)言聯(lián)想、情景聯(lián)想 觀點(diǎn): 1.統(tǒng)計(jì)學(xué)現(xiàn)在叫數(shù)據(jù)分析,社會(huì)...
    Jenaral閱讀 5,967評(píng)論 0 5

友情鏈接更多精彩內(nèi)容