1.前言
前一篇文章聊了聊Flink中JDBCSink的使用方法,今天就說一說Flink中最具有特色的“狀態(tài)”的保存和檢查點機(jī)制。雖然我嘴上說著“狀態(tài)”是Flink的一大特色,但是實際上我也是聽別人提起過,或許是書籍上看到的、亦或者是文章中讀到了,有些記不起來了。
但從我目前對Flink使用過程中所領(lǐng)悟到的淺薄心得來說,我認(rèn)為狀態(tài)的使用還是非常有特點的,合理的使用能夠讓代碼邏輯更“輕薄”。但這篇文章我就先不介紹狀態(tài)都是怎么使用的了,因為與對狀態(tài)的使用相比,它的保存思路是更加值得學(xué)習(xí)的。接下來我就對Flink中狀態(tài)的保存和檢查點機(jī)制進(jìn)行比較詳細(xì)的描述。尤其是檢查點機(jī)制,它是能夠?qū)崿F(xiàn)精準(zhǔn)一次性的重要技術(shù)支持。如果有人看到了我寫的文章中的錯誤,非常希望大家能及時指出,我會非常感謝。
檢查點機(jī)制的官方鏈接:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/datastream/fault-tolerance/checkpointing/
2.Flink中狀態(tài)的保存
對Flink有簡單使用經(jīng)驗的人都會知道,在Flink中有一個機(jī)制叫做“狀態(tài)”,利用這個狀態(tài)進(jìn)行代碼編寫的時候,也可以說是在進(jìn)行狀態(tài)編程。
這個狀態(tài)在我看來實際上就是用來輔助主邏輯程序進(jìn)行計算來獲得想要的計算結(jié)果的一個“物件”,說物件未免有些太過通俗,還是說它是一個“數(shù)據(jù)內(nèi)容”吧。不同的狀態(tài)類型也就代表著這個“數(shù)據(jù)內(nèi)容”中的東西不一樣。
既然計算結(jié)果需要使用到這個“數(shù)據(jù)內(nèi)容”,也就是要使用狀態(tài)。那么這個狀態(tài)就需要有一個保存的地方,能夠讓流動中的每一條數(shù)據(jù)都能夠使用到這個狀態(tài)完成計算,就像是高速路上的收費(fèi)站一樣。那在對這個狀態(tài)進(jìn)行保存的時候,就需要使用到一個相對應(yīng)的機(jī)制,也就是狀態(tài)后端。
這個狀態(tài)后端實際上就是用來保存每一個使用到狀態(tài)編程的程序在運(yùn)行過程中所產(chǎn)生的狀態(tài)的一個機(jī)制。試想一下,一個狀態(tài)誕生之后,就一定是在內(nèi)存中或者是磁盤中出現(xiàn)了的,不然數(shù)據(jù)流中的數(shù)據(jù)也不會訪問到它。那它到底是在內(nèi)存中還是在磁盤上就需要“狀態(tài)后端”機(jī)制來指定了。
在Flink中,狀態(tài)后端機(jī)制為開發(fā)者們提供了兩種不同的狀態(tài)保存點,分別是基于內(nèi)存的“Hash表狀態(tài)后端”和基于本地磁盤維護(hù)的“RocksDB”狀態(tài)后端。
2.1 狀態(tài)后端的介紹
整個介紹的步驟主要是以:狀態(tài)后端介紹->狀態(tài)后端使用來進(jìn)行的。切記:狀態(tài)后端指的是程序運(yùn)行時狀態(tài)的保存點!??!
2.1.1 RocksDB狀態(tài)后端
先來講講工作中比較常用到的RocksDB數(shù)據(jù)庫狀態(tài)后端,這種狀態(tài)后端會將程序運(yùn)行時的狀態(tài)保存在Flink內(nèi)置的RocksDB數(shù)據(jù)庫中。它是很強(qiáng)的一個狀態(tài)后端,雖然它要把狀態(tài)保存在磁盤上需要經(jīng)歷序列化和反序列化,但它不會占用正在運(yùn)行的TaskManager的JVM堆內(nèi)存,對于長時間的狀態(tài)來說,該狀態(tài)后端對計算系統(tǒng)會寬容一些。
如果想要在代碼中開啟這個狀態(tài)后端,需要在Pom文件中引入對應(yīng)的依賴,具體內(nèi)容如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>1.13.0</version>
</dependency>
引入依賴之后,才能在代碼中開啟
setStateBackend(new EmbeddedRocksDBStateBackend());
2.1.2 HashMapStateBackend
這種狀態(tài)后端突出的一個特點就是快,因為它會將所有的狀態(tài)保存在TaskManager的JVM堆內(nèi)存上,也正是由于處在堆內(nèi)存中,所以也不需要序列化和反序列化了。
至于它為什么叫做HashMap,是因為在這個狀態(tài)后端的內(nèi)部,是將程序中所有的狀態(tài)當(dāng)成了對象,在連帶著窗口收集到的所有數(shù)據(jù)、窗口的觸發(fā)器一起,都以鍵值對的形式存儲了起來,構(gòu)成了一張哈希表,所以也就叫這個名字了。
在調(diào)用方面,也不需要引入什么依賴,直接就設(shè)置開啟就能夠使用了:
env.setStateBackend(new HashMapStateBackend());
那到這里為止也就說完了狀態(tài)后端是干嘛的,是怎么使用的了,那下一章節(jié)就聊聊另一個維護(hù)狀態(tài)的機(jī)制——“CheckPoint”。
3.檢查點機(jī)制
在第二章節(jié)中講述了Flink狀態(tài)后端相關(guān)的內(nèi)容,如果你仔細(xì)閱讀了之后應(yīng)該能夠?qū)ξ依斫獾臓顟B(tài)后端機(jī)制有一個小小的認(rèn)識。接下來就要對Flink中與狀態(tài)相關(guān)的另外一個機(jī)制“checkPoint”進(jìn)行簡單描述。
前面說了,狀態(tài)后端是能夠為程序運(yùn)行時的狀態(tài)提供“寄存點”的一個機(jī)制,但是如果程序突然出現(xiàn)了問題,那這些“寄存點”中的數(shù)據(jù)就會隨著程序的停止而消失,即使故障修復(fù)之后再重新運(yùn)行程序,也會因為之前狀態(tài)的消失導(dǎo)致計算失敗。這對于流式計算是很致命的。
所以為了能夠讓程序運(yùn)行時的所有狀態(tài)都能夠在故障恢復(fù)之后重新?lián)碛泄收锨暗乃小凹拇纥c”中的狀態(tài),就引入了一個檢查點機(jī)制,通過這個檢查點機(jī)制就能夠讓程序恢復(fù)到故障前的形態(tài),仿若無事發(fā)生一樣繼續(xù)正確的處理數(shù)據(jù)。
雖然我說起來輕描淡寫,但是實際上這個機(jī)制還是很牛的,用它就能夠輕松的實現(xiàn)數(shù)據(jù)源到Flink程序之間的精準(zhǔn)一次性,前提是數(shù)據(jù)源需要具備數(shù)據(jù)重放的能力。所謂的數(shù)據(jù)重放就是,它能夠讓Source源算子讀取到每條數(shù)據(jù)的偏移量做狀態(tài)保存,然后還能夠在狀態(tài)恢復(fù)之后,重新用這個記錄的偏移量來讀取已經(jīng)被Source處理過的數(shù)據(jù)。
接下來就通過對檢查點機(jī)制是如何使用->流程->原理,這三個階段來展開描述。
3.1 檢查點的介紹
3.1.1 如何使用
檢查點機(jī)制使用起來還比較方便,大致的思路就是先開啟,再配置。具體的調(diào)用方法如下。
//獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//開啟檢查點,30秒保存一次檢查點.語義就是精準(zhǔn)一次性
env.enableCheckpointing(30 * 1000L,CheckpointingMode.EXACTLY_ONCE);
//設(shè)置檢查點的超時時間,如果超過這個時間檢查點還沒保存完成,那么這次檢查點保存動作就不要了
env.getCheckpointConfig().setCheckpointTimeout(60000);
//兩次檢查點保存之間的間隔,不能超過的最小時間。
//舉個例子,如果5秒進(jìn)行一次檢查點,但是上一次用了4.5秒才保存完成,而這個最小值設(shè)置的1秒,那就要等到這一秒到了才能開啟下一次保存
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//檢查點中狀態(tài)的管理機(jī)制,有兩種。一種是程序壞了就刪除,另外一種是程序壞了仍然保存。工作中肯定是需要手動維護(hù)的
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
//指定檢查點故障重啟的配置
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1L),Time.minutes(1L)));
//設(shè)置檢查點外部保存的地址
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
3.1.2使用流程
使用流程就和3.1.1 寫的一樣,只是具體的參數(shù)需要跟你們的業(yè)務(wù)場景進(jìn)行自己的判斷了。
3.1.3 原理
看到這里,檢查點的作用、狀態(tài)后端的作用其實應(yīng)該明白了,它們二者存在的必要就是共同維護(hù)程序的健康運(yùn)行,即使出現(xiàn)錯誤也能夠快速的恢復(fù)。但是你可以想一下,不同的狀態(tài)后端在與檢查點之間進(jìn)行交互處理保存狀態(tài)的時候,是怎么個過程的呢?
不同的狀態(tài)后端對狀態(tài)的持久化方式也是不一樣的,這里也正是Rocks狀態(tài)后端比HashMap狀態(tài)后端出色的地方,因為Rocks狀態(tài)后端在進(jìn)行檢查點保存的時候是異步快照,就算檢查點保存的過程中出現(xiàn)了問題,也對其狀態(tài)的使用沒影響。其次就是Rocks在進(jìn)行檢查點快照時,是增量保存的。而哈希表狀態(tài)后端是全量保存(每一次都是全量,也就代表著狀態(tài)積累的越多,保存的內(nèi)容就越多,造成的壓力也會越多)。但是計算Rocks狀太后端看起來花里胡哨的,它還是沒有哈希表狀態(tài)后端來的快。
3.2 檢查點保存流程
在進(jìn)行這一部分內(nèi)容的描述之前,必須要明確一件事情。那就是保存點在對所有狀態(tài)進(jìn)行保存的時候,是等到一條數(shù)據(jù)被程序中所有任務(wù)處理完成之后,在對所有的狀態(tài)進(jìn)行保存操作。這句話神重要?。?!
如果一共有5條數(shù)據(jù)要交由map->key by -> sum這個執(zhí)行流程處理。當(dāng)?shù)谌龡l數(shù)據(jù)已經(jīng)完成了所有的處理操作之后,要進(jìn)行檢查點保存,所保存的這個快照就是第三條數(shù)據(jù)被處理完成之后的狀態(tài)。那如果此時第四條數(shù)據(jù)也在程序中的話,那豈不是會有數(shù)據(jù)丟失的風(fēng)險?
實際不然,在檢查點觸發(fā)的時候,Source算子保存的有關(guān)于數(shù)據(jù)的偏移量信息也止步于第三條數(shù)據(jù)的偏移量,當(dāng)程序恢復(fù)計算之后,就會需要讓數(shù)據(jù)源實行數(shù)據(jù)回放的操作,從保存的偏移量的位置讀取數(shù)據(jù),這也就保證的數(shù)據(jù)的精準(zhǔn)一次性消費(fèi)。
3.3 保存點中狀態(tài)的恢復(fù)
上述內(nèi)容看起來很溫馨的保證了因故障出現(xiàn)而發(fā)生的不良情況,但是實際上還有一個問題。Flink狀態(tài)編程有一個問題,那就是它是按照數(shù)據(jù)流是否通過keyed計算來區(qū)別狀態(tài)類型的。經(jīng)歷過keyby計算的狀態(tài)叫鍵控狀態(tài),沒有經(jīng)歷過keyby處理的數(shù)據(jù)流中的狀態(tài)是算子狀態(tài),兩種不同的狀態(tài)也就表明了狀態(tài)的恢復(fù)方式也是不一樣的。
key by處理之后,流中的數(shù)據(jù)會按照鍵值選擇器中所指定的key進(jìn)行分區(qū),所有鍵值一樣的數(shù)據(jù)都會進(jìn)入到一個分區(qū)里,但是值得注意的是,這個分區(qū)并不是物理意義上的分區(qū),僅僅只是邏輯上的分區(qū),但對于檢查點的恢復(fù)也足夠了。因為keyed流中有key作為標(biāo)記,所以即使是發(fā)生故障進(jìn)行恢復(fù)之后,檢查點中保存的狀態(tài)也能夠按照各自的key進(jìn)行恢復(fù),數(shù)據(jù)流中的數(shù)據(jù)仍然會按照檢查點中保存的對應(yīng)key的狀態(tài)真正意義上的恢復(fù)到故障前的數(shù)據(jù)處理形態(tài)上,是真正意義上的恢復(fù),猶如神龍許愿一般。
但非鍵控狀態(tài)就不太行了,它沒有key作為“燈塔”,即使是進(jìn)行了狀態(tài)恢復(fù),也沒有辦法恢復(fù)到原來的樣子,所以就需要咱們這群倒霉的開發(fā)者來進(jìn)行接口的重寫,來幫助它達(dá)到我們想要的狀態(tài)。(ps:這部分內(nèi)容我在工組中沒有用到,所以就當(dāng)是提供了一個小小的思路吧)。
在進(jìn)行流程圖的繪制之前,首先要明確幾個概念,在CheckpointFunction接口中,有兩個方法分別是在程序第一次啟動時或者重啟時被調(diào)用的initializeState方法,和做狀態(tài)快照的snapshotState方法。這兩個方法都具有調(diào)用當(dāng)前程序狀態(tài)的能力,只不過快照方法獲取狀態(tài)時沒有辦法拿到狀態(tài)的句柄信息,而initializeState是可以的。
接下來就模擬一個場景,在數(shù)據(jù)寫出時實現(xiàn)一個自定義的檢查點方法,對狀態(tài)進(jìn)行保存。
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
public class MyCheckpointAndSinkFunction implements CheckpointedFunction,SinkFunction<String> {
//給SinkFunction定義一個緩沖的大小
private final int bufferSize;
//給SinkFunction定義一個緩沖區(qū),這個緩沖區(qū)的大小就是BufferSize的大小
private List<String> bufferElements;
//用構(gòu)造方法給這個緩沖地帶賦值
public MyCheckpointAndSinkFunction(int bufferSize) {
this.bufferSize = bufferSize;
bufferElements = new ArrayList<>();
}
//創(chuàng)建一個暫時的列表狀態(tài),我在想是不是因為觸發(fā)檢查點的時候不需要保存這個狀態(tài),所以要定義成臨時的呢?
private transient ListState<String> listState;
//快照方法
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
listState.clear();
for (String bufferElement : bufferElements) {
listState.add(bufferElement);
}
}
//初始化、重啟時調(diào)用的方法
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> buffered = new ListStateDescriptor<>("buffered", String.class);
listState = context.getOperatorStateStore().getListState(buffered);
//如果是狀態(tài)恢復(fù),那么就將恢復(fù)得到的狀態(tài)保存到緩沖區(qū)中
if (context.isRestored()){
while (listState.get().iterator().hasNext()){
bufferElements.add(listState.get().iterator().next());
}
}
}
//數(shù)據(jù)寫出時調(diào)用的方法
@Override
public void invoke(String value, Context context) throws Exception {
SinkFunction.super.invoke(value, context);
bufferElements.add(value);
if (bufferElements.size() == bufferSize){
//數(shù)據(jù)個數(shù)已經(jīng)達(dá)到緩沖區(qū)的上限了,這里可以做數(shù)據(jù)的寫出動作
}
//數(shù)據(jù)寫完了,自然要進(jìn)行一次清理動作
bufferElements.clear();
}
}
4.結(jié)語
狀態(tài)編程對于Flink來說是很重要的,所以維護(hù)這些狀態(tài)更是一件值得認(rèn)真對待的一件事情,在運(yùn)行時Flink通過狀態(tài)后端來完成狀態(tài)的保存、使用,并且可以通過開啟檢查點的方式,將程序中所產(chǎn)生、使用到的狀態(tài)周期性的保存到外部文件系統(tǒng)上(dfs),從某種意義上來說,它是全方位的為狀態(tài)提供了保障。下一節(jié),我們再一起聊聊Flink是如何通過檢查點機(jī)制來保證數(shù)據(jù)的精準(zhǔn)一次性語義的。