Flink源碼閱讀之Checkpoint執(zhí)行過程

前言

對應(yīng)Flink來說checkpoint的作用及重要性就不細(xì)說了,前面文章寫過checkpoint的詳細(xì)過程checkpoint周期性觸發(fā)過程。不熟悉checkpoint大概過程的同學(xué)可以查閱。
本篇我們在一起根據(jù)源碼看下checkpoint的詳細(xì)執(zhí)行過程。

checkpoint過程

源頭

我們都知道checkpoint的周期性觸發(fā)是由jobmanager中的一個(gè)叫做CheckpointCoordinator角色發(fā)起的,具體執(zhí)行在CheckpointCoordinator.triggerCheckpoint中,這個(gè)方法代碼邏輯很長,概括一下主要包括:

  1. 預(yù)檢查。包括
  • 是否需要強(qiáng)制進(jìn)行 checkpoint
  • 當(dāng)前正在排隊(duì)的并發(fā) checkpoint 的數(shù)目是否超過閾值
  • 距離上一次成功 checkpoint 的間隔時(shí)間是否過小
    如果上述條件不滿足則不會進(jìn)行這次checkpoint。
  1. 檢查需要觸發(fā)的task是否都是running狀態(tài),否則放棄。之前踩過坑,請見記一次flink不做checkpoint的問題。
  2. 檢查所有需要ack checkpoint完成的task是否都是running狀態(tài)。否則放棄。
    上面的檢查都通過之后就可以做checkpoint啦。
  3. 生成唯一自增的checkpointID。
  4. 初始化CheckpointStorageLocation,用于存儲這次checkpoint快照的路徑,不同的backend有區(qū)別。
  5. 生成 PendingCheckpoint,這表示一個(gè)處于中間狀態(tài)的 checkpoint,并保存在 checkpointId -> PendingCheckpoint 這樣的映射關(guān)系中。
  6. 注冊一個(gè)調(diào)度任務(wù),在 checkpoint 超時(shí)后取消此次 checkpoint,并重新觸發(fā)一次新的 checkpoint
  7. 調(diào)用 Execution.triggerCheckpoint() 方法向所有需要 trigger 的 task 發(fā)起 checkpoint 請求
for (Execution execution: executions) {
                if (props.isSynchronous()) {
                    execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                } else {
                    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                }
            }

最終通過 RPC 調(diào)用 TaskExecutorGateway.triggerCheckpoint,即請求執(zhí)行 TaskExecutor.triggerCheckpoin()。 因?yàn)橐粋€(gè) TaskExecutor 中可能有多個(gè) Task 正在運(yùn)行,因而要根據(jù)觸發(fā) checkpoint 的 ExecutionAttemptID 找到對應(yīng)的 Task,然后調(diào)用 Task.triggerCheckpointBarrier() 方法

private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }
@Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions,
            boolean advanceToEndOfEventTime) {
        log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
        }
    }

Task 執(zhí)行 checkpoint 的真正邏輯被封裝在 AbstractInvokable.triggerCheckpointAsync(...) 中,

public void triggerCheckpointBarrier(
            final long checkpointID,
            final long checkpointTimestamp,
            final CheckpointOptions checkpointOptions,
            final boolean advanceToEndOfEventTime) {

        final AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

        if (executionState == ExecutionState.RUNNING && invokable != null) {
            try {
                invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
            }
            catch (RejectedExecutionException ex) {
                // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
                LOG.debug(
                    "Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
                    checkpointID, taskNameWithSubtask, executionId);
            }
            catch (Throwable t) {
                if (getExecutionState() == ExecutionState.RUNNING) {
                    failExternally(new Exception(
                        "Error while triggering checkpoint " + checkpointID + " for " +
                            taskNameWithSubtask, t));
                } else {
                    LOG.debug("Encountered error while triggering checkpoint {} for " +
                        "{} ({}) while being not in state running.", checkpointID,
                        taskNameWithSubtask, executionId, t);
                }
            }
        }
        else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
        }
    }

triggerCheckpointAsync方法分別被SourceStreamTask和普通StreamTask覆蓋,主要邏輯還是在StreamTask中

private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics,
            boolean advanceToEndOfTime) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        final long checkpointId = checkpointMetaData.getCheckpointId();

        if (isRunning) {
            actionExecutor.runThrowing(() -> {

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    setSynchronousSavepointId(checkpointId);

                    if (advanceToEndOfTime) {
                        advanceToEndOfEventTime();
                    }
                }

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointId);

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointId,
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

            });

            return true;
        } else {
            actionExecutor.runThrowing(() -> {
                // we cannot perform our checkpoint - let the downstream operators know that they
                // should not wait for any input from this operator

                // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                // yet be created
                final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                recordWriter.broadcastEvent(message);
            });

            return false;
        }
    }

主要做三件事:1)checkpoint的準(zhǔn)備操作,這里通常不進(jìn)行太多操作;2)發(fā)送 CheckpointBarrier;3)存儲檢查點(diǎn)快照。

廣播Barrier

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
            streamOutput.broadcastEvent(barrier);
        }
    }

進(jìn)行快照

private void checkpointState(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {
        //checkpoint的存儲地址及元數(shù)據(jù)信息
        CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
                checkpointMetaData.getCheckpointId(),
                checkpointOptions.getTargetLocation());
        //將checkpoint的過程封裝為CheckpointingOperation對象
        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
            this,
            checkpointMetaData,
            checkpointOptions,
            storage,
            checkpointMetrics);

        checkpointingOperation.executeCheckpointing();
    }

每一個(gè)算子的快照被抽象為 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照結(jié)果:

public class OperatorSnapshotFutures {

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
    }

由于每一個(gè) StreamTask 可能包含多個(gè)算子,因而內(nèi)部使用一個(gè) Map 維護(hù) OperatorID -> OperatorSnapshotFutures 的關(guān)系。

        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

快照的過程分同步和異步兩個(gè)部分

public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            try {
            //同步
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                //異步
                // checkpoint 可以配置成同步執(zhí)行,也可以配置成異步執(zhí)行的
                // 如果是同步執(zhí)行的,在這里實(shí)際上所有的 runnable future 都是已經(jīng)完成的狀態(tài)
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            } catch (Exception ex) {
                // Cleanup to release resources
                for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotResult) {
                        try {
                            operatorSnapshotResult.cancel();
                        } catch (Exception e) {
                            LOG.warn("Could not properly cancel an operator snapshot result.", e);
                        }
                    }
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    // in the case of a synchronous checkpoint, we always rethrow the exception,
                    // so that the task fails.
                    // this is because the intention is always to stop the job after this checkpointing
                    // operation, and without the failure, the task would go back to normal execution.
                    throw ex;
                } else {
                    owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
                }
            }
        }

在同步執(zhí)行階段,會依次調(diào)用每一個(gè)算子的 StreamOperator.snapshotState,返回結(jié)果是一個(gè) runnable future。根據(jù) checkpoint 配置成同步模式和異步模式的區(qū)別,這個(gè) future 可能處于完成狀態(tài),也可能處于未完成狀態(tài):

private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
//同步過程調(diào)用算子的snapshotState方法,返回OperatorSnapshotFutures可能已完成或未完成
                OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions,
                        storageLocation);
                operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

詳細(xì)過程在AbstractStreamOperator#snapshotState

public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
            checkpointId,
            timestamp,
            factory,
            keyGroupRange,
            getContainingTask().getCancelables());

        try {
            //對狀態(tài)進(jìn)行快照,包括KeyedState和OperatorState
            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            //寫入operatorState快照
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            //寫入keyedState快照    
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + ".";

            if (!getContainingTask().isCanceled()) {
                LOG.info(snapshotFailMessage, snapshotException);
            }
            try {
                snapshotContext.closeExceptionally();
            } catch (IOException e) {
                snapshotException.addSuppressed(e);
            }
            throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
        }

        return snapshotInProgress;
    }

我們知道state還分為raw state(原生state)和managed state(flink管理的state),timer定時(shí)器屬于raw state,也需要寫到snapshot中。

/**
     * Stream operators with state, which want to participate in a snapshot need to override this hook method.
     *
     * @param context context that provides information and means required for taking a snapshot
     */
    public void snapshotState(StateSnapshotContext context) throws Exception {
        final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
        //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
        // 所有的 timer 都作為 raw keyed state 寫入
        if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
            ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {

            KeyedStateCheckpointOutputStream out;

            try {
                out = context.getRawKeyedOperatorStateOutput();
            } catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " +
                    getOperatorName() + '.', exception);
            }

            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                for (int keyGroupIdx : allKeyGroups) {
                    out.startNewKeyGroup(keyGroupIdx);

                    timeServiceManager.snapshotStateForKeyGroup(
                        new DataOutputViewStreamWrapper(out), keyGroupIdx);
                }
            } catch (Exception exception) {
                throw new Exception("Could not write timer service of " + getOperatorName() +
                    " to checkpoint state stream.", exception);
            } finally {
                try {
                    out.close();
                } catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This " +
                        "might have prevented deleting some state data.", getOperatorName(), closeException);
                }
            }
        }
    }

上面是AbstractStreamOperator中的snapshotState做的操作,還有個(gè)子類AbstractUdfStreamOperator

public void snapshotState(StateSnapshotContext context) throws Exception {
        //先調(diào)用父類方法,寫入timer
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    }
public static void snapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(backend);

        while (true) {

            if (trySnapshotFunctionState(context, backend, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }


private static boolean trySnapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {
        //如果用戶函數(shù)實(shí)現(xiàn)了CheckpointedFunction接口,則調(diào)用udf中的snapshotState方法進(jìn)行快照
        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).snapshotState(context);

            return true;
        }
        // 如果用戶函數(shù)實(shí)現(xiàn)了 ListCheckpointed
        if (userFunction instanceof ListCheckpointed) {
        //先調(diào)用 snapshotState 方法獲取當(dāng)前狀態(tài)
            @SuppressWarnings("unchecked")
            List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                    snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
            //獲取狀態(tài)后端存儲引用
            ListState<Serializable> listState = backend.
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
            //清空
            listState.clear();
            //當(dāng)前狀態(tài)寫入狀態(tài)后端存儲
            if (null != partitionableState) {
                try {
                    for (Serializable statePartition : partitionableState) {
                        listState.add(statePartition);
                    }
                } catch (Exception e) {
                    listState.clear();

                    throw new Exception("Could not write partitionable state to operator " +
                        "state backend.", e);
                }
            }

            return true;
        }

        return false;
    }

到這里我們知道了checkpoint過程中如何調(diào)用到我們自己實(shí)現(xiàn)的快照方法。再看下flink管理的狀態(tài)是如何寫入快照的。

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

首先來看看 operator state。DefaultOperatorStateBackend 將實(shí)際的工作交給 DefaultOperatorStateBackendSnapshotStrategy 完成。首先,會為對當(dāng)前注冊的所有 operator state(包含 list state 和 broadcast state)做深度拷貝,然后將實(shí)際的寫入操作封裝在一個(gè)異步的 FutureTask 中,這個(gè) FutureTask 的主要任務(wù)包括: 1)打開輸出流 2)寫入狀態(tài)元數(shù)據(jù)信息 3)寫入狀態(tài) 4)關(guān)閉輸出流,獲得狀態(tài)句柄。如果不啟用異步checkpoint模式,那么這個(gè) FutureTask 在同步階段就會立刻執(zhí)行。

public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        final long checkpointId,
        final long timestamp,
        @Nonnull final CheckpointStreamFactory streamFactory,
        @Nonnull final CheckpointOptions checkpointOptions) throws IOException {

        if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }

        final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
            new HashMap<>(registeredOperatorStates.size());
        final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
            new HashMap<>(registeredBroadcastStates.size());

        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(userClassLoader);
        try {
            // eagerly create deep copies of the list and the broadcast states (if any)
            // in the synchronous phase, so that we can use them in the async writing.
            //獲得已注冊的所有 list state 和 broadcast state 的深拷貝
            if (!registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> listState = entry.getValue();
                    if (null != listState) {
                        listState = listState.deepCopy();
                    }
                    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                }
            }

            if (!registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                    if (null != broadcastState) {
                        broadcastState = broadcastState.deepCopy();
                    }
                    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                }
            }
        } finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }
//將主要寫入操作封裝為一個(gè)異步的FutureTask
        AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
            new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

                @Override
                protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
                    // 創(chuàng)建狀態(tài)輸出流
                    CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                        streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                    snapshotCloseableRegistry.registerCloseable(localOut);
                    // 收集元數(shù)據(jù)
                    // get the registered operator state infos ...
                    List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
                        new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                    for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStatesDeepCopies.entrySet()) {
                        operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }
                    // 寫入元數(shù)據(jù)
                    // ... get the registered broadcast operator state infos ...
                    List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
                        new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStatesDeepCopies.entrySet()) {
                        broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }
        
                    // ... write them all in the checkpoint stream ...
                    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                    OperatorBackendSerializationProxy backendSerializationProxy =
                        new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                    backendSerializationProxy.write(dov);

                    // ... and then go for the states ...
                    // 寫入狀態(tài)
                    // we put BOTH normal and broadcast state metadata here
                    int initialMapCapacity =
                        registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
                    final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                        new HashMap<>(initialMapCapacity);

                    for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStatesDeepCopies.entrySet()) {

                        PartitionableListState<?> value = entry.getValue();
                        long[] partitionOffsets = value.write(localOut);
                        OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                        writtenStatesMetaData.put(
                            entry.getKey(),
                            new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                    }

                    // ... and the broadcast states themselves ...
                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStatesDeepCopies.entrySet()) {

                        BackendWritableBroadcastState<?, ?> value = entry.getValue();
                        long[] partitionOffsets = {value.write(localOut)};
                        OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                        writtenStatesMetaData.put(
                            entry.getKey(),
                            new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                    }

                    // ... and, finally, create the state handle.
                    OperatorStateHandle retValue = null;

                    if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
                        //關(guān)閉輸出流,獲得狀態(tài)句柄,后面可以用這個(gè)句柄讀取狀態(tài)
                        StreamStateHandle stateHandle = localOut.closeAndGetHandle();

                        if (stateHandle != null) {
                            retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                        }

                        return SnapshotResult.of(retValue);
                    } else {
                        throw new IOException("Stream was already unregistered.");
                    }
                }

                @Override
                protected void cleanupProvidedResources() {
                    // nothing to do
                }

                @Override
                protected void logAsyncSnapshotComplete(long startTime) {
                    if (asynchronousSnapshots) {
                        logAsyncCompleted(streamFactory, startTime);
                    }
                }
            };

        final FutureTask<SnapshotResult<OperatorStateHandle>> task =
            snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
//如果不是異步 checkpoint 那么在這里直接運(yùn)行 FutureTask,即在同步階段就完成了狀態(tài)的寫入
        if (!asynchronousSnapshots) {
            task.run();
        }

        return task;
    }

keyed state 寫入的基本流程與此相似,但由于 keyed state 在存儲時(shí)有多種實(shí)現(xiàn),包括基于堆內(nèi)存和 RocksDB 的不同實(shí)現(xiàn),此外基于 RocksDB 的實(shí)現(xiàn)還包括支持增量 checkpoint,因而相比于 operator state 要更復(fù)雜一些。

至此,我們介紹了快照操作的第一個(gè)階段,即同步執(zhí)行的階段。異步執(zhí)行階段被封裝為 AsyncCheckpointRunnable,主要的操作包括 1)執(zhí)行同步階段創(chuàng)建的 FutureTask 2)完成后向 CheckpointCoordinator 發(fā)送 Ack 響應(yīng)。

protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        @Override
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
                    new TaskStateSnapshot(operatorSnapshotsInProgress.size());
                TaskStateSnapshot localTaskOperatorSubtaskStates =
                    new TaskStateSnapshot(operatorSnapshotsInProgress.size());

                // 完成每一個(gè) operator 的狀態(tài)寫入
                // 如果是同步 checkpoint,那么在此之前狀態(tài)已經(jīng)寫入完成
                // 如果是異步 checkpoint,那么在這里才會寫入狀態(tài)
                for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
                    OperatorID operatorID = entry.getKey();
                    OperatorSnapshotFutures snapshotInProgress = entry.getValue();
                    // finalize the async part of all by executing all snapshot runnables
                    OperatorSnapshotFinalizer finalizedSnapshots =
                        new OperatorSnapshotFinalizer(snapshotInProgress);

                    jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                        operatorID,
                        finalizedSnapshots.getJobManagerOwnedState());

                    localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                        operatorID,
                        finalizedSnapshots.getTaskLocalState());
                }

                final long asyncEndNanos = System.nanoTime();
                final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;

                checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

                if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
                    CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                    //報(bào)告 snapshot 完成
                    reportCompletedSnapshotStates(
                        jobManagerTaskOperatorSubtaskStates,
                        localTaskOperatorSubtaskStates,
                        asyncDurationMillis);

                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                        owner.getName(),
                        checkpointMetaData.getCheckpointId());
                }
            } catch (Exception e) {
                handleExecutionException(e);
            } finally {
                owner.cancelables.unregisterCloseable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        }
    }

    private void reportCompletedSnapshotStates(
            TaskStateSnapshot acknowledgedTaskStateSnapshot,
            TaskStateSnapshot localTaskStateSnapshot,
            long asyncDurationMillis) {
            TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
            boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
            boolean hasLocalState = localTaskStateSnapshot.hasState();
            // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
            // to stateless tasks on restore. This enables simple job modifications that only concern
            // stateless without the need to assign them uids to match their (always empty) states.
            taskStateManager.reportTaskStateSnapshots(
                checkpointMetaData,
                checkpointMetrics,
                hasAckState ? acknowledgedTaskStateSnapshot : null,
                hasLocalState ? localTaskStateSnapshot : null);
        }
}

public class TaskStateManagerImpl implements TaskStateManager {
    @Override
    public void reportTaskStateSnapshots(
        @Nonnull CheckpointMetaData checkpointMetaData,
        @Nonnull CheckpointMetrics checkpointMetrics,
        @Nullable TaskStateSnapshot acknowledgedState,
        @Nullable TaskStateSnapshot localState) {

        long checkpointId = checkpointMetaData.getCheckpointId();

        localStateStore.storeLocalState(checkpointId, localState);

        //發(fā)送 ACK 響應(yīng)給 CheckpointCoordinator
        checkpointResponder.acknowledgeCheckpoint(
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
    }
}

Checkpoint 的確認(rèn)

Task 對 checkpoint 的響應(yīng)是通過 CheckpointResponder 接口完成的:

public interface CheckpointResponder {

    /**
     * Acknowledges the given checkpoint.
     */
    void acknowledgeCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        TaskStateSnapshot subtaskState);

    /**
     * Declines the given checkpoint.
     */
    void declineCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        Throwable cause);
}

RpcCheckpointResponder 作為 CheckpointResponder 的具體實(shí)現(xiàn),主要是通過 RPC 調(diào)用通知 CheckpointCoordinatorGateway,即通知給 JobMaster, JobMaster 調(diào)用 CheckpointCoordinator.receiveAcknowledgeMessage() 和 CheckpointCoordinator.receiveDeclineMessage() 進(jìn)行處理。

確認(rèn)完成

在一個(gè) Task 完成 checkpoint 操作后,CheckpointCoordinator 接收到 Ack 響應(yīng),對 Ack 響應(yīng)的處理流程主要如下:

  • 根據(jù) Ack 的 checkpointID 從 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找對應(yīng)的 PendingCheckpoint
  • 若存在對應(yīng)的 PendingCheckpoint
    • 這個(gè) PendingCheckpoint 沒有被丟棄,調(diào)用 PendingCheckpoint.acknowledgeTask 方法處理 Ack,根據(jù)處理結(jié)果的不同:
      • SUCCESS:判斷是否已經(jīng)接受了所有需要響應(yīng)的 Ack,如果是,則調(diào)用 completePendingCheckpoint 完成此次 checkpoint
      • DUPLICATE:Ack 消息重復(fù)接收,直接忽略
      • UNKNOWN:未知的 Ack 消息,清理上報(bào)的 Ack 中攜帶的狀態(tài)句柄
      • DISCARD:Checkpoint 已經(jīng)被 discard,清理上報(bào)的 Ack 中攜帶的狀態(tài)句柄
    • 這個(gè) PendingCheckpoint 已經(jīng)被丟棄,拋出異常
  • 若不存在對應(yīng)的 PendingCheckpoint,則清理上報(bào)的 Ack 中攜帶的狀態(tài)句柄
    相應(yīng)代碼:
class CheckpointCoordinator {
    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
        if (shutdown || message == null) {
            return false;
        }

        if (!job.equals(message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
            return false;
        }

        final long checkpointId = message.getCheckpointId();

        synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            if (shutdown) {
                return false;
            }

            final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

            if (checkpoint != null && !checkpoint.isDiscarded()) {

                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS:
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
                            checkpointId, message.getTaskExecutionId(), message.getJob());

                        if (checkpoint.isFullyAcknowledged()) {
                            completePendingCheckpoint(checkpoint);
                        }
                        break;
                    case DUPLICATE:
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
                        break;
                    case UNKNOWN:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                                "because the task's execution attempt id was unknown. Discarding " +
                                "the state handle to avoid lingering state.", message.getCheckpointId(),
                            message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                        break;
                    case DISCARDED:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                            "because the pending checkpoint had been discarded. Discarding the " +
                                "state handle tp avoid lingering state.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                }

                return true;
            }
            else if (checkpoint != null) {
                // this should not happen
                throw new IllegalStateException(
                        "Received message for discarded but non-removed checkpoint " + checkpointId);
            }
            else {
                boolean wasPendingCheckpoint;
                // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
                if (recentPendingCheckpoints.contains(checkpointId)) {
                    wasPendingCheckpoint = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from " +
                        "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
                }
                else {
                    LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob());
                    wasPendingCheckpoint = false;
                }

                // try to discard the state so that we don't have lingering state lying around
                discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                return wasPendingCheckpoint;
            }
        }
    }
}

對于一個(gè)已經(jīng)觸發(fā)但還沒有完成的 checkpoint,即 PendingCheckpoint,它是如何處理 Ack 消息的呢?在 PendingCheckpoint 內(nèi)部維護(hù)了兩個(gè) Map,分別是:

  • Map<OperatorID, OperatorState> operatorStates; : 已經(jīng)接收到 Ack 的算子的狀態(tài)句柄
  • Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但還沒有接收到的 Task

每當(dāng)接收到一個(gè) Ack 消息時(shí),PendingCheckpoint 就從 notYetAcknowledgedTasks 中移除對應(yīng)的 Task,并保存 Ack 攜帶的狀態(tài)句柄保存。當(dāng) notYetAcknowledgedTasks 為空時(shí),表明所有的 Ack 消息都接收到了。

一旦 PendingCheckpoint 確認(rèn)所有 Ack 消息都已經(jīng)接收,那么就可以完成此次 checkpoint 了,具體包括:

  • 調(diào)用 PendingCheckpoint.finalizeCheckpoint() 將 PendingCheckpoint 轉(zhuǎn)化為 CompletedCheckpoint
    • 獲取 CheckpointMetadataOutputStream,將所有的狀態(tài)句柄信息通過 CheckpointMetadataOutputStream 寫入到存儲系統(tǒng)中
    • 創(chuàng)建一個(gè) CompletedCheckpoint 對象
  • 將 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
    • CompletedCheckpointStore 有兩種實(shí)現(xiàn),分別為 StandaloneCompletedCheckpointStore 和 ZooKeeperCompletedCheckpointStore
    • StandaloneCompletedCheckpointStore 簡單地將 CompletedCheckpointStore 存放在一個(gè)數(shù)組中
    • ZooKeeperCompletedCheckpointStore 提供高可用實(shí)現(xiàn):先將 CompletedCheckpointStore 寫入到 RetrievableStateStorageHelper 中(通常是文件系統(tǒng)),然后將文件句柄存在 ZK 中
    • 保存的 CompletedCheckpointStore 數(shù)量是有限的,會刪除舊的快照
  • 移除被越過的 PendingCheckpoint,因?yàn)?CheckpointID 是遞增的,那么所有比當(dāng)前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丟棄了
  • 依次調(diào)用 Execution.notifyCheckpointComplete() 通知所有的 Task 當(dāng)前 Checkpoint 已經(jīng)完成
    • 通過 RPC 調(diào)用 TaskExecutor.confirmCheckpoint() 告知對應(yīng)的 Task

Task收到notifyCheckpointComplete確認(rèn)后進(jìn)行后續(xù)處理,比如kafkaproduce的兩段式提交過程。

總結(jié)

本文分析了checkpoint進(jìn)行snapshot的過程,包括廣播barrier、進(jìn)行snapshot以及checkpoint完成后的ACK過程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容