Flink Checkpoint機(jī)制解析-代碼走讀

Flink的Checkpoint機(jī)制是Flink容錯能力的基本保證,能夠?qū)α魈幚磉\(yùn)行時的狀態(tài)進(jìn)行保存,當(dāng)故障發(fā)生時,能夠備份的狀態(tài)中還原。例如,當(dāng)Flink讀取kafka時,將消費(fèi)的kafka offset保存下來,如果任務(wù)失敗,可以從上次消費(fèi)的offset之后重新消費(fèi)。

Flink的checkpoint從以下幾方面著手理解。

Barrier

Barrier是一個輕量級的數(shù)據(jù)被按一定的規(guī)則(調(diào)度)插入到原始數(shù)據(jù)流中,這個數(shù)據(jù)不會影響原有數(shù)據(jù)處理的性能,不會改變原始數(shù)據(jù)的順序。Barrier將數(shù)據(jù)分割成一段一段的,有點(diǎn)類似于Spark Streaming的micro batch中的批次數(shù)據(jù)。


Barrier.png

Barrier隨著數(shù)據(jù)一起在各Task中流動,當(dāng)Operator收到一個barrier時,會認(rèn)為此barrier之前的所有數(shù)據(jù)應(yīng)該已經(jīng)得到了處理,這時候就會觸發(fā)checkpoint。
如果一個Operator有多個并發(fā)的輸入流,那么當(dāng)它收到一個checkpoint的barrier時,需要等待其他所有的該checkpoint對應(yīng)的額barrier到達(dá),再進(jìn)行處理,這個barrier對齊的步驟如下。

  • 只要有operator從上游接收到一條barrier n,此時,該operator就不能處理這條流barrier以后的數(shù)據(jù),直到該operator收到其他所有上游的barrier n。

  • 此時上報(bào)barrier n的流暫時不做任何處理。從這些流里讀到的數(shù)據(jù)也不被處理,而是被放置到input buffer中緩存。

  • 直到最后一個上游的barrier n到達(dá),operator會發(fā)送barrier n給下游。

  • 之后,operator恢復(fù)從所有的上游中處理數(shù)據(jù),在上游流數(shù)據(jù)處理之前先將input buffer中的數(shù)據(jù)處理。
    整個過程如下圖所示。


    barrier.png

Barrier的產(chǎn)生

Flink的checkpoint是由JobMaster發(fā)起的,以一定的周期觸發(fā)Source Task產(chǎn)生barrier。

JobMastert周期性觸發(fā)checkpoint。

在CheckpointCoordinator類中startCheckpointScheduler() 方法

    public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();
            //以特定的周期 baseInterval 觸發(fā)checkpoint
            periodicScheduling = true;
            long initialDelay = ThreadLocalRandom.current().nextLong(
                minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                    new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
        }
    }

順著ScheduledTrigger一直進(jìn)入到j(luò)obMaster觸發(fā)checkpoint的方法triggerCheckpoint. 在經(jīng)過一系列的設(shè)置之后,該方法會生成一個唯一的checkpoint ID,并創(chuàng)建pending的checkpoint,調(diào)用rpc方法execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 觸發(fā)Task側(cè)生成barrier,進(jìn)行checkpoint.

checkpointID = checkpointIdCounter.getAndIncrement();
final PendingCheckpoint checkpoint = new PendingCheckpoint(
                job,
                checkpointID,
                timestamp,
                ackTasks,
                props,
                checkpointStorageLocation,
                executor);
// send the messages to the tasks that trigger their checkpoint
        for (Execution execution: executions) {
                    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                }

    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
//調(diào)用TaskManager RPC taskManagerGateway.triggerCheckpoint
            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }
Source收到指令后生成barrier

TaskManager 接收到觸發(fā)checkpoint的RPC后,在Source Task中觸發(fā)生成checkpoint barrier, 在triggerCheckpointBarrier中會創(chuàng)建另一個線程專門做生成barrier的事情。

public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {

    Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // set safety net from the task's context for checkpointing thread
                    LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

                    try {
                        boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                        if (!success) {
                            checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                        }
                    }

在Source Task中真正執(zhí)行的方法是StreamTask中的performCheckpoint,在此方法中,會進(jìn)行兩件事:首先生成攜帶checkpoint ID的barrier,并將此barrier發(fā)送到所有的下游。然后處理本Task的狀態(tài)保存。這樣,在整個流處理中就有了barrier傳遞。

    private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        synchronized (lock) {
            if (isRunning) {
                // we can do a checkpoint

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }

Barrier傳遞

上面講到對于Source Task,會根據(jù)JobMaster的指令周期性的在原始數(shù)據(jù)中插入barrier,并將barrier傳遞到下游Operator。
對于非Source Task,在處理數(shù)據(jù)中,并不是周期性觸發(fā)checkpoint,而是當(dāng)遇到Barrier數(shù)據(jù)時,觸發(fā)一次checkpoint。
具體到代碼中,是由BarrierBuffer中的getNextNonBlocked觸發(fā)。

    public BufferOrEvent getNextNonBlocked() throws Exception {
        while (true) {
            // process buffered BufferOrEvents before grabbing new ones
            Optional<BufferOrEvent> next;
            if (currentBuffered == null) {
                next = inputGate.getNextBufferOrEvent();
            }
            else {
                next = Optional.ofNullable(currentBuffered.getNext());
                if (!next.isPresent()) {
                    completeBufferedSequence();
                    return getNextNonBlocked();
                }
            }

            if (!next.isPresent()) {
                if (!endOfStream) {
                    // end of input stream. stream continues with the buffered data
                    endOfStream = true;
                    releaseBlocksAndResetBarriers();
                    return getNextNonBlocked();
                }
                else {
                    // final end of both input and buffered data
                    return null;
                }
            }

            BufferOrEvent bufferOrEvent = next.get();
            if (isBlocked(bufferOrEvent.getChannelIndex())) {
                // if the channel is blocked we, we just store the BufferOrEvent
                bufferBlocker.add(bufferOrEvent);
                checkSizeLimit();
            }
            else if (bufferOrEvent.isBuffer()) {
                return bufferOrEvent;
            }
            else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                if (!endOfStream) {
                    // process barriers only if there is a chance of the checkpoint completing
                    processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                }
            }
            else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
            }
            else {
                if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                    processEndOfPartition();
                }
                return bufferOrEvent;
            }
        }
    }

如果當(dāng)前流尚未結(jié)束,則在方法processBarrier中處理該barrier,processBarrier會根據(jù)該Task是否有多個輸入源判斷是否需要對齊barrier。如果可以進(jìn)行barrier了,則會調(diào)用notifyCheckpoint觸發(fā)checkpoint,該方法會走到triggerCheckpointOnBarrier,后續(xù)過程和Source Task一致。

Operator checkpoint

上述講到當(dāng)Task執(zhí)行checkpoint時,首先會生成該checkpoint的barrier廣播出去,然后再執(zhí)行該Task的checkpoint. 通過executeCheckpointing方法調(diào)用operator的snapshotState進(jìn)行狀態(tài)保存。不同的operator根據(jù)自己的需要實(shí)現(xiàn)snapshotState方法。
例如Flink提供的kafka consumer operator, KafkaConsumerBase的snapshotState就保存了當(dāng)前Topic各partition消費(fèi)到的offset.

    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

pending checkpoint到complete

當(dāng)一個operator完成了checkpoint時,會向job master報(bào)告已經(jīng)完成了,job master收到該operator報(bào)告的完成信息,會將此operator從未完成checkpoint移到已完成,當(dāng)所有的operator都上報(bào)了完成信息時,job master會將此checkpoint從pending狀態(tài)改變未complete狀態(tài)。

Task側(cè):

executeCheckpointing -> asyncCheckpointRunnable -> reportCompletedSnapshotStates -> taskStateManager.reportTaskStateSnapshots -> TaskStateManagerImpl 最終會調(diào)用

    public void reportTaskStateSnapshots(
        @Nonnull CheckpointMetaData checkpointMetaData,
        @Nonnull CheckpointMetrics checkpointMetrics,
        @Nullable TaskStateSnapshot acknowledgedState,
        @Nullable TaskStateSnapshot localState) {

        long checkpointId = checkpointMetaData.getCheckpointId();

        localStateStore.storeLocalState(checkpointId, localState);

        checkpointResponder.acknowledgeCheckpoint(
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
    }

通過actor通知job master。

Job master側(cè):

acknowledgeCheckpoint -> checkpointCoordinator.receiveAcknowledgeMessage(ackMessage) -> checkpoint.acknowledgeTask -> completePendingCheckpoint(checkpoint);

    private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        final long checkpointId = pendingCheckpoint.getCheckpointId();
        final CompletedCheckpoint completedCheckpoint;

        // As a first step to complete the checkpoint, we register its state with the registry
        Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
        sharedStateRegistry.registerAll(operatorStates.values());

        try {
            try {
                completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
            }
            catch (Exception e1) {
                // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
                if (!pendingCheckpoint.isDiscarded()) {
                    pendingCheckpoint.abortError(e1);
                }

                throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);
            }

            // the pending checkpoint must be discarded after the finalization
            Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

            try {
                completedCheckpointStore.addCheckpoint(completedCheckpoint);
            } catch (Exception exception) {
                // we failed to store the completed checkpoint. Let's clean up
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            completedCheckpoint.discardOnFailedStoring();
                        } catch (Throwable t) {
                            LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
                        }
                    }
                });

                throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
            }
        } finally {
            pendingCheckpoints.remove(checkpointId);

            triggerQueuedRequests();
        }

        rememberRecentCheckpointId(checkpointId);

        // drop those pending checkpoints that are at prior to the completed one
        dropSubsumedCheckpoints(checkpointId);

        // record the time when this was completed, to calculate
        // the 'min delay between checkpoints'
        lastCheckpointCompletionNanos = System.nanoTime();

        LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
            completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());

        if (LOG.isDebugEnabled()) {
            StringBuilder builder = new StringBuilder();
            builder.append("Checkpoint state: ");
            for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
                builder.append(state);
                builder.append(", ");
            }
            // Remove last two chars ", "
            builder.setLength(builder.length() - 2);

            LOG.debug(builder.toString());
        }

        // send the "notify complete" call to all vertices
        final long timestamp = completedCheckpoint.getTimestamp();

        for (ExecutionVertex ev : tasksToCommitTo) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee != null) {
                ee.notifyCheckpointComplete(checkpointId, timestamp);
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容