registerEventTimeTimer

之前寫了一篇文章介紹registerProcessingTimeTimer,有興趣可以看下之前的文章。這篇文章介紹一下registerEventTimeTimer。

背景

  • 首先介紹一下processingtime和eventtime的區(qū)別。
    processingtime 指的時間是當(dāng)前時間
    eventtime 指的是數(shù)據(jù)里的時間。

  • registerProcessingTimeTimer與registerEventTimeTimer 區(qū)別
    上邊文章討論的是,注冊相同的當(dāng)前時間的timer,那么應(yīng)該如何觸發(fā)?
    本片則要討論,如果注冊相同事件時間(eventtime)的timer,那么在數(shù)據(jù)時間相同和數(shù)據(jù)時間不同時,如何觸發(fā)?

測試栗子

public static void eventTimeWindow() throws Exception {
        long ct=System.currentTimeMillis();
        StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
        e.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        e.getConfig().setAutoWatermarkInterval(1000);
        DataStreamSource<Long> source = e
                .addSource(new SourceFunction<Long>() {
                    private volatile boolean stop = false;
                    @Override
                    public void run(SourceContext<Long> ctx) throws Exception {
                            for(int j=0;j<10;j++) {
                                ctx.collectWithTimestamp(
                                        (long) j,
                                        ct);
                                Thread.sleep(500);
                            }
                        for(int j=0;j<10;j++) {
                            ctx.collectWithTimestamp(
                                    (long) j,
                                    System.currentTimeMillis());
                            Thread.sleep(500);
                        }
                    }
                    @Override
                    public void cancel() {
                        stop = true;
                    }
                }).setParallelism(1);
        source.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Long>forBoundedOutOfOrderness(Duration.ofSeconds(1)))
                .keyBy(v->v/1000).process(new KeyedProcessFunction<Long, Long, Long>() {
            private ValueState<Integer> itemState;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                ValueStateDescriptor<Integer> itemsStateDesc = new ValueStateDescriptor<>(
                        "itemState-state",
                        Integer.class);
                itemState = getRuntimeContext().getState(itemsStateDesc);
            }
            @Override
            public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
                int val=(itemState.value()==null)?0:itemState.value();
                itemState.update(val+1);
             System.out.println(ctx.timerService().currentWatermark()+","+ctx.timestamp());
                ctx.timerService().registerEventTimeTimer(ct);
            }
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
                System.out.println(itemState.value()+"---"+timestamp+"——"+ctx.getCurrentKey());
            }
            @Override
            public void close() throws Exception {
                super.close();
            }
        }).setParallelism(1);
        e.execute();
    }

代碼講解:這個測試里的栗子是:

  • 數(shù)據(jù)源: 10條ct的數(shù)據(jù),10條當(dāng)前時間戳的數(shù)據(jù)。
  • registertime 是ct。

結(jié)果:

-9223372036854775808,1623310821756     //current watermark,nowct
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310829459
11---1623310821756——0                            //val,timer的時間戳
1623310829358,1623310829961
12---1623310821756——0
1623310829860,1623310830466
13---1623310821756——0
1623310830365,1623310830970
14---1623310821756——0
1623310830869,1623310831474
15---1623310821756——0
1623310831373,1623310831977
16---1623310821756——0
1623310831876,1623310832481
17---1623310821756——0
1623310832380,1623310832986
18---1623310821756——0
1623310832885,1623310833490
19---1623310821756——0
1623310833389,1623310833993
20---1623310821756——0

源碼解析:

1. watermark生成方式

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

如上可知,當(dāng)前watermark=maxTimestamp - outOfOrdernessMillis - 1,也就是ct-1000s-1,允許延遲1s。
由于使用的onPeriodicEmit ,watermark會定時1s更新一次。

2. 注冊

@Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

只是添加到隊列里。如果time相同就不會添加成功,那么也就不會觸發(fā)Timer 。具體參考上一篇

3. 觸發(fā)

public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

與processingtime不同,eventtime觸發(fā)適合watermark有關(guān)的。當(dāng)eventtimetimer隊列不為空,且當(dāng)前隊列timer小于等于當(dāng)前watermark就會觸發(fā)。

結(jié)論:

綜上:1. watermark是定時生成的,當(dāng)前時間,間隔1000s生成一個,所以當(dāng)有watermark且符合time<=watermark,才會觸發(fā)timer。
2.因為相同timer會去重,所以當(dāng)符合條件時,相同timer只會觸發(fā)一次timer。
3.下次觸發(fā)會在,watermark到來,且符合timer<=watermark時才會觸發(fā)。

參考:watermark生成

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容