Flink-1.10 源碼筆記 checkpint -- 3

Flink 源碼筆記 --- checkpoint

接上篇內(nèi)容

Flink-1.10 源碼筆記 checkpint - 2

Snapshot確認(rèn)消息發(fā)送

在AsyncCheckpointRunnable.run()中調(diào)用了reportCompletedSnapshotStates方法負(fù)責(zé)報(bào)告 snapshot,在這里taskEnvironment調(diào)用了getTaskStateManager 獲取TaskStateManager(任務(wù)狀態(tài)管理器),TaskStateManager是TaskStateManagerImpl類型,后調(diào)用reportTaskStateSnapshots進(jìn)行上報(bào)狀態(tài)

private void reportCompletedSnapshotStates(
        TaskStateSnapshot acknowledgedTaskStateSnapshot,
        TaskStateSnapshot localTaskStateSnapshot,
        long asyncDurationMillis) {  //快照持續(xù)時(shí)間

        boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
        boolean hasLocalState = localTaskStateSnapshot.hasState();

        Preconditions.checkState(hasAckState || !hasLocalState,
            "Found cached state but no corresponding primary state is reported to the job " +
                "manager. This indicates a problem.");

        // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
        // to stateless tasks on restore. This enables simple job modifications that only concern
        // stateless without the need to assign them uids to match their (always empty) states.
        //todo 上報(bào)任務(wù)  快照狀態(tài)
        // 當(dāng)任務(wù)上觸發(fā)檢查點(diǎn)或保存點(diǎn)時(shí),它將為其擁有的所有流操作符實(shí)例創(chuàng)建快照。然后,通過(guò)此接口報(bào)告來(lái)自任務(wù)的所有操作符快照。
        // 典型的實(shí)現(xiàn)將把報(bào)告的狀態(tài)信息分派并轉(zhuǎn)發(fā)給相關(guān)方,比如檢查點(diǎn)協(xié)調(diào)器或本地狀態(tài)存儲(chǔ)。
        taskEnvironment.getTaskStateManager().reportTaskStateSnapshots( //改方法調(diào)用的 TaskStateManagerImpl實(shí)現(xiàn)了的方法
            checkpointMetaData,
            checkpointMetrics,
            hasAckState ? acknowledgedTaskStateSnapshot : null,
            hasLocalState ? localTaskStateSnapshot : null);

        LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
            taskName, checkpointMetaData.getCheckpointId(), asyncDurationMillis);

        LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
            taskName, checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
    }

進(jìn)入TaskStateManagerImpl類,該類實(shí)現(xiàn)了TaskStateManager接口

public class TaskStateManagerImpl implements TaskStateManager

現(xiàn)在查看reportTaskStateSnapshots方法,在該方法中,會(huì)先將快照狀態(tài)保存在本地,然后調(diào)用checkpointResponder的acknowledgeCheckpoint方法,發(fā)送消息

checkpointResponder是RpcCheckpointResponder類型

    /**
     * @param checkpointMetaData 檢查點(diǎn)請(qǐng)求的元數(shù)據(jù)。
     * @param checkpointMetrics  檢查點(diǎn)的任務(wù)級(jí)別度量。   --指標(biāo)監(jiān)控
     * @param acknowledgedState  報(bào)告的狀態(tài)表示向作業(yè)管理器應(yīng)答。
     * @param localState         報(bào)告的狀態(tài)為本地恢復(fù)。
     */ 
    @Override
    public void reportTaskStateSnapshots(
        @Nonnull CheckpointMetaData checkpointMetaData,
        @Nonnull CheckpointMetrics checkpointMetrics,
        @Nullable TaskStateSnapshot acknowledgedState,
        @Nullable TaskStateSnapshot localState) {

        long checkpointId = checkpointMetaData.getCheckpointId();

        //保存本地快照狀態(tài)
        localStateStore.storeLocalState(checkpointId, localState);

        //發(fā)送快照成功消息
        checkpointResponder.acknowledgeCheckpoint( //調(diào)用RpcCheckpointResponder類中的方法
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
    }

進(jìn)入到acknowledgeCheckpoint方法中主要調(diào)用了checkpointCoordinatorGateway的acknowledgeCheckpoint方法,該方法調(diào)用的是JobMaster的acknowledgeCheckpoint方法

    @Override
    public void acknowledgeCheckpoint(
            JobID jobID,
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            CheckpointMetrics checkpointMetrics,
            TaskStateSnapshot subtaskState) {
        //調(diào)用了同名方法
        // JobMaster  --> JobMasterGateway  --> JobMasterOperatorEventGateway   以此實(shí)現(xiàn)   JobMasterOperatorEventGateway為父接口
        // jobMaster 是 任務(wù)管理器負(fù)責(zé)執(zhí)行單個(gè)任務(wù)的jobGraph
        // RpcCheckpointResponder 本對(duì)象 在jobManager選舉成功的時(shí)候,建立和jobManager的聯(lián)系時(shí)候創(chuàng)建的
        checkpointCoordinatorGateway.acknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            subtaskState);
    }

JobMaster繼承了FencedRpcEndpoint,實(shí)現(xiàn)了JobMasterGateway和JobMasterService接口,

JobMasterGateway接口實(shí)現(xiàn)了JobMasterOperatorEventGateway接口

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService

進(jìn)入checkpointCoordinatorGateway.acknowledgeCheckpoint方法

    /**
     * @param jobID                     正在運(yùn)行的作業(yè)的作業(yè)ID
     * @param executionAttemptID        正在運(yùn)行的任務(wù)的執(zhí)行嘗試ID
     * @param checkpointId              此檢查點(diǎn)的元數(shù)據(jù)
     * @param checkpointMetrics         這個(gè)檢查點(diǎn)的度量
     * @param checkpointState           檢查點(diǎn)的狀態(tài)句柄
     */
    @Override
    public void acknowledgeCheckpoint(
            final JobID jobID,
            final ExecutionAttemptID executionAttemptID,
            final long checkpointId,
            final CheckpointMetrics checkpointMetrics,
            final TaskStateSnapshot checkpointState) {
        //todo schedulerNG 該對(duì)象, 負(fù)責(zé)調(diào)度Flink作業(yè)
        //調(diào)用schedulerBase實(shí)現(xiàn)類的方法
        schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
    }

進(jìn)入schedulerNG的acknowledgeCheckpoint方法

在該方法中, 會(huì)獲取checkpointCoordinator(checkpoint協(xié)調(diào) 器),AcknowledgeCheckpoint對(duì)象,然后獲取獲取taskManager位置,最后調(diào)用checkpointCoordinator的receiveAcknowledgeMessage方法

@Override
    public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
        // 確保在主線程中運(yùn)行
        mainThreadExecutor.assertRunningInMainThread();

        //從執(zhí)行圖中 獲取checkpoint協(xié)調(diào)器
        final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        /*
        該對(duì)象用于
        此消息從{@link org.apache.flink.runtime.taskexecutor}發(fā)送到{@link org.apache.flink.runtime.jobmaster},以指示單個(gè)任務(wù)的檢查點(diǎn)已經(jīng)完成。
         */
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            checkpointState);

        //獲取taskManager位置
        final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

        if (checkpointCoordinator != null) {
            ioExecutor.execute(() -> {
                try {
                    //該方法 接收確認(rèn)AcknowledgeCheckpoint消息,并返回該消息是否與某個(gè)掛起的檢查點(diǎn)相關(guān)聯(lián)。
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
                } catch (Throwable t) {
                    log.warn("Error while processing checkpoint acknowledgement message", t);
                }
            });
        } else {
            String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
            if (executionGraph.getState() == JobStatus.RUNNING) {
                log.error(errorMessage, jobGraph.getJobID());
            } else {
                log.debug(errorMessage, jobGraph.getJobID());
            }
        }
    }

現(xiàn)在進(jìn)入checkpointCoordinator的receiveAcknowledgeMessage方法

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
        if (shutdown || message == null) {
            return false;
        }

        if (!job.equals(message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
            return false;
        }

        //獲取 檢查點(diǎn)id
        final long checkpointId = message.getCheckpointId();

        //對(duì)此處代碼添加線程鎖, 同一時(shí)刻為單線程跑
        synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            // 確保沒(méi)有關(guān)閉
            if (shutdown) {
                return false;
            }

            //獲取正在進(jìn)行的checkpoint
            final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

            if (checkpoint != null && !checkpoint.isDiscarded()) {
                //acknowledgeTask  使用給定的執(zhí)行嘗試id和給定的子任務(wù)狀態(tài)來(lái)識(shí)別任務(wù)
                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS: //成功
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
                            checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                        //確認(rèn)成功,并接收到了所有operator快照成功的確認(rèn)
                        if (checkpoint.areTasksFullyAcknowledged()) {
                            //調(diào)用改方法
                            completePendingCheckpoint(checkpoint);
                        }
                        break;
                    case DUPLICATE: // 復(fù)制,副本
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                        break;
                    case UNKNOWN: //未知的
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                                "because the task's execution attempt id was unknown. Discarding " +
                                "the state handle to avoid lingering state.", message.getCheckpointId(),
                            message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                        break;
                    case DISCARDED: // 廢棄的
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                                "because the pending checkpoint had been discarded. Discarding the " +
                                "state handle tp avoid lingering state.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                }

                return true;
            } else if (checkpoint != null) {
                // this should not happen
                throw new IllegalStateException(
                    "Received message for discarded but non-removed checkpoint " + checkpointId);
            } else {
                boolean wasPendingCheckpoint;

                // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
                if (recentPendingCheckpoints.contains(checkpointId)) {
                    wasPendingCheckpoint = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
                        "{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                } else {
                    LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                    wasPendingCheckpoint = false;
                }

                // try to discard the state so that we don't have lingering state lying around
                discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                return wasPendingCheckpoint;
            }
        }
    }

在進(jìn)入該方法后,會(huì)先確認(rèn)正在運(yùn)行得checkpoint不等于null,并且不能是被丟棄的才會(huì)進(jìn)入邏輯,

首先看一下switch中調(diào)用的方法,調(diào)用的checkpoint的acknowledgeTask方法,

該方法主要根據(jù)任務(wù)的狀態(tài),返回一個(gè)操作結(jié)果,

public TaskAcknowledgeResult acknowledgeTask(
            ExecutionAttemptID executionAttemptId,
            TaskStateSnapshot operatorSubtaskStates,
            CheckpointMetrics metrics) {

        synchronized (lock) {
            //如果棄用 ,返回棄用
            if (discarded) {
                return TaskAcknowledgeResult.DISCARDED;
            }

            // 從notYetAcknowledgedTasks集合中移除已確認(rèn)的task
            // notYetAcknowledgedTasks保存了所有未確認(rèn)的task
            final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);

            if (vertex == null) {
                // 如果notYetAcknowledgedTasks沒(méi)有該task
                // 但是它在acknowledgedTasks(已確認(rèn)的task)集合中
                // 返回重復(fù)確認(rèn)DUPLICATE
                if (acknowledgedTasks.contains(executionAttemptId)) {
                    return TaskAcknowledgeResult.DUPLICATE;
                } else {
                    // 其他情況返回未知
                    return TaskAcknowledgeResult.UNKNOWN;
                }
            } else {
                //如果 不等于null  將其添加至已確認(rèn)的task集合
                acknowledgedTasks.add(executionAttemptId);
            }

            List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
            int subtaskIndex = vertex.getParallelSubtaskIndex();
            long ackTimestamp = System.currentTimeMillis();

            long stateSize = 0L;

            // 這段代碼為保存各個(gè)operator的snapshot狀態(tài)
            if (operatorSubtaskStates != null) {
                for (OperatorID operatorID : operatorIDs) {

                    //返回給定操作符id的子任務(wù)狀態(tài)(如果不包含則返回null)。
                    OperatorSubtaskState operatorSubtaskState =
                        operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);

                    // if no real operatorSubtaskState was reported, we insert an empty state
                    //如果沒(méi)獲取到 operatorSubtaskSate的狀態(tài) 給一個(gè)空狀態(tài)
                    if (operatorSubtaskState == null) {
                        operatorSubtaskState = new OperatorSubtaskState();
                    }

                    //獲取改 operator的狀態(tài)
                    OperatorState operatorState = operatorStates.get(operatorID);

                    if (operatorState == null) {
                        //如果 operator有狀態(tài), 那么給添加一個(gè)默認(rèn)的狀態(tài)
                        operatorState = new OperatorState(
                            operatorID,
                            vertex.getTotalNumberOfParallelSubtasks(),
                            vertex.getMaxParallelism());
                        //添加的狀態(tài)后 進(jìn)行put
                        operatorStates.put(operatorID, operatorState);
                    }
                    //對(duì) operator添加狀態(tài)    平行任務(wù)的索引 operator子任務(wù)狀態(tài)
                    operatorState.putState(subtaskIndex, operatorSubtaskState);
                    stateSize += operatorSubtaskState.getStateSize();
                }
            }

            ++numAcknowledgedTasks; //多少個(gè)確認(rèn)的任務(wù)

            // publish the checkpoint statistics
            // to prevent null-pointers from concurrent modification, copy reference onto stack
            // 這段代碼為匯報(bào)所有子任務(wù)checkpoint狀態(tài)
            final PendingCheckpointStats statsCallback = this.statsCallback;
            if (statsCallback != null) {
                // Do this in millis because the web frontend works with them
                long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
                long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000;

                SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
                    subtaskIndex,
                    ackTimestamp,
                    stateSize,
                    metrics.getSyncDurationMillis(),
                    metrics.getAsyncDurationMillis(),
                    alignmentDurationMillis,
                    checkpointStartDelayMillis);

                statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
            }
            // 最后返回執(zhí)行成功
            return TaskAcknowledgeResult.SUCCESS;
        }
    }

在switch語(yǔ)句中匹配到SUCCESS 會(huì)調(diào)用 completePendingCheckpoint方法, 在進(jìn)入方法前會(huì)先判斷checkpoint.areTasksFullyAcknowledged()方法,通過(guò)代碼可以看出,所有任務(wù)都被確認(rèn),并且checkpoint不是被棄用的才會(huì)君如completePendingCheckpoint方法

    boolean areTasksFullyAcknowledged() {
        //    為確認(rèn)的任務(wù) =空                     不是廢棄的
        return notYetAcknowledgedTasks.isEmpty() && !discarded;
    }

現(xiàn)在跟著進(jìn)入completePendingCheckpoint方法

該方法, 會(huì)先將operator的狀態(tài)注冊(cè)到 注冊(cè)表中,然后回調(diào)用完成checkpoint的邏輯,到這里checkpoint的過(guò)程就已經(jīng)結(jié)束了

private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        final long checkpointId = pendingCheckpoint.getCheckpointId();
        final CompletedCheckpoint completedCheckpoint;

        // As a first step to complete the checkpoint, we register its state with the registry
        // 作為完成檢查點(diǎn)的第一步,我們?cè)谧?cè)表中注冊(cè)檢查點(diǎn)的狀態(tài)
        // 注冊(cè)所有operator的state到sharedStateRegistry
        Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
        sharedStateRegistry.registerAll(operatorStates.values());

        try {
            try {
                // 調(diào)用完成pendingCheckpoint的邏輯,
                completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
                // 重置失敗checkpoint的計(jì)數(shù)
                failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
            } catch (Exception e1) {
                // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
                // 如果我們未能完成掛起檢查點(diǎn),則終止當(dāng)前掛起檢查點(diǎn)
                if (!pendingCheckpoint.isDiscarded()) {
                    abortPendingCheckpoint(
                        pendingCheckpoint,
                        new CheckpointException(
                            CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
                }

                throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
            }

            // the pending checkpoint must be discarded after the finalization
            // 檢查狀態(tài),調(diào)用finalizeCheckpoint方法后pendingCheckpoint必須為discarded狀態(tài)
            Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

            try {
                //存儲(chǔ)完成的checkpoint
                completedCheckpointStore.addCheckpoint(completedCheckpoint);
            } catch (Exception exception) {
                // we failed to store the completed checkpoint. Let's clean up
                // 未能存儲(chǔ)完成的檢查點(diǎn),那么進(jìn)行清理
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            completedCheckpoint.discardOnFailedStoring();
                        } catch (Throwable t) {
                            LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
                        }
                    }
                });

                throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
            }
        } finally {
            // 從正在進(jìn)行中checkpoint集合中移除此checkpoint
            pendingCheckpoints.remove(checkpointId);

            // 恢復(fù)暫停周期性觸發(fā)
            resumePeriodicTriggering();
        }

        // 保存最近的checkpoint ID
        rememberRecentCheckpointId(checkpointId);

        // drop those pending checkpoints that are at prior to the completed one
        // 在完成檢查點(diǎn)之前刪除那些掛起的檢查點(diǎn)
        // 掛掉所有id小于checkpointId的checkpoint操作(被掛掉的checkpoint不能是強(qiáng)制的)
        dropSubsumedCheckpoints(checkpointId);

        // record the time when this was completed, to calculate
        // the 'min delay between checkpoints'
        // 保存此次checkpoint完成時(shí)間
        lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();

        //Completed checkpoint 2 for job d36ca92c353c5fc9794d42fdff834b5d (9013 bytes in 106 ms).
        // flink web監(jiān)控 輸出
        LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
            completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());

        if (LOG.isDebugEnabled()) {
            StringBuilder builder = new StringBuilder();
            builder.append("Checkpoint state: ");
            for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
                builder.append(state);
                builder.append(", ");
            }
            // Remove last two chars ", "
            builder.setLength(builder.length() - 2);

            LOG.debug(builder.toString());
        }

        // send the "notify complete" call to all vertices, coordinators, etc.
        sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
    }

最后我們看一下PendingCheckpoint如何生成CompletedCheckpoint的過(guò)程,首先會(huì)調(diào)用pendingCheckpoint.finalizeCheckpoint方法,在方法中會(huì)存儲(chǔ)checkpoint的元數(shù)據(jù)等操作,最終將pendingCheckpoint變成CompletedCheckpoint狀態(tài),在這之后會(huì)將pendingCheckpoint標(biāo)記為棄用的狀態(tài),返回完成的checkpoint

    public CompletedCheckpoint finalizeCheckpoint() throws IOException {

        synchronized (lock) {

            checkState(!isDiscarded(), "checkpoint is discarded");
            checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet");

            // make sure we fulfill the promise with an exception if something fails
            try {
                // write out the metadata
                //創(chuàng)建一個(gè) checkpointMetadata對(duì)象       --改對(duì)象封裝 檢查點(diǎn)或保存的元數(shù)據(jù)
                final CheckpointMetadata savepoint = new CheckpointMetadata(checkpointId, operatorStates.values(), masterStates);

                final CompletedCheckpointStorageLocation finalizedLocation;

                // 保存checkpoint數(shù)據(jù)到文件系統(tǒng)
                // createMetadataOutputStream 創(chuàng)建將檢查點(diǎn)元數(shù)據(jù)持久化到的輸出流
                try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
                    Checkpoints.storeCheckpointMetadata(savepoint, out);
                    //在寫(xiě)入所有元數(shù)據(jù)后關(guān)閉流并完成檢查點(diǎn)位置
                    finalizedLocation = out.closeAndFinalizeCheckpoint();
                }

                // 創(chuàng)建一個(gè) CompletedCheckpoint對(duì)象
                // CompletedCheckpoint描述在所有需要的任務(wù)確認(rèn)之后的檢查點(diǎn)(帶有它們的狀態(tài)),它被認(rèn)為是成功的。
                // CompletedCheckpoint類包含檢查點(diǎn)的所有元數(shù)據(jù),即、檢查點(diǎn)ID、時(shí)間戳以及檢查點(diǎn)的所有狀態(tài)的句柄
                CompletedCheckpoint completed = new CompletedCheckpoint(
                        jobId,
                        checkpointId,
                        checkpointTimestamp,
                        System.currentTimeMillis(),
                        operatorStates,
                        masterStates,
                        props,
                        finalizedLocation);
                // completableFuture任務(wù)完成,返回completedCheckpoint
                onCompletionPromise.complete(completed);

                // to prevent null-pointers from concurrent modification, copy reference onto stack
                // 設(shè)置completedCheckpoint的discardCallback
                PendingCheckpointStats statsCallback = this.statsCallback;
                if (statsCallback != null) {
                    // Finalize the statsCallback and give the completed checkpoint a
                    // callback for discards.
                    CompletedCheckpointStats.DiscardCallback discardCallback =
                            statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
                    //設(shè)置當(dāng)丟棄此檢查點(diǎn)時(shí)用于跟蹤的回調(diào)
                    completed.setDiscardCallback(discardCallback);
                }

                // mark this pending checkpoint as disposed, but do NOT drop the state
                // 標(biāo)記自己為disposed狀態(tài)
                dispose(false);

                return completed;
            }
            catch (Throwable t) {
                onCompletionPromise.completeExceptionally(t);
                ExceptionUtils.rethrowIOException(t);
                return null; // silence the compiler
            }
        }
    }

完成了pendingCheckpoint向completedCheckpoint轉(zhuǎn)換后,會(huì)調(diào)用failureManager.handleCheckpointSuccess方法

這個(gè)方法主要是重置checkpoint失敗的計(jì)數(shù)

    public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) {
        clearCount();
    }

checkpoint往期內(nèi)容

Flink-1.10 源碼筆記 checkpint - 1

Flink-1.10 源碼筆記 checkpint - 2

如有錯(cuò)誤,歡迎指正!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容