上一篇分享中介紹了Flink完成數(shù)據(jù)統(tǒng)計(jì)的例子,在最后提到了自定義的統(tǒng)計(jì)觸發(fā)器,這一篇分享主要介紹一下自定義的觸發(fā)器如何來(lái)實(shí)現(xiàn)。
一、觸發(fā)器的作用
觸發(fā)器的作用就是我們?cè)诖翱谥?,什么時(shí)候來(lái)觸發(fā)我們的聚合方法。主要涉及到的就是聚合計(jì)算(AggregateFunction)中的
OUT getResult(ACC var1);
這兩個(gè)方法
比如我們想要在1個(gè)小時(shí)為單位的時(shí)間窗口里,達(dá)到每分鐘來(lái)刷新數(shù)據(jù)的目的,那我們就必須每分鐘都要觸發(fā)一次getResult方法,來(lái)把數(shù)據(jù)發(fā)送到下一個(gè)處理節(jié)點(diǎn)(一般來(lái)說(shuō)都是Sink-->保存數(shù)據(jù)的節(jié)點(diǎn))
二、觸發(fā)器的的實(shí)現(xiàn)
實(shí)現(xiàn)很簡(jiǎn)單,只需要繼承Trigger<Object, W>類,實(shí)現(xiàn)它的方法即可
例如,我們需要一個(gè)帶步長(zhǎng)的觸發(fā)器:
class ProcessTimeTrigger<W extends Window> extends Trigger<Object, W>
private final long interval;
private ProcessTimeTrigger(long interval) {
this.interval = interval;
}
方法調(diào)用時(shí)機(jī)如下:
onElement()方法,每個(gè)元素被添加到窗口時(shí)調(diào)用
onEventTime()方法,當(dāng)一個(gè)已注冊(cè)的事件時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用
onProcessingTime()方法,當(dāng)一個(gè)已注冊(cè)的處理時(shí)間計(jì)時(shí)器啟動(dòng)時(shí)調(diào)用
onMerge()方法,與狀態(tài)性觸發(fā)器相關(guān),當(dāng)使用會(huì)話窗口時(shí),兩個(gè)觸發(fā)器對(duì)應(yīng)的窗口合并時(shí),合并兩個(gè)觸發(fā)器的狀態(tài)。
*最后一個(gè)clear()方法執(zhí)行任何需要清除的相應(yīng)窗口
上面的方法中有兩個(gè)需要注意的地方:
1)第一、三通過(guò)返回一個(gè)TriggerResult來(lái)決定如何操作調(diào)用他們的事件,這些操作可以是下面操作中的一個(gè);
CONTINUE:什么也不做
FIRE:觸發(fā)計(jì)算
PURGE:清除窗口中的數(shù)據(jù)
FIRE_AND_PURGE:觸發(fā)計(jì)算并清除窗口中的數(shù)據(jù)
三、自定義注冊(cè)定時(shí)觸發(fā)器
我們?cè)谛枰趏nElement中注冊(cè)一個(gè)定時(shí)觸發(fā)的任務(wù)
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
根據(jù)步長(zhǎng)來(lái)注冊(cè)下次執(zhí)行的時(shí)間
然后
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
} else if(window.maxTimestamp() == time) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
在onProcessingTime的時(shí)候如果步長(zhǎng)和當(dāng)前的執(zhí)行時(shí)間一致,則觸發(fā)計(jì)算
并再注冊(cè)下一次的觸發(fā)時(shí)間,直到窗口結(jié)束。