新舊版本對比
早在flink1.10以前的版本,有三種時間語義:
- processing time,即當(dāng)前集群的處理時間
- event time,即消息中附帶的消息產(chǎn)生時間
- ingestion time,即消息進(jìn)入集群的攝取時間
彼時,時間語義可以通過如下方式設(shè)置
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2020年年中,flink1.11發(fā)布,其中,F(xiàn)LIP-126: Unify (and separate) Watermark Assigners。水位線的產(chǎn)生之前有兩個接口(AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks),改為由WatermarkGenerator接口實現(xiàn),降低了開發(fā)的復(fù)雜度,并且脫離了TimestampAssigner
2020年年底,flink1.12發(fā)布,流批統(tǒng)一,另外flink將上述時間語義設(shè)置方式標(biāo)記過期:env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
水印生成策略
// version flink-1.12.1
//官方策略-無水印生成
assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())
//官方策略-單調(diào)遞增的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
//官方策略-延遲20秒的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
//自定義策略-從消息中提取水印
class MyWatermark implements WatermarkGenerator<String> {
@Override
public void onEvent(String event, long timestamp, WatermarkOutput output) {
LocalDateTime dateTime = LocalDateTime.parse(event, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
long eventTimestamp = dateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("event[{}] timestamp[{}] eventTimestamp[{}]" ,event , timestamp, eventTimestamp);
output.emitWatermark(new Watermark(eventTimestamp));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
void onPeriodicEmit(WatermarkOutput output):周期性觸發(fā)watermark生成,默認(rèn)200ms??梢酝ㄟ^env.getConfig().setAutoWatermarkInterval(200)配置。
void onEvent(String event, long eventTimestamp, WatermarkOutput output):事件觸發(fā)watermark生成。
withIdleness(Duration.ofMinutes(1)):此方法為閑置watermark生成。如果1分鐘內(nèi)沒有事件產(chǎn)生,則下發(fā)一個watermark。
兩條時間線
processing time:flink處理消息時的時間,窗口開啟的依據(jù)時間
-
watermark:消息中的消息產(chǎn)生時間,窗口關(guān)閉的依據(jù)時間
note:如果當(dāng)前watermark大于窗口關(guān)閉的時間,你會發(fā)現(xiàn),數(shù)據(jù)無法在窗口收集
源碼分析
//窗口分配器參數(shù)中 timestamp為當(dāng)前處理事件
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > -9223372036854775808L) {
if (this.staggerOffset == null) {
this.staggerOffset = this.windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), this.size);
}
long start = TimeWindow.getWindowStartWithOffset(timestamp, (this.globalOffset + this.staggerOffset) % this.size, this.size);
return Collections.singletonList(new TimeWindow(start, start + this.size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
//窗口觸發(fā)器 TriggerResult.FIRE表示窗口關(guān)閉,觸發(fā)計算
//onElement表示窗口收集動作
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
//onEventTime表示窗口計算動作
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
注意事項
- 單調(diào)遞增的watermark的下發(fā)一般越貼近數(shù)據(jù)源越好
- 每個算子默認(rèn)的watermark是Long.MIN_VALUE
- 算子的watermark時間不會出現(xiàn)回溯(下一個水印如果小于當(dāng)前水印則無效)
- 如果并行度大于1,則watermark依據(jù)木桶原理,同一算子的每個slot的watermark時間都大于關(guān)窗時間才觸發(fā)計算
- 上述均為watermark的生成及下發(fā),還有關(guān)于執(zhí)行時間的生成及下發(fā),雖然他很少用。具體可以了解了解BoundedOutOfOrdernessTimestampExtractor這個類
參考資料
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html