Flink1.10—ProcessFunction使用的問題及分析

ProcessFunction是Flink的low-level Stream Api,我們?cè)谑褂肍link的API進(jìn)行作業(yè)時(shí),不外乎關(guān)心以下三種東西:

  • events(也就是流數(shù)據(jù),streaming elements)
  • state(狀態(tài))
  • timer(時(shí)間)

ProccessFunction就是這樣一種API,它提供了所有能訪問以上三種東西的接口。需要注意的是,如果要訪問keyed state,則必須使用keyedstream:

stream.keyBy(...).process(new MyProcessFunction())

ProccessFunction就像是一個(gè)RichMapFunction + Timer的組合體,但功能更強(qiáng)大。比如當(dāng)你用RichMapFunction+Timer時(shí),state的訪問就是一個(gè)問題,你無(wú)法在除map以外的其它方法訪問keyed state。
在我剛剛開始用ProccessFunction的時(shí)候,遇到些問題,這里記錄下。

為什么定時(shí)任務(wù)不觸發(fā)

相信大家剛開始使用ProccessFunction的時(shí)候,應(yīng)該都先看了Flink官網(wǎng)的Demo,不知道大家有沒有運(yùn)行成功,我反正沒有運(yùn)行成功過,定時(shí)任務(wù)也不觸發(fā)。我測(cè)試用到的kafka數(shù)據(jù)非常簡(jiǎn)單就兩字段:userId和viewTime。ProccessFunction里的主要邏輯如下:

public void processElement(UserAction value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    // retrieve the current count
    CountWithTimestamp current = state.value();
    if (current == null) {
      current = new CountWithTimestamp();
    }
    // update the state's count
    current.count++;
    // ctx.timestamp()方法返回你的事件時(shí)間
    current.lastModified = ctx.timestamp();
    // 更新狀態(tài)
    state.update(current);
    // 一分鐘后調(diào)度計(jì)算(執(zhí)行onTimer方法)
    ctx.timerService().registerEventTimeTimer(currentWarter + 60000);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
    CountWithTimestamp result = state.value();
    if (timestamp == result.lastModified + 60000) {
      out.collect(new Tuple2<>(key.f0, result.count));
    }
  }

那為什么沒有觸發(fā)onTimer呢?沒辦法,只能看源代碼了,因應(yīng)之前有閱讀過Flink1.6各個(gè)算子之前的數(shù)據(jù)傳輸?shù)拇a的經(jīng)驗(yàn),直覺告訴我,應(yīng)該在StreamInputProcessor.java類里找原因,不過現(xiàn)在我用的是flink1.10,所以我還得去Flink1.10里找,代碼差不多,也是從StreamInputProcessor里找,只不過在Flink1.10里它變成了一個(gè)接口類,不過沒關(guān)系,大同小異。在StreamTaskNetworkInput.java類中的一個(gè)關(guān)鍵方法processElement:

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()){
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {
            statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {
            statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }

從上面可以看到,F(xiàn)link在處理元素?cái)?shù)據(jù)的時(shí)候,有4個(gè)分支判斷,即是判斷StreamElement屬于哪種類型:

  • StreamRecord - 代表一條流數(shù)據(jù)
  • WaterMark - 水位線,本身就是一個(gè)時(shí)間戳,指示元素timestamp小等于watermark的值都已經(jīng)到了,算子通過調(diào)用。
  • StreamStatus - 流狀態(tài),包含IDLE和ACTIVE兩種狀態(tài)。
  • LatencyMarker - 一個(gè)特殊的mark,用于判斷數(shù)據(jù)的延遲情況。

其中recordOrMark.isWatermark()分支就是用于判斷是否為一個(gè)wartermark, 當(dāng)我們?cè)诔绦蛑薪o元素打上watermark的時(shí)候,程序就會(huì)進(jìn)入這個(gè)判斷,處理新的watermark,當(dāng)新的watermark大于舊的watermark時(shí),會(huì)覆蓋舊的watermark,并且如果有定時(shí)任務(wù),則觸發(fā)onTimer(新的watermark大于等于定時(shí)的timer),如下:

//AbstractStreamOperator.java
public void processWatermark(Watermark mark) throws Exception {
    if (timeServiceManager != null) {
        timeServiceManager.advanceWatermark(mark);
    }
    output.emitWatermark(mark);
}

//InternalTimerServiceImpl.java
public void advanceWatermark(long time) throws Exception {
    currentWatermark = time;

    InternalTimer<K, N> timer;
       //從隊(duì)列里取出定時(shí)任務(wù),并且判斷時(shí)間是否小于等于當(dāng)前的watermark
    while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
        eventTimeTimersQueue.poll();
        keyContext.setCurrentKey(timer.getKey());
        triggerTarget.onEventTime(timer);
    }
}

給程序加上WaterMark

因?yàn)槲襊roccessFunctio里使用的是ctx.timerService().registerEventTimeTimer,基于事件時(shí)間的timer,所以要使ProccessFunction里的定時(shí)任務(wù)能夠觸發(fā),我們還需要如下兩步:

  1. 把Time Characteristic設(shè)置為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  1. 給數(shù)據(jù)加上watermark
stream
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<UserAction>() {
        private long currentMaxTimestamp = 0L;
        private long maxOutOfOrderness = 10000L;//最大允許的亂序時(shí)間是10s
        private Watermark watermark = null;
        @Override
        public long extractTimestamp(UserAction element, long previousElementTimestamp) {
                long timestamp = element.viewTime;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        }
        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
                watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                return watermark;
        }
})
.keyBy("userId")
.process(new CountWithTimeoutFunction())
.print();

再次運(yùn)行程序,把斷點(diǎn)放在onTimer()里,依次發(fā)送數(shù)據(jù):

shizc,2020-06-04 16:25:00
shizc,2020-06-04 16:26:00

當(dāng)發(fā)送完第二條數(shù)據(jù)的時(shí)候,斷點(diǎn)走到了onTimer(),但當(dāng)繼續(xù)執(zhí)行時(shí),由于我們程序里有這樣一個(gè)判斷:

// timestamp是觸發(fā)timer時(shí)的那個(gè)點(diǎn),
if (timestamp == result.lastModified + 60000) {
      // emit the state on timeout
      out.collect(new Tuple2<>(key.f0, result.count));
}

我們發(fā)現(xiàn)程序永遠(yuǎn)也進(jìn)入不了這個(gè)判斷,具體原因是:

  1. 基于event-time的定時(shí)任務(wù)需要下一個(gè)元素的watermark高于當(dāng)前的定時(shí)任務(wù),比如數(shù)據(jù)當(dāng)前的timestamp是2020-06-04 16:25:00,設(shè)置60s后觸發(fā)onTimer,那么定時(shí)任務(wù)就是2020-06-04 16:26:00,如果想要onTimer觸發(fā),就必須要有一條數(shù)的timestamp是大于等于2020-06-04 16:26:00。
  2. 當(dāng)滿足上面 1 的條件時(shí),由于以下代碼的原因?qū)е聇imestamp == result.lastModified + 6000永遠(yuǎn)不成立:
......
// 
current.lastModified = ctx.timestamp();
// 更新狀態(tài)
state.update(current);
// 一分鐘后調(diào)度計(jì)算(執(zhí)行onTimer方法)
ctx.timerService().registerEventTimeTimer(currentWarter + 60000);

比如:我們第一條數(shù)據(jù)是(shizc,2020-06-04 16:25:00),那么current.lastModified = ctx.timestamp() == 2020-06-04 16:25:00,執(zhí)行registerEventTimeTimer后,我期望在2020-06-04 16:26:00執(zhí)行onTimer,所以我發(fā)送了第二條數(shù)據(jù)(shizc,2020-06-04 16:26:00),斷點(diǎn)走到了onTimer():
此時(shí)onTimer方法里的timestamp=2020-06-04 16:26:00,而result.lastModified=2020-06-04 16:26:00,所以if (timestamp == result.lastModified + 60000),永遠(yuǎn)不會(huì)成立。這里應(yīng)該是官網(wǎng)demo的bug,不應(yīng)該去數(shù)據(jù)里的timestamp,應(yīng)該取的是用前一個(gè)watermark來(lái)注冊(cè)timer。

使用WaterMark注冊(cè)Timer

我們修改下CountWithTimeoutFunction里的processElement代碼,并加上些日志輸出,代碼如下:

@Override
  public void processElement(UserAction value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
   ...
   // 取前一個(gè)watermark,基于前一個(gè)watermark的60s后,觸發(fā)onTimer計(jì)算。而不是基于timestamp。
    long prevWaterMark = ctx.timerService().currentWatermark();
    LOG.info("前一個(gè)watermark: {}({})", prevWaterMark, DateFormatUtils.format(prevWaterMark, "yyyy-MM-dd HH:mm:ss"));
    current.lastModified = prevWaterMark;
    // write the state back
    state.update(current);
    ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
    CountWithTimestamp result = state.value();
    long prevWaterMark = result.lastModified;
    LOG.info("onTimer觸發(fā)時(shí)間: {}({}), 前一個(gè)watermark: {}({})", timestamp, DateFormatUtils.format(timestamp, "yyyy-MM-dd HH:mm:ss"),  prevWaterMark, DateFormatUtils.format(prevWaterMark, "yyyy-MM-dd HH:mm:ss"));
    Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
    if (timestamp == result.lastModified + 60000) {
      out.collect(new Tuple2<>(key.f0, result.count));
    }
  }

代碼里關(guān)鍵的部分是long prevWaterMark = ctx.timerService().currentWatermark();,我們應(yīng)該取前一個(gè)watermark,基于前一個(gè)watermark的60s后,觸發(fā)onTimer計(jì)算。而不是基于timestamp。
然后我們依次發(fā)送測(cè)試數(shù)據(jù):

shizc,2020-06-04 16:25:00
shizc,2020-06-04 16:26:00
shizc,2020-06-04 16:28:00

得到的輸出如下:

前一個(gè)watermark: -10000(1970-01-01 07:59:50)
onTimer觸發(fā)時(shí)間: 50000(1970-01-01 08:00:50), 前一個(gè)watermark: -10000(1970-01-01 07:59:50)
(shizc,1)
前一個(gè)watermark: 1591259090000(2020-06-04 16:24:50)
onTimer觸發(fā)時(shí)間: 1591259150000(2020-06-04 16:25:50), 前一個(gè)watermark: 1591259090000(2020-06-04 16:24:50)
(shizc,2)
前一個(gè)watermark: 1591259150000(2020-06-04 16:25:50)
onTimer觸發(fā)時(shí)間: 1591259210000(2020-06-04 16:26:50), 前一個(gè)watermark: 1591259150000(2020-06-04 16:25:50)
(shizc,3)

通過日志可以看出,數(shù)據(jù)已經(jīng)是正常輸出了。

使用ProccessTime注冊(cè)Timer

接著來(lái)看下基于ProccessTimer注冊(cè)Timer和基于EventTime的Timer有什么不同:

public void registerProcessingTimeTimer(N namespace, long time) {
        //取出head,
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
        //如果head不為空,則取head的timestamp作為下一個(gè)triggertime
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // 查檢當(dāng)前的time是否小于nextTrigger,如果小于則直接把當(dāng)前的時(shí)間設(shè)置定時(shí)任務(wù)。
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                // 真正設(shè)置定時(shí)任務(wù)
                nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
            }
        }
    }

    public void registerEventTimeTimer(N namespace, long time) {
        //直接加入到隊(duì)列,等待下一個(gè)watermark到達(dá)后,再取出判斷執(zhí)行。 
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

可以看出proccessTime和eventTime的定時(shí)任務(wù)還是有很大差別的,proccessTime是先與隊(duì)首判斷,如果小于隊(duì)首,則直接設(shè)置定時(shí)任務(wù),否則加入隊(duì)列(里面具體沒看,不過應(yīng)該是有排序的);而eventTime的是直接加入到隊(duì)列,等待下一個(gè)watermark的觸發(fā),再取出判斷執(zhí)行。

proccessTimeTimer的執(zhí)行邏輯是從頭開始執(zhí)行的,當(dāng)你注冊(cè)了三個(gè)timer,并不是已經(jīng)都為三個(gè)timer設(shè)置了定時(shí)任務(wù),而是先為時(shí)間最小的那個(gè)設(shè)置了定時(shí)任務(wù),等待時(shí)間觸發(fā)后,再設(shè)置第二個(gè)定時(shí)任務(wù),如下:

private void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            // 真正設(shè)置定時(shí)任務(wù)
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
        }
}

所以我們不擔(dān)心timer多,而是擔(dān)心timer多而間隔小。所以基于proccessTime的timer我們可以這樣改進(jìn):

long currentProccessTime = ctx.timerService().currentProcessingTime();
    if(current.lastModified == 0 || current.lastModified + 60000 <= currentProccessTime) {
      current.lastModified = currentProccessTime;
      // write the state back
      state.update(current);
      ctx.timerService().registerProcessingTimeTimer(current.lastModified + 60000);
    } else {
      state.update(current);
}

每隔一分鐘設(shè)置一次timer,onTimer()里就不需要在有if (timestamp == result.lastModified + 60000)的判斷了,因?yàn)槲覀円呀?jīng)控制了timer的個(gè)數(shù),同樣基于eventTime的timer也可以這樣改進(jìn)。

運(yùn)行代碼,每隔60s輸出數(shù)據(jù):

onTimer觸發(fā)時(shí)間: 1591344468660(2020-06-05 16:07:48), processingTime: 1591344408660(2020-06-05 16:06:48)
(shizc,2)

onTimer觸發(fā)時(shí)間: 1591344499806(2020-06-05 16:08:19), processingTime: 1591344439806(2020-06-05 16:07:19)
(flink,2)

以上就是我使用ProccessFunction遇到的一些問題,希望對(duì)大家有所幫助,若有不正確的地方歡迎糾正 ~~.

新的發(fā)現(xiàn)

就以上的問題,我在flink jira上提了個(gè)issue: FLINK-19167,我把我遇到的問題描述后,F(xiàn)link大神們也都參與了討論:

discuss.png

最后Dawid Wysakowicz 大神指出了問題的根源: 這個(gè)demo只有在多個(gè)key(多個(gè)window)的情況下才能正確運(yùn)行,單個(gè)key的情況下,會(huì)存在lastModified的值被覆蓋的情況。舉個(gè)例子:
有3條數(shù)據(jù):(key1, 1), (key1,2),(key1,60001), 當(dāng)window收到第一條數(shù)據(jù)時(shí),注冊(cè)一個(gè)基于eventtime的timer,watermark為1 + 6000, 當(dāng)?shù)谌龡l數(shù)據(jù)進(jìn)來(lái)時(shí),先把lastModified更新為60001,此時(shí)觸發(fā)onTimer, timestamp == lastModified,都同為60001,所以 if 的判斷不成立。
如果把(key1, 60001)的key改成key2, 那么前面的key1的所有timer都會(huì)觸發(fā)。是因?yàn)閗ey1的lastModified不會(huì)被更新,但是watermark大于它所注冊(cè)的wartermark,所以會(huì)觸發(fā)。

timestamp: 60001, lastModified: 1
timestamp: 60002, lastModified: 2
(key1,2)

所以由以上得出的結(jié)論是: 基于eventTime的 每一個(gè)key的timer,它們是同一個(gè)流,同一個(gè)流也就會(huì)受到watermark的增長(zhǎng)而影響到(可以想象成 他們?cè)谕粭l時(shí)間軸上)

Flink的大神們也都說(shuō)了,官網(wǎng)的例子的確有沒有考慮到的地方,只是一個(gè)很簡(jiǎn)單的例子,多個(gè)key沒問題,單個(gè)key不生效。

The end !

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容