Flink狀態(tài)機制

什么是狀態(tài)

首先要知道,狀態(tài)指的是算子的狀態(tài)。為什么算子需要狀態(tài),狀態(tài)的用處無非兩點:

  1. 實現(xiàn)算子的邏輯(作為一種中間狀態(tài))
  2. 錯誤恢復(fù)
實現(xiàn)算子的邏輯

用官網(wǎng)的例子,假設(shè)一段數(shù)據(jù)流格式長這樣<1,3><1,2><1,3><2,3><2,5>
那么我想對相同第一個元素所有tuple,求第二個元素的平均值。該如何實現(xiàn)?

你可能會想到使用Flink自帶的聚合函數(shù),其中該函數(shù)緩存所有的相同key的元素,在函數(shù)里做遍歷累加求值的操作。這很正確。但有一個不好的點,需要緩存所有數(shù)據(jù)。

如果現(xiàn)在就讓你用map操作實現(xiàn)呢?而且不緩存所以數(shù)據(jù)

這就需要用到狀態(tài)了。試想一下,如果在map算子里面維護這樣一個變量<a,b>。a是該算子的key的次數(shù),上面數(shù)據(jù)key為1的次數(shù)便是3(a=3),b是所有第二個元素之和。

那么上面數(shù)據(jù)流在每個map算子中維護了<3,8>,<2,8>的狀態(tài)。好了,平均值就出來了。而且,這個狀態(tài),來一次數(shù)據(jù)更新一次,不需要緩存。

貼下代碼:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

錯誤恢復(fù)

試想這樣一個場景:
需要將數(shù)據(jù)流的每個數(shù)據(jù)存入數(shù)據(jù)庫,而且任務(wù)失敗后重啟能保證不將數(shù)據(jù)不重復(fù)落盤。怎么實現(xiàn)?

首先對于落盤,肯定不能來一條存一條,考慮到性能問題,我們設(shè)定一個閾值,達到這個閾值觸發(fā)落盤操作。

那么任務(wù)一旦失敗了,從哪開始恢復(fù)呢。這就肯定需要知道上一次落盤在哪發(fā)生的。

這就又需要在落盤算子(SinkFunction)中保存一個狀態(tài),用來記錄在上次任務(wù)失敗時所緩存的還沒有落盤的數(shù)據(jù),只要把這批數(shù)據(jù)存數(shù)據(jù)庫。后面的操作繼續(xù)執(zhí)行就可以了。

代碼如下:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 3.2 彈性分布式數(shù)據(jù)集 本節(jié)簡單介紹RDD,并介紹RDD與分布式共享內(nèi)存的異同。 3.2.1 RDD簡介 在集群...
    Albert陳凱閱讀 1,725評論 0 0
  • Spark的算子的分類 從大方向來說,Spark 算子大致可以分為以下兩類: 1)Transformation 變...
    達微閱讀 944評論 0 6
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹 對...
    cosWriter閱讀 11,641評論 1 32
  • Spark的算子的分類 從大方向來說,Spark 算子大致可以分為以下兩類: 1)Transformation 變...
    姚興泉閱讀 1,462評論 0 6
  • 正骨的高手少,練成太極拳高手的也少,不過還是有。 小金的老師就是一位練成太極拳的高手。小金是溫州人,年輕、聰明、有...
    一代鬃獅閱讀 1,033評論 0 1

友情鏈接更多精彩內(nèi)容