Flink自定義觸發(fā)器

上一篇分享中介紹了Flink完成數(shù)據(jù)統(tǒng)計(jì)的例子,在最后提到了自定義的統(tǒng)計(jì)觸發(fā)器,這一篇分享主要介紹一下自定義的觸發(fā)器如何來(lái)實(shí)現(xiàn)。

一、觸發(fā)器的作用
觸發(fā)器的作用就是我們?cè)诖翱谥?,什么時(shí)候來(lái)觸發(fā)我們的聚合方法。主要涉及到的就是聚合計(jì)算(AggregateFunction)中的
OUT getResult(ACC var1);
這兩個(gè)方法

比如我們想要在1個(gè)小時(shí)為單位的時(shí)間窗口里,達(dá)到每分鐘來(lái)刷新數(shù)據(jù)的目的,那我們就必須每分鐘都要觸發(fā)一次getResult方法,來(lái)把數(shù)據(jù)發(fā)送到下一個(gè)處理節(jié)點(diǎn)(一般來(lái)說(shuō)都是Sink-->保存數(shù)據(jù)的節(jié)點(diǎn))

二、觸發(fā)器的的實(shí)現(xiàn)
實(shí)現(xiàn)很簡(jiǎn)單,只需要繼承Trigger<Object, W>類,實(shí)現(xiàn)它的方法即可
例如,我們需要一個(gè)帶步長(zhǎng)的觸發(fā)器:

class ProcessTimeTrigger<W extends Window> extends Trigger<Object, W>
private final long interval;
private ProcessTimeTrigger(long interval) {
        this.interval = interval;
    }

方法調(diào)用時(shí)機(jī)如下:
onElement()方法,每個(gè)元素被添加到窗口時(shí)調(diào)用
  
onEventTime()方法,當(dāng)一個(gè)已注冊(cè)的事件時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用
  onProcessingTime()方法,當(dāng)一個(gè)已注冊(cè)的處理時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用
  
onMerge()方法,與狀態(tài)性觸發(fā)器相關(guān),當(dāng)使用會(huì)話窗口時(shí),兩個(gè)觸發(fā)器對(duì)應(yīng)的窗口合并時(shí),合并兩個(gè)觸發(fā)器的狀態(tài)。
  *最后一個(gè)clear()方法執(zhí)行任何需要清除的相應(yīng)窗口
上面的方法中有兩個(gè)需要注意的地方:
1)第一、三通過(guò)返回一個(gè)TriggerResult來(lái)決定如何操作調(diào)用他們的事件,這些操作可以是下面操作中的一個(gè);
CONTINUE:什么也不做
FIRE:觸發(fā)計(jì)算
PURGE:清除窗口中的數(shù)據(jù)
FIRE_AND_PURGE:觸發(fā)計(jì)算并清除窗口中的數(shù)據(jù)

三、自定義注冊(cè)定時(shí)觸發(fā)器
我們?cè)谛枰趏nElement中注冊(cè)一個(gè)定時(shí)觸發(fā)的任務(wù)

@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerProcessingTimeTimer(nextFireTimestamp);

            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

根據(jù)步長(zhǎng)來(lái)注冊(cè)下次執(zhí)行的時(shí)間

然后

@Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        } else if(window.maxTimestamp() == time) {
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

在onProcessingTime的時(shí)候如果步長(zhǎng)和當(dāng)前的執(zhí)行時(shí)間一致,則觸發(fā)計(jì)算
并再注冊(cè)下一次的觸發(fā)時(shí)間,直到窗口結(jié)束。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評(píng)論 19 139
  • Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的”buckets”桶,我們...
    尼小摩閱讀 3,556評(píng)論 0 13
  • 原文連接 https://ci.apache.org/projects/flink/flink-docs-rele...
    Alex90閱讀 3,553評(píng)論 0 5
  • 在圖書(shū)館看著無(wú)聊的習(xí)題,昏昏欲睡的我看到對(duì)面突然坐下一個(gè)身穿熒光綠外套的小女孩,眼神純真清澈,是未經(jīng)世事的...
    薩爾斯堡的鹽樹(shù)枝閱讀 436評(píng)論 0 0
  • ——(我的青春我做主) 火遍大江南北的那首歌--時(shí)間都去哪兒了,究其原因,可能是歌...
    交織閱讀 289評(píng)論 0 0

友情鏈接更多精彩內(nèi)容