Flink-1.10 源碼筆記 process && 調(diào)用過程

我們知道flink已經(jīng)封裝了很多高級的api供用戶訪問使用,但是有時候我們可能根據(jù)不同的需求,發(fā)現(xiàn)提供的高級api不能滿足我們的需求,這個時候flink也為我們提供了low-level層面的api,比如processFunction,通過processFunction函數(shù),我們可以訪問state,進(jìn)行注冊process ,event time定時器來幫助我們完成一項(xiàng)復(fù)雜的操作。在我們使用process 函數(shù)的時候,有一個前提就是要求我們必須使用在keyedStream上,有兩個原因,一個是getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的訪問權(quán)限,所以只能訪問keyd state,另外一個是我們在注冊定時器的時候,需要有三個維度,namespace,key,time,所以要求我們有key,這就是在ProcessFunction中只能在keyedStream做定時器注冊,在flink1.8.0版本中,有ProcessFunction 和KeyedProcessFunction 這個類面向用戶的api,但是在ProcessFunction 類我們無法注冊定時器,在ProcessOperator源碼中我們發(fā)現(xiàn)注冊是拋出異常

為什么KeyedProcessFunction可以調(diào)用RuntimeContext對象,通過源碼看一下
KeyedProcessFunction是一個抽象類,繼承了AbstractRichFunction抽象類

@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction

進(jìn)入AbstractRichFunction類,可以看到,該類實(shí)現(xiàn)了實(shí)現(xiàn)了RichFunction,和Serializable接口
RichFunction中定義了getRuntimeContext方法,在AbstractRichFunction中實(shí)現(xiàn)了該方法

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable

我們調(diào)用getRuntimeContext方法時,便可以獲取RuntimeContext對象,對狀態(tài)等進(jìn)行操作

    private transient RuntimeContext runtimeContext;

    @Override
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

現(xiàn)在開始看process算子的實(shí)現(xiàn)

process算子需要傳入的值,傳入值分為兩種processFunc和KeyedProcessFunc,但不建議使用ProcessFunction了,建議使用KeyedProcessFunction,所以主要看KeyedProcessFunction


image.png

數(shù)據(jù)流在經(jīng)過keyBy之后會轉(zhuǎn)換成KeyedStream,先看一下KeyStream中的procss方法
KeyedStream是DataStream的實(shí)現(xiàn)

public class KeyedStream<T, KEY> extends DataStream<T>

可以看到process需要傳入一個keyedProcessFunction (用編寫的),如果用戶不指定輸出類型,會獲取默認(rèn)類型

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

可以看出將函數(shù)封裝成了一個KeyedProcessOperator類型,這個類繼承了AbstractUdfStreamOperator類和實(shí)現(xiàn)了OneInputStreamOperato接口和Triggerable接口

public class KeyedProcessOperator<K, IN, OUT>
    extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace>

該類重寫了 父類的open方法,實(shí)現(xiàn)了AbstractUdfStreamOperator的processElement方法和Triggerable的onEventTime和onProcessingTime方法, 現(xiàn)在看一下實(shí)現(xiàn)的邏輯

open方法
在方法中,首先調(diào)用父類Open方法進(jìn)行初始化操作, 然后初始化本類服務(wù),

    @Override
    public void open() throws Exception {
        //調(diào)用父類open方法 進(jìn)行初始化
        super.open();
        //創(chuàng)建一個 timestampedCollector 來給定Flink output             ----英翻  時間戳收集器
        collector = new TimestampedCollector<>(output);

        //定義內(nèi)部定時服務(wù)      
        InternalTimerService<VoidNamespace> internalTimerService =
            getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
            //internalTimerService 封裝到 TimeService中
                //獲取timerSErvice    SimpleTimerService 內(nèi)部使用了 internalTimerService
        TimerService timerService = new SimpleTimerService(internalTimerService);

        //傳入 userFun 和 定時器  返回context對象
        context = new ContextImpl(userFunction, timerService);
        //同上 返回定時器onTimerContext對象
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

這里重要的是這行代碼context = new ContextImpl(userFunction, timerService);
現(xiàn)在看下他的內(nèi)部實(shí)現(xiàn), 這個是內(nèi)部類,他繼承了KeyedProcessFunction的Context類
在該類中實(shí)現(xiàn)了Countext對象,對我們提供上下文服務(wù)

    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {

        private final TimerService timerService;

        private StreamRecord<IN> element;

        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
            function.super();
            this.timerService = checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            checkState(element != null);

            if (element.hasTimestamp()) {
                return element.getTimestamp();
            } else {
                return null;
            }
        }
            
        @Override
        public TimerService timerService() {
            return timerService;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }

            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }

        @Override
        @SuppressWarnings("unchecked")
        public K getCurrentKey() {
            return (K) KeyedProcessOperator.this.getCurrentKey();
        }
    }

在看一下 processElement 方法,主要調(diào)用用戶邏輯
這里userFunc調(diào)用processElement方法,該方法為用戶定義的內(nèi)容

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);

        //賦值element
        context.element = element;
        //將context對象 和collector 傳入 userFunc中
        //為用戶層級提供了訪問時間和注冊定時器的入口
        userFunction.processElement(element.getValue(), context, collector);

        //賦值調(diào)用完后 清空
        context.element = null;
    }

當(dāng)用戶通過ctx.timerService().registerProcessingTimeTimer(); 設(shè)置定時器后,定時器觸發(fā)會走KeyedProcessOperator的onEventTime或onProcessingTime方法 這里看下onEventTime的實(shí)現(xiàn)
在EventTime計時器觸發(fā)時調(diào)用,在方法中 調(diào)用了 invokeUserFunction方法

  @Override
  public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
      collector.setAbsoluteTimestamp(timer.getTimestamp());
      invokeUserFunction(TimeDomain.EVENT_TIME, timer);
  }

我們跟隨invokeUserFunction進(jìn)入方法 看下實(shí)現(xiàn),這個方法會調(diào)用 用戶的onTime方法,執(zhí)行里面邏輯

    private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        //這個時候也就是調(diào)用了我們自定義類K\eyedProcessFunction中的onTimer,
        //調(diào)用時傳入了OnTimerContextImpl對象,其持有IntervalTimeService服務(wù),也可以注冊定時器操作。
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

最終 將用戶的Func 包裝成KeyedProcessOperator對象 調(diào)用transform方法,最終返回轉(zhuǎn)換后的DataStream

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

現(xiàn)在我們追蹤進(jìn)去看,最終調(diào)用了doTransform方法,經(jīng)過一系列的轉(zhuǎn)換,將將operator添加到拓補(bǔ)圖中,最終將operator轉(zhuǎn)換成SingleOutputStreamOperator對象,該類繼承DataStream,進(jìn)行返回

    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 檢查輸出類型是否為MissingTypeInfo,如果是拋出異常,
        transformation.getOutputType();

        //創(chuàng)建OneInputTransformation
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            transformation,          //input   --上游的 transformation
                operatorName,
                operatorFactory,     //需要進(jìn)行轉(zhuǎn)換操作的
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //多個級聯(lián)的map和filter操作會被transform成為一連串的OneInputTransformation。
        // 后一個transformation的input指向前一個transformation
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

到此整個process算子調(diào)用完成

如有錯誤,歡迎指正!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容