Flink window窗口機(jī)制探究--以tumbling windows為例(一)

零、序言

本篇文章探究Flink Window窗口機(jī)制,首先介紹窗口機(jī)制使用的總綱,涉及的所有組件進(jìn)行介紹,心中有一個大體的藍(lán)圖和認(rèn)識。之后基于keyBy方法返回的Keyed Window入手,分析window方法,并依次進(jìn)行WindowAssigner、Trigger類介紹。篇幅所限,計劃在其他文章中繼續(xù)介紹evictor、reduce/aggregate等聚合方法,以及allowedLateness方法等使用。

一、背景&目標(biāo)

為了實現(xiàn)項目場景中自定義窗口功能,還是要先把目前Flink提供的窗口機(jī)制剖析一下,先從簡單好理解的入手,以tumbling windows為例,sliding windows思路也相似。

二、窗口機(jī)制

考慮keyed Windows ,從官網(wǎng)介紹可以通過窗口機(jī)制整個使用方法,提綱挈領(lǐng)了解所涉及的組件,雖然有些組件使用的是時候是可以使用默認(rèn)而不用指定。

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
組件 作用
keyBy 數(shù)據(jù)流按照key分流
window 需要傳入WindowAssigner類,用來進(jìn)行Event元素時間窗口分配。滾動窗口和session窗口一個Event對應(yīng)一個時間窗口,滑動窗口一個Eevent可以對應(yīng)多個時間窗口。
trigger 用來決定觸發(fā)針對特定時間窗口進(jìn)行運(yùn)算的window function執(zhí)行。
evictor 用來在trigger觸發(fā)后、window function執(zhí)行之前進(jìn)行event過濾。
allowedLatteness 允許event延遲時間。
sideOutputLateData 設(shè)置遲到的event 標(biāo)簽
getSideOutput 獲取遲到event
reduce/aggregate/fold/apply window function窗口計算函數(shù),對時間窗口中的event 元素進(jìn)行計算。

先從最基礎(chǔ)的WindowAssigner類開始,本篇重點以tumbling windows為例。

三、探究剖析--溯源

Flink1.7.2版本Java代碼。

一切都是始于stream.keyBy().window()
stream.keyBy()返回的是一個DataStream類子類:KeyedStream類對象

DataStream類的相關(guān)知識已經(jīng)在通過Flink 程序模板來學(xué)習(xí)StreamExecutionEnvironment 、DataStream 、StreamTransformation類文章中探究過了。

來看看KeyedStream類對象中:window()

public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }

返回的是一個WindowedStream類:

public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;
    // 其他省略
    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        function = input.getExecutionEnvironment().clean(function);
        return apply(new InternalIterableWindowFunction<>(function), resultType, function);
    }
        

第一反應(yīng),WindowedStream雖然名字叫WindowedStream但是他不是DataStream類(雖然源代碼中也在datastream包中)!但是呢,他提供了一系列計算操作function,返回的可都是DataStream類的子類:SingleOutputStreamOperator類。

之后呢,看到了WindowedStream的成員和方法,可以看到窗口機(jī)制的組件 windowAssigner、trigger、evicto和窗口計算函數(shù)都在,開心啊,按圖索驥即可!

根據(jù)實際運(yùn)行時的dataflow來看,最終Flink拓?fù)鋾晦D(zhuǎn)換為一個有一個包含算子的處理結(jié)構(gòu)。Flink怎么把窗口機(jī)制所有的組件都調(diào)動起來呢?通過觀察窗口計算函數(shù)返回值都是DataStream類,整個拓?fù)渚痛饋砹?,對?yīng)有就有相應(yīng)的Transformation,也就有相應(yīng)的operator(StreamOperator真正在底層處理一個一個元素的操作類)。WindowStream 的apply方法對應(yīng)調(diào)用一個private apply方法:

    private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
        KeySelector<T, K> keySel = input.getKeySelector();

        WindowOperator<K, T, Iterable<T>, R, W> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            ListStateDescriptor<StreamRecord<T>> stateDesc =
                    new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    evictor,
                    allowedLateness,
                    lateDataOutputTag);

        } else {
            ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
    }

全程都在構(gòu)建operator?。?最后通過 return input.transform(opName, resultType, operator); 也就是說還是串在 KeyedStream上的哦!所以說WindowStream看似是一個 Stream其實只是為了構(gòu)建Window機(jī)制而提供的API,到真正Flink 運(yùn)行的時候,所有在KeyedStream定義的時間窗口,最終都會因為window function的調(diào)用返回一個DataStream,一個新的 Transformation被創(chuàng)建,窗口中的各種組件 windowAssigner 、trigger、evictor都會被打包在EvictingWindowOperator或者WindowOperator傳給這個Transformation,Transformation 為王??!

我們來看看Transformation的operator對象,以WindowOperator類(AbstractUdfStreamOperator的子類)為例,看看他的processElement方法,代碼很長100多行,先看骨架:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
        // 代碼塊1,windowAssigner可以merge
        } else {
        // 代碼塊2,windowAssigner不可以merge
        }
        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

第一步,調(diào)用windowAssigner.assignWindows給當(dāng)前event element分配Window。
第二步,核心,會針對第一步window集合,一個window一個window一次處理,調(diào)用trigger.onElement方法,如果fire的話就會調(diào)用window function進(jìn)行計算。
第三部,處理side ouput,遲到的元素。

四、探究剖析--WindowAssigner類

Flink源碼中,WindowAssigner類對應(yīng)滾動窗口的類有TumblingEventTimeWindows和TumblingProcessingTimeWindows,我們以TumblingEventTimeWindows為例,二者區(qū)別主要是窗口時間使用Event Time還是Process Time。

先看一下WindowAssigner類源碼,可以看出主要包含四個抽象方法。

/**
 *  WindowAssigner可以分配 0個或者多個 Windows 給 Event 元素.
 * @param <T> Event 元素類別.
 * @param <W> Window類別.
 */
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * 返回Event  element 應(yīng)該被分配的 window的集合
     * @param timestamp :event 的時間戳.
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    /**
        * 返回WindowAssigner默認(rèn)的trigger
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    /**
        * 返回Window 的 TypeSerializer
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
       /**
        * 是否是 event time
     */
    public abstract boolean isEventTime();
      /**
        *其他省略
     */
}

方法名稱 方法用途
assignWindows 返回Event element 應(yīng)該被分配的 window的集合
getDefaultTrigger 返回WindowAssigner默認(rèn)的trigger
getWindowSerializer 返回Window 的 TypeSerializer
isEventTime 是否是 event time

接下來 我們來看 繼承WindowAssigner類 的 TumblingEventTimeWindows類的具體實現(xiàn)。

/**
 *   示例:keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
 */
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
  // 窗口大小
    private final long size;
  // 偏移量
    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) {
        if (offset < 0 || offset >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
        }
        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
                        // 根據(jù)滾動窗口機(jī)制,按照當(dāng)前timestamp,計算對應(yīng)窗口的start時間,并返回對應(yīng)窗口                
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
// 其他省略
    @Override
    public boolean isEventTime() {
        return true;
    }
}

我們可以看到,主要區(qū)別就是包含了size 和 offset兩個變量,使得 assignWindows 實現(xiàn)的時候可以返回想要的翻滾窗口。另外上面代碼可以看出getDefaultTrigger返回的是 EventTimeTrigger類。

接下來我們看一下Trigger類。

Trigger方法名稱 方法用途
onElement 每當(dāng)有event element 被加到window中,會觸發(fā)。結(jié)果返回事件元素對應(yīng)的window是否可以進(jìn)行window function計算。
onProcessingTime timer 計時器觸發(fā)調(diào)用,使用的是process time 。
onEventTime 同上,使用的是 event time。
canMerge 是否支持 窗口合并,如果返回true,必須實現(xiàn)onMerge方法
onMerge 當(dāng)多個window被WindowAssigner合并的時候的調(diào)用。
clear 清理相關(guān)window 的state
TriggerContext TriggerContext接口,給Trigger提供state 處理和注冊Timer callback
OnMergeContext TriggerContext子接口,onMerge方法使用,增加了mergePartitionedState方法。

單獨整理TriggerContext 接口方法

TriggerContext方法名稱 方法用途
getCurrentProcessingTime 返回當(dāng)前processing time
getMetricGroup 返回MetricGroup類對象
getCurrentWatermark 返回當(dāng)前Watermark time
registerProcessingTimeTimer 注冊time callback ,一旦到達(dá)time,Trigger的onProcessingTime會被調(diào)用
registerEventTimeTimer 同上,當(dāng)watermark 達(dá)到time,會觸發(fā)Trigger的onEventTime方法。
deleteProcessingTimeTimer 刪除指定時間的processing time trigger
deleteEventTimeTimer 刪除指定時間的event time trigger
getPartitionedState 返回 State對象
getKeyValueState 返回ValueState對象
/**
 * @param <T> Event 元素類別.
 * @param <W> Window類別.
 */
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

    public boolean canMerge() {
        return false;
    }

    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {

        long getCurrentProcessingTime();

        MetricGroup getMetricGroup();

        long getCurrentWatermark();

        void registerProcessingTimeTimer(long time);

        void registerEventTimeTimer(long time);

        void deleteProcessingTimeTimer(long time);

        void deleteEventTimeTimer(long time);

        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

        <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

        <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
    }

    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}

接下來我們看繼承Trigger的 EventTimeTrigger類的實現(xiàn):

/**
 * EventTime使用watermark,一旦 watermark 超過 the end of the window,EventTimeTrigger觸發(fā)
 */
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}
    /**
     * 判斷window的最大時間戳是否小于目前的watermark,小于的話返回TriggerResult.FIRE,否則的話為這個window注冊一個trimer,返回TriggerResult.CONTINUE。TriggerResult是個枚舉類型
     */    
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

順便看一下枚舉類TriggerResult:
TriggerResult決定window將會發(fā)生什么,如window function是否會調(diào)用,或者window是否被丟棄。當(dāng)然如果window里面沒有任何數(shù)據(jù),什么都不會發(fā)生。

TriggerResult 值 解釋
CONTINUE 對于window來說什么都不會發(fā)生
FIRE_AND_PURGE 觸發(fā)window function ,而且 purge
FIRE 觸發(fā)window function, window不會被purged
PURGE window會被丟棄,里面所有的元素都被清理
public enum TriggerResult {

    CONTINUE(false, false),

    FIRE_AND_PURGE(true, true),

    FIRE(true, false),

    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容