文檔解讀
文檔路徑
/Application Development/Streaming (DataStream API)/Event Time/Watermarks in Parallel Streams
Watermarks are generated at, or directly after, source functions. Each parallel subtask of a source function usually generates its watermarks independently. These watermarks define the event time at that particular parallel source.
As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators.
Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…) or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.

上文中主要說(shuō)明的是每個(gè)subtask會(huì)獨(dú)立計(jì)算各自的watermark,然后向后傳播這個(gè)watermark,當(dāng)遇到了需要shuffle的算子,例如上圖中的window,那么會(huì)取各自subtask發(fā)送過(guò)來(lái)的watermark的最小值作為當(dāng)前算子的watermark。
以kafka作為source為例,上圖的中“多個(gè)”source,可以認(rèn)為是同一個(gè)topic中的不同partition,也可以看作是不同的topic,如果是不同的topic也就是所謂的多流。這兩種情況,如果出現(xiàn)需要對(duì)齊watermark的時(shí)候,都是取的最小值,只是源碼實(shí)現(xiàn)的方式不同。
同一個(gè)流
在同一個(gè)流中不同partition的場(chǎng)景,不同subtask之間認(rèn)為有一個(gè)channel,實(shí)際代碼會(huì)調(diào)用到StatusWatermarkValve#inputWatermark,其中會(huì)調(diào)用findAndOutputNewMinWatermarkAcrossAlignedChannels取所有channel中watermark的最小值作為新的watermark
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
long newMinWatermark = Long.MAX_VALUE;
boolean hasAlignedChannels = false;
// determine new overall watermark by considering only watermark-aligned channels across all channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
hasAlignedChannels = true;
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it really is aggregated
// from some remaining aligned channel, and is also larger than the last output watermark
if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
}
}
兩條流
目前flink只支持兩個(gè)流,在AbstractStreamOperator.java中,非常清楚描述了生成新watermark的規(guī)則
// ---------------- two-input operator watermarks ------------------
// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
擴(kuò)展閱讀
上文是提到的都是“多輸入”,這里的多個(gè)輸入可以認(rèn)為是同一個(gè)數(shù)據(jù)源的不同分區(qū),也可以認(rèn)為是多個(gè)數(shù)據(jù)源,總之這種情況是取watermark的最小值,“有多輸入”就會(huì)有“單輸入”,所謂單輸入,可以認(rèn)為是官網(wǎng)中所說(shuō)的Each parallel subtask of a source function,這里簡(jiǎn)單說(shuō)明一下??梢詤⒖?code>BoundedOutOfOrdernessTimestampExtractor#getCurrentWatermark生成watermark的方法,可見相當(dāng)于每次計(jì)算,都是取potentialWM的最大值作為watermark。
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
所以,可以簡(jiǎn)單總結(jié)一句話,對(duì)于watermark,單輸入取其大,多輸入取其小。