之前寫了一篇文章介紹registerProcessingTimeTimer,有興趣可以看下之前的文章。這篇文章介紹一下registerEventTimeTimer。
背景
首先介紹一下processingtime和eventtime的區(qū)別。
processingtime 指的時間是當(dāng)前時間
eventtime 指的是數(shù)據(jù)里的時間。registerProcessingTimeTimer與registerEventTimeTimer 區(qū)別
上邊文章討論的是,注冊相同的當(dāng)前時間的timer,那么應(yīng)該如何觸發(fā)?
本片則要討論,如果注冊相同事件時間(eventtime)的timer,那么在數(shù)據(jù)時間相同和數(shù)據(jù)時間不同時,如何觸發(fā)?
測試栗子
public static void eventTimeWindow() throws Exception {
long ct=System.currentTimeMillis();
StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
e.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
e.getConfig().setAutoWatermarkInterval(1000);
DataStreamSource<Long> source = e
.addSource(new SourceFunction<Long>() {
private volatile boolean stop = false;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
for(int j=0;j<10;j++) {
ctx.collectWithTimestamp(
(long) j,
ct);
Thread.sleep(500);
}
for(int j=0;j<10;j++) {
ctx.collectWithTimestamp(
(long) j,
System.currentTimeMillis());
Thread.sleep(500);
}
}
@Override
public void cancel() {
stop = true;
}
}).setParallelism(1);
source.assignTimestampsAndWatermarks(WatermarkStrategy
.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(1)))
.keyBy(v->v/1000).process(new KeyedProcessFunction<Long, Long, Long>() {
private ValueState<Integer> itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> itemsStateDesc = new ValueStateDescriptor<>(
"itemState-state",
Integer.class);
itemState = getRuntimeContext().getState(itemsStateDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
int val=(itemState.value()==null)?0:itemState.value();
itemState.update(val+1);
System.out.println(ctx.timerService().currentWatermark()+","+ctx.timestamp());
ctx.timerService().registerEventTimeTimer(ct);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
super.onTimer(timestamp, ctx, out);
System.out.println(itemState.value()+"---"+timestamp+"——"+ctx.getCurrentKey());
}
@Override
public void close() throws Exception {
super.close();
}
}).setParallelism(1);
e.execute();
}
代碼講解:這個測試里的栗子是:
- 數(shù)據(jù)源: 10條ct的數(shù)據(jù),10條當(dāng)前時間戳的數(shù)據(jù)。
- registertime 是ct。
結(jié)果:
-9223372036854775808,1623310821756 //current watermark,nowct
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310829459
11---1623310821756——0 //val,timer的時間戳
1623310829358,1623310829961
12---1623310821756——0
1623310829860,1623310830466
13---1623310821756——0
1623310830365,1623310830970
14---1623310821756——0
1623310830869,1623310831474
15---1623310821756——0
1623310831373,1623310831977
16---1623310821756——0
1623310831876,1623310832481
17---1623310821756——0
1623310832380,1623310832986
18---1623310821756——0
1623310832885,1623310833490
19---1623310821756——0
1623310833389,1623310833993
20---1623310821756——0
源碼解析:
1. watermark生成方式
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
如上可知,當(dāng)前watermark=maxTimestamp - outOfOrdernessMillis - 1,也就是ct-1000s-1,允許延遲1s。
由于使用的onPeriodicEmit ,watermark會定時1s更新一次。
2. 注冊
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
只是添加到隊列里。如果time相同就不會添加成功,那么也就不會觸發(fā)Timer 。具體參考上一篇
3. 觸發(fā)
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
與processingtime不同,eventtime觸發(fā)適合watermark有關(guān)的。當(dāng)eventtimetimer隊列不為空,且當(dāng)前隊列timer小于等于當(dāng)前watermark就會觸發(fā)。
結(jié)論:
綜上:1. watermark是定時生成的,當(dāng)前時間,間隔1000s生成一個,所以當(dāng)有watermark且符合time<=watermark,才會觸發(fā)timer。
2.因為相同timer會去重,所以當(dāng)符合條件時,相同timer只會觸發(fā)一次timer。
3.下次觸發(fā)會在,watermark到來,且符合timer<=watermark時才會觸發(fā)。
參考:watermark生成