本文闡述 Flink 的事件時間和 Watermark 機(jī)制,剖析 Watermark 產(chǎn)生和傳遞的流程。
1 Event time 和 Watermark 的關(guān)系
1.1 Event time 和 Processing time介紹
Event time 事件時間和Processing time 處理時間主要區(qū)別是產(chǎn)生時間不同,前者是事件的實際發(fā)生時間,后者是機(jī)器的系統(tǒng)處理時間,如下圖所示。
① Event time 事件時間:事件在其設(shè)備上發(fā)生的時間。
Event time 是事件在進(jìn)入 Flink 之前已經(jīng)嵌入到記錄的時間,其大小取決于事件本身,與網(wǎng)絡(luò)延時、系統(tǒng)時區(qū)等因素?zé)o關(guān)。
② Processing time 處理時間:作業(yè)正在執(zhí)行相應(yīng)操作的機(jī)器系統(tǒng)時間。
Processing time 提供了最佳的性能和最低的延遲,但是不能提供確定性,即計算結(jié)果是不確定的。
例如,時間窗口為5min的求和統(tǒng)計,應(yīng)用程序在 9:00 開始運(yùn)行,則第一個時間窗口處理 [9:00, 9:05) 的事件,下一個窗口處理 [9:05, 9:10) 的事件,依此類推。通信延遲、作業(yè)故障重啟等問題,可能導(dǎo)致窗口的計算結(jié)果是不一樣的。如下圖所示,假設(shè)事件(事件時間, 數(shù)值) 遇到上述問題,場景一:事件 B 有網(wǎng)絡(luò)延遲落在[9:10, 9:15),場景二:作業(yè)故障重啟導(dǎo)致事件 A 和事件 B落在[9:10, 9:15)。
ProcessingTime不確定性.JPG
1.2 Event time 和 Watermark
問題:Flink 支持事件時間,如何測量事件時間的進(jìn)度?例如,5min 的事件時間窗口,當(dāng)事件時間超過 5min 時,需要通知 Flink 觸發(fā)窗口計算。
解答:Watermark 機(jī)制。
Watermark 本質(zhì)是時間戳,與業(yè)務(wù)數(shù)據(jù)一樣無差別地傳遞下去,目的是衡量事件時間的進(jìn)度(通知 Flink 觸發(fā)事件時間相關(guān)的操作,例如窗口)。
說明: Watermark(T) 表示目前系統(tǒng)的時間事件是 T,即系統(tǒng)后續(xù)沒有 T'<T 的事件即 Event(T'<T)
/**
* 1.Watermark 是一個時間戳, 它表示小于該時間戳的事件都已經(jīng)到達(dá)了。
* 2.Watermark 一般情況在源位置產(chǎn)生(也可以在流圖中的其它節(jié)點產(chǎn)生), 通過流圖節(jié)點傳播。
* 3.Watermark 也是 StreamElement, 和普通數(shù)據(jù)一起在算子之間傳遞。
* 4.Watermark 可以觸發(fā)窗口計算, 時間戳為 Long.MAX_VALUE 表示算子后續(xù)沒有任何數(shù)據(jù)。
*/
public final class Watermark extends StreamElement {
// 省略...
/**
* The timestamp of the watermark in milliseconds.
*/
private final long timestamp;
/**
* Creates a new watermark with the given timestamp in milliseconds.
*/
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
/**
* Returns the timestamp associated with this {@link Watermark} in milliseconds.
*/
public long getTimestamp() {
return timestamp;
}
// 省略...
}
如下圖所示,事件 Event 是按照事件時間 EventTime 順序上報的。
如下圖所示,事件 Event 是不按照事件時間 EventTime 亂序上報的。
2 Watermark 的產(chǎn)生
2.1 Watermark 類型
說明:flink-1.12 支持 WatermarkStrategy 和 WatermarkGenerator
flink 采用 WatermarkStrategy 設(shè)置自定義 Watermark 類型,WatermarkGenerator 是 Watermark 的基類。flink 實現(xiàn)了 Punctuated Watermarks 從事件獲取事件的時間戳、Periodic Watermarks 周期獲取事件的時間戳。
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or
* periodically (in a fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/**
* 從事件獲取事件的時間戳
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* 周期獲取事件的時間戳
*/
void onPeriodicEmit(WatermarkOutput output);
}
使用 WatermarkStrategy 的樣例,如下代碼。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("data");
// 使用 WatermarkStrategy 設(shè)置 Watermark 類型
input.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMillis(10)));
2.2 Watermark 的產(chǎn)生
Watermark 是算子 TimestampsAndWatermarksOperator 產(chǎn)生的,WatermarkStrategy 相當(dāng)于 UDFFunction(封裝于TimestampsAndWatermarksOperator 內(nèi)部)。processElement 方法實現(xiàn)事件產(chǎn)生 Watermark,processWatermark 方法阻斷上游傳過來的 Watermark,onProcessingTime 方法實現(xiàn)周期產(chǎn)生 Watermark。
public class TimestampsAndWatermarksOperator<T>
extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
// 省略...
@Override
public void processElement(final StreamRecord<T> element) throws Exception {
final T event = element.getValue();
final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);
element.setTimestamp(newTimestamp);
output.collect(element);
// 事件產(chǎn)生 Watermark
watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
}
// 阻斷上游傳過來的 watermark
@Override
public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
// if we receive a Long.MAX_VALUE watermark we forward it since it is used
// to signal the end of input and to not block watermark progress downstream
if (mark.getTimestamp() == Long.MAX_VALUE) {
wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
}
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
// 采用定時器, 周期產(chǎn)生 Watermark
watermarkGenerator.onPeriodicEmit(wmOutput);
final long now = getProcessingTimeService().getCurrentProcessingTime();
// 更新定時器
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
// 省略...
}
(1)Watermark 周期產(chǎn)生
public class TimePeriodicWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
結(jié)合算子 TimestampsAndWatermarksOperator 和 TimePeriodicWatermarkGenerator,分析 Watermark 的產(chǎn)生流程。如下圖所示,橫軸表示 processing time,圓形表示事件,圓形中的時間 t 表示事件時間,圓形落在橫軸表示事件在算子中的處理,其中 Watermark 的產(chǎn)生周期為 60s 和允許延遲時間為 10s。以第一個周期 [0,60) 為例,獲取事件中的最大事件時間 max,向下游發(fā)送 watermark(最大事件時間 - 允許延遲時間 - 1)。
watermark產(chǎn)生流程.JPG
(2)Watermark 事件產(chǎn)生
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
}
}
3 Watermark 的傳遞
Watermark 的傳遞方式是廣播,即廣播方式發(fā)送到下游。Watermark 與業(yè)務(wù)數(shù)據(jù)一樣,無差別地傳遞下去。
例子:多并發(fā)的場景下,Watermark 是 source task 產(chǎn)生,經(jīng)過 keyby 分組后觸發(fā)窗口計算。
說明:① Watermark 要單調(diào)遞增。② 如果算子有多個上游(廣播)即輸入多個 Watermark(T),則該算子取最小 Watermark 即 min(Watermark(T1), Watermark(T2))。
多并行下的Watermark.JPG
從 WindowOperator 源碼分析窗口是如何傳遞 Watermark。
首先分析 WindowOperator 類圖,可知 WindowOperator 間接繼承AbstractStreamOperator,而 AbstractStreamOperator 實現(xiàn)了接口 Input 的 processWatermark 方法、接口 TwoInputStreamOperator 的 processWatermark1 方法 和 processWatermark2 方法。

接著分析一下 AbstractStreamOperator 實現(xiàn)的 processWatermark 、processWatermark1 和 processWatermark2。
// 省略 ....
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
// 發(fā)送 watermark
output.emitWatermark(mark);
}
/**
* 2個上游的watermark
* 計算最小watermark, 并設(shè)置為當(dāng)前算子的watermark
*/
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
/**
* 2個上游的watermark
* 計算最小watermark, 并設(shè)置為當(dāng)前算子的watermark
*/
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
// 省略 ....