Flink狀態(tài)管理和恢復(fù)機(jī)制

0.問題

1、什么是狀態(tài)?
2、Flink狀態(tài)類型有哪幾種?
3、狀態(tài)有什么作用?
4、如何使用狀態(tài),實(shí)現(xiàn)什么樣的API?
5、什么是checkpoint與savepoint?
6、如何使用checkpoint與savepoint?
7、checkpoint原理是什么?
8、checkpint存儲(chǔ)到hdfs上又是什么意思?

1.狀態(tài)

1.0 作用

<1> 增量計(jì)算
聚合操作、機(jī)器學(xué)習(xí)訓(xùn)練模型迭代運(yùn)算時(shí)保存當(dāng)前模型等等
<2> 容錯(cuò)
Job故障重啟、升級(jí)

1.1 基本介紹

定義:某task或者operator在某一時(shí)刻的在內(nèi)存中的狀態(tài)。
而checkpoint是,對(duì)于這個(gè)中間結(jié)果進(jìn)行一次快照。
作用:State是可以被記錄的,在失敗的情況下可以恢復(fù)。
checkpoint則表示了一個(gè)Flink Job,在一個(gè)特定時(shí)刻的一份全局狀態(tài)快照,即包含了一個(gè)job下所有task/operator某時(shí)刻的狀態(tài)。

比如任務(wù)掛掉的時(shí)候或被手動(dòng)停止的時(shí)候,可以從掛掉的點(diǎn)重新繼續(xù)消費(fèi)。
基本類型:Operator state、Keyed state
特殊的 Broadcast State
適用場(chǎng)景:
增量計(jì)算:
<1>聚合操作
<2>機(jī)器學(xué)習(xí)訓(xùn)練模型迭代運(yùn)算時(shí)保存當(dāng)前模型
等等
容錯(cuò):
Job故障重啟

使用狀態(tài),必須使用RichFunction,因?yàn)闋顟B(tài)是使用RuntimeContext訪問的,只能在RichFunction中訪問

1.2 案例介紹

假設(shè)現(xiàn)在存在輸入源數(shù)據(jù)格式為(EventID,Value)
輸出數(shù)據(jù),直接flatMap即可,無狀態(tài)。
如果要輸出某EventID最大值/最小值等,HashMap是否可以?
程序一旦Crash,如何恢復(fù)?

答案:Flink提供了一套狀態(tài)保存的方法,不需要借助第三方存儲(chǔ)系統(tǒng)來解決狀態(tài)存儲(chǔ)問題。

1.3 State類型

1.3.1 Operator State

Operator State跟一個(gè)特定operator的一個(gè)并發(fā)實(shí)例綁定,整個(gè)operator只對(duì)應(yīng)一個(gè)state。相比較而言,在一個(gè)operator上,可能有很多個(gè)key,從而對(duì)應(yīng)多個(gè)keyed state。
所以一個(gè)并行度為4的source,即有4個(gè)實(shí)例,那么就會(huì)有4個(gè)狀態(tài)

舉例:Flink中的Kafka Connector,就使用了operator state。有幾個(gè)并行度,就會(huì)有幾個(gè)connector實(shí)例,消費(fèi)的分區(qū)不一樣,它會(huì)在每個(gè)connector實(shí)例中,保存該實(shí)例中消費(fèi)topic的所有(partition,offset)映射。


image.png

數(shù)據(jù)結(jié)構(gòu):ListState<T>

一般編碼過程:實(shí)現(xiàn)CheckpointedFunction接口,必須實(shí)現(xiàn)兩個(gè)函數(shù),分別是:
initializeState和snapshotState

如何保存狀態(tài)?
通常是定義一個(gè)private transient ListState<Long> checkPointList;

注意:使用Operator State最好不要在keyBy之后使用,另外不要將太大的state存放到這個(gè)里面。

public class CountWithOperatorState extends RichFlatMapFunction<Long,String> implements CheckpointedFunction {

    private transient ListState<Long> checkPointCountList;
    private List<Long> listBufferElements;


    public void flatMap(Long r, Collector<String> collector) throws Exception {
        if (r == 1) {
            if (listBufferElements.size() > 0) {
                StringBuffer buffer = new StringBuffer();
                for(int i = 0 ; i < listBufferElements.size(); i ++) {
                    buffer.append(listBufferElements.get(i) + " ");
                }
                collector.collect(buffer.toString());
                listBufferElements.clear();
            }
        } else {
            listBufferElements.add(r);
        }
    }


    //隔一段時(shí)間做一次快照
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        //先進(jìn)行一次clear,因?yàn)楫?dāng)前保存到數(shù)據(jù)已經(jīng)通過上一次checkpoint記錄下來
        checkPointCountList.clear();
        for(int i=0;i<listBufferElements.size();i++){
            checkPointCountList.add(listBufferElements.get(i));
        }
        
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {

        //1.對(duì)ListState進(jìn)行存儲(chǔ)類型描述,就是定義一個(gè)ListStateDescriptor類
        ListStateDescriptor<Long> listStateDescriptor=new ListStateDescriptor<Long>("listForThree", TypeInformation.of(new TypeHint<Long>() {}));

        //2.通過上下文,再根據(jù)上面的類型描述獲取對(duì)應(yīng)的ListState
        checkPointCountList=functionInitializationContext.getOperatorStateStore().getListState(listStateDescriptor);

        //3.如果處于數(shù)據(jù)恢復(fù)階段
        if(functionInitializationContext.isRestored()){
            //如果有數(shù)據(jù)就添加進(jìn)去
            for(Long element:checkPointCountList.get()){
                listBufferElements.add(element);
            }
        }
    }
}

1.3.2 Keyed state

是基于KeyStream之上的狀態(tài),keyBy之后的Operator State。
那么,一個(gè)并行度為3的keyed Opreator有幾個(gè)狀態(tài),這個(gè)就不一定是3了,這里有幾個(gè)狀態(tài)是由keyby之后有幾個(gè)key所決定的。

案例:有一個(gè)事件流Tuple2[eventId,val],求不同的事件eventId下,相鄰3個(gè)val的平均值,事件流如下:
(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)
那么事件1:8/3=2
那么事件2:14/3=4

Keyed State的數(shù)據(jù)結(jié)構(gòu)類型有:
ValueState<T>:update(T)
ListState<T>:add(T)、get(T)和clear(T)
ReducingState<T>:add(T)、reduceFunction()
MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)

FlatMapFunction是無狀態(tài)函數(shù);RichFlatMapFunction是有狀態(tài)函數(shù)

public class CountWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 3) {
            out.collect(new Tuple2<Long,Long>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }


    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<Tuple2<Long, Long>>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

這里沒有實(shí)現(xiàn)CheckpointedFunction接口,而是直接調(diào)用方法 getRuntimeContext(),然后使用getState方法來獲取狀態(tài)值。

1.3.3 Managed Key State

image.png

1.3.4 Repartition Key State

image.png

2.Broadcast State(廣播狀態(tài),有妙用)

特殊場(chǎng)景:來自一個(gè)流的一些數(shù)據(jù)需要廣播到所有下游任務(wù),在這些任務(wù)中,這些數(shù)據(jù)被本地存儲(chǔ)并且用于處理另一個(gè)流上的所有處理元素。例如:一個(gè)低吞吐量流,其中包含一組規(guī)則,我們希望對(duì)來自另一個(gè)流的所有元素按照規(guī)則進(jìn)行計(jì)算

典型應(yīng)用:常規(guī)事件流.connect(規(guī)則流)
常規(guī)事件流.connect(配置流)

2.1 使用套路

<1> 創(chuàng)建常規(guī)事件流DataStream或者KeyedDataStream
<2> 創(chuàng)建BroadcastedStream:創(chuàng)建規(guī)則流/配置流(低吞吐)并廣播
<3> 連接兩個(gè)Stream并實(shí)現(xiàn)計(jì)算處理
process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )

BroadcastProcessFunction:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    private static final long serialVersionUID = 8352559162119034453L;

    /**
     * This method is called for each element in the (non-broadcast)
     * {@link org.apache.flink.streaming.api.datastream.DataStream data stream}.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter,
     * query the current processing/event time, and also query and update the local keyed state.
     * Finally, it has <b>read-only</b> access to the broadcast state.
     * The context is only valid during the invocation of this method, do not store it.
     *
     * @param value The stream element.
     * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
     *            querying the current processing/event time and updating the broadcast state.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector to emit resulting elements to
     * @throws Exception The function may throw exceptions which cause the streaming program
     *                   to fail and go into recovery.
     */
    public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;

    /**
     * This method is called for each element in the
     * {@link org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter,
     * query the current processing/event time, and also query and update the internal
     * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. These can be done
     * through the provided {@link Context}.
     * The context is only valid during the invocation of this method, do not store it.
     *
     * @param value The stream element.
     * @param ctx A {@link Context} that allows querying the timestamp of the element,
     *            querying the current processing/event time and updating the broadcast state.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector to emit resulting elements to
     * @throws Exception The function may throw exceptions which cause the streaming program
     *                   to fail and go into recovery.
     */
    public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;

    /**
     * A {@link BaseBroadcastProcessFunction.Context context} available to the broadcast side of
     * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
     */
    public abstract class Context extends BaseBroadcastProcessFunction.Context {}

    /**
     * A {@link BaseBroadcastProcessFunction.Context context} available to the non-keyed side of
     * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
     */
    public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
}

processElement(...):負(fù)責(zé)處理非廣播流中的傳入元素

processBroadcastElement(...):負(fù)責(zé)處理廣播流中的傳入元素(如規(guī)則),一般廣播流的元素添加到狀態(tài)里去備用,processElement處理業(yè)務(wù)數(shù)據(jù)時(shí)就可以使用

ReadOnlyContext和Context:
ReadOnlyContext對(duì)Broadcast State只有只讀權(quán)限,Conetxt有寫權(quán)限

KeyedBroadcastProcessFunction:

image.png

注意:
<1> Flink之間沒有跨Task的通信
<2> 每個(gè)任務(wù)的廣播狀態(tài)的元素順序有可能不一樣
<3> Broadcast State保存在內(nèi)存中(并不在RocksDB)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 最近看了看Flink中state方面的知識(shí),F(xiàn)link中的state是啥?state的作用是啥?為什么Flink中...
    MrSocean閱讀 7,281評(píng)論 3 13
  • 有狀態(tài)的函數(shù)和操作在處理各個(gè)元素或者事件時(shí)存儲(chǔ)數(shù)據(jù),使得state稱為任何類型的復(fù)雜操作的關(guān)鍵構(gòu)建部件,例如:當(dāng)一...
    寫B(tài)ug的張小天閱讀 19,139評(píng)論 2 12
  • 原文鏈接 Keyed State and Operator State 在Flink中有兩種基本類型的狀態(tài):Key...
    小C菜鳥閱讀 949評(píng)論 0 0
  • Flink + Kafka 整合數(shù)據(jù)一致性保證 1. Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置:Flink...
    喬一波一閱讀 12,238評(píng)論 0 13
  • 成立于2012年的Coinbase交易所已經(jīng)成為全球加密貨幣交易所中的“王者”,不僅交易量一度排名世界第一,同時(shí)高...
    Sept9閱讀 367評(píng)論 0 0

友情鏈接更多精彩內(nèi)容