Flink源碼閱讀之Checkpoint周期觸發(fā)過(guò)程

Flink的checkpoint原理就不說(shuō)了,官網(wǎng)以及博客都有說(shuō)明,有興趣的同學(xué)可以自行查閱。
本文主要從源碼層面分析一下checkpoint是如何周期性觸發(fā)的。

分析

首先通過(guò)如下配置啟用CheckPoint

env.enableCheckpointing(1000);

不設(shè)置,則默認(rèn)CheckPoint間隔為-1,即不啟用CheckPoint

/** Periodic checkpoint triggering interval. */
private long checkpointInterval = -1; // disabled

如不設(shè)置則在構(gòu)建jobGraph時(shí)checkpointInterval 會(huì)被賦值為L(zhǎng)ong.MAX_VALUE
StreamingJobGraphGenerator#configureCheckpointing

long interval = cfg.getCheckpointInterval();
if (interval < MINIMAL_CHECKPOINT_TIME) {
    // interval of max value means disable periodic checkpoint
    interval = Long.MAX_VALUE;
}

同時(shí)會(huì)初始化三個(gè)列表:

// collect the vertices that receive "trigger checkpoint" messages.
        // currently, these are all the sources
        List<JobVertexID> triggerVertices = new ArrayList<>();

        // collect the vertices that need to acknowledge the checkpoint
        // currently, these are all vertices
        List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());

        // collect the vertices that receive "commit checkpoint" messages
        // currently, these are all vertices
        List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

其中, triggerVertices 只包含那些作為 source 的節(jié)點(diǎn),ackVertices 和 commitVertices 均包含所有的節(jié)點(diǎn)。

checkpoint的進(jìn)行是由CheckpointCoordinator發(fā)起的,在 ExecutionGraphBuilder#buildGraph 中,如果作業(yè)開(kāi)啟了 checkpoint,則會(huì)調(diào)用 ExecutionGraph.enableCheckpointing() 方法, 這里會(huì)創(chuàng)建 CheckpointCoordinator 對(duì)象,并注冊(cè)一個(gè)作業(yè)狀態(tài)的監(jiān)聽(tīng) CheckpointCoordinatorDeActivator, CheckpointCoordinatorDeActivator 會(huì)在作業(yè)狀態(tài)發(fā)生改變時(shí)得到通知。

ExecuteGraph#enableCheckpointing
checkpointCoordinator = new CheckpointCoordinator(...);

// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max value
if (interval != Long.MAX_VALUE) {
   // the periodic checkpoint scheduler is activated and deactivated as a result of
   // job status changes (running -> on, all other states -> off)
   registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}

當(dāng)作業(yè)狀態(tài)發(fā)送變更時(shí),CheckpointCoordinatorDeActivator 會(huì)得到通知并執(zhí)行notifyJobStatusChange

//ExecuteGraph.java
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
   if (jobStatusListeners.size() > 0) {
      final long timestamp = System.currentTimeMillis();
      final Throwable serializedError = error == null ? null : new SerializedThrowable(error);

      for (JobStatusListener listener : jobStatusListeners) {
         try {
            listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
         } catch (Throwable t) {
            LOG.warn("Error while notifying JobStatusListener", t);
         }
      }
   }
}

//CheckpointCoordinatorDeActivator.java
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
   if (newJobStatus == JobStatus.RUNNING) {
      // start the checkpoint scheduler
      coordinator.startCheckpointScheduler();
   } else {
      // anything else should stop the trigger for now
      coordinator.stopCheckpointScheduler();
   }
}

開(kāi)始觸發(fā)checkpoint調(diào)度

    // --------------------------------------------------------------------------------------------
    //  Periodic scheduling of checkpoints
    // --------------------------------------------------------------------------------------------
public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            periodicScheduling = true;
            currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
        }
    }
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
        return timer.scheduleAtFixedRate(
            new ScheduledTrigger(),
            initDelay, baseInterval, TimeUnit.MILLISECONDS);
    }

new ScheduledTrigger()這是調(diào)度線程,這里也是用的ScheduledThreadPoolExecutor線程池來(lái)調(diào)度線程執(zhí)行,和周期性生成水印調(diào)度一樣。run方法如下

private final class ScheduledTrigger implements Runnable {

        @Override
        public void run() {
            try {
                triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint for job {}.", job, e);
            }
        }
    }

定時(shí)觸發(fā)checkpoint,具體執(zhí)行checkpoint過(guò)程在

public CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic)

具體觸發(fā)checkpoint執(zhí)行的過(guò)程,后面文章再作分析。

總結(jié)

具體的過(guò)程包括以下幾點(diǎn):

  1. 通過(guò)env配置checkpoint的間隔,即開(kāi)啟checkpoint。
  2. 在構(gòu)建jobgraph時(shí)進(jìn)行checkpoint相關(guān)配置。
  3. 構(gòu)建executiongraph時(shí)初始化CheckpointCoordinator 對(duì)象并注冊(cè)CheckpointCoordinatorDeActivator監(jiān)聽(tīng)。
  4. 作業(yè)狀態(tài)發(fā)生變化時(shí),開(kāi)啟checkpoint調(diào)度。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容