Watermarks in Parallel Streams

目錄

文檔解讀

文檔路徑

/Application Development/Streaming (DataStream API)/Event Time/Watermarks in Parallel Streams

Watermarks are generated at, or directly after, source functions. Each parallel subtask of a source function usually generates its watermarks independently. These watermarks define the event time at that particular parallel source.

As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators.

Some operators consume multiple input streams; a union, for example, or operators following a keyBy(…) or partition(…) function. Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.

上文中主要說(shuō)明的是每個(gè)subtask會(huì)獨(dú)立計(jì)算各自的watermark,然后向后傳播這個(gè)watermark,當(dāng)遇到了需要shuffle的算子,例如上圖中的window,那么會(huì)取各自subtask發(fā)送過(guò)來(lái)的watermark的最小值作為當(dāng)前算子的watermark。

以kafka作為source為例,上圖的中“多個(gè)”source,可以認(rèn)為是同一個(gè)topic中的不同partition,也可以看作是不同的topic,如果是不同的topic也就是所謂的多流。這兩種情況,如果出現(xiàn)需要對(duì)齊watermark的時(shí)候,都是取的最小值,只是源碼實(shí)現(xiàn)的方式不同。

同一個(gè)流

在同一個(gè)流中不同partition的場(chǎng)景,不同subtask之間認(rèn)為有一個(gè)channel,實(shí)際代碼會(huì)調(diào)用到StatusWatermarkValve#inputWatermark,其中會(huì)調(diào)用findAndOutputNewMinWatermarkAcrossAlignedChannels取所有channel中watermark的最小值作為新的watermark

private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
  long newMinWatermark = Long.MAX_VALUE;
  boolean hasAlignedChannels = false;

  // determine new overall watermark by considering only watermark-aligned channels across all channels
  for (InputChannelStatus channelStatus : channelStatuses) {
    if (channelStatus.isWatermarkAligned) {
      hasAlignedChannels = true;
      newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
    }
  }

  // we acknowledge and output the new overall watermark if it really is aggregated
  // from some remaining aligned channel, and is also larger than the last output watermark
  if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
    lastOutputWatermark = newMinWatermark;
    outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
  }
}

兩條流

目前flink只支持兩個(gè)流,在AbstractStreamOperator.java中,非常清楚描述了生成新watermark的規(guī)則

// ---------------- two-input operator watermarks ------------------

// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;

public void processWatermark1(Watermark mark) throws Exception {
  input1Watermark = mark.getTimestamp();
  long newMin = Math.min(input1Watermark, input2Watermark);
  if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
  }
}

public void processWatermark2(Watermark mark) throws Exception {
  input2Watermark = mark.getTimestamp();
  long newMin = Math.min(input1Watermark, input2Watermark);
  if (newMin > combinedWatermark) {
    combinedWatermark = newMin;
    processWatermark(new Watermark(combinedWatermark));
  }
}

擴(kuò)展閱讀

上文是提到的都是“多輸入”,這里的多個(gè)輸入可以認(rèn)為是同一個(gè)數(shù)據(jù)源的不同分區(qū),也可以認(rèn)為是多個(gè)數(shù)據(jù)源,總之這種情況是取watermark的最小值,“有多輸入”就會(huì)有“單輸入”,所謂單輸入,可以認(rèn)為是官網(wǎng)中所說(shuō)的Each parallel subtask of a source function,這里簡(jiǎn)單說(shuō)明一下??梢詤⒖?code>BoundedOutOfOrdernessTimestampExtractor#getCurrentWatermark生成watermark的方法,可見相當(dāng)于每次計(jì)算,都是取potentialWM的最大值作為watermark。

public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}

所以,可以簡(jiǎn)單總結(jié)一句話,對(duì)于watermark,單輸入取其大,多輸入取其小。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容