【Flink 精選】闡述 Watermark 機(jī)制,剖析 Watermark 的產(chǎn)生和傳遞流程

本文闡述 Flink 的事件時間和 Watermark 機(jī)制,剖析 Watermark 產(chǎn)生和傳遞的流程。


1 Event time 和 Watermark 的關(guān)系

1.1 Event time 和 Processing time介紹

Event time 事件時間和Processing time 處理時間主要區(qū)別是產(chǎn)生時間不同,前者是事件的實際發(fā)生時間,后者是機(jī)器的系統(tǒng)處理時間,如下圖所示。


EventTime和ProcessingTime.JPG

① Event time 事件時間事件在其設(shè)備上發(fā)生的時間。

Event time 是事件在進(jìn)入 Flink 之前已經(jīng)嵌入到記錄的時間,其大小取決于事件本身,與網(wǎng)絡(luò)延時、系統(tǒng)時區(qū)等因素?zé)o關(guān)。

② Processing time 處理時間:作業(yè)正在執(zhí)行相應(yīng)操作機(jī)器系統(tǒng)時間。

Processing time 提供了最佳的性能和最低的延遲,但是不能提供確定性,即計算結(jié)果是不確定的
例如,時間窗口為5min的求和統(tǒng)計,應(yīng)用程序在 9:00 開始運(yùn)行,則第一個時間窗口處理 [9:00, 9:05) 的事件,下一個窗口處理 [9:05, 9:10) 的事件,依此類推。通信延遲、作業(yè)故障重啟等問題,可能導(dǎo)致窗口的計算結(jié)果是不一樣的。如下圖所示,假設(shè)事件(事件時間, 數(shù)值) 遇到上述問題,場景一:事件 B 有網(wǎng)絡(luò)延遲落在[9:10, 9:15),場景二:作業(yè)故障重啟導(dǎo)致事件 A 和事件 B落在[9:10, 9:15)。

ProcessingTime不確定性.JPG

1.2 Event time 和 Watermark

問題:Flink 支持事件時間,如何測量事件時間的進(jìn)度?例如,5min 的事件時間窗口,當(dāng)事件時間超過 5min 時,需要通知 Flink 觸發(fā)窗口計算。

解答:Watermark 機(jī)制。

Watermark 本質(zhì)是時間戳,與業(yè)務(wù)數(shù)據(jù)一樣無差別地傳遞下去,目的是衡量事件時間的進(jìn)度(通知 Flink 觸發(fā)事件時間相關(guān)的操作,例如窗口)。

說明: Watermark(T) 表示目前系統(tǒng)的時間事件是 T,即系統(tǒng)后續(xù)沒有 T'<T 的事件即 Event(T'<T)

/**
 * 1.Watermark 是一個時間戳, 它表示小于該時間戳的事件都已經(jīng)到達(dá)了。
 * 2.Watermark 一般情況在源位置產(chǎn)生(也可以在流圖中的其它節(jié)點產(chǎn)生), 通過流圖節(jié)點傳播。
 * 3.Watermark 也是 StreamElement, 和普通數(shù)據(jù)一起在算子之間傳遞。
 * 4.Watermark 可以觸發(fā)窗口計算, 時間戳為 Long.MAX_VALUE 表示算子后續(xù)沒有任何數(shù)據(jù)。
 */
public final class Watermark extends StreamElement {
    // 省略...

    /**
     * The timestamp of the watermark in milliseconds.
     */
    private final long timestamp;

    /**
     * Creates a new watermark with the given timestamp in milliseconds.
     */
    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }

    /**
     * Returns the timestamp associated with this {@link Watermark} in milliseconds.
     */
    public long getTimestamp() {
        return timestamp;
    }
       // 省略...
}

如下圖所示,事件 Event 是按照事件時間 EventTime 順序上報的。


順序的事件.JPG

如下圖所示,事件 Event 是不按照事件時間 EventTime 亂序上報的。


亂序的事件.JPG

2 Watermark 的產(chǎn)生

2.1 Watermark 類型

說明:flink-1.12 支持 WatermarkStrategy 和 WatermarkGenerator

flink 采用 WatermarkStrategy 設(shè)置自定義 Watermark 類型,WatermarkGenerator 是 Watermark 的基類。flink 實現(xiàn)了 Punctuated Watermarks 從事件獲取事件的時間戳、Periodic Watermarks 周期獲取事件的時間戳。

/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 從事件獲取事件的時間戳
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期獲取事件的時間戳
     */
    void onPeriodicEmit(WatermarkOutput output);
}

使用 WatermarkStrategy 的樣例,如下代碼。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = env.fromElements("data");

        // 使用 WatermarkStrategy 設(shè)置 Watermark 類型
        input.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMillis(10)));

2.2 Watermark 的產(chǎn)生

Watermark 是算子 TimestampsAndWatermarksOperator 產(chǎn)生的,WatermarkStrategy 相當(dāng)于 UDFFunction(封裝于TimestampsAndWatermarksOperator 內(nèi)部)。processElement 方法實現(xiàn)事件產(chǎn)生 Watermark,processWatermark 方法阻斷上游傳過來的 Watermark,onProcessingTime 方法實現(xiàn)周期產(chǎn)生 Watermark。

public class TimestampsAndWatermarksOperator<T>
        extends AbstractStreamOperator<T>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
// 省略...
    @Override
    public void processElement(final StreamRecord<T> element) throws Exception {
        final T event = element.getValue();
        final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
        final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

        element.setTimestamp(newTimestamp);
        output.collect(element);
        // 事件產(chǎn)生 Watermark
        watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
    }

    // 阻斷上游傳過來的 watermark
    @Override
    public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE) {
            wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
        }
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // 采用定時器, 周期產(chǎn)生 Watermark
        watermarkGenerator.onPeriodicEmit(wmOutput);

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        // 更新定時器
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
// 省略...
}

(1)Watermark 周期產(chǎn)生

public class TimePeriodicWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // don't need to do anything because we work on processing time
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

結(jié)合算子 TimestampsAndWatermarksOperator 和 TimePeriodicWatermarkGenerator,分析 Watermark 的產(chǎn)生流程。如下圖所示,橫軸表示 processing time,圓形表示事件,圓形中的時間 t 表示事件時間,圓形落在橫軸表示事件在算子中的處理,其中 Watermark 的產(chǎn)生周期為 60s 和允許延遲時間為 10s。以第一個周期 [0,60) 為例,獲取事件中的最大事件時間 max,向下游發(fā)送 watermark(最大事件時間 - 允許延遲時間 - 1)

watermark產(chǎn)生流程.JPG

(2)Watermark 事件產(chǎn)生

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // don't need to do anything because we emit in reaction to events above
    }
}

3 Watermark 的傳遞

Watermark 的傳遞方式是廣播,即廣播方式發(fā)送到下游。Watermark 與業(yè)務(wù)數(shù)據(jù)一樣,無差別地傳遞下去。

Watermark傳遞.JPG

例子:多并發(fā)的場景下,Watermark 是 source task 產(chǎn)生,經(jīng)過 keyby 分組后觸發(fā)窗口計算。
說明:① Watermark 要單調(diào)遞增。② 如果算子有多個上游(廣播)即輸入多個 Watermark(T),則該算子取最小 Watermark 即 min(Watermark(T1), Watermark(T2))。

多并行下的Watermark.JPG

從 WindowOperator 源碼分析窗口是如何傳遞 Watermark。
首先分析 WindowOperator 類圖,可知 WindowOperator 間接繼承AbstractStreamOperator,而 AbstractStreamOperator 實現(xiàn)了接口 Input 的 processWatermark 方法、接口 TwoInputStreamOperator 的 processWatermark1 方法 和 processWatermark2 方法。


WindowOperator類圖.jpg

接著分析一下 AbstractStreamOperator 實現(xiàn)的 processWatermark 、processWatermark1 和 processWatermark2。

// 省略 ....
    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        // 發(fā)送 watermark
        output.emitWatermark(mark);
    }

    /**
     * 2個上游的watermark
     * 計算最小watermark, 并設(shè)置為當(dāng)前算子的watermark
     */
    public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    /**
     * 2個上游的watermark
     * 計算最小watermark, 并設(shè)置為當(dāng)前算子的watermark
     */
    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }
// 省略 ....
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容