flink時間語義指北

新舊版本對比

早在flink1.10以前的版本,有三種時間語義:

  • processing time,即當(dāng)前集群的處理時間
  • event time,即消息中附帶的消息產(chǎn)生時間
  • ingestion time,即消息進(jìn)入集群的攝取時間

彼時,時間語義可以通過如下方式設(shè)置

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

2020年年中,flink1.11發(fā)布,其中,F(xiàn)LIP-126: Unify (and separate) Watermark Assigners。水位線的產(chǎn)生之前有兩個接口(AssignerWithPunctuatedWatermarksAssignerWithPeriodicWatermarks),改為由WatermarkGenerator接口實現(xiàn),降低了開發(fā)的復(fù)雜度,并且脫離了TimestampAssigner

2020年年底,flink1.12發(fā)布,流批統(tǒng)一,另外flink將上述時間語義設(shè)置方式標(biāo)記過期:env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

水印生成策略

// version flink-1.12.1
//官方策略-無水印生成
assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())
//官方策略-單調(diào)遞增的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
//官方策略-延遲20秒的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))

//自定義策略-從消息中提取水印
class MyWatermark implements WatermarkGenerator<String> {
    @Override
    public void onEvent(String event, long timestamp, WatermarkOutput output) {
        LocalDateTime dateTime = LocalDateTime.parse(event, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
        long eventTimestamp = dateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
        log.info("event[{}] timestamp[{}] eventTimestamp[{}]" ,event , timestamp, eventTimestamp);
        output.emitWatermark(new Watermark(eventTimestamp));
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    }
}

void onPeriodicEmit(WatermarkOutput output):周期性觸發(fā)watermark生成,默認(rèn)200ms??梢酝ㄟ^env.getConfig().setAutoWatermarkInterval(200)配置。

void onEvent(String event, long eventTimestamp, WatermarkOutput output):事件觸發(fā)watermark生成。

withIdleness(Duration.ofMinutes(1)):此方法為閑置watermark生成。如果1分鐘內(nèi)沒有事件產(chǎn)生,則下發(fā)一個watermark。

兩條時間線

  • processing time:flink處理消息時的時間,窗口開啟的依據(jù)時間

  • watermark:消息中的消息產(chǎn)生時間,窗口關(guān)閉的依據(jù)時間

    note:如果當(dāng)前watermark大于窗口關(guān)閉的時間,你會發(fā)現(xiàn),數(shù)據(jù)無法在窗口收集

源碼分析

    //窗口分配器參數(shù)中 timestamp為當(dāng)前處理事件
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > -9223372036854775808L) {
            if (this.staggerOffset == null) {
                this.staggerOffset = this.windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), this.size);
            }

            long start = TimeWindow.getWindowStartWithOffset(timestamp, (this.globalOffset + this.staggerOffset) % this.size, this.size);
            return Collections.singletonList(new TimeWindow(start, start + this.size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
    //窗口觸發(fā)器 TriggerResult.FIRE表示窗口關(guān)閉,觸發(fā)計算
    //onElement表示窗口收集動作
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }
    //onEventTime表示窗口計算動作
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

注意事項

  1. 單調(diào)遞增的watermark的下發(fā)一般越貼近數(shù)據(jù)源越好
  2. 每個算子默認(rèn)的watermark是Long.MIN_VALUE
  3. 算子的watermark時間不會出現(xiàn)回溯(下一個水印如果小于當(dāng)前水印則無效)
  4. 如果并行度大于1,則watermark依據(jù)木桶原理,同一算子的每個slot的watermark時間都大于關(guān)窗時間才觸發(fā)計算
  5. 上述均為watermark的生成及下發(fā),還有關(guān)于執(zhí)行時間的生成及下發(fā),雖然他很少用。具體可以了解了解BoundedOutOfOrdernessTimestampExtractor這個類

參考資料

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容