Flink 源碼之ExecutionGraph

Flink源碼分析系列文檔目錄

請點(diǎn)擊:Flink 源碼分析系列文檔目錄

從JobGraph到ExecutionGraph

JobGraph通過Dispatcher.submitJob方法提交。這是后續(xù)流程的入口方法。該方法調(diào)用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob。

Dispatcher.persistAndRunJob方法存儲并執(zhí)行作業(yè)。如下所示:

private void persistAndRunJob(JobGraph jobGraph) throws Exception {
    jobGraphWriter.putJobGraph(jobGraph);
    runJob(jobGraph, ExecutionType.SUBMISSION);
}

Dispatcher.runJob接收J(rèn)obGraph和執(zhí)行類型兩個參數(shù)。執(zhí)行類型有兩種:提交任務(wù)(SUBMISSION)和恢復(fù)任務(wù)(RECOVERY)。

private void runJob(JobGraph jobGraph, ExecutionType executionType) {
    // 確保JobID對應(yīng)的這個作業(yè)目前不在運(yùn)行狀態(tài),避免重復(fù)提交
    Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
    // 獲取啟動時時間戳
    long initializationTimestamp = System.currentTimeMillis();
    // 這里將JobManagerRunner創(chuàng)建出來
    // JobManagerRunner接下來會構(gòu)造出JobManager
    CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
        createJobManagerRunner(jobGraph, initializationTimestamp);

    // 包裝JobGraph相關(guān)信息供Dispatcher使用
    DispatcherJob dispatcherJob =
        DispatcherJob.createFor(
        jobManagerRunnerFuture,
        jobGraph.getJobID(),
        jobGraph.getName(),
        initializationTimestamp);
    
    // 將當(dāng)前作業(yè)的ID加入runningJob集合
    // 表示當(dāng)前作業(yè)已處于運(yùn)行狀態(tài)
    runningJobs.put(jobGraph.getJobID(), dispatcherJob);

    final JobID jobId = jobGraph.getJobID();

    // 處理Job派發(fā)結(jié)果
    final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
        dispatcherJob
        .getResultFuture()
        .handleAsync(
        (dispatcherJobResult, throwable) -> {
            Preconditions.checkState(
                runningJobs.get(jobId) == dispatcherJob,
                "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");

            if (dispatcherJobResult != null) {
                return handleDispatcherJobResult(
                    jobId, dispatcherJobResult, executionType);
            } else {
                return dispatcherJobFailed(jobId, throwable);
            }
        },
        getMainThreadExecutor());

    // 作業(yè)停止的時候,將JobID從runningJob中移除
    final CompletableFuture<Void> jobTerminationFuture =
        cleanupJobStateFuture
        .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
        .thenCompose(Function.identity());

    // 將作業(yè)ID和對應(yīng)的作業(yè)停止future加入到dispatcherJobTerminationFutures集合維護(hù)
    FutureUtils.assertNoException(jobTerminationFuture);
    registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
}

接下來是Dispatcher.createJobManagerRunner方法。

JobManagerDispatcher中被創(chuàng)建出來,然后啟動。創(chuàng)建JobManager的邏輯在createJobManagerRunner方法中,如下所示:

CompletableFuture<JobManagerRunner> createJobManagerRunner(
    JobGraph jobGraph, long initializationTimestamp) {
    final RpcService rpcService = getRpcService();
    return CompletableFuture.supplyAsync(
        () -> {
            try {
                // 使用工廠類創(chuàng)建JobManager
                // 傳入了JobGraph和高可用服務(wù)
                JobManagerRunner runner =
                    jobManagerRunnerFactory.createJobManagerRunner(
                    jobGraph,
                    configuration,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    jobManagerSharedServices,
                    new DefaultJobManagerJobMetricGroupFactory(
                        jobManagerMetricGroup),
                    fatalErrorHandler,
                    initializationTimestamp);
                // 啟動JobManager
                // 實(shí)際上為啟動JobManager的leader選舉服務(wù),選出JM主節(jié)點(diǎn)
                runner.start();
                return runner;
            } catch (Exception e) {
                throw new CompletionException(
                    new JobInitializationException(
                        jobGraph.getJobID(),
                        "Could not instantiate JobManager.",
                        e));
            }
        },
        ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
    // JobManager creation
}

此時,JobManager開始進(jìn)行l(wèi)eader競選活動。為了確保JobManager不存在單點(diǎn)故障問題,F(xiàn)link設(shè)計(jì)了JobManager 高可用,可以同時運(yùn)行多個JobManager實(shí)例。在Standalone部署方式中,JobManager的競選通過Zookeeper來實(shí)現(xiàn)。Yarn集群模式下則通過Yarn的ApplicationMaster失敗后自動重啟動方式來確保JobManager的高可用。有關(guān)leader選舉的內(nèi)容請參見Flink 源碼之leader選舉(Zookeeper方式)

一旦leader JM被選舉出來,選舉服務(wù)會調(diào)用對應(yīng)JM的grantLeadership方法。該方法內(nèi)容如下所示:

@Override
public void grantLeadership(final UUID leaderSessionID) {
    synchronized (lock) {
        if (shutdown) {
            log.debug(
                "JobManagerRunner cannot be granted leadership because it is already shut down.");
            return;
        }

        leadershipOperation =
            leadershipOperation.thenRun(
            ThrowingRunnable.unchecked(
                () -> {
                    synchronized (lock) {
                        // 主要邏輯是這個
                        // 檢查作業(yè)調(diào)度狀態(tài)并啟動JobManager
                        verifyJobSchedulingStatusAndStartJobManager(
                            leaderSessionID);
                    }
                }));

        handleException(leadershipOperation, "Could not start the job manager.");
    }
}

接著我們跟蹤到verifyJobSchedulingStatusAndStartJobManager方法。

@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
    throws FlinkException {
    // 如果JobManager已停止,直接返回
    if (shutdown) {
        log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
        return;
    }

    // 從JobRegistry中獲取Job調(diào)度狀態(tài)
    final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
        getJobSchedulingStatus();

    // 如果作業(yè)已執(zhí)行完畢
    // 調(diào)用作業(yè)執(zhí)行完畢邏輯(實(shí)際上是作業(yè)未被當(dāng)前JobManager完成運(yùn)行的邏輯)
    if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
        jobAlreadyDone();
    } else {
        // 啟動JobMaster
        startJobMaster(leaderSessionId);
    }
}

現(xiàn)在邏輯流轉(zhuǎn)到了JobManagerRunnerImpl.startJobMaster方法。

該方法啟動JobMaster。注冊JobGraph,啟動JobMaster服務(wù)并確認(rèn)該JobMaster為leader。

@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException {
    log.info(
        "JobManager runner for job {} ({}) was granted leadership with session id {}.",
        jobGraph.getName(),
        jobGraph.getJobID(),
        leaderSessionId);

    try {
        // 注冊JobGraph
        // 根據(jù)集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存儲JobID
        runningJobsRegistry.setJobRunning(jobGraph.getJobID());
    } catch (IOException e) {
        throw new FlinkException(
            String.format(
                "Failed to set the job %s to running in the running jobs registry.",
                jobGraph.getJobID()),
            e);
    }

    // 啟動JobMaster服務(wù)
    startJobMasterServiceSafely(leaderSessionId);

    // 確認(rèn)該JobMaster是leader狀態(tài)
    if (jobMasterService != null) {
        confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
    }
}

JobManagerRunnerImpl.startJobMasterServiceSafely緊接著通過

DefaultJobMasterServiceFactory.createJobMasterService方法,創(chuàng)建出JobMaster并啟動他的Rpc通信服務(wù)。

接下來。在JobMaster構(gòu)造函數(shù)中存在構(gòu)建Flink作業(yè)任務(wù)調(diào)度器的邏輯。JobMaster.createScheduler方法調(diào)用了

DefaultSlotPoolServiceSchedulerFactory.createScheduler創(chuàng)建Flink的調(diào)度器。該方法又調(diào)用了Scheduler工廠類的創(chuàng)建Scheduler實(shí)例這個方法DefaultSchedulerFactory.createInstance。

接下來的流程到了DefaultScheduler中。DefaultScheduler是Flink作業(yè)調(diào)度器的默認(rèn)實(shí)現(xiàn)。它繼承了SchedulerBase,SchedulerBase又實(shí)現(xiàn)了SchedulerNG接口。

SchedulerBase構(gòu)造函數(shù)中調(diào)用了createAndRestoreExecutionGraph方法。

SchedulerBase.createAndRestoreExecutionGraph代碼如下所示:

private ExecutionGraph createAndRestoreExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker,
    ExecutionDeploymentTracker executionDeploymentTracker,
    long initializationTimestamp,
    ComponentMainThreadExecutor mainThreadExecutor,
    JobStatusListener jobStatusListener)
    throws Exception {

    // 創(chuàng)建ExecutionGraph
    ExecutionGraph newExecutionGraph =
        createExecutionGraph(
        currentJobManagerJobMetricGroup,
        completedCheckpointStore,
        checkpointsCleaner,
        checkpointIdCounter,
        shuffleMaster,
        partitionTracker,
        executionDeploymentTracker,
        initializationTimestamp);

    // 獲取ExecutionGraph中創(chuàng)建出的CheckpointCoordinator
    // 創(chuàng)建CheckpointCoordinator的過程后面章節(jié)有說明
    final CheckpointCoordinator checkpointCoordinator =
        newExecutionGraph.getCheckpointCoordinator();

    if (checkpointCoordinator != null) {
        // check whether we find a valid checkpoint
        // 檢查是否存在一個最近的checkpoint
        if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
            new HashSet<>(newExecutionGraph.getAllVertices().values()))) {

            // check whether we can restore from a savepoint
            // 如果有,嘗試從這個檢查點(diǎn)恢復(fù)
            tryRestoreExecutionGraphFromSavepoint(
                newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        }
    }

    // 設(shè)置任務(wù)失敗監(jiān)聽器
    newExecutionGraph.setInternalTaskFailuresListener(
        new UpdateSchedulerNgOnInternalFailuresListener(this));
    // 設(shè)置作業(yè)狀態(tài)監(jiān)聽器
    newExecutionGraph.registerJobStatusListener(jobStatusListener);
    // 設(shè)置JobMaster的主線程ThreadExecutor
    newExecutionGraph.start(mainThreadExecutor);

    return newExecutionGraph;
}

SchedulerBase.createExecutionGraph方法調(diào)用DefaultExecutionGraphBuilder,創(chuàng)建出ExecutionGraph。代碼如下所示:

private ExecutionGraph createExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    ShuffleMaster<?> shuffleMaster,
    final JobMasterPartitionTracker partitionTracker,
    ExecutionDeploymentTracker executionDeploymentTracker,
    long initializationTimestamp)
    throws JobExecutionException, JobException {

    // 創(chuàng)建Execution部署監(jiān)聽器
    ExecutionDeploymentListener executionDeploymentListener =
        new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
    //創(chuàng)建Execution狀態(tài)更新監(jiān)聽器
    ExecutionStateUpdateListener executionStateUpdateListener =
        (execution, newState) -> {
        if (newState.isTerminal()) {
            executionDeploymentTracker.stopTrackingDeploymentOf(execution);
        }
    };

    // 創(chuàng)建ExecutionGraph
    return DefaultExecutionGraphBuilder.buildGraph(
        jobGraph,
        jobMasterConfiguration,
        futureExecutor,
        ioExecutor,
        userCodeLoader,
        completedCheckpointStore,
        checkpointsCleaner,
        checkpointIdCounter,
        rpcTimeout,
        currentJobManagerJobMetricGroup,
        blobWriter,
        log,
        shuffleMaster,
        partitionTracker,
        TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
            jobGraph.getJobType()),
        executionDeploymentListener,
        executionStateUpdateListener,
        initializationTimestamp,
        new DefaultVertexAttemptNumberStore());
}

ExecutionGraph相關(guān)概念

ExecutionGraph為Flink作業(yè)的物理執(zhí)行計(jì)劃。用來協(xié)調(diào)數(shù)據(jù)流的分布式執(zhí)行過程。

StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。

ExecutionGraph也有頂點(diǎn)(Vertex)的概念,ExecutionGraph中的vertex為ExecutionJobVertex,和JobGraph中的JobVertex對應(yīng)。從ExecutionGraphJobGraph的過程中加入了并行度的概念,ExecutionJobVertex包含了與之對應(yīng)的JobVertex中所有的并行任務(wù)。ExecutionJobVertex之中每一個并行的任務(wù)由ExecutionVertex代表。也就是說一個ExecutionJobVertex具有多少并行度,它下面就包含多少個ExecutionVertex。

ExecutionVertex可以被執(zhí)行一次或多次(由于任務(wù)恢復(fù),重計(jì)算或更新配置)ExecutionVertex的每一次執(zhí)行都會生成一個Execution對象。Execution負(fù)責(zé)跟蹤ExecutionVertex的任務(wù)執(zhí)行狀態(tài)變化和資源使用狀況。

IntermediateResultJobGraphJobVertexIntermediateDataSet的概念對應(yīng),用于表示兩個相鄰的ExecutionJobVertex之間數(shù)據(jù)傳輸過程中的臨時存放點(diǎn)。IntermediateResultExecutionJobVertex創(chuàng)建的時候被構(gòu)建出來,數(shù)量和該vertex的并行度一致。

DefaultExecutionGraphBuilder的buildGraph方法

public static ExecutionGraph buildGraph(
    JobGraph jobGraph,
    Configuration jobManagerConfig,
    ScheduledExecutorService futureExecutor,
    Executor ioExecutor,
    ClassLoader classLoader,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    Time rpcTimeout,
    MetricGroup metrics,
    BlobWriter blobWriter,
    Logger log,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker,
    TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
    ExecutionDeploymentListener executionDeploymentListener,
    ExecutionStateUpdateListener executionStateUpdateListener,
    long initializationTimestamp,
    VertexAttemptNumberStore vertexAttemptNumberStore)
    throws JobExecutionException, JobException {

    checkNotNull(jobGraph, "job graph cannot be null");

    // 獲取作業(yè)名稱和作業(yè)ID
    final String jobName = jobGraph.getName();
    final JobID jobId = jobGraph.getJobID();

    // 創(chuàng)建JobInformation
    // JobInformation為ExecutionGraph中的job相關(guān)配置信息的封裝類
    final JobInformation jobInformation =
        new JobInformation(
        jobId,
        jobName,
        jobGraph.getSerializedExecutionConfig(),
        jobGraph.getJobConfiguration(),
        jobGraph.getUserJarBlobKeys(),
        jobGraph.getClasspaths());

    // 獲取保留在歷史記錄中的最大重試次數(shù)
    final int maxPriorAttemptsHistoryLength =
        jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);

    // 獲取IntermediateResultPartition釋放策略工廠類
    final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
        PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
        jobManagerConfig);

    // create a new execution graph, if none exists so far
    // 創(chuàng)建ExecutionGraph,后面章節(jié)分析
    final DefaultExecutionGraph executionGraph;
    try {
            executionGraph =
                    new DefaultExecutionGraph(
                            jobInformation,
                            futureExecutor,
                            ioExecutor,
                            rpcTimeout,
                            maxPriorAttemptsHistoryLength,
                            classLoader,
                            blobWriter,
                            partitionReleaseStrategyFactory,
                            shuffleMaster,
                            partitionTracker,
                            partitionLocationConstraint,
                            executionDeploymentListener,
                            executionStateUpdateListener,
                            initializationTimestamp,
                            vertexAttemptNumberStore);
    } catch (IOException e) {
        throw new JobException("Could not create the ExecutionGraph.", e);
    }

    // set the basic properties

    try {
        // 設(shè)置json格式的執(zhí)行計(jì)劃
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
    } catch (Throwable t) {
        log.warn("Cannot create JSON plan for job", t);
        // give the graph an empty plan
        // 如果根據(jù)jobGraph生成json執(zhí)行計(jì)劃失敗,設(shè)置一個空的執(zhí)行計(jì)劃
        executionGraph.setJsonPlan("{}");
    }

    // initialize the vertices that have a master initialization hook
    // file output formats create directories here, input formats create splits

    final long initMasterStart = System.nanoTime();
    log.info("Running initialization on master for job {} ({}).", jobName, jobId);

    for (JobVertex vertex : jobGraph.getVertices()) {
        // 獲取節(jié)點(diǎn)調(diào)用的類名,即節(jié)點(diǎn)的task
        String executableClass = vertex.getInvokableClassName();
        // 確保每個節(jié)點(diǎn)的調(diào)用類必須存在
        if (executableClass == null || executableClass.isEmpty()) {
            throw new JobSubmissionException(
                jobId,
                "The vertex "
                + vertex.getID()
                + " ("
                + vertex.getName()
                + ") has no invokable class.");
        }

        try {
            // 根據(jù)不同的節(jié)點(diǎn)類型,調(diào)用job啟動時節(jié)點(diǎn)的任務(wù)邏輯
            vertex.initializeOnMaster(classLoader);
        } catch (Throwable t) {
            throw new JobExecutionException(
                jobId,
                "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
                t);
        }
    }

    log.info(
        "Successfully ran initialization on master in {} ms.",
        (System.nanoTime() - initMasterStart) / 1_000_000);

    // topologically sort the job vertices and attach the graph to the existing one
    // 按照拓?fù)浣Y(jié)構(gòu)(數(shù)據(jù)流的順序)排序,獲取所有的Job頂點(diǎn)
    List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
    if (log.isDebugEnabled()) {
        log.debug(
            "Adding {} vertices from job graph {} ({}).",
            sortedTopology.size(),
            jobName,
            jobId);
    }
    // executionGraph綁定所有的Job節(jié)點(diǎn)
    executionGraph.attachJobGraph(sortedTopology);

    if (log.isDebugEnabled()) {
        log.debug(
            "Successfully created execution graph from job graph {} ({}).", jobName, jobId);
    }

    // configure the state checkpointing
    // 配置checkpoint
    // 如果啟用了checkpoint
    if (isCheckpointingEnabled(jobGraph)) {
        // 從JobGraph獲取checkpoint的配置
        // snapshotSettings的配置位于StreamingJobGraphGenerator
        JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
        
        // 獲取所有觸發(fā)checkpoint的頂點(diǎn),即所有的數(shù)據(jù)輸入頂點(diǎn)
        List<ExecutionJobVertex> triggerVertices =
            idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);

        // 獲取所有需要checkpoint確認(rèn)的頂點(diǎn),即所有的頂點(diǎn)
        List<ExecutionJobVertex> ackVertices =
            idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);

        // 獲取所有需要接收到提交checkpoint信息的頂點(diǎn),即所有的頂點(diǎn)
        List<ExecutionJobVertex> confirmVertices =
            idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);

        // Maximum number of remembered checkpoints
        // 獲取歷史記錄checkpoint最大數(shù)量
        int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);

        // 創(chuàng)建checkpoint狀態(tài)跟蹤器,和CheckpointCoordinator配合工作
        CheckpointStatsTracker checkpointStatsTracker =
            new CheckpointStatsTracker(
            historySize,
            ackVertices,
            snapshotSettings.getCheckpointCoordinatorConfiguration(),
            metrics);

        // load the state backend from the application settings
        // 獲取狀態(tài)后端的配置
        final StateBackend applicationConfiguredBackend;
        final SerializedValue<StateBackend> serializedAppConfigured =
            snapshotSettings.getDefaultStateBackend();

        if (serializedAppConfigured == null) {
            applicationConfiguredBackend = null;
        } else {
            try {
                // 根據(jù)應(yīng)用的配置獲取狀態(tài)后端
                applicationConfiguredBackend =
                    serializedAppConfigured.deserializeValue(classLoader);
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(
                    jobId, "Could not deserialize application-defined state backend.", e);
            }
        }

        final StateBackend rootBackend;
        try {
            // 獲取狀態(tài)后端配置
            // 如果應(yīng)用的狀態(tài)后端沒有配置,使用配置文件中的狀態(tài)后端
            // 如果配置文件中也沒有,使用默認(rèn)值
            rootBackend =
                StateBackendLoader.fromApplicationOrConfigOrDefault(
                applicationConfiguredBackend, jobManagerConfig, classLoader, log);
        } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
            throw new JobExecutionException(
                jobId, "Could not instantiate configured state backend", e);
        }
        
        // load the checkpoint storage from the application settings
        // 從app設(shè)置中加載checkpoint存儲配置
        final CheckpointStorage applicationConfiguredStorage;
        final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
            snapshotSettings.getDefaultCheckpointStorage();

        if (serializedAppConfiguredStorage == null) {
            applicationConfiguredStorage = null;
        } else {
            try {
                applicationConfiguredStorage =
                    serializedAppConfiguredStorage.deserializeValue(classLoader);
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(
                    jobId,
                    "Could not deserialize application-defined checkpoint storage.",
                    e);
            }
        }
        
        // 和狀態(tài)后端的配置類似,從應(yīng)用配置和flink配置文件中加載checkpoint存儲配置
        final CheckpointStorage rootStorage;
        try {
            rootStorage =
                CheckpointStorageLoader.load(
                applicationConfiguredStorage,
                null,
                rootBackend,
                jobManagerConfig,
                classLoader,
                log);
        } catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
            throw new JobExecutionException(
                jobId, "Could not instantiate configured checkpoint storage", e);
        }

        // instantiate the user-defined checkpoint hooks

        // 實(shí)例化用戶定義的checkpoint鉤子
        // 這些鉤子可以在恢復(fù)快照或者是觸發(fā)快照的時候執(zhí)行
        final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
            snapshotSettings.getMasterHooks();
        final List<MasterTriggerRestoreHook<?>> hooks;

        if (serializedHooks == null) {
            hooks = Collections.emptyList();
        } else {
            final MasterTriggerRestoreHook.Factory[] hookFactories;
            try {
                hookFactories = serializedHooks.deserializeValue(classLoader);
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(
                    jobId, "Could not instantiate user-defined checkpoint hooks", e);
            }

            final Thread thread = Thread.currentThread();
            final ClassLoader originalClassLoader = thread.getContextClassLoader();
            thread.setContextClassLoader(classLoader);

            try {
                hooks = new ArrayList<>(hookFactories.length);
                for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                    hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                }
            } finally {
                thread.setContextClassLoader(originalClassLoader);
            }
        }

        // 獲取checkpoint協(xié)調(diào)器的配置
        final CheckpointCoordinatorConfiguration chkConfig =
            snapshotSettings.getCheckpointCoordinatorConfiguration();

        // 為executionGraph應(yīng)用checkpoint的相關(guān)配置
        executionGraph.enableCheckpointing(
            chkConfig,
            triggerVertices,
            ackVertices,
            confirmVertices,
            hooks,
            checkpointIdCounter,
            completedCheckpointStore,
            rootBackend,
            checkpointStatsTracker,
            checkpointsCleaner);
    }

    // create all the metrics for the Execution Graph
    // 創(chuàng)建相關(guān)監(jiān)控項(xiàng),監(jiān)控任務(wù)運(yùn)行時間,重啟時間和停止時間
    metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
    metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
    metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));

    return executionGraph;
}

創(chuàng)建ExecutionGraph的主要步驟大致如下:

  • 獲取Job信息
  • 創(chuàng)建ExecutionGraph本體
  • 綁定JobGraph頂點(diǎn)
  • 設(shè)置Checkpoint配置
  • 設(shè)置狀態(tài)后端配置
  • 設(shè)置Checkpoint存儲
  • 設(shè)置Checkpoint鉤子
  • 設(shè)置作業(yè)監(jiān)控

ExecutionGraph構(gòu)造函數(shù)

  • jobInformation:作業(yè)信息的一個封裝,包含作業(yè)id,名稱,配置,用戶代碼和classpath等。
  • futureExecutor:異步執(zhí)行線程池。
  • ioExecutor:IO操作線程池。
  • rpcTimeout:RPC調(diào)用超時時間。
  • maxPriorAttemptsHistoryLength:保留在歷史記錄中的最大重試次數(shù)。
  • classLoader:用戶代碼類加載器。
  • blobWriter:用于將數(shù)據(jù)寫入blob server。
  • partitionReleaseStrategyFactory:IntermediateResultPartition釋放策略工廠類。
  • shuffleMaster:用于注冊IntermediateResultPartition(中間結(jié)果分區(qū)),向JobMaster注冊數(shù)據(jù)分區(qū)及它的生產(chǎn)者。
  • partitionTracker:用于追蹤和釋放分區(qū)。
  • partitionLocationConstraint:限制在部署的時候partition的位置可否未知。在批模式,分區(qū)未知可以未知,但是在流模式,分區(qū)位置必須是已知的。
  • executionDeploymentListener:執(zhí)行計(jì)劃部署監(jiān)聽器
  • executionStateUpdateListener:執(zhí)行計(jì)劃更新監(jiān)聽器
  • initializationTimestamp:初始時間戳
  • vertexAttemptNumberStore:用于儲存每個Job頂點(diǎn)重試次數(shù)

attachJobGraph方法

該方法將JobGraph綁定到ExecutionGraph。

@Override
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

    // 檢查在JobMaster主線程執(zhí)行
    assertRunningInJobMasterMainThread();

    LOG.debug(
        "Attaching {} topologically sorted vertices to existing job graph with {} "
        + "vertices and {} intermediate results.",
        topologiallySorted.size(),
        tasks.size(),
        intermediateResults.size());

    // 創(chuàng)建保存Execution作業(yè)頂點(diǎn)的集合
    final ArrayList<ExecutionJobVertex> newExecJobVertices =
        new ArrayList<>(topologiallySorted.size());
    final long createTimestamp = System.currentTimeMillis();

    for (JobVertex jobVertex : topologiallySorted) {

        // 如果有頂點(diǎn)是數(shù)據(jù)輸入頂點(diǎn)并且是無法停止的頂點(diǎn)
        // 則設(shè)置ExecutionGraph的數(shù)據(jù)源task屬性為無法停止
        if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
            this.isStoppable = false;
        }

        // create the execution job vertex and attach it to the graph
        // 創(chuàng)建出ExecutionJobVertex
        ExecutionJobVertex ejv =
            new ExecutionJobVertex(
            this,
            jobVertex,
            maxPriorAttemptsHistoryLength,
            rpcTimeout,
            createTimestamp,
            this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));

        // 設(shè)置前一個節(jié)點(diǎn)的IntermediateResult給當(dāng)前ejv
        // 完成連接到前置節(jié)點(diǎn)這個邏輯,即這個方法名的含義
        ejv.connectToPredecessors(this.intermediateResults);

        // 將job頂點(diǎn)ID和ejv作為鍵值對,放入ExecutionGraph
        ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
        
        // 如果previousTask不為空,說明兩個JobGraph的頂點(diǎn)具有相同的ID,為異常情況
        if (previousTask != null) {
            throw new JobException(
                String.format(
                    "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(), ejv, previousTask));
        }

        // 遍歷ejv中創(chuàng)建的IntermediateResult
        // 該IntermediateResult在ExecutionJobVertex構(gòu)造函數(shù)創(chuàng)建
        // 從ejv對應(yīng)JobVertex的IntermediateDataSets創(chuàng)建出IntermediateResult
        for (IntermediateResult res : ejv.getProducedDataSets()) {
            IntermediateResult previousDataSet =
                this.intermediateResults.putIfAbsent(res.getId(), res);
            // 同理,檢查result的ID不能重復(fù)
            if (previousDataSet != null) {
                throw new JobException(
                    String.format(
                        "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(), res, previousDataSet));
            }
        }

        // 該集合按照頂點(diǎn)創(chuàng)建順序保存ejv,將ejv保存起來
        this.verticesInCreationOrder.add(ejv);
        // 統(tǒng)計(jì)總的頂點(diǎn)數(shù),作業(yè)實(shí)際執(zhí)行的時候,每個并行度都會部署一個vertex運(yùn)行task
        // 因此需要累加各個ejv的并行度
        this.numVerticesTotal += ejv.getParallelism();
        newExecJobVertices.add(ejv);
    }

    // 注冊所有的ExecutionVertex和它輸出數(shù)據(jù)的IntermediateResultPartition
    // ExecutionVertex為物理執(zhí)行節(jié)點(diǎn),一個ExecutionJobVertex有多少并行度,就會包含多少個ExecutionVertex
    registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);

    // the topology assigning should happen before notifying new vertices to failoverStrategy
    // 創(chuàng)建執(zhí)行拓?fù)?    //執(zhí)行拓?fù)浒蠩xecutionVertex,ResultPartition以及PipelinedRegion
    executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);

    // 創(chuàng)建分區(qū)釋放策略
    partitionReleaseStrategy =
        partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

Pipelined Region

創(chuàng)建executionTopology我們會遇到Pipelined region。在解釋這個概念前需要了解下pipelined result和blocking result的區(qū)別。

Pipelined result指的是數(shù)據(jù)從管道中源源不斷的流出,下游可以連續(xù)消費(fèi)產(chǎn)生的數(shù)據(jù)。一旦上游產(chǎn)生數(shù)據(jù),下游就可以立即開始消費(fèi)。Pipelined result生產(chǎn)數(shù)據(jù)的過程永遠(yuǎn)不會停止。此類型對應(yīng)的作業(yè)為流計(jì)算作業(yè)。

Blocking result只能等到上游數(shù)據(jù)生產(chǎn)過程結(jié)束的時候才可以消費(fèi)。Blocking result永遠(yuǎn)是有限的。典型的場景是批處理作業(yè)。

Pipelined Region是ExecutionGraph的一部分。它包含連續(xù)多個pipeline類型數(shù)據(jù)交換task(生成pipelined result)。因此一個ExecutionGraph可被分隔為多個pipelined Region,他們之間有block類型作業(yè)相連接。

Pipelined Region的意義是region內(nèi)部的所有消費(fèi)者必須持續(xù)消費(fèi)上游生產(chǎn)者產(chǎn)生的數(shù)據(jù),從而避免阻塞上游,或者產(chǎn)生反壓。所以說同一個pipelined Region內(nèi)的所有task啟動或失敗之時都必須同時被調(diào)度或重啟。

Pipelined region調(diào)度的官網(wǎng)解釋請參見:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html

enableCheckpointing方法

該方法為ExecutionGraph初始化檢查點(diǎn)相關(guān)配置。主要邏輯是創(chuàng)建和配置CheckpointCoordinator對象。代碼如下所示:

@Override
public void enableCheckpointing(
    CheckpointCoordinatorConfiguration chkConfig,
    List<MasterTriggerRestoreHook<?>> masterHooks,
    CheckpointIDCounter checkpointIDCounter,
    CompletedCheckpointStore checkpointStore,
    StateBackend checkpointStateBackend,
    CheckpointStorage checkpointStorage,
    CheckpointStatsTracker statsTracker,
    CheckpointsCleaner checkpointsCleaner) {

    // 檢查作業(yè)必須處于已創(chuàng)建狀態(tài)
    checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
    // 檢查CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器)必須未創(chuàng)建,避免重復(fù)操作
    checkState(checkpointCoordinator == null, "checkpointing already enabled");

    // 收集各個ExecutionJobVertex的OperatorCoordinator
    // OperatorCoordinator運(yùn)行在JobManager,與作業(yè)vertex中的operator相關(guān)聯(lián)。用于和operator交互
    final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators =
        buildOpCoordinatorCheckpointContexts();

    checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");

    // 創(chuàng)建checkpoint失敗管理器,負(fù)責(zé)在checkpoint失敗時候調(diào)用處理邏輯
    CheckpointFailureManager failureManager =
        new CheckpointFailureManager(
        chkConfig.getTolerableCheckpointFailureNumber(),
        new CheckpointFailureManager.FailJobCallback() {
            @Override
            public void failJob(Throwable cause) {
                getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
            }

            @Override
            public void failJobDueToTaskFailure(
                Throwable cause, ExecutionAttemptID failingTask) {
                getJobMasterMainThreadExecutor()
                    .execute(
                    () ->
                    failGlobalIfExecutionIsStillRunning(
                        cause, failingTask));
            }
        });

    // 創(chuàng)建CheckpointCoordinator周期自動觸發(fā)checkpoint的定時器
    checkState(checkpointCoordinatorTimer == null);

    checkpointCoordinatorTimer =
        Executors.newSingleThreadScheduledExecutor(
        new DispatcherThreadFactory(
            Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));

    // 創(chuàng)建CheckpointCoordinator,負(fù)責(zé)協(xié)調(diào)整個集群范圍內(nèi)所有operator的checkpoint操作,發(fā)起checkpoint操作和提交checkpoint
    // create the coordinator that triggers and commits checkpoints and holds the state
    checkpointCoordinator =
        new CheckpointCoordinator(
        jobInformation.getJobId(),
        chkConfig,
        operatorCoordinators,
        checkpointIDCounter,
        checkpointStore,
        checkpointStorage,
        ioExecutor,
        checkpointsCleaner,
        new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
        SharedStateRegistry.DEFAULT_FACTORY,
        failureManager,
        createCheckpointPlanCalculator(),
        new ExecutionAttemptMappingProvider(getAllExecutionVertices()));

    // register the master hooks on the checkpoint coordinator
    // 設(shè)置主消息鉤子,在創(chuàng)建checkpoint或從checkpoint恢復(fù)的時候回調(diào)
    for (MasterTriggerRestoreHook<?> hook : masterHooks) {
        if (!checkpointCoordinator.addMasterHook(hook)) {
            LOG.warn(
                "Trying to register multiple checkpoint hooks with the name: {}",
                hook.getIdentifier());
        }
    }

    // 配置checkpoint狀態(tài)跟蹤器
    checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);

    // interval of max long value indicates disable periodic checkpoint,
    // the CheckpointActivatorDeactivator should be created only if the interval is not max
    // value
    // 如果沒有禁用周期性觸發(fā)checkpoint,注冊一個作業(yè)狀態(tài)監(jiān)聽器
    // 該listener為CheckpointCoordinator所用,監(jiān)聽作業(yè)狀態(tài)的變化
    if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
        // the periodic checkpoint scheduler is activated and deactivated as a result of
        // job status changes (running -> on, all other states -> off)
        registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
    }

    // 配置狀態(tài)后端名稱和checkpoint存儲名稱
    this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
    this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
}

從ExecutionGraph到部署Task

ExecutionVertex每次執(zhí)行都會創(chuàng)建出一個Execution對象。

JobManager啟動完畢之后,會對Scheduler發(fā)出開始調(diào)度的命令(調(diào)用SchedulerBasestartScheduling方法)。經(jīng)過中間層層調(diào)用(較為復(fù)雜,這里暫時省略),最終到達(dá)Execution.deploy方法。

Execution.deploy方法為真正的部署運(yùn)行邏輯,根據(jù)task資源和ExecutionVertex構(gòu)造出一個task部署描述符。這個部署描述符的作用為攜帶task執(zhí)行配置,通過RPC的方式傳遞給TaskManager,從而創(chuàng)建出一個符合要求的task。

public void deploy() throws JobException {
    // 確保在JobMaster主線程執(zhí)行
    assertRunningInJobMasterMainThread();

    // 獲取分配的資源
    final LogicalSlot slot = assignedResource;

    checkNotNull(
        slot,
        "In order to deploy the execution we first have to assign a resource via tryAssignResource.");

    // Check if the TaskManager died in the meantime
    // This only speeds up the response to TaskManagers failing concurrently to deployments.
    // The more general check is the rpcTimeout of the deployment call
    if (!slot.isAlive()) {
        throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
    }

    // make sure exactly one deployment call happens from the correct state
    // note: the transition from CREATED to DEPLOYING is for testing purposes only
    // 獲取之前的狀態(tài),并切換狀態(tài)為正在部署(DEPLOYING)
    ExecutionState previous = this.state;
    if (previous == SCHEDULED || previous == CREATED) {
        if (!transitionState(previous, DEPLOYING)) {
            // race condition, someone else beat us to the deploying call.
            // this should actually not happen and indicates a race somewhere else
            throw new IllegalStateException(
                "Cannot deploy task: Concurrent deployment call race.");
        }
    } else {
        // vertex may have been cancelled, or it was already scheduled
        throw new IllegalStateException(
            "The vertex must be in CREATED or SCHEDULED state to be deployed. Found state "
            + previous);
    }

    // 檢查這個slot資源是否分配給了當(dāng)前execution
    if (this != slot.getPayload()) {
        throw new IllegalStateException(
            String.format(
                "The execution %s has not been assigned to the assigned slot.", this));
    }

    try {

        // race double check, did we fail/cancel and do we need to release the slot?
        // 再次檢查作業(yè)狀態(tài)是否為正在部署
        if (this.state != DEPLOYING) {
            slot.releaseSlot(
                new FlinkException(
                    "Actual state of execution "
                    + this
                    + " ("
                    + state
                    + ") does not match expected state DEPLOYING."));
            return;
        }

        LOG.info(
            "Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",
            vertex.getTaskNameWithSubtaskIndex(),
            attemptNumber,
            vertex.getCurrentExecutionAttempt().getAttemptId(),
            getAssignedResourceLocation(),
            slot.getAllocationId());

        // 創(chuàng)建一個Task部署描述符
        // 該部署描述符攜帶了ExecutionVertex及其分配的資源等信息
        final TaskDeploymentDescriptor deployment =
            TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
            .createDeploymentDescriptor(
            slot.getAllocationId(),
            taskRestore,
            producedPartitions.values());

        // null taskRestore to let it be GC'ed
        taskRestore = null;

        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

        final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
            vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();

        getVertex().notifyPendingDeployment(this);
        // We run the submission in the future executor so that the serialization of large TDDs
        // does not block
        // the main thread and sync back to the main thread once submission is completed.
        // 在這里,異步調(diào)用taskManagerGateway,通過rpc方式通知TaskManager
        // 將Task部署描述符發(fā)送給TaskManager
        // TaskManager接收到后開始部署Task
        CompletableFuture.supplyAsync(
            () -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
            .thenCompose(Function.identity())
            .whenCompleteAsync(
            (ack, failure) -> {
                if (failure == null) {
                    vertex.notifyCompletedDeployment(this);
                } else {
                    if (failure instanceof TimeoutException) {
                        String taskname =
                            vertex.getTaskNameWithSubtaskIndex()
                            + " ("
                            + attemptId
                            + ')';

                        markFailed(
                            new Exception(
                                "Cannot deploy task "
                                + taskname
                                + " - TaskManager ("
                                + getAssignedResourceLocation()
                                + ") not responding after a rpcTimeout of "
                                + rpcTimeout,
                                failure));
                    } else {
                        markFailed(failure);
                    }
                }
            },
            jobMasterMainThreadExecutor);

    } catch (Throwable t) {
        markFailed(t);
    }
}

上面方法中通過TaskManagerGateway調(diào)用了TaskExecutorsubmitTask方法。

@Override
public CompletableFuture<Acknowledge> submitTask(
    TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
    // ...
    // 創(chuàng)建出Task
    Task task =
                    new Task(
                            jobInformation,
                            taskInformation,
                            tdd.getExecutionAttemptId(),
                            tdd.getAllocationId(),
                            tdd.getSubtaskIndex(),
                            tdd.getAttemptNumber(),
                            tdd.getProducedPartitions(),
                            tdd.getInputGates(),
                            memoryManager,
                            taskExecutorServices.getIOManager(),
                            taskExecutorServices.getShuffleEnvironment(),
                            taskExecutorServices.getKvStateService(),
                            taskExecutorServices.getBroadcastVariableManager(),
                            taskExecutorServices.getTaskEventDispatcher(),
                            externalResourceInfoProvider,
                            taskStateManager,
                            taskManagerActions,
                            inputSplitProvider,
                            checkpointResponder,
                            taskOperatorEventGateway,
                            aggregateManager,
                            classLoaderHandle,
                            fileCache,
                            taskManagerConfiguration,
                            taskMetricGroup,
                            resultPartitionConsumableNotifier,
                            partitionStateChecker,
                            getRpcService().getExecutor());
// ...
}

到這里為止,TaskManager中的具體任務(wù)Task對象已經(jīng)被創(chuàng)建出來。從JobGraph生成ExecutionGraph并最終部署為Task的過程已分析完畢。

本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容