Flink JobManager 詳解

JobManager 詳解

JobMaster 在實(shí)現(xiàn)中,也依賴了很多的服務(wù),其中最重要的是 SchedulerNGSlotPool,JobMaster 對(duì)外提供的接口實(shí)現(xiàn)中大都是使用前面這兩個(gè)服務(wù)的方法。

// JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    // LegacyScheduler: 用于調(diào)度作業(yè)的 ExecutionGraph
    private SchedulerNG schedulerNG;
    // SlotPoolImpl: 從名字也能看出它主要處理 slot 相關(guān)的內(nèi)容,在 JM 這邊的一個(gè)抽象
    private final SlotPool slotPool;
    // HA 服務(wù),這里主要用于監(jiān)控 RM leader,如果 RM Leader 有變化,這里會(huì)與新的 leader 建立連接
    private final HighAvailabilityServices highAvailabilityServices;

    /**
    * 下面這些都是創(chuàng)建上面 SchedulerNG(即 LegacyScheduler)需要使用到的服務(wù)
    */
    // 用于將數(shù)據(jù)上傳到 BlobServer,這里上傳的主要是 JobInformation 和 TaskInformation
    private final BlobWriter blobWriter;
    // 作業(yè)的 JobGraph 信息
    private final JobGraph jobGraph;
    // SchedulerImpl: 它也是一個(gè)調(diào)度器,將 slot 分配給對(duì)應(yīng)的 task,它會(huì)調(diào)用 SlotPool 的相關(guān)接口(它里面有一個(gè) slotSelectionStrategy 對(duì)象,用來決定一個(gè) slot 分配的最佳算法)
    private final Scheduler scheduler;
    // 用于注冊(cè) Intermediate result partition,在作業(yè)調(diào)度的時(shí)候會(huì)用到
    private final ShuffleMaster<?> shuffleMaster;
    // 用于追蹤 Intermediate result partition 的服務(wù)
    private final PartitionTracker partitionTracker;
    // --------- BackPressure --------
    private final BackPressureStatsTracker backPressureStatsTracker;
}

JobMaster 中涉及到重要組件如下圖所示:

JobMaster 中的組件組成

JobMaster 主要有兩個(gè)服務(wù):

  1. LegacyScheduler: ExecutionGraph 相關(guān)的調(diào)度都是在這里實(shí)現(xiàn)的,它類似更深層的抽象,封裝了 ExecutionGraph 和 BackPressureStatsTracker,JobMaster 不直接去調(diào)用 ExecutionGraph 和 BackPressureStatsTracker 的相關(guān)方法,都是通過 LegacyScheduler 間接去調(diào)用;
  2. SlotPool: 它是 JobMaster 管理其 slot 的服務(wù),它負(fù)責(zé)向 RM 申請(qǐng)/釋放 slot 資源,并維護(hù)其相應(yīng)的 slot 信息。

從前面的圖中可以看出,如果 LegacyScheduler 想調(diào)用 CheckpointCoordinator 的方法,比如 LegacySchedulertriggerSavepoint() 方法,它是需要先通過 executionGraphgetCheckpointCoordinator() 方法拿到 CheckpointCoordinator,然后再調(diào)用 CheckpointCoordinatortriggerSavepoint() 方法來觸發(fā)這個(gè)作業(yè)的 savepoint。

JobMaster 的 API 概述

目前 JobMaster 對(duì)外提供的 API 列表如下(主要還是 JobMasterGateway 接口對(duì)應(yīng)的實(shí)現(xiàn)):

  1. cancel(): 取消當(dāng)前正在執(zhí)行的作業(yè),如果作業(yè)還在調(diào)度,會(huì)執(zhí)行停止,如果作業(yè)正在運(yùn)行的話,它會(huì)向?qū)?yīng)的 TM 發(fā)送取消 task 的請(qǐng)求(cancelTask() 請(qǐng)求);
  2. updateTaskExecutionState(): 更新某個(gè) task 的狀態(tài)信息,這個(gè)是 TM 主動(dòng)向 JM 發(fā)送的更新請(qǐng)求;
  3. requestNextInputSplit(): Source ExecutionJobVertex 請(qǐng)求 next InputSlipt,這個(gè)一般是針對(duì)批處理讀取而言,有興趣的可以看下 FLIP-27: Refactor Source Interface,這里是社區(qū)計(jì)劃對(duì) Source 做的改進(jìn),未來會(huì)將批和流統(tǒng)一到一起;
  4. requestPartitionState(): 獲取指定 Result Partition 對(duì)應(yīng)生產(chǎn)者 JobVertex 的執(zhí)行狀態(tài);
  5. scheduleOrUpdateConsumers(): TM 通知 JM 對(duì)應(yīng)的 Result Partition 的數(shù)據(jù)已經(jīng)可用,每個(gè) ExecutionVertex 的每個(gè) ResultPartition 都會(huì)調(diào)用一次這個(gè)方法(可能是在第一次生產(chǎn)數(shù)據(jù)時(shí)調(diào)用或者所有數(shù)據(jù)已經(jīng)就緒時(shí)調(diào)用);
  6. disconnectTaskManager(): TM 心跳超時(shí)或者作業(yè)取消時(shí),會(huì)調(diào)用這個(gè)方法,JM 會(huì)釋放這個(gè) TM 上的所有 slot 資源;
  7. acknowledgeCheckpoint(): 當(dāng)一個(gè) Task 做完 snapshot 后,通過這個(gè)接口通知 JM,JM 再做相應(yīng)的處理,如果這個(gè) checkpoint 所有的 task 都已經(jīng) ack 了,那就意味著這個(gè) checkpoint 完成了;
  8. declineCheckpoint(): TM 向 JM 發(fā)送這個(gè)消息,告訴 JM 的 Checkpoint Coordinator 這個(gè) checkpoint request 沒有響應(yīng),比如:TM 觸發(fā) checkpoint 失敗,然后 Checkpoint Coordinator 就會(huì)知道這個(gè) checkpoint 處理失敗了,再做相應(yīng)的處理;
  9. requestKvStateLocation(): 請(qǐng)求某個(gè)注冊(cè)過 registrationName 對(duì)應(yīng)的 KvState 的位置信息;
  10. notifyKvStateRegistered(): 當(dāng)注冊(cè)一個(gè) KvState 的時(shí)候,會(huì)調(diào)用這個(gè)方法,一些 operator 在初始化的時(shí)候會(huì)調(diào)用這個(gè)方法注冊(cè)一個(gè) KvState;
  11. notifyKvStateUnregistered(): 取消一個(gè) KVState 的注冊(cè),這里是在 operator 關(guān)閉 state backend 時(shí)調(diào)用的(比如:operator 的生命周期結(jié)束了,就會(huì)調(diào)用這個(gè)方法);
  12. offerSlots(): TM 通知 JM 其上分配到的 slot 列表;
  13. failSlot(): 如果 TM 分配 slot 失敗(情況可能很多,比如:slot 分配時(shí)狀態(tài)轉(zhuǎn)移失敗等),將會(huì)通過這個(gè)接口告知 JM;
  14. registerTaskManager(): 向這個(gè) JM 注冊(cè) TM,JM 會(huì)將 TM 注冊(cè)到 SlotPool 中(只有注冊(cè)過的 TM 的 Slot 才被認(rèn)為是有效的,才可以做相應(yīng)的分配),并且會(huì)通過心跳監(jiān)控對(duì)應(yīng)的 TM;
  15. disconnectResourceManager(): 與 ResourceManager 斷開連接,這個(gè)是有三種情況會(huì)觸發(fā),JM 與 ResourceManager 心跳超時(shí)、作業(yè)取消、重連 RM 時(shí)會(huì)斷開連接(比如:RM leader 切換、RM 的心跳超時(shí));
  16. heartbeatFromTaskManager(): TM 向 JM 發(fā)送心跳信息;
  17. heartbeatFromResourceManager(): JM 向 ResourceManager 發(fā)送一個(gè)心跳信息,ResourceManager 只會(huì)監(jiān)聽 JM 是否超時(shí);
  18. requestJobDetails(): 請(qǐng)求這個(gè)作業(yè)的 JobDetails(作業(yè)的概況信息,比如:作業(yè)執(zhí)行了多長(zhǎng)時(shí)間、作業(yè)狀態(tài)等);
  19. requestJobStatus(): 請(qǐng)求這個(gè)作業(yè)的執(zhí)行狀態(tài) JobStatus;
  20. requestJob(): 請(qǐng)求這個(gè)作業(yè)的 ArchivedExecutionGraph(它是 ExecutionGraph 序列化之后的結(jié)果);
  21. triggerSavepoint(): 對(duì)這個(gè)作業(yè)觸發(fā)一次 savepoint;
  22. stopWithSavepoint(): 停止作業(yè)前觸發(fā)一次 savepoint(觸發(fā)情況是:用戶手動(dòng)停止作業(yè)時(shí)指定一個(gè) savepoint 路徑,這樣的話,會(huì)在停止前做一次 savepoint);
  23. requestOperatorBackPressureStats(): 匯報(bào)某個(gè) operator 反壓的情況;
  24. notifyAllocationFailure(): 如果 RM 分配 slot 失敗的話,將會(huì)通過這個(gè)接口通知 JM;

這里可以看到有部分接口的方法是在跟 RM 通信使用的,所以在 RM 的接口中也可以看到對(duì)應(yīng)的方法。另外,JobMaster 上面這些方法在實(shí)現(xiàn)時(shí)基本都是在調(diào)用 LegacySchedulerSlotPool 的具體實(shí)現(xiàn)方法來實(shí)現(xiàn)的。

SlotPool

SlotPool 是為當(dāng)前作業(yè)的 slot 請(qǐng)求而服務(wù)的,它會(huì)向 ResourceManager 請(qǐng)求 slot 資源;SlotPool 會(huì)維護(hù)請(qǐng)求到的 slot 列表信息(即使 ResourceManager 掛掉了,SlotPool 也可以使用當(dāng)前作業(yè)空閑的 slot 資源進(jìn)行分配),而如果一個(gè) slot 不再使用的話,即使作業(yè)在運(yùn)行,也是可以釋放掉的(所有的 slot 都是通過 AllocationID 來區(qū)分的)。

目前 SlotPool 提供的 API 列表如下:

  1. connectToResourceManager(): SlotPool 與 ResourceManager 建立連接,之后 SlotPool 就可以向 ResourceManager 請(qǐng)求 slot 資源了;
  2. disconnectResourceManage(): SlotPool 與 ResourceManager 斷開連接,這個(gè)方法被調(diào)用后,SlotPool 就不能從 ResourceManager 請(qǐng)求 slot 資源了,并且所有正在排隊(duì)等待的 Slot Request 都被取消;
  3. allocateAvailableSlot(): 將指定的 Slot Request 分配到指定的 slot 上,這里只是記錄其對(duì)應(yīng)關(guān)系(哪個(gè) slot 對(duì)應(yīng)哪個(gè) slot 請(qǐng)求);
  4. releaseSlot(): 釋放一個(gè) slot;
  5. requestNewAllocatedSlot(): 從 RM 請(qǐng)求一個(gè)新的 slot 資源分配,申請(qǐng)到的 slot 之后也會(huì)添加到 SlotPool 中;
  6. requestNewAllocatedBatchSlot(): 上面的方法是 Stream 類型,這里是 batch 類型,但向 RM 申請(qǐng)的時(shí)候,這里并沒有區(qū)別,只是為了做相應(yīng)的標(biāo)識(shí);
  7. getAvailableSlotsInformation(): 獲取當(dāng)前可用的 slot 列表;
  8. failAllocation(): 分配失敗,并釋放相應(yīng)的 slot,可能是因?yàn)檎?qǐng)求超時(shí)由 JM 觸發(fā)或者 TM 分配失敗;
  9. registerTaskManager(): 注冊(cè) TM,這里會(huì)記錄一下注冊(cè)過來的 TM,只能向注冊(cè)過來的 TM 分配 slot;
  10. releaseTaskManager(): 注銷 TM,這個(gè) TM 相關(guān)的 slot 都會(huì)被釋放,task 將會(huì)被取消,SlotPool 會(huì)通知相應(yīng)的 TM 釋放其 slot;
  11. createAllocatedSlotReport(): 匯報(bào)指定 TM 上的 slot 分配情況;

通過上面 SlotPool 對(duì)外提供的 API 列表,可以看到其相關(guān)方法都是跟 Slot 相關(guān)的,整體可以分為下面幾部分:

  1. 與 ResourceManager 建立/取消 連接;
  2. 注冊(cè)/注銷 TM,這里只是記錄注冊(cè)過 TM 列表,只有是注冊(cè)過的 TM 才允許使用其上面的 slot 資源;
  3. 向 ResourceManager 請(qǐng)求 slot 資源;
  4. 分配/釋放 slot,這里只是更新其狀態(tài)信息,并不做實(shí)質(zhì)的操作。

SlotPool 這里,更多只是維護(hù)一個(gè)狀態(tài)信息,以及與 ResourceManager(請(qǐng)求 slot 資源)和 TM(釋放對(duì)應(yīng)的 slot)做一些交互工作,它對(duì)這些功能做了相應(yīng)的封裝,方便 JobMaster 來調(diào)用。

LegacyScheduler

如前面所述,LegacyScheduler 其實(shí)是對(duì) ExecutionGraphBackPressureStatsTracker 方法的一個(gè)抽象,它還負(fù)責(zé)為作業(yè)創(chuàng)建對(duì)應(yīng)的 ExecutionGraph 以及對(duì)這個(gè)作業(yè)進(jìn)行調(diào)度。關(guān)于 LegacyScheduler 提供的 API 這里就不再展開,有興趣的可以直接看下源碼,它提供的大部分 API 都是在 JobMaster 的 API 列表中,因?yàn)?JobMaster 的很多方法實(shí)現(xiàn)本身就是調(diào)用 LegacyScheduler 對(duì)應(yīng)的方法。

作業(yè)調(diào)度的詳細(xì)流程

有了前面的講述,這里看下一個(gè)新提交的作業(yè),JobMaster 是如何調(diào)度起來的。當(dāng) JobMaster 調(diào)用 LegacyScheduler 的 startScheduling() 方法后,就會(huì)開始對(duì)這個(gè)作業(yè)進(jìn)行相應(yīng)的調(diào)度,申請(qǐng)對(duì)應(yīng)的 slot,并部署 task,其實(shí)現(xiàn)如下:

// LegacyScheduler.java
//note: ExecutionGraph 開始調(diào)度
@Override
public void startScheduling() {
    //note: 啟動(dòng)這個(gè)線程
    mainThreadExecutor.assertRunningInMainThread();

    try {
        //note: 調(diào)度這個(gè) graph
        executionGraph.scheduleForExecution();
    }
    catch (Throwable t) {
        executionGraph.failGlobal(t);
    }
}

一個(gè)作業(yè)開始調(diào)度后詳細(xì)流程如下圖所示(其中比較核心方法已經(jīng)標(biāo)成黃顏色):

一個(gè)作業(yè)調(diào)度的詳細(xì)流程

ExecutionGraph 通過 scheduleForExecution() 方法對(duì)這個(gè)作業(yè)調(diào)度執(zhí)行,其方法實(shí)現(xiàn)如下:

/note: 把 CREATED 狀態(tài)轉(zhuǎn)換為 RUNNING 狀態(tài),并做相應(yīng)的調(diào)度,如果有異常這里會(huì)拋出
public void scheduleForExecution() throws JobException {

    assertRunningInJobMasterMainThread();

    final long currentGlobalModVersion = globalModVersion;

    //note: 先將作業(yè)狀態(tài)轉(zhuǎn)移為 RUNNING
    if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

        //note: 這里會(huì)真正調(diào)度相應(yīng)的 Execution Graph
        final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
            scheduleMode,
            getAllExecutionVertices(),
            this);

        if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
            schedulingFuture = newSchedulingFuture;
            //note: 前面調(diào)度完成后,如果最后的結(jié)果有異常,這里會(huì)做相應(yīng)的處理
            newSchedulingFuture.whenComplete(
                (Void ignored, Throwable throwable) -> {
                    if (throwable != null) {
                        final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

                        if (!(strippedThrowable instanceof CancellationException)) {
                            // only fail if the scheduling future was not canceled
                            failGlobal(strippedThrowable);
                        }
                    }
                });
        } else {
            newSchedulingFuture.cancel(false);
        }
    }
    else {
        throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
    }
}

配合前面圖中的流程,接下來,看下這個(gè)作業(yè)在 SchedulingUtils 中是如何調(diào)度的:

// SchedulingUtils.java
public static CompletableFuture<Void> schedule(
        ScheduleMode scheduleMode,
        final Iterable<ExecutionVertex> vertices,
        final ExecutionGraph executionGraph) {

    switch (scheduleMode) {
        // LAZY 的意思是:是有上游數(shù)據(jù)就緒后,下游的 task 才能調(diào)度,這個(gè)主要是批場(chǎng)景會(huì)用到,流不能走這個(gè)模式
        case LAZY_FROM_SOURCES:
        case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
            return scheduleLazy(vertices, executionGraph);

        // 流默認(rèn)的是這個(gè)調(diào)度模式
        case EAGER:
            return scheduleEager(vertices, executionGraph);

        default:
            throw new IllegalStateException(String.format("Schedule mode %s is invalid.", scheduleMode));
    }
}

/**
 * Schedule vertices eagerly. That means all vertices will be scheduled at once.
 * note: 所有的節(jié)點(diǎn)會(huì)被同時(shí)調(diào)度
 *
 * @param vertices Topologically sorted vertices to schedule.
 * @param executionGraph The graph the given vertices belong to.
 */
public static CompletableFuture<Void> scheduleEager(
        final Iterable<ExecutionVertex> vertices,
        final ExecutionGraph executionGraph) {

    executionGraph.assertRunningInJobMasterMainThread();

    checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently");

    // Important: reserve all the space we need up front.
    // that way we do not have any operation that can fail between allocating the slots
    // and adding them to the list. If we had a failure in between there, that would
    // cause the slots to get lost

    // collecting all the slots may resize and fail in that operation without slots getting lost
    final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();

    final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
    final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
        computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider()));

    // allocate the slots (obtain all their futures)
    for (ExecutionVertex ev : vertices) {
        // these calls are not blocking, they only return futures
        //note: 給每個(gè) Execution 分配相應(yīng)的資源
        CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
            slotProviderStrategy,
            LocationPreferenceConstraint.ALL,
            allPreviousAllocationIds);

        allAllocationFutures.add(allocationFuture);
    }

    // this future is complete once all slot futures are complete.
    // the future fails once one slot future fails.
    final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

    return allAllocationsFuture.thenAccept(
        (Collection<Execution> executionsToDeploy) -> {
            for (Execution execution : executionsToDeploy) {
                try {
                    //note: 部署每個(gè) Execution
                    execution.deploy();
                } catch (Throwable t) {
                    throw new CompletionException(
                        new FlinkException(
                            String.format("Could not deploy execution %s.", execution),
                            t));
                }
            }
        })
        // Generate a more specific failure message for the eager scheduling
        .exceptionally(
            //...
        );
}

由于對(duì)于流作業(yè)來說,它默認(rèn)的調(diào)度模式(ScheduleMode)是 ScheduleMode.EAGER,也就是說,所有 task 會(huì)同時(shí)調(diào)度起來,上面的代碼里也可以看到調(diào)度的時(shí)候有兩個(gè)主要方法:

  1. allocateResourcesForExecution(): 它的作用是給這個(gè) Execution 分配資源,獲取要分配的 slot(它還會(huì)向 ShuffleMaster 注冊(cè) produced partition,這個(gè) shuffle 部分內(nèi)容后面文章再講述,這里就不展開了);
  2. deploy(): 這個(gè)方法會(huì)直接向 TM 提交這個(gè) task 任務(wù);

這里,主要展開一下 allocateResourcesForExecution() 方法的實(shí)現(xiàn),deploy() 的實(shí)現(xiàn)將會(huì)在后面 TaskManager 這篇文章中講述。

如何給 ExecutionVertex 分配 slot

通過前面的代碼,我們知道,allocateResourcesForExecution() 方法會(huì)給每一個(gè) ExecutionVertex 分配一個(gè) slot,而它具體是如何分配的,這個(gè)流程是在 Execution 的 allocateAndAssignSlotForExecution() 方法中實(shí)現(xiàn)的,代碼如下如下:

/**
 * Allocates and assigns a slot obtained from the slot provider to the execution.
 * note: 從 slot provider 獲取一個(gè) slot,將任務(wù)分配到這個(gè) slot 上
 *
 * @param slotProviderStrategy to obtain a new slot from
 * @param locationPreferenceConstraint constraint for the location preferences
 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
 *                                                 Can be empty if the allocation ids are not required for scheduling.
 * @return Future which is completed with the allocated slot once it has been assigned
 *          or with an exception if an error occurred.
 */
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
        SlotProviderStrategy slotProviderStrategy,
        LocationPreferenceConstraint locationPreferenceConstraint,
        @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {

    checkNotNull(slotProviderStrategy);

    assertRunningInJobMasterMainThread();

    //note: 獲取這個(gè) vertex 的相關(guān)信息
    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
    final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

    // sanity check
    //note: 做相應(yīng)的檢查
    if (locationConstraint != null && sharingGroup == null) {
        throw new IllegalStateException(
                "Trying to schedule with co-location constraint but without slot sharing allowed.");
    }

    // this method only works if the execution is in the state 'CREATED'
    //note: 這個(gè)只會(huì)在 CREATED 下工作
    if (transitionState(CREATED, SCHEDULED)) {

        final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;

        //note: 創(chuàng)建一個(gè) ScheduledUnit 對(duì)象(跟 sharingGroup/locationConstraint 都有關(guān)系)
        ScheduledUnit toSchedule = locationConstraint == null ?
                new ScheduledUnit(this, slotSharingGroupId) :
                new ScheduledUnit(this, slotSharingGroupId, locationConstraint);

        // try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
        //note: 如果能找到之前調(diào)度的 AllocationID,會(huì)盡量先重新調(diào)度在同一個(gè) slot 上
        ExecutionVertex executionVertex = getVertex();
        AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();

        Collection<AllocationID> previousAllocationIDs =
            lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();

        // calculate the preferred locations
        //note: 這里先根據(jù) state 和上游數(shù)據(jù)的輸入節(jié)點(diǎn)獲取這個(gè) Task Execution 的最佳 TM location
        final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
            calculatePreferredLocations(locationPreferenceConstraint);

        final SlotRequestId slotRequestId = new SlotRequestId();

        //note: 根據(jù)指定的需求分配這個(gè) slot
        final CompletableFuture<LogicalSlot> logicalSlotFuture =
            preferredLocationsFuture.thenCompose(
                (Collection<TaskManagerLocation> preferredLocations) ->
                    slotProviderStrategy.allocateSlot(
                        slotRequestId,
                        toSchedule,
                        new SlotProfile(
                            vertex.getResourceProfile(),
                            preferredLocations,
                            previousAllocationIDs,
                            allPreviousExecutionGraphAllocationIds)));

        // register call back to cancel slot request in case that the execution gets canceled
        releaseFuture.whenComplete(
            (Object ignored, Throwable throwable) -> {
                if (logicalSlotFuture.cancel(false)) {
                    slotProviderStrategy.cancelSlotRequest(
                        slotRequestId,
                        slotSharingGroupId,
                        new FlinkException("Execution " + this + " was released."));
                }
            });

        // This forces calls to the slot pool back into the main thread, for normal and exceptional completion
        //note: 返回 LogicalSlot
        return logicalSlotFuture.handle(
            (LogicalSlot logicalSlot, Throwable failure) -> {

                if (failure != null) {
                    throw new CompletionException(failure);
                }

                if (tryAssignResource(logicalSlot)) {
                    return logicalSlot;
                } else {
                    // release the slot
                    logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
                    throw new CompletionException(
                        new FlinkException(
                            "Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
                }
            });
    } else {
        // call race, already deployed, or already done
        throw new IllegalExecutionStateException(this, CREATED, state);
    }
}

這里,簡(jiǎn)單總結(jié)一下上面這個(gè)方法的流程:

  1. 狀態(tài)轉(zhuǎn)換,將這個(gè) Execution 的狀態(tài)(ExecutionState)從 CREATED 轉(zhuǎn)為 SCHEDULED 狀態(tài);
  2. 根據(jù)是否是一個(gè)有狀態(tài)的 operator 以及它上游輸入節(jié)點(diǎn)位置,來計(jì)算一個(gè)最佳的 TM 位置列表(TaskManagerLocation)列表;
  3. 如果這個(gè) Execution 之前有調(diào)度記錄,也就是說,這次由 failover 導(dǎo)致的重啟,這里會(huì)拿到上次調(diào)度的 TM 位置信息;
  4. 根據(jù) 2、3 拿到 TM 位置信息,去調(diào)用 SlotProviderStrategy 的 allocateSlot() 獲取要分配的 slot。

在 SchedulerImpl 去分配 slot 的時(shí)候,其實(shí)是會(huì)分兩種情況的:

  1. allocateSingleSlot(): 如果對(duì)應(yīng)的 task 節(jié)點(diǎn)沒有設(shè)置 SlotSharingGroup,會(huì)直接走這個(gè)方法,就不會(huì)考慮 share group 的情況,直接給這個(gè) task 分配對(duì)應(yīng)的 slot;
  2. allocateSharedSlot(): 如果對(duì)應(yīng)的 task 節(jié)點(diǎn)有設(shè)置 SlotSharingGroup,就會(huì)走到這個(gè)方法,在分配 slot 的時(shí)候,考慮的因素就會(huì)多一些。

分配時(shí)如何選擇最優(yōu)的 TM 列表

這里,我們先來看下如何給這個(gè) slot 選擇一個(gè)最佳的 TM 列表,具體的方法實(shí)現(xiàn)是在 Execution 中的 calculatePreferredLocations() 方法中實(shí)現(xiàn)的,其具體的實(shí)現(xiàn)如下:

// Execution.java
/**
 * Calculates the preferred locations based on the location preference constraint.
 * note: 根據(jù) LocationPreferenceConstraint 策略計(jì)算前置輸入節(jié)點(diǎn)的 TaskManagerLocation
 *
 * @param locationPreferenceConstraint constraint for the location preference
 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
 *      have been a resource assigned.
 */
@VisibleForTesting
public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    //note: 獲取一個(gè)最佳分配的 TM location 集合
    final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocations();
    final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;

    switch(locationPreferenceConstraint) {
        case ALL:
            //note: 默認(rèn)是 ALL,就是前面拿到的列表,這里都可以使用
            preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
            break;
        case ANY:
            //note: 遍歷所有 input,先獲取已經(jīng)完成 assign 的 input 列表
            final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());

            for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
                if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
                    //note: 在這個(gè) future 完成(沒有異常的情況下),這里會(huì)使用這個(gè) taskManagerLocation 對(duì)象
                    final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);

                    if (taskManagerLocation == null) {
                        throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
                    }

                    completedTaskManagerLocations.add(taskManagerLocation);
                }
            }

            preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
            break;
        default:
            throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
    }

    return preferredLocationsFuture;
}

從上面的實(shí)現(xiàn)可以看出,這里是先通過 ExecutionVertexgetPreferredLocations() 方法獲取一個(gè) TaskManagerLocation 列表,然后再根據(jù) LocationPreferenceConstraint 的模式做過濾,如果是 ALL,那么前面拿到的所有列表都會(huì)直接返回,而如果是 ANY,只會(huì)把那些已經(jīng)分配好的 input 節(jié)點(diǎn)的 TaskManagerLocation 返回。

這里,看下 ExecutionVertexgetPreferredLocations() 方法的實(shí)現(xiàn)邏輯:

// ExecutionVertex.java
/**
 * Gets the overall preferred execution location for this vertex's current execution.
 * The preference is determined as follows:
 *
 * <ol>
 *     <li>If the task execution has state to load (from a checkpoint), then the location preference
 *         is the location of the previous execution (if there is a previous execution attempt).
 *     <li>If the task execution has no state or no previous location, then the location preference
 *         is based on the task's inputs.
 * </ol>
 * note: 如果這個(gè) task Execution 是從 checkpoint 加載的狀態(tài),那么這個(gè) location preference 就是之前執(zhí)行的狀態(tài);
 * note: 如果這個(gè) task Execution 沒有狀態(tài)信息或之前的 location 記錄,這個(gè) location preference 依賴于 task 的輸入;
 *
 * <p>These rules should result in the following behavior:
 *
 * note: 1. 無狀態(tài) task 總是基于與輸入共享的方式調(diào)度;
 * note: 2. 有狀態(tài) task 基于與輸入共享的方式來初始化他們最開始的調(diào)度;
 * note: 3. 有狀態(tài) task 的重復(fù)執(zhí)行會(huì)盡量與他們的 state 共享執(zhí)行;
 * <ul>
 *     <li>Stateless tasks are always scheduled based on co-location with inputs.
 *     <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
 *     <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
 * </ul>
 */
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
    Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
    return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
}


/**
 * Gets the preferred location to execute the current task execution attempt, based on the state that the execution attempt will resume.
 * note: 根據(jù)這個(gè) Execution 試圖恢復(fù)的狀態(tài)來獲取當(dāng)前 task execution 的首選位置
 */
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() {
    TaskManagerLocation priorLocation;
    if (currentExecution.getTaskRestore() != null && (priorLocation = getLatestPriorLocation()) != null) {
        return Collections.singleton(CompletableFuture.completedFuture(priorLocation));
    }
    else {
        return null;
    }
}

這里簡(jiǎn)單介紹一下其處理邏輯:

  1. 如果這個(gè)作業(yè)是從 Checkpoint 恢復(fù)的話,這里會(huì)根據(jù)它之前的狀態(tài)信息獲取上次的位置信息,直接返回這個(gè)位置信息;
  2. 另一種情況是,根據(jù)這個(gè) ExecutionVertex 的 inputEdges,獲取其上游 ExecutionVertex 的位置信息列表,但是如果這個(gè)列表的數(shù)目超過閾值(默認(rèn)是 8),就會(huì)直接返回 null(上游過于分散,再根據(jù) input 位置信息去分配就沒有太大意義了)。

可以看出,在選取最優(yōu)的 TaskManagerLocation 列表時(shí),主要是根據(jù) state 和 input 的位置信息來判斷,會(huì)優(yōu)先選擇 state,也就是上次 checkpoint 中記錄的位置。

最優(yōu)的 slot 分配算法

在上面選擇了最優(yōu)的 TaskManagerLocation 列表后,這里來看下如何給 task 選擇具體的 slot,這個(gè)是在 SlotSelectionStrategy 中的 selectBestSlotForProfile() 方法中做的,目前 SlotSelectionStrategy 有兩個(gè)實(shí)現(xiàn)類:PreviousAllocationSlotSelectionStrategyLocationPreferenceSlotSelectionStrategy,這個(gè)是在 state.backend.local-recovery 參數(shù)中配置的,默認(rèn)是 false,選擇的是 PreviousAllocationSlotSelectionStrategy,如果配置為 true,那么就會(huì)選擇 PreviousAllocationSlotSelectionStrategy,這部分的邏輯如下:

// DefaultSchedulerFactory.java
@Nonnull
private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
    // 根據(jù) state.backend.local-recover 配置選擇
    if (configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) {
        return PreviousAllocationSlotSelectionStrategy.INSTANCE;
    } else {
        return LocationPreferenceSlotSelectionStrategy.INSTANCE;
    }
}

這里分別看下這兩個(gè)實(shí)現(xiàn)類的 selectBestSlotForProfile() 的實(shí)現(xiàn)邏輯:

  1. PreviousAllocationSlotSelectionStrategy: 它會(huì)根據(jù)上次的分配記錄,如果這個(gè)位置剛好在 SlotPool 的可用列表里,這里就會(huì)直接選這個(gè) slot,否則會(huì)走到 LocationPreferenceSlotSelectionStrategy 的處理邏輯;
  2. LocationPreferenceSlotSelectionStrategy: 這個(gè)是對(duì)可用的 slot 列表做打分,選擇分?jǐn)?shù)最高的(分?jǐn)?shù)相同的話,會(huì)選擇第一個(gè)),如果 slot 在前面得到的最優(yōu) TaskManagerLocation 列表中,分?jǐn)?shù)就會(huì)比較高。

allocateSharedSlot VS allocateSingleSlot

在分配 slot 時(shí),這里分為兩種情況:

  1. allocateSingleSlot(): 如果沒有設(shè)置 SlotSharingGroup 將會(huì)走到這個(gè)方法,直接給這個(gè) SlotRequestId 分配一個(gè) slot,具體選擇哪個(gè) slot 就是上面的邏輯;
  2. allocateSharedSlot(): 而如果設(shè)置了 SlotSharingGroup 就會(huì)走到這里,先根據(jù) SlotSharingGroupId 獲取或創(chuàng)建對(duì)應(yīng)的 SlotSharingManager,然后創(chuàng)建(或者根據(jù) SlotSharingGroup 獲?。┮粋€(gè)的 MultiTaskSlot(每個(gè) SlotSharingGroup 會(huì)對(duì)應(yīng)一個(gè) MultiTaskSlot 對(duì)象),這里再將這個(gè) task 分配到這個(gè) MultiTaskSlot 上(這個(gè)只是簡(jiǎn)單介紹,后面在調(diào)度模型文章中,將會(huì)詳細(xì)講述)。

小結(jié)

到這里,F(xiàn)link JobManager 的大部分內(nèi)容已經(jīng)講述完了,還有一些小點(diǎn)會(huì)在后面的系列文章中再給大家講述。這里總結(jié)一下,JobManager 主要是為一個(gè)具體的作業(yè)而服務(wù)的,它負(fù)責(zé)這個(gè)作業(yè)每個(gè) task 的調(diào)度、checkpoint/savepoint(后面 checkpoint 的文章中會(huì)詳述其流程)的觸發(fā)以及容錯(cuò)恢復(fù),它有兩個(gè)非常重點(diǎn)的服務(wù)組件 —— LegacySchedulerSlotPool,其中:

  1. LegacyScheduler: 它封裝了作業(yè)的 ExecutionGraph 以及 BackPressureStatsTracker 中的接口,它會(huì)負(fù)責(zé)這個(gè)作業(yè)具體調(diào)度、savepoint 觸發(fā)等工作;
  2. SlotPool: 它主要負(fù)責(zé)這個(gè)作業(yè) slot 相關(guān)的內(nèi)容,像與 ResourceManager 通信、分配或釋放 slot 資源等工作。

文章的后半部分,又總結(jié)了一個(gè)作業(yè)是如何調(diào)度起來的,首先是分配 slot,最后是通過 deploy() 接口向 TM 提交這個(gè) task,本文著重關(guān)注了 slot 的分配,task 的部署將會(huì)在下節(jié)的 TaskManager 詳解中給大家介紹。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容