問題定位:Flink水位線不觸發(fā)問題

Flink水位線不觸發(fā)問題

窗口計(jì)算時(shí)遇到好幾次水位線不觸發(fā)的情況,簡(jiǎn)單總結(jié)下。

首先,介紹下Flink的事件時(shí)間(EventTime)和水位線(Watermarks)的概念。

一、處理時(shí)間

如果要構(gòu)造一個(gè)實(shí)時(shí)的流式應(yīng)用,或早或晚都會(huì)接觸到EventTime這個(gè)概念?,F(xiàn)實(shí)場(chǎng)景中也會(huì)遇到消息亂序到達(dá),這里會(huì)介紹到為什么需要事件時(shí)間和如何去處理亂序到達(dá)的數(shù)據(jù)。
ProcessingTime是Flink系統(tǒng)處理這條消息的時(shí)間,EventTime可以理解成是這條消息真實(shí)發(fā)生的時(shí)間。
舉個(gè)例子,創(chuàng)建一個(gè)SlidingWindow,窗口大小為10秒,步長(zhǎng)為5秒。關(guān)于窗口的更多概念,可以參考Flink官方文檔——Windows。

案例1:消息都按時(shí)到達(dá)

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)
counts.print
senv.execute("ProcessingTime processing example")

如果source中有三條消息,對(duì)應(yīng)的事件時(shí)間分別為13秒、13秒和16秒:
[站外圖片上傳中...(image-1e2304-1550219427900)]
它們會(huì)落到正確的窗口上,如下圖所示。13秒產(chǎn)生的兩條消息會(huì)落到窗口1[5s-15s]和窗口2[10s-20s]上,16秒產(chǎn)生的消息會(huì)落到窗口2[10s-20s]和窗口3[15s-25s]上。最后窗口fire掉時(shí),三個(gè)窗口的count值分別為:(a,2), (a,3) and (a,1) ,和預(yù)期一致。

案例2:消息delay

其中一條13秒產(chǎn)生的消息晚到了6s,那按上面的代碼邏輯,這些消息會(huì)落到下面的窗口中:


延遲的消息會(huì)落到窗口2[10s-20s]和窗口3[15s-25s]上。這看起來對(duì)窗口2沒有影響,因?yàn)榻Y(jié)果都是3,但窗口3的結(jié)果卻不一致了。

二、事件時(shí)間

因此此處我們采用事件時(shí)間,這里水位線的事件時(shí)間為當(dāng)前系統(tǒng)的時(shí)間,當(dāng)然你可以改成數(shù)據(jù)中的某個(gè)時(shí)間。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }
  override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis)
  }
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

結(jié)果如下圖所示:
[站外圖片上傳中...(image-b2feb1-1550219427900)]
結(jié)果看起來好很多,窗口2和3都正確了,但窗口1卻丟了一條數(shù)據(jù)。
Flink沒有將delay的數(shù)據(jù)分配給窗口3是因?yàn)楝F(xiàn)在是檢查消息的事件時(shí)間,因此不會(huì)放入窗口三中。而沒有分配給窗口1的原因是delayed的消息到達(dá)系統(tǒng)的時(shí)間是19秒,窗口1已經(jīng)fire掉了。此處就需要watermarks了。

三、水位線(水?。?/h2>

我認(rèn)為水位線是很重要和有趣的一個(gè)概念,我這里會(huì)大概描述下,如果想了解更多可以看Google一場(chǎng)精彩的talk,也可以看這個(gè)dataArtisans的blog。水位線簡(jiǎn)單理解就是一個(gè)timestamp,當(dāng)Flink收到這個(gè)水印時(shí),F(xiàn)link理解會(huì)收到來自這個(gè)時(shí)間點(diǎn)之后的消息,也可以理解成告訴Flink運(yùn)行到哪個(gè)事件時(shí)間了。
在這個(gè)案例中,其實(shí)就是告訴Flink一條消息可以遲到多久。
我現(xiàn)在設(shè)置水位線為現(xiàn)在的事件提前5秒,相當(dāng)于告訴Flink我的消息可以遲到五秒。

override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis - 5000)
  }

結(jié)果變成下圖所示:

[站外圖片上傳中...(image-6dfb6e-1550219427900)]

四、允許延遲(Lateness)

如果采用“watermark - delay”,如果水位線不超過window_length + delay是不會(huì)被fire掉的,所以此刻可以采用allowedLateness方法。在window_end_time + allowed lateness之前,F(xiàn)link都不會(huì)丟棄這條數(shù)據(jù)。
當(dāng)消息到達(dá)時(shí),F(xiàn)link會(huì)提取它的時(shí)間,然后判斷它是否在有效的延遲時(shí)間內(nèi),然后去判斷是否fire掉窗口。
但是通過這種途徑,一個(gè)窗口有可能被fire掉多次,如果需要exactly once processing的話,需要保證sink是冪等的。

五、水位線怎么不觸發(fā)?

數(shù)據(jù)一直有序得進(jìn)來,為什么沒有窗口被fire掉?沒有結(jié)果產(chǎn)出?

case1:提取時(shí)間失敗

筆者和上游約定好了數(shù)據(jù)格式,extractTimestamp中提取的是某個(gè)字段為事件時(shí)間。研究數(shù)據(jù)發(fā)現(xiàn)約定好的字段突然不發(fā)了。

case2:提取時(shí)間有了,但是照樣失敗

上游按約定發(fā)了該字段后,系統(tǒng)在測(cè)試環(huán)境運(yùn)行了一段時(shí)間,又沒有結(jié)果產(chǎn)出了。
調(diào)試發(fā)現(xiàn)提取時(shí)間正常,checkAndGetNextWatermark也正常,但是為什么窗口沒被fire掉呢。
因?yàn)闀r(shí)間的format變了,由到毫秒的timestamp變成了yyyymmddHHmmss,需要轉(zhuǎn)成timestamp。

case3:一切正常,窗口不fire

EventTimeTrigger.java
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}

看了下EventTimeTrigger的源碼,只有當(dāng)當(dāng)前的水位線越過窗口,即時(shí)間大于窗口的endTime才會(huì)觸發(fā)Fire的操作。我們的處理流程沒有觸發(fā),那就說明我們的水位線沒有更新到合適的值。調(diào)試后發(fā)現(xiàn)當(dāng)前的水位線一直停留在初始的最小的long值。

BoundedOutOfOrdernessTimestampExtractor.java
@Override
public final Watermark getCurrentWatermark() {
    // this guarantees that the watermark never goes backwards.
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}

debug發(fā)現(xiàn)lastEmittedWatermark確實(shí)有更新,這說明這個(gè)地方是觸發(fā)了Watermark的值。但是debug的過程中,發(fā)現(xiàn)時(shí)不時(shí)會(huì)出現(xiàn)初始值的水位線。

SystemProcessingTimeService.java
TimestampsAndPeriodicWatermarksOperator.java
@Override
public void onProcessingTime(long timestamp) throws Exception {
    // register next timer
    Watermark newWatermark = userFunction.getCurrentWatermark();
    if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
        currentWatermark = newWatermark.getTimestamp();
        // emit watermark
        output.emitWatermark(newWatermark);
    }
    long now = getProcessingTimeService().getCurrentProcessingTime();
    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}

TimestampsAndPeriodicWatermarksOperator會(huì)做判斷:如果新的水位線小于當(dāng)前的水位線,就不會(huì)更新了。

終于,順著StreamInputProcessor–>StatusWatermarkValve理了下來,看見這樣的處理邏輯:

StreamInputProcessor.java
StatusWatermarkValve.java
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
    long newMinWatermark = Long.MAX_VALUE;
    // determine new overall watermark by considering only watermark-aligned channels across all channels
    for (InputChannelStatus channelStatus : channelStatuses) {
        if (channelStatus.isWatermarkAligned) {
            newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
        }
    }
    // we acknowledge and output the new overall watermark if it is larger than the last output watermark
    if (newMinWatermark > lastOutputWatermark) {
        lastOutputWatermark = newMinWatermark;
        outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
    }
}

這里會(huì)將所有的channel status的水位線做個(gè)匯總:取最小的水位線。那是不是問題出在這里?后面debug了下看看,確實(shí),這個(gè)地方有的channel status下的水位線一直是最小的long值那個(gè)不正常的水位線,進(jìn)而導(dǎo)致整體的水位線發(fā)送不出去。

那么為什么會(huì)出現(xiàn)這種情況呢,百思不得其解。

當(dāng) [window_start_time,window_end_time) 有數(shù)據(jù),watermark Time大于等于window_end_time時(shí),會(huì)觸發(fā)window trigger。

因?yàn)橹斑\(yùn)行都是正常的,檢查了數(shù)據(jù)也沒問題。去翻改動(dòng),有影響的可能就是改了一些算子的并行度。

assigntimestampandwatermarks和map的并行度一樣了就不能生成水位線了?

于是修改了assigntimestampandwatermarks的并行度,window正常fire掉了。

分析原因:
Flink source用到了FlinkKafkaConsumer010,沒有指定KafkaPartitioner的話,會(huì)通過FixedPartitioner來給出默認(rèn)的partitioner方法:

public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        return partitions[this.parallelInstanceId % partitions.length];
    }

parallelInstanceId代表著Flink consumer程序的并行度ID,假如FlinkKafkaConsumer010的并行度是12,那么這12個(gè)線程的ID分別是1-12.

parallelInstances代表著總的并行度,即12。

partitions是一個(gè)kafka partition的數(shù)組,kafka的topic的partitions是4(因?yàn)樾阅軠y(cè)試,換到只有一個(gè)節(jié)點(diǎn)的kafka)。

Flink partition的規(guī)則,就是Flink的并行度ID除以kafka partition length取余。

因此kafka編號(hào)為1-4的partition分別對(duì)應(yīng)source node的1-4的partition,那么source node5-12的partition就為空了。

默認(rèn)的partition策略是按照Flink的并行度ID與kafka中partition的數(shù)量取余的方法分配的,而與數(shù)據(jù)本身沒有關(guān)系。

source node的partition為5-12的接收不到數(shù)據(jù),

Watermark的生成是數(shù)據(jù)驅(qū)動(dòng)的,只有當(dāng)TimestampAndWatermarkAssigner”看到”數(shù)據(jù)時(shí),watermark才會(huì)生效。

而map和assigntimestampandwatermarks并發(fā)度一樣的話,這八個(gè)partition的watermark不會(huì)修改,所以會(huì)出現(xiàn)watermark的初始值一直存在的情況。

當(dāng)assigntimestampandwatermarks的并行度修改后,事件會(huì)被shuffled across(洗牌),因此到了TimestampAndWatermarkAssigner不會(huì)有空的partition存在了。

以上。

五 實(shí)際定位

  1. 問題:發(fā)現(xiàn)程序沒有輸出,沒有任何報(bào)錯(cuò)。
  2. 可以先去任務(wù)的Appmaster上看watermark運(yùn)行情況.來定位是否是watermarke的問題
image-20181219103115949
  1. 如果是顯示no watermark ,按照上述的case 進(jìn)行診斷。
  2. 在assignTimestampsAndWatermarks 后setParallelism(2) 。具體并行度根據(jù)實(shí)際需求設(shè)置,在這里kafkatopic只有2個(gè)分區(qū),并行度設(shè)置為2
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容