Flink Timer(定時(shí)器)機(jī)制與其具體實(shí)現(xiàn)

Timer簡介

Timer(定時(shí)器)是Flink Streaming API提供的用于感知并利用處理時(shí)間/事件時(shí)間變化的機(jī)制。Ververica blog上給出的描述如下:

Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.

對于普通用戶來說,最常見的顯式利用Timer的地方就是KeyedProcessFunction。我們在其processElement()方法中注冊Timer,然后覆寫其onTimer()方法作為Timer觸發(fā)時(shí)的回調(diào)邏輯。根據(jù)時(shí)間特征的不同:

  • 處理時(shí)間——調(diào)用Context.timerService().registerProcessingTimeTimer()注冊;onTimer()在系統(tǒng)時(shí)間戳達(dá)到Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)。
  • 事件時(shí)間——調(diào)用Context.timerService().registerEventTimeTimer()注冊;onTimer()在Flink內(nèi)部水印達(dá)到或超過Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)。

舉個(gè)栗子,按天實(shí)時(shí)統(tǒng)計(jì)指標(biāo)并存儲(chǔ)在狀態(tài)中,每天0點(diǎn)清除狀態(tài)重新統(tǒng)計(jì),就可以在processElement()方法里注冊Timer。

ctx.timerService().registerProcessingTimeTimer(
  tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1
);

public static long tomorrowZeroTimestampMs(long now, int timeZone) {
  return now - (now + timeZone * 3600000) % 86400000 + 86400000;
}

再在onTimer()方法里執(zhí)行state.clear()。so easy。

除了KeyedProcessFunction之外,Timer在窗口機(jī)制中也有重要的地位。提起窗口自然就能想到Trigger,即觸發(fā)器。來看下Flink自帶的EventTimeTrigger的部分代碼,它是事件時(shí)間特征下的默認(rèn)觸發(fā)器。

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

可見,當(dāng)水印還沒有到達(dá)窗口右邊沿時(shí),就注冊以窗口右邊沿為時(shí)間戳的Timer。Timer到期后觸發(fā)onEventTime()方法,進(jìn)而觸發(fā)該窗口相關(guān)聯(lián)的Trigger。

文章開頭引用的blog從用戶的角度給出了Flink Timer的4大特點(diǎn),如下圖所示。

經(jīng)由上面的介紹,我們有了兩個(gè)入手點(diǎn)(KeyedProcessFunction、Trigger)來分析Timer的細(xì)節(jié)。接下來從前者入手,let's get our hands dirty。

TimerService、InternalTimerService

負(fù)責(zé)實(shí)際執(zhí)行KeyedProcessFunction的算子是KeyedProcessOperator,其中以內(nèi)部類的形式實(shí)現(xiàn)了KeyedProcessFunction需要的上下文類Context,如下所示。

    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
        private final TimerService timerService;
        private StreamRecord<IN> element;

        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
            function.super();
            this.timerService = checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            checkState(element != null);
            if (element.hasTimestamp()) {
                return element.getTimestamp();
            } else {
                return null;
            }
        }

        @Override
        public TimerService timerService() {
            return timerService;
        }

        // 以下略...
    }

可見timerService()方法返回的是外部傳入的TimerService實(shí)例,那么我們就回到KeyedProcessOperator看一下它的實(shí)現(xiàn),順便放個(gè)類圖。

public class KeyedProcessOperator<K, IN, OUT>
    extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    private static final long serialVersionUID = 1L;
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private transient OnTimerContextImpl onTimerContext;

    public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
        super(function);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
        InternalTimerService<VoidNamespace> internalTimerService =
            getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        context = new ContextImpl(userFunction, timerService);
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

    @Override
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

    // 以下略...
}

通過閱讀上述代碼,可以總結(jié)出:

  • TimerService接口的實(shí)現(xiàn)類為SimpleTimerService,它實(shí)際上又是InternalTimerService的非常簡單的代理(真的很簡單,代碼略去)。
  • InternalTimerService的實(shí)例由getInternalTimerService()方法取得,該方法定義在所有算子的基類AbstractStreamOperator中。它比較重要,后面再提。
  • KeyedProcessOperator.processElement()方法調(diào)用用戶自定義函數(shù)的processElement()方法,順便將上下文實(shí)例ContextImpl傳了進(jìn)去,所以用戶可以由它獲得TimerService來注冊Timer。
  • Timer在代碼中叫做InternalTimer(是個(gè)接口)。
  • 當(dāng)Timer觸發(fā)時(shí),實(shí)際上是根據(jù)時(shí)間特征調(diào)用onProcessingTime()/onEventTime()方法(這兩個(gè)方法來自Triggerable接口),進(jìn)而觸發(fā)用戶函數(shù)的onTimer()回調(diào)邏輯。后面還會(huì)見到它們。

接下來就看看InternalTimerService是如何取得的。

    /**
     * Returns a {@link InternalTimerService} that can be used to query current processing time
     * and event time and to set timers. An operator can have several timer services, where
     * each has its own namespace serializer. Timer services are differentiated by the string
     * key that is given when requesting them, if you call this method with the same key
     * multiple times you will get the same timer service instance in subsequent requests.
     *
     * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
     * When a timer fires, this key will also be set as the currently active key.
     *
     * <p>Each timer has attached metadata, the namespace. Different timer services
     * can have a different namespace type. If you don't need namespace differentiation you
     * can use {@link VoidNamespaceSerializer} as the namespace serializer.
     *
     * @param name                The name of the requested timer service. If no service exists under the given
     *                            name a new one will be created and returned.
     * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
     * @param triggerable         The {@link Triggerable} that should be invoked when timers fire
     * @param <N>                 The type of the timer namespace.
     */
    public <K, N> InternalTimerService<N> getInternalTimerService(
        String name,
        TypeSerializer<N> namespaceSerializer,
        Triggerable<K, N> triggerable) {
        checkTimerServiceInitialization();

        KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
        TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
        InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
        return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
    }

該方法的注釋描述非常清楚,所以一起粘貼過來。簡單來講:

  • 每個(gè)算子可以有一個(gè)或多個(gè)InternalTimerService。
  • InternalTimerService的四要素是:名稱、命名空間類型N(及其序列化器)、鍵類型K(及其序列化器),還有上文所述Triggerable接口的實(shí)現(xiàn)。
  • InternalTimerService經(jīng)由InternalTimeServiceManager.getInternalTimerService()方法取得。

例如,上文KeyedProcessOperator初始化的InternalTimerService,名稱為"user-timers",命名空間類型為空(VoidNamespace),Triggerable實(shí)現(xiàn)類則是其本身。如果是WindowOperator的話,其InternalTimerService的名稱就是"window-timers",命名空間類型則是Window。

InternalTimerService在代碼中仍然是一個(gè)接口,其代碼如下。方法的簽名除了多了命名空間之外(命名空間對用戶透明),其他都與TimerService提供的相同。

public interface InternalTimerService<N> {
    long currentProcessingTime();
    long currentWatermark();

    void registerProcessingTimeTimer(N namespace, long time);
    void deleteProcessingTimeTimer(N namespace, long time);

    void registerEventTimeTimer(N namespace, long time);
    void deleteEventTimeTimer(N namespace, long time);

    // ...
}

下面更進(jìn)一步,看看InternalTimeServiceManager是如何實(shí)現(xiàn)的。

InternalTimeServiceManager、TimerHeapInternalTimer

顧名思義,InternalTimeServiceManager用于管理各個(gè)InternalTimeService。部分代碼如下:

public class InternalTimeServiceManager<K> {
    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";
    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;
    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;
    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
    private final boolean useLegacySynchronousSnapshots;

    @SuppressWarnings("unchecked")
    public <N> InternalTimerService<N> getInternalTimerService(
        String name,
        TimerSerializer<K, N> timerSerializer,
        Triggerable<K, N> triggerable) {
        InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
        timerService.startTimerService(
            timerSerializer.getKeySerializer(),
            timerSerializer.getNamespaceSerializer(),
            triggerable);
        return timerService;
    }

    @SuppressWarnings("unchecked")
    <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
        if (timerService == null) {
            timerService = new InternalTimerServiceImpl<>(
                localKeyGroupRange,
                keyContext,
                processingTimeService,
                createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
                createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
            timerServices.put(name, timerService);
        }
        return timerService;
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
        String name,
        TimerSerializer<K, N> timerSerializer) {
        return priorityQueueSetFactory.create(
            name,
            timerSerializer);
    }

    // 以下略...
}

從上面的代碼可以得知:

  • Flink中InternalTimerService的最終實(shí)現(xiàn)實(shí)際上是InternalTimerServiceImpl類,而InternalTimer的最終實(shí)現(xiàn)是TimerHeapInternalTimer類。
  • InternalTimeServiceManager會(huì)用HashMap維護(hù)一個(gè)特定鍵類型K下所有InternalTimerService的名稱與實(shí)例映射。如果名稱已經(jīng)存在,就會(huì)直接返回,不會(huì)重新創(chuàng)建。
  • 初始化InternalTimerServiceImpl時(shí),會(huì)同時(shí)創(chuàng)建兩個(gè)包含TimerHeapInternalTimer的優(yōu)先隊(duì)列(該優(yōu)先隊(duì)列是Flink自己實(shí)現(xiàn)的),分別用于維護(hù)事件時(shí)間和處理時(shí)間的Timer。

說了這么多,最需要注意的是,Timer是維護(hù)在JVM堆內(nèi)存中的,如果頻繁注冊大量Timer,或者同時(shí)觸發(fā)大量Timer,也是一筆不小的開銷。

TimerHeapInternalTimer的實(shí)現(xiàn)比較簡單,主要就是4個(gè)字段和1個(gè)方法。為了少打點(diǎn)字,把注釋也弄過來。

    /**
     * The key for which the timer is scoped.
     */
    @Nonnull
    private final K key;
    /**
     * The namespace for which the timer is scoped.
     */
    @Nonnull
    private final N namespace;
    /**
     * The expiration timestamp.
     */
    private final long timestamp;
    /**
     * This field holds the current physical index of this timer when it is managed by a timer heap so that we can
     * support fast deletes.
     */
    private transient int timerHeapIndex;

    @Override
    public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
        return Long.compare(timestamp, other.getTimestamp());
    }
}

可見,Timer的scope有兩個(gè),一是數(shù)據(jù)的key,二是命名空間。但是用戶不會(huì)感知到命名空間的存在,所以我們可以簡單地認(rèn)為Timer是以key級別注冊的(Timer四大特點(diǎn)之1)。正確估計(jì)key的量可以幫助我們控制Timer的量。

timerHeapIndex是這個(gè)Timer在優(yōu)先隊(duì)列里存儲(chǔ)的下標(biāo)。優(yōu)先隊(duì)列通常用二叉堆實(shí)現(xiàn),而二叉堆可以直接用數(shù)組存儲(chǔ)(科普文見這里),所以讓Timer持有其對應(yīng)的下標(biāo)可以較快地從隊(duì)列里刪除它。

comparePriorityTo()方法則用于確定Timer的優(yōu)先級,顯然Timer的優(yōu)先隊(duì)列是一個(gè)按Timer時(shí)間戳為關(guān)鍵字排序的最小堆。下面粗略看看該最小堆的實(shí)現(xiàn)。

HeapPriorityQueueSet

上面代碼中PriorityQueueSetFactory.create()方法創(chuàng)建的優(yōu)先隊(duì)列實(shí)際上的類型是HeapPriorityQueueSet。它的基本思路與Java自帶的PriorityQueue相同,但是在其基礎(chǔ)上加入了按key去重的邏輯(Timer四大特點(diǎn)之2)。不妨列出它的部分代碼。

    private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
    private final KeyGroupRange keyGroupRange;

    @Override
    @Nullable
    public T poll() {
        final T toRemove = super.poll();
        return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
    }

    @Override
    public boolean add(@Nonnull T element) {
        return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
    }

    @Override
    public boolean remove(@Nonnull T toRemove) {
        T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
        return storedElement != null && super.remove(storedElement);
    }

    private HashMap<T, T> getDedupMapForKeyGroup(
        @Nonnegative int keyGroupId) {
        return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
    }

    private HashMap<T, T> getDedupMapForElement(T element) {
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
            keyExtractor.extractKeyFromElement(element),
            totalNumberOfKeyGroups);
        return getDedupMapForKeyGroup(keyGroup);
    }

    private int globalKeyGroupToLocalIndex(int keyGroup) {
        checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup);
        return keyGroup - keyGroupRange.getStartKeyGroup();
    }

要搞懂它,必須解釋一下KeyGroup和KeyGroupRange。KeyGroup是Flink內(nèi)部KeyedState的原子單位,亦即一些key的組合。一個(gè)Flink App的KeyGroup數(shù)量與最大并行度相同,將key分配到KeyGroup的操作則是經(jīng)典的取hashCode+取模。而KeyGroupRange則是一些連續(xù)KeyGroup的范圍,每個(gè)Flink sub-task都只包含一個(gè)KeyGroupRange。也就是說,KeyGroupRange可以看做當(dāng)前sub-task在本地維護(hù)的所有key。

解釋完畢。容易得知,上述代碼中的那個(gè)HashMap<T, T>[]數(shù)組就是在KeyGroup級別對key進(jìn)行去重的容器,數(shù)組中每個(gè)元素對應(yīng)一個(gè)KeyGroup。以插入一個(gè)Timer的流程為例:

  • 從Timer中取出key,計(jì)算該key屬于哪一個(gè)KeyGroup;
  • 計(jì)算出該KeyGroup在整個(gè)KeyGroupRange中的偏移量,按該偏移量定位到HashMap<T, T>[]數(shù)組的下標(biāo);
  • 根據(jù)putIfAbsent()方法的語義,只有當(dāng)對應(yīng)HashMap不存在該Timer的key時(shí),才將Timer插入最小堆中。

接下來回到主流程,InternalTimerServiceImpl。

InternalTimerServiceImpl

在這里,我們終于可以看到注冊和移除Timer方法的最底層實(shí)現(xiàn)了。注意ProcessingTimeService是Flink內(nèi)部產(chǎn)生處理時(shí)間的時(shí)間戳的服務(wù)。

    private final ProcessingTimeService processingTimeService;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
    private ScheduledFuture<?> nextTimer;

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

由此可見,注冊Timer實(shí)際上就是為它們賦予對應(yīng)的時(shí)間戳、key和命名空間,并將它們加入對應(yīng)的優(yōu)先隊(duì)列。特別地,當(dāng)注冊基于處理時(shí)間的Timer時(shí),會(huì)先檢查要注冊的Timer時(shí)間戳與當(dāng)前在最小堆堆頂?shù)腡imer的時(shí)間戳的大小關(guān)系。如果前者比后者要早,就會(huì)用前者替代掉后者,因?yàn)樘幚頃r(shí)間是永遠(yuǎn)線性增長的。

Timer注冊好了之后是如何觸發(fā)的呢?先來看處理時(shí)間的情況。

InternalTimerServiceImpl類繼承了ProcessingTimeCallback接口,表示它可以觸發(fā)處理時(shí)間的回調(diào)。該接口只要求實(shí)現(xiàn)一個(gè)方法,如下。

    @Override
    private Triggerable<K, N> triggerTarget;

    public void onProcessingTime(long time) throws Exception {
        nextTimer = null;
        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

可見,當(dāng)onProcessingTime()方法被觸發(fā)回調(diào)時(shí),就會(huì)按順序從隊(duì)列中獲取到比時(shí)間戳time小的所有Timer,并挨個(gè)執(zhí)行Triggerable.onProcessingTime()方法,也就是在上文KeyedProcessOperator的同名方法,用戶自定義的onTimer()邏輯也就被執(zhí)行了。

最后來到ProcessingTimeService的實(shí)現(xiàn)類SystemProcessingTimeService,它是用調(diào)度線程池實(shí)現(xiàn)回調(diào)的。相關(guān)的代碼如下。

    private final ScheduledThreadPoolExecutor timerService;

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        try {
            return timerService.schedule(
                new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            } else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            } else {
                throw e;
            }
        }
    }

    // 注意:這個(gè)是TriggerTask線程的run()方法
    @Override
    public void run() {
        synchronized (lock) {
            try {
                if (serviceStatus.get() == STATUS_ALIVE) {
                    target.onProcessingTime(timestamp);
                }
            } catch (Throwable t) {
                TimerException asyncException = new TimerException(t);
                exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
            }
        }
    }

可見,onProcessingTime()在TriggerTask線程中被回調(diào),而TriggerTask線程按照Timer的時(shí)間戳來調(diào)度。到這里,處理時(shí)間Timer的情況就講述完畢了。

再來看事件時(shí)間的情況。事件時(shí)間與內(nèi)部時(shí)間戳無關(guān),而與水印有關(guān)。以下是InternalTimerServiceImpl.advanceWatermark()方法的代碼。

    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

該邏輯與處理時(shí)間相似,只不過從回調(diào)onProcessingTime()變成了回調(diào)onEventTime()而已。然后追蹤它的調(diào)用鏈,回到InternalTimeServiceManager的同名方法。

    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }

繼續(xù)向上追溯,到達(dá)終點(diǎn):算子基類AbstractStreamOperator中處理水印的方法processWatermark()。當(dāng)水印到來時(shí),就會(huì)按著上述調(diào)用鏈流轉(zhuǎn)到InternalTimerServiceImpl中,并觸發(fā)所有早于水印時(shí)間戳的Timer了。

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

至此,我們算是基本打通了Flink Timer機(jī)制的實(shí)現(xiàn)細(xì)節(jié),well done。

The End

寫完才發(fā)現(xiàn)沒有提Timer的checkpoint邏輯(四大特點(diǎn)之3)。但是本文已經(jīng)相當(dāng)長了,剩下的內(nèi)容等以后有機(jī)會(huì)寫checkpoint的時(shí)候再說吧。

民那晚安~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容