Flink RichFunction題目一則

前言

祝廣大女性節(jié)日快樂~

快問快答

  • Flink DataStream API中的RichFunction有哪些用途/特點?
  • RichFunction中獲取到的RuntimeContext是干什么用的?
  • 所有Function都有對應的RichFunction實現(xiàn)嗎?
  • 所有Flink流處理的算子都可以傳入RichFunction嗎?

前兩個問題實際上可以合并成一個問題。RichFunction的特點是比Function多出了生命周期管理(open()close()方法),以及能夠獲取其運行時上下文RuntimeContext。RuntimeContext與Function的每個并行實例(即一個Sub-task)相關聯(lián),通過它還能進一步得到如下信息:

  • 運行時靜態(tài)信息,如Task的名稱、并行度、最大并行度、當前Sub-task的編號、當前類加載器等;
  • 全局數(shù)據(jù)結(jié)構(gòu),即累加器(Accumulators)、廣播變量(Broadcast variables)和分布式緩存(Distributed cache);
  • 創(chuàng)建各種狀態(tài)句柄,即我們熟知的get***State(StateDescriptor)方法。

第三個問題,yes;第四個問題,no

RichFunction不適用的場景

簡單的開窗聚合場景:

dataStream.keyBy(x -> x.getKey())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .reduce(new MyRichReduceFunction<>())

這段代碼能編譯通過,但執(zhí)行時會拋出UnsupportedOperationException,提示ReduceFunction of reduce can not be a RichFunction。如果換成aggregate()方法和RichAggregateFunction會有同樣的問題,提示This aggregation function cannot be a RichFunction。在WindowedStream的對應實現(xiàn)中,可以看到此路不通:

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "ReduceFunction of reduce can not be a RichFunction. "
                            + "Please use reduce(ReduceFunction, WindowFunction) instead.");
        }

        // clean the closure
        function = input.getExecutionEnvironment().clean(function);
        return reduce(function, new PassThroughWindowFunction<>());
    }

    public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
        checkNotNull(function, "function");

        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "This aggregation function cannot be a RichFunction.");
        }

        TypeInformation<ACC> accumulatorType =
                TypeExtractor.getAggregateFunctionAccumulatorType(
                        function, input.getType(), null, false);

        TypeInformation<R> resultType =
                TypeExtractor.getAggregateFunctionReturnType(
                        function, input.getType(), null, false);

        return aggregate(function, accumulatorType, resultType);
    }

為什么不能用Rich[Reduce / Aggregate]Function?

答案并不難:與FlatMap、Filter等算子不同,Reduce和Aggregate本身就是自帶確定的狀態(tài)語義的算子,不需要用戶手動操作狀態(tài)(如果用戶能干預的話大概率會出問題),也不需要生命期管理的特性(它們的生命期總是始于第一條數(shù)據(jù),終于最后一條數(shù)據(jù))。

以Reduce邏輯為例(Aggregate同理),不妨進一步看下對應的窗口算子是如何構(gòu)造的。

    public <R> WindowOperator<K, T, ?, R, W> reduce(
            ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");

        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "ReduceFunction of apply can not be a RichFunction.");
        }

        if (evictor != null) {
            return buildEvictingWindowOperator(
                    new InternalIterableWindowFunction<>(
                            new ReduceApplyWindowFunction<>(reduceFunction, function)));
        } else {
            ReducingStateDescriptor<T> stateDesc =
                    new ReducingStateDescriptor<>(
                            WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));

            return buildWindowOperator(
                    stateDesc, new InternalSingleValueWindowFunction<>(function));
        }
    }

注意到這里創(chuàng)建了ReducingStateDescriptor(ReduceFunction恰好是它的一個入?yún)ⅲ?,并最終獲取了內(nèi)置的ReducingState句柄。其實就DataStream API用戶的日常編程習慣而言,很少會主動用到ReducingState(以及AggregateState)。即使這樣,在它們的描述符構(gòu)造方法中,也加了同樣的強制校驗,防止傳入RichFunction,以保護狀態(tài)的確定性。

    public ReducingStateDescriptor(
            String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
        super(name, typeClass, null);
        this.reduceFunction = checkNotNull(reduceFunction);

        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "ReduceFunction of ReducingState can not be a RichFunction.");
        }
    }

話說回來,Rich[Reduce / Aggregate]Function在Flink工程內(nèi)部以及示例中都沒有有效的使用過,所以我們大概可以判定這是Flink發(fā)展過程中的遺產(chǎn)吧(笑

The End

晚安晚安。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容