寫(xiě)給大忙人看的 Flink Window原理

Window 可以說(shuō)是 Flink 中必不可少的 operator 之一,在很多場(chǎng)合都有很非凡的表現(xiàn)。今天呢,我們就一起來(lái)看一下 window 是如何實(shí)現(xiàn)的。

window 分類(lèi)

Tumbling Window


在這里插入圖片描述

Sliding Window

在這里插入圖片描述

Session Window


在這里插入圖片描述

Global Window


在這里插入圖片描述

window operator

evictor
evictor 主要用于做一些數(shù)據(jù)的自定義操作,可以在執(zhí)行用戶(hù)代碼之前,也可以在執(zhí)行用戶(hù)代碼之后,更詳細(xì)的描述可以參考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 兩個(gè)方法。

trigger
trigger 用來(lái)判斷一個(gè)窗口是否需要被觸發(fā),每個(gè) WindowAssigner 都自帶一個(gè)默認(rèn)的 trigger,如果默認(rèn)的 trigger 不能滿(mǎn)足你的需求,則可以自定義一個(gè)類(lèi),繼承自 Trigger 即可,我們?cè)敿?xì)描述下 Trigger 的接口以及含義:

  • onElement() 每次往 window 增加一個(gè)元素的時(shí)候都會(huì)觸發(fā)

  • onEventTime() 當(dāng) event-time timer 被觸發(fā)的時(shí)候會(huì)調(diào)用

  • onProcessingTime() 當(dāng) processing-time timer 被觸發(fā)的時(shí)候會(huì)調(diào)用

  • onMerge() 對(duì)兩個(gè) trigger 的 state 進(jìn)行 merge 操作

  • clear() window 銷(xiāo)毀的時(shí)候被調(diào)用

上面的接口中前三個(gè)會(huì)返回一個(gè) TriggerResult,TriggerResult 有如下幾種可能的選擇:

  • CONTINUE 不做任何事情
  • FIRE 觸發(fā) window
  • PURGE 清空整個(gè) window 的元素并銷(xiāo)毀窗口
  • FIRE_AND_PURGE 觸發(fā)窗口,然后銷(xiāo)毀窗口

window code

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
 * @author shengjk1
 * @date 2019/9/4
 */
public class Main {
    protected final static org.slf4j.Logger logger = LoggerFactory.getLogger(Main.class);
    
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(5);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        
        env.setParallelism(1);
        
        StateBackend backend =
            new RocksDBStateBackend("file:////Users/iss/sourceCode/spark/flink/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/checkpoints", true);
        env.setStateBackend(backend);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata-dev-mq:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "1000");
        
        
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        
        env.addSource(consumer).uid("orderAndRegisterUserIdSource")
                .rebalance()
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                .timeWindow(Time.seconds(2))
                .trigger(new CountAndTimeTrigger(2L)
                .process(new ProcessWindowFunctionImp()).uid("process");
        
        
        // execute program
        env.execute("realTimeDataWareHouse");
    }
}

其中的 CountAndTimeTrigger 可參考 Flink 自定義觸發(fā)器實(shí)現(xiàn)帶超時(shí)時(shí)間的 countAndTimeTrigger

window 原理剖析

首先,當(dāng)此程序開(kāi)始消費(fèi)消息時(shí)( 可參考 一文搞定 Flink 消費(fèi)消息的全流程) 進(jìn)入 WindowOperator processElement 方法

// window operator 的 processElement
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();

            for (W window: elementWindows) {

                // adding the new window might result in a merge, in that case the actualWindow
                // is the merged window and we work with that. If we don't merge then
                // actualWindow == window
                W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                    @Override
                    public void merge(W mergeResult,
                            Collection<W> mergedWindows, W stateWindowResult,
                            Collection<W> mergedStateWindows) throws Exception {

                        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                            throw new UnsupportedOperationException("The end timestamp of an " +
                                    "event-time window cannot become earlier than the current watermark " +
                                    "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                    " window: " + mergeResult);
                        } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a " +
                                    "processing-time window cannot become earlier than the current processing time " +
                                    "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                    " window: " + mergeResult);
                        }

                        triggerContext.key = key;
                        triggerContext.window = mergeResult;

                        triggerContext.onMerge(mergedWindows);

                        for (W m: mergedWindows) {
                            triggerContext.window = m;
                            triggerContext.clear();
                            deleteCleanupTimer(m);
                        }

                        // merge the merged state windows into the newly resulting state window
                        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });

                // drop if the window is already late
                if (isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;

                W stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }

                windowState.setCurrentNamespace(stateWindow);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = actualWindow;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    // RockdbListState  RocksDBReducingState
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(actualWindow, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(actualWindow);
            }

            // need to make sure to update the merging state in state
            mergingWindows.persist();
        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                //數(shù)據(jù)過(guò)來(lái)之后會(huì)先存入 windowState 直至 window fire
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                //調(diào)用用戶(hù)定義的 onElement 代碼
                TriggerResult triggerResult = triggerContext.onElement(element);
                //當(dāng)觸發(fā)窗口時(shí),從 windowState 中獲取數(shù)據(jù),在本樣例中 windowState 為 RocksDBListState
                if (triggerResult.isFire()) {
                    //RocksDBListState RocksDBReducingState
                    //
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    //當(dāng)窗口觸發(fā)時(shí),會(huì)將 window 中數(shù)據(jù)發(fā)送到下游,調(diào)用用戶(hù)的 process 方法。
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                // 注冊(cè) timer,其實(shí)就是定時(shí)調(diào)度任務(wù)。底層通過(guò) ScheduledThreadPoolExecutor.schedule(...)來(lái)實(shí)現(xiàn)的
                // 每個(gè)窗口中的每個(gè) key 會(huì)有且僅有一個(gè) timer( 判斷方式的一部分是通過(guò) map 來(lái)實(shí)現(xiàn)的)
                registerCleanupTimer(window);
            }
        }

關(guān)于 window 消息順序性問(wèn)題,可以參考 一文搞懂 Flink window 元素的順序問(wèn)題
當(dāng)注冊(cè)的 timer 到期之后開(kāi)始調(diào)用 onProcessingTime

// 這個(gè)是通過(guò) timer 來(lái)調(diào)用的,
    // processElement 的時(shí)候 registerCleanupTimer(window) 會(huì)創(chuàng)建相應(yīng)的 timer
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        triggerContext.key = timer.getKey();
        triggerContext.window = timer.getNamespace();

        MergingWindowSet<W> mergingWindows;

        if (windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
            if (stateWindow == null) {
                // Timer firing for non-existent window, this can only happen if a
                // trigger did not clean up timers. We have already cleared the merging
                // window and therefore the Trigger state, however, so nothing to do.
                return;
            } else {
                windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            windowState.setCurrentNamespace(triggerContext.window);
            mergingWindows = null;
        }

        TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());

        if (triggerResult.isFire()) {
            ACC contents = windowState.get();
            if (contents != null) {
                emitWindowContents(triggerContext.window, contents);
            }
        }

        if (triggerResult.isPurge()) {
            windowState.clear();
        }

        if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            // 會(huì)清空所有的 state
            // 先 windowState.clear() 調(diào)用用戶(hù)定義的 clear 方法,然后再清除 windowContext 內(nèi)部的狀態(tài):
            // 僅僅是通過(guò) onProcessingTime or onEventTime method fire window 才可能會(huì)觸發(fā) clearAllState 操作
            // 否則會(huì)可以理解為還是一個(gè)窗口雖然 fire 了。
            // 先增量增量的 fire 然后再全量的 fire ( onProcessingTime and  onEventTime  導(dǎo)致的 fire ,未指定 purge)
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }

需要注意的是 window 跟 key 有關(guān)

總結(jié)

整個(gè) window 流程


在這里插入圖片描述
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容