Flink 狀態(tài)管理與檢查點(diǎn)機(jī)制

一、狀態(tài)分類

相對(duì)于其他流計(jì)算框架,F(xiàn)link 一個(gè)比較重要的特性就是其支持有狀態(tài)計(jì)算。即你可以將中間的計(jì)算結(jié)果進(jìn)行保存,并提供給后續(xù)的計(jì)算使用:

[圖片上傳失敗...(image-a7a290-1596010933687)]

<figcaption></figcaption>

具體而言,F(xiàn)link 又將狀態(tài) (State) 分為 Keyed State 與 Operator State:

2.1 算子狀態(tài)

算子狀態(tài) (Operator State):顧名思義,狀態(tài)是和算子進(jìn)行綁定的,一個(gè)算子的狀態(tài)不能被其他算子所訪問到。官方文檔上對(duì) Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的,即假設(shè)算子的并行度是 2,那么其應(yīng)有兩個(gè)對(duì)應(yīng)的算子狀態(tài):

[圖片上傳失敗...(image-88cb40-1596010933686)]

<figcaption></figcaption>

2.2 鍵控狀態(tài)

鍵控狀態(tài) (Keyed State) :是一種特殊的算子狀態(tài),即狀態(tài)是根據(jù) key 值進(jìn)行區(qū)分的,F(xiàn)link 會(huì)為每類鍵值維護(hù)一個(gè)狀態(tài)實(shí)例。如下圖所示,每個(gè)顏色代表不同 key 值,對(duì)應(yīng)四個(gè)不同的狀態(tài)實(shí)例。需要注意的是鍵控狀態(tài)只能在 KeyedStream 上進(jìn)行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream 。

[圖片上傳失敗...(image-2dc7e-1596010933686)]

<figcaption></figcaption>

二、狀態(tài)編程

2.1 鍵控狀態(tài)

Flink 提供了以下數(shù)據(jù)格式來管理和存儲(chǔ)鍵控狀態(tài) (Keyed State):

  • ValueState:存儲(chǔ)單值類型的狀態(tài)??梢允褂?update(T) 進(jìn)行更新,并通過 T value() 進(jìn)行檢索。
  • ListState:存儲(chǔ)列表類型的狀態(tài)??梢允褂?add(T)addAll(List) 添加元素;并通過 get() 獲得整個(gè)列表。
  • ReducingState:用于存儲(chǔ)經(jīng)過 ReduceFunction 計(jì)算后的結(jié)果,使用 add(T) 增加元素。
  • AggregatingState:用于存儲(chǔ)經(jīng)過 AggregatingState 計(jì)算后的結(jié)果,使用 add(IN) 添加元素。
  • FoldingState:已被標(biāo)識(shí)為廢棄,會(huì)在未來版本中移除,官方推薦使用 AggregatingState 代替。
  • MapState:維護(hù) Map 類型的狀態(tài)。

以上所有增刪改查方法不必硬記,在使用時(shí)通過語法提示來調(diào)用即可。這里給出一個(gè)具體的使用示例:假設(shè)我們正在開發(fā)一個(gè)監(jiān)控系統(tǒng),當(dāng)監(jiān)控?cái)?shù)據(jù)超過閾值一定次數(shù)后,需要發(fā)出報(bào)警信息。這里之所以要達(dá)到一定次數(shù),是因?yàn)橛捎谂及l(fā)原因,偶爾一次超過閾值并不能代表什么,故需要達(dá)到一定次數(shù)后才觸發(fā)報(bào)警,這就需要使用到 Flink 的狀態(tài)編程。相關(guān)代碼如下:

public class ThresholdWarning extends 
    RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

    // 通過ListState來存儲(chǔ)非正常數(shù)據(jù)的狀態(tài)
    private transient ListState<Long> abnormalData;
    // 需要監(jiān)控的閾值
    private Long threshold;
    // 觸發(fā)報(bào)警的次數(shù)
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) {
        // 通過狀態(tài)名稱(句柄)獲取狀態(tài)實(shí)例,如果不存在則會(huì)自動(dòng)創(chuàng)建
        abnormalData = getRuntimeContext().getListState(
            new ListStateDescriptor<>("abnormalData", Long.class));
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)
        throws Exception {
        Long inputValue = value.f1;
        // 如果輸入值超過閾值,則記錄該次不正常的數(shù)據(jù)信息
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }
        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
        // 如果不正常的數(shù)據(jù)出現(xiàn)達(dá)到一定次數(shù),則輸出報(bào)警信息
        if (list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));
            // 報(bào)警信息輸出后,清空狀態(tài)
            abnormalData.clear();
        }
    }
}
復(fù)制代碼

調(diào)用自定義的狀態(tài)監(jiān)控,這里我們使用 a,b 來代表不同類型的監(jiān)控?cái)?shù)據(jù),分別對(duì)其數(shù)據(jù)進(jìn)行監(jiān)控:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
    Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
    Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
    Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
    Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
    .keyBy(0)
    .flatMap(new ThresholdWarning(100L, 3))  // 超過100的閾值3次后就進(jìn)行報(bào)警
    .printToErr();
env.execute("Managed Keyed State");
復(fù)制代碼

輸出如下結(jié)果如下:

[圖片上傳失敗...(image-e0f6a8-1596010933685)]

<figcaption></figcaption>

2.2 狀態(tài)有效期

以上任何類型的 keyed state 都支持配置有效期 (TTL) ,示例如下:

StateTtlConfig ttlConfig = StateTtlConfig
    // 設(shè)置有效期為 10 秒
    .newBuilder(Time.seconds(10))  
    // 設(shè)置有效期更新規(guī)則,這里設(shè)置為當(dāng)創(chuàng)建和寫入時(shí),都重置其有效期到規(guī)定的10秒
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    /*設(shè)置只要值過期就不可見,另外一個(gè)可選值是ReturnExpiredIfNotCleanedUp,
     代表即使值過期了,但如果還沒有被物理刪除,就是可見的*/
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);
復(fù)制代碼

2.3 算子狀態(tài)

相比于鍵控狀態(tài),算子狀態(tài)目前支持的存儲(chǔ)類型只有以下三種:

  • ListState:存儲(chǔ)列表類型的狀態(tài)。
  • UnionListState:存儲(chǔ)列表類型的狀態(tài),與 ListState 的區(qū)別在于:如果并行度發(fā)生變化,ListState 會(huì)將該算子的所有并發(fā)的狀態(tài)實(shí)例進(jìn)行匯總,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實(shí)例匯總起來,具體的劃分行為則由用戶進(jìn)行定義。
  • BroadcastState:用于廣播的算子狀態(tài)。

這里我們繼續(xù)沿用上面的例子,假設(shè)此時(shí)我們不需要區(qū)分監(jiān)控?cái)?shù)據(jù)的類型,只要有監(jiān)控?cái)?shù)據(jù)超過閾值并達(dá)到指定的次數(shù)后,就進(jìn)行報(bào)警,代碼如下:

public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, 
Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {

    // 非正常數(shù)據(jù)
    private List<Tuple2<String, Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String, Long>> checkPointedState;
    // 需要監(jiān)控的閾值
    private Long threshold;
    // 次數(shù)
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意這里獲取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
            getListState(new ListStateDescriptor<>("abnormalData",
                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                })));
        // 如果發(fā)生重啟,則需要從快照中將狀態(tài)進(jìn)行恢復(fù)
        if (context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, 
                        Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {
        Long inputValue = value.f1;
        // 超過閾值則進(jìn)行記錄
        if (inputValue >= threshold) {
            bufferedData.add(value);
        }
        // 超過指定次數(shù)則輸出報(bào)警信息
        if (bufferedData.size() >= numberOfTimes) {
             // 順便輸出狀態(tài)實(shí)例的hashcode
             out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值警報(bào)!", bufferedData));
            bufferedData.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在進(jìn)行快照時(shí),將數(shù)據(jù)存儲(chǔ)到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String, Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }
}
復(fù)制代碼

調(diào)用自定義算子狀態(tài),這里需要將并行度設(shè)置為 1:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟檢查點(diǎn)機(jī)制
env.enableCheckpointing(1000);
// 設(shè)置并行度為1
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
    Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
    Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
    Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
    Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
    .flatMap(new ThresholdWarning(100L, 3))
    .printToErr();
env.execute("Managed Keyed State");
}
復(fù)制代碼

此時(shí)輸出如下:

[圖片上傳失敗...(image-d9aa0c-1596010933685)]

<figcaption></figcaption>

在上面的調(diào)用代碼中,我們將程序的并行度設(shè)置為 1,可以看到三次輸出中狀態(tài)實(shí)例的 hashcode 全是一致的,證明它們都同一個(gè)狀態(tài)實(shí)例。假設(shè)將并行度設(shè)置為 2,此時(shí)輸出如下:

[圖片上傳失敗...(image-757922-1596010933684)]

<figcaption></figcaption>

可以看到此時(shí)兩次輸出中狀態(tài)實(shí)例的 hashcode 是不一致的,代表它們不是同一個(gè)狀態(tài)實(shí)例,這也就是上文提到的,一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的。同時(shí)這里只輸出兩次,是因?yàn)樵诓l(fā)處理的情況下,線程 1 可能拿到 5 個(gè)非正常值,線程 2 可能拿到 4 個(gè)非正常值,因?yàn)橐笥?3 次才能輸出,所以在這種情況下就會(huì)出現(xiàn)只輸出兩條記錄的情況,所以需要將程序的并行度設(shè)置為 1。

三、檢查點(diǎn)機(jī)制

3.1 CheckPoints

為了使 Flink 的狀態(tài)具有良好的容錯(cuò)性,F(xiàn)link 提供了檢查點(diǎn)機(jī)制 (CheckPoints) 。通過檢查點(diǎn)機(jī)制,F(xiàn)link 定期在數(shù)據(jù)流上生成 checkpoint barrier ,當(dāng)某個(gè)算子收到 barrier 時(shí),即會(huì)基于當(dāng)前狀態(tài)生成一份快照,然后再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 后,也基于當(dāng)前狀態(tài)生成一份快照,依次傳遞直至到最后的 Sink 算子上。當(dāng)出現(xiàn)異常后,F(xiàn)link 就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)。

[圖片上傳失敗...(image-8b1582-1596010933684)]

<figcaption></figcaption>

3.2 開啟檢查點(diǎn)

默認(rèn)情況下,檢查點(diǎn)機(jī)制是關(guān)閉的,需要在程序中進(jìn)行開啟:

// 開啟檢查點(diǎn)機(jī)制,并指定狀態(tài)檢查點(diǎn)之間的時(shí)間間隔
env.enableCheckpointing(1000); 

// 其他可選配置如下:
// 設(shè)置語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設(shè)置兩個(gè)檢查點(diǎn)之間的最小時(shí)間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 設(shè)置執(zhí)行Checkpoint操作時(shí)的超時(shí)時(shí)間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 設(shè)置最大并發(fā)執(zhí)行的檢查點(diǎn)的數(shù)量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 將檢查點(diǎn)持久化到外部存儲(chǔ)
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存點(diǎn)時(shí),是否將作業(yè)回退到該檢查點(diǎn)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
復(fù)制代碼

3.3 保存點(diǎn)機(jī)制

保存點(diǎn)機(jī)制 (Savepoints) 是檢查點(diǎn)機(jī)制的一種特殊的實(shí)現(xiàn),它允許你通過手工的方式來觸發(fā) Checkpoint,并將結(jié)果持久化存儲(chǔ)到指定路徑中,主要用于避免 Flink 集群在重啟或升級(jí)時(shí)導(dǎo)致狀態(tài)丟失。示例如下:

# 觸發(fā)指定id的作業(yè)的Savepoint,并將結(jié)果存儲(chǔ)到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
復(fù)制代碼

更多命令和配置可以參考官方文檔:savepoints

四、狀態(tài)后端

4.1 狀態(tài)管理器分類

默認(rèn)情況下,所有的狀態(tài)都存儲(chǔ)在 JVM 的堆內(nèi)存中,在狀態(tài)數(shù)據(jù)過多的情況下,這種方式很有可能導(dǎo)致內(nèi)存溢出,因此 Flink 該提供了其它方式來存儲(chǔ)狀態(tài)數(shù)據(jù),這些存儲(chǔ)方式統(tǒng)一稱為狀態(tài)后端 (或狀態(tài)管理器):

[圖片上傳失敗...(image-84289c-1596010933684)]

<figcaption></figcaption>

主要有以下三種:

1. MemoryStateBackend

默認(rèn)的方式,即基于 JVM 的堆內(nèi)存進(jìn)行存儲(chǔ),主要適用于本地開發(fā)和調(diào)試。

2. FsStateBackend

基于文件系統(tǒng)進(jìn)行存儲(chǔ),可以是本地文件系統(tǒng),也可以是 HDFS 等分布式文件系統(tǒng)。 需要注意而是雖然選擇使用了 FsStateBackend ,但正在進(jìn)行的數(shù)據(jù)仍然是存儲(chǔ)在 TaskManager 的內(nèi)存中的,只有在 checkpoint 時(shí),才會(huì)將狀態(tài)快照寫入到指定文件系統(tǒng)上。

3. RocksDBStateBackend

RocksDBStateBackend 是 Flink 內(nèi)置的第三方狀態(tài)管理器,采用嵌入式的 key-value 型數(shù)據(jù)庫 RocksDB 來存儲(chǔ)正在進(jìn)行的數(shù)據(jù)。等到 checkpoint 時(shí),再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中,所以采用 RocksDBStateBackend 時(shí)也需要配置持久化存儲(chǔ)的文件系統(tǒng)。之所以這樣做是因?yàn)?RocksDB 作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式,其讀取速率更快;比起全內(nèi)存的方式,其存儲(chǔ)空間更大,因此它是一種比較均衡的方案。

4.2 配置方式

Flink 支持使用兩種方式來配置后端管理器:

第一種方式:基于代碼方式進(jìn)行配置,只對(duì)當(dāng)前作業(yè)生效:

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
復(fù)制代碼

配置 RocksDBStateBackend 時(shí),需要額外導(dǎo)入下面的依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
</dependency>
復(fù)制代碼

第二種方式:基于 flink-conf.yaml 配置文件的方式進(jìn)行配置,對(duì)所有部署在該集群上的作業(yè)都生效:

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
復(fù)制代碼

注:本篇文章所有示例代碼下載地址:flink-state-management

參考資料

作者:heibaiying
鏈接:https://juejin.im/post/5dd2661cf265da0bf175d5bb
來源:掘金
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容