Flink 源碼之 Slot

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

Slot 概念

Flink中的Slot是一組資源的集合,包含CPU核心數(shù),task堆內(nèi)存,task對外內(nèi)存,管理內(nèi)存和網(wǎng)絡(luò)內(nèi)存。同時slot也是Flink的資源分配單位。

一個TaskManager中包含一個或者多個Slot。根據(jù)slot共享配置,一個slot中可同時運行多個task。這些task以工作線程的形式存在于slot中。

TaskManager,Slot,Task和并行度parallelism的關(guān)系如下圖所示(引用官網(wǎng)的圖):

Flink Slot

Slot 相關(guān)的一些類

SchedulerImpl

SchedulerImpl負(fù)責(zé)為Execution節(jié)點的任務(wù)執(zhí)行分配slot。

在后面的分析中涉及到的SchedulerImpl兩個最重要的方法為allocateSlotallocateBatchSlot。這兩個方法的邏輯基本相同,只是前一個方法參數(shù)中多了分配slot超時時間。

具體分配slot的流程較為復(fù)雜,在后面分析slot申請流程的時候再講解。

SlotSharingManager

SlotSharingManager負(fù)責(zé)Slot共享。Slot共享指的是不同的task在同一個slot中運行。

SlotSharingManager維護(hù)了一個slot層級結(jié)構(gòu):其中根節(jié)點和層級結(jié)構(gòu)的中間節(jié)點為MultiTaskSlot。MultiTaskSlot可從屬于另一個MultiTaskSlot,同時它又包含多個MultiTaskSlotSingleTaskSlot,這樣就形成了層級結(jié)構(gòu)。SingleTaskSlot是slot層級結(jié)構(gòu)中的最底層節(jié)點,只能擁有一個parent作為它的父節(jié)點。

Slot共享正是通過這種層級結(jié)構(gòu)體現(xiàn)出來的。一個Slot被多個task共享,以Slot層級結(jié)構(gòu)表示就是一個MultiTaskSlot包含多個SingleTaskSlot

下面我們分析下幾個重要的方法。

createRootSlot

創(chuàng)建一個根節(jié)點slot,該Slot的類型為MultiTaskSlot。

@Nonnull
MultiTaskSlot createRootSlot(
    SlotRequestId slotRequestId,
    CompletableFuture<? extends SlotContext> slotContextFuture,
    SlotRequestId allocatedSlotRequestId) {
    LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);

    final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
    // 創(chuàng)建一個根節(jié)點
    // 這個方法同時將創(chuàng)建出的MultiTaskSlot存入到allTaskSlots和unresolvedRootSlots集合中
    final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(
        slotRequestId,
        allocatedSlotRequestId,
        slotContextFutureAfterRootSlotResolution);
    // 當(dāng)slotContextFuture完成后執(zhí)行
    // slotContextFuture是向SlotPool申請slot的過程
    // 這個future在SlotPoolImpl的tryFulfillSlotRequestOrMakeAvailable方法中complete
    FutureUtils.forward(
        slotContextFuture.thenApply(
            (SlotContext slotContext) -> {
                // add the root node to the set of resolved root nodes once the SlotContext future has
                // been completed and we know the slot's TaskManagerLocation
                // 此時slot已經(jīng)分配完畢,將該slot從unresolvedRootSlots集合移除
                // 存入到resolvedRootSlots集合中
                tryMarkSlotAsResolved(slotRequestId, slotContext);
                return slotContext;
            }),
        slotContextFutureAfterRootSlotResolution);

    return rootMultiTaskSlot;
}

SlotPool

SlotPool用于緩存slot。它接收ExecutionGraph發(fā)起的slot申請,為其分配執(zhí)行任務(wù)所需的slot。如果SlotPool無法處理slot請求,他會嘗試去連接ResourceManager獲取新的slot。如果ResourceManager目前狀態(tài)不可用,被ResourceManager拒絕或者是請求超時,則slot申請失敗。SlotPool緩存了一部分slot,在ResourceManager不可用的時候,SlotPool仍然可以提供已注冊的空閑slot。這些Slot只會在它們不再被使用的時候釋放掉。比如說作業(yè)在運行但仍有空閑slot這種情況。

啟動方法

SlotPoolJobMasterstartJobMasterServices中啟動。該方法中注冊了兩個周期任務(wù):檢測空閑的slot和批量檢測超時的slot

public void start(
    @Nonnull JobMasterId jobMasterId,
    @Nonnull String newJobManagerAddress,
    @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {

    this.jobMasterId = jobMasterId;
    this.jobManagerAddress = newJobManagerAddress;
    this.componentMainThreadExecutor = componentMainThreadExecutor;

    scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
    scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);

    if (log.isDebugEnabled()) {
        scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
    }
}

checkIdleSlot

該方法邏輯為SlotPool周期運行任務(wù)之一,用戶定期檢測空閑slot。

protected void checkIdleSlot() {

    // The timestamp in SlotAndTimestamp is relative
    // 獲取當(dāng)前時間
    final long currentRelativeTimeMillis = clock.relativeTimeMillis();

    // 創(chuàng)建用于保存空閑slot的集合
    final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());

    // 遍歷找出所有空閑的slot
    for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
        if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
            expiredSlots.add(slotAndTimestamp.slot);
        }
    }

    final FlinkException cause = new FlinkException("Releasing idle slot.");

    for (AllocatedSlot expiredSlot : expiredSlots) {
        // 獲取每個過期slot的allocation ID
        final AllocationID allocationID = expiredSlot.getAllocationId();
        // 移除該allocation id對應(yīng)的slot
        if (availableSlots.tryRemove(allocationID) != null) {

            log.info("Releasing idle slot [{}].", allocationID);
            // RPC調(diào)用空閑slot所在的TaskManager,通知去釋放掉這個slot
            final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
                allocationID,
                cause,
                rpcTimeout);

            // RPC調(diào)用完成執(zhí)行
            // 如果釋放slot出現(xiàn)異常,廢棄掉這個slot,下次心跳的時候向taskManager同步slot狀態(tài)
            FutureUtils.whenCompleteAsyncIfNotDone(
                freeSlotFuture,
                componentMainThreadExecutor,
                (Acknowledge ignored, Throwable throwable) -> {
                    if (throwable != null) {
                        // The slot status will be synced to task manager in next heartbeat.
                        log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.",
                                    allocationID, expiredSlot.getTaskManagerId(), throwable);
                    }
                });
        }
    }

    // 安排下一次調(diào)用時間,實現(xiàn)周期調(diào)用
    scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
}

checkBatchSlotTimeout

protected void checkBatchSlotTimeout() {
    // 如果沒開啟批量超時檢測,方法直接返回
    if (!batchSlotRequestTimeoutCheckEnabled) {
        return;
    }

    final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();

    if (!pendingBatchRequests.isEmpty()) {
        // 獲取積壓的slot請求
        final Set<ResourceProfile> allocatedResourceProfiles = getAllocatedResourceProfiles();

        //將這些slot申請按照資源要求進(jìn)行分組,和已分配過的slot的資源要求相同的分為一組,其余的在另一組
        final Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests
            .stream()
            .collect(Collectors.partitioningBy(canBeFulfilledWithAllocatedSlot(allocatedResourceProfiles)));

        // 提取出資源要求相同和不同的兩組積壓的slot請求
        final List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
        final List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);

        final long currentTimestamp = clock.relativeTimeMillis();

        // 標(biāo)記為可滿足要求
        for (PendingRequest fulfillableRequest : fulfillableRequests) {
            fulfillableRequest.markFulfillable();
        }

        for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
            // 更新請求為無法滿足,并設(shè)置時間
            unfulfillableRequest.markUnfulfillable(currentTimestamp);

            if (unfulfillableRequest.getUnfulfillableSince() + batchSlotTimeout.toMilliseconds() <= currentTimestamp) {
                // 如果請求已超時,調(diào)用超時處理邏輯,后面分析
                timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
            }
        }
    }

    // 安排下一次調(diào)用時間,實現(xiàn)周期調(diào)用
    scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
}

// 處理超時積壓請求的方法
protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
    log.info("Pending slot request [{}] timed out.", slotRequestId);
    // 從waitingForResourceManager和pendingRequests中移除這個request
    final PendingRequest pendingRequest = removePendingRequest(slotRequestId);

    // 異步拋出請求超時異常
    if (pendingRequest != null) {
        pendingRequest
            .getAllocatedSlotFuture()
            .completeExceptionally(new TimeoutException("Pending slot request timed out in SlotPool."));
    }
}

ResourceManager

和JobManager,TaskManager一樣,ResourceManager也是Flink中的一個重要角色。ResourceManager負(fù)責(zé)資源的分配和撤回,以及資源的登記保管。ResourceManager具有HA功能,可參與選主。ResourceManager還持有JobManager的連接。后來創(chuàng)建出的TaskManager可以通過registerTaskExecutor方法注冊到ResourceManager中。

ResourceManager中最為重要的成員為SlotManager??捎玫膕lot交由SlotManager維護(hù)。

ResourceManager本身是一個抽象類。它有兩個子類

  • StandaloneResourceManager:用于standalone模式部署的時候。
  • ActiveResourceManager:用于非standalone模式。其中有一個成員變量ResourceManagerDriver。ResourceManagerDriver有多個子類,分別對應(yīng)著支持Kubernetes, Mesos和Yarn。

SlotManager

SlotManager負(fù)責(zé)維護(hù)所有已注冊的slot。SlotManager統(tǒng)計了所有的已注冊slot,空閑的slot,積壓待分配的slot(pendingSlot),積壓的slot請求(pendingSlotRequest)以及以滿足的slot請求。

TaskSlotTable

Task Manager上的slot和task的分配表,是TaskSlot的容器。它維護(hù)了多個索引,用于快速訪問task和分配給它的slot。

下面我們分析下它的主要方法。

start 方法

在使用TaskSlotTable之前必須先啟動它。啟動方法為start如下所示:

start方法:

@Override
public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
    // 檢查狀態(tài),必須為CREATED
    Preconditions.checkState(
        state == State.CREATED,
        "The %s has to be just created before starting",
        TaskSlotTableImpl.class.getSimpleName());
    // 設(shè)置slotAction,下面分析
    this.slotActions = Preconditions.checkNotNull(initialSlotActions);
    // 設(shè)置主線程執(zhí)行器
    this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
    // 一個定時任務(wù),可以schedule多個事件,到期時通知對應(yīng)的timeout listener
    timerService.start(this);

    // 改變狀態(tài)為RUNNING,防止反復(fù)啟動
    state = State.RUNNING;
}

SlotAction

SlotAction包含了Slot分配動作的回調(diào)邏輯。該接口包含了兩個回調(diào)動作:

  • freeSlot:slot被釋放的時候回調(diào)
  • timeoutSlot:slot超時的時候回調(diào)

接口代碼如下:

public interface SlotActions {

    /**
     * Free the task slot with the given allocation id.
     *
     * @param allocationId to identify the slot to be freed
     */
    void freeSlot(AllocationID allocationId);

    /**
     * Timeout the task slot for the given allocation id. The timeout is identified by the given
     * ticket to filter invalid timeouts out.
     *
     * @param allocationId identifying the task slot to be timed out
     * @param ticket allowing to filter invalid timeouts out
     */
    void timeoutSlot(AllocationID allocationId, UUID ticket);
}

其中AllocationID是JobManager通過ResourceManager分配的物理Slot對應(yīng)的唯一標(biāo)識。在JobManager第一次請求的時候指定,重試的時候保持不變。這個ID用于TaskManager和ResourceManager追蹤和同步slot的分配狀態(tài)。和SlotRequestId不同的是,task從SlotPool中請求邏輯slot的時候使用SlotRequestId。由于存在slot共享的緣故,多個邏輯slot的請求可能映射到同一個物理slot請求。

SlotAction唯一的實現(xiàn)類是SlotActionsImpl,位于TaskExecutor.java中。稍后用到的時候在分析它。

allocateSlot 方法

為指定的job分配一個slot,使用指定的index。如果index為負(fù)數(shù)則使用自增的index。如果slot可以分配,返回true。

@Override
public boolean allocateSlot(
        int index,
        JobID jobId,
        AllocationID allocationId,
        ResourceProfile resourceProfile,
        Time slotTimeout) {
    // 檢查TaskSlotTable的狀態(tài)是否為RUNNING
    checkRunning();

    Preconditions.checkArgument(index < numberSlots);

    // 檢查這個allocation id是否已經(jīng)分配過slot
    // 如果分配過,直接返回
    TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
    if (taskSlot != null) {
        LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
        return false;
    }

    // 如果taskSlots列表包含這個index
    if (taskSlots.containsKey(index)) {
        // 獲取這個重復(fù)的taskslot
        TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
        LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
            index,
            duplicatedTaskSlot.getResourceProfile(),
            duplicatedTaskSlot.getJobId(),
            duplicatedTaskSlot.getAllocationId());
        // 只有在這個重復(fù)的taskslot的job id和allocation id相同的情況下,才允許分配
        return duplicatedTaskSlot.getJobId().equals(jobId) &&
            duplicatedTaskSlot.getAllocationId().equals(allocationId);
    } else if (allocatedSlots.containsKey(allocationId)) {
        // 如果allocation id已經(jīng)分配過slot,返回true
        // 這里有疑問,上面已經(jīng)檢測過是否已分配,不太可能進(jìn)入這個分支
        return true;
    }

    // 如果index大于等于0,使用默認(rèn)的ResourceProfile,否則使用方法傳入的resourceProfile
    resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;

    // 檢查是否還能夠分配出滿足條件的資源
    if (!budgetManager.reserve(resourceProfile)) {
        LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
                + "while the currently remaining available resources are {}, total is {}.",
            resourceProfile,
            budgetManager.getAvailableBudget(),
            budgetManager.getTotalBudget());
        return false;
    }

    // 創(chuàng)建一個新的TaskSlot
    taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);
    if (index >= 0) {
        // 存入taskSlots集合
        taskSlots.put(index, taskSlot);
    }

    // update the allocation id to task slot map
    // 加入到已分配slot的集合中
    allocatedSlots.put(allocationId, taskSlot);

    // register a timeout for this slot since it's in state allocated
    // 注冊slot的超時時間定時器,在slot超時后會調(diào)用超時處理邏輯
    timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());

    // add this slot to the set of job slots
    // 將slot和job id關(guān)聯(lián)起來
    Set<AllocationID> slots = slotsPerJob.get(jobId);

    if (slots == null) {
        slots = new HashSet<>(4);
        slotsPerJob.put(jobId, slots);
    }

    slots.add(allocationId);

    return true;
}

addTask

將task添加到slot中,通過allocation id匹配。

@Override
public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
    // 檢查TaskSlotTable是否在運行
    checkRunning();
    Preconditions.checkNotNull(task);

    // 從allocatedSlots集合獲取taskSlot
    TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());

    if (taskSlot != null) {
        // 如果taskSlot在運行狀態(tài),job id和allocation id與task的相同
        if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
            // 將task指定給taskslot,并且設(shè)定映射關(guān)系
            if (taskSlot.add(task)) {
                taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));

                return true;
            } else {
                return false;
            }
        } else {
            throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
        }
    } else {
        throw new SlotNotFoundException(task.getAllocationId());
    }
}

createSlotReport

這個方法返回當(dāng)前TaskManager中slot分配情況的報告。

返回的SlotReportTaskExecutor中一系列slot狀態(tài)的報告。

@Override
public SlotReport createSlotReport(ResourceID resourceId) {
    List<SlotStatus> slotStatuses = new ArrayList<>();

    // 獲取固定分配的slot狀態(tài)
    for (int i = 0; i < numberSlots; i++) {
        SlotID slotId = new SlotID(resourceId, i);
        SlotStatus slotStatus;
        if (taskSlots.containsKey(i)) {
            TaskSlot<T> taskSlot = taskSlots.get(i);

            slotStatus = new SlotStatus(
                slotId,
                taskSlot.getResourceProfile(),
                taskSlot.getJobId(),
                taskSlot.getAllocationId());
        } else {
            slotStatus = new SlotStatus(
                slotId,
                defaultSlotResourceProfile,
                null,
                null);
        }

        slotStatuses.add(slotStatus);
    }

    // 獲取自動分配的slot狀態(tài)
    for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
         // slot id小于0表示該slot是動態(tài)分配的
        if (taskSlot.getIndex() < 0) {
            SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
            SlotStatus slotStatus = new SlotStatus(
                slotID,
                taskSlot.getResourceProfile(),
                taskSlot.getJobId(),
                taskSlot.getAllocationId());
            slotStatuses.add(slotStatus);
        }
    }

    final SlotReport slotReport = new SlotReport(slotStatuses);

    return slotReport;
}

TaskSlot

TaskSlotTaskSlotTable中維護(hù)的Slot的類型的包裝類。TaskSlot是一個容器,內(nèi)部有一個tasks變量,負(fù)責(zé)維護(hù)屬于同一個slot的所有tasks。

TaskSlot 的成員變量

/** Index of the task slot. */
// taskSlot的索引
// 小于0的值表示動態(tài)分配
private final int index;

/** Resource characteristics for this slot. */
private final ResourceProfile resourceProfile;

/** Tasks running in this slot. */
private final Map<ExecutionAttemptID, T> tasks;

private final MemoryManager memoryManager;

/** State of this slot. */
private TaskSlotState state;

/** Job id to which the slot has been allocated. */
private final JobID jobId;

/** Allocation id of this slot. */
private final AllocationID allocationId;

/** The closing future is completed when the slot is freed and closed. */
private final CompletableFuture<Void> closingFuture;

ResourceProfile

ResourceProfile是Slot資源需求的一個包裝類。它的所有字段都是final類型,一旦創(chuàng)建后不可再修改。

ResourceProfile的主要成員變量如下所示:

/** How many cpu cores are needed. Can be null only if it is unknown. */
// CPU核心數(shù)
@Nullable
private final Resource cpuCores;

/** How much task heap memory is needed. */
// task堆內(nèi)存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskHeapMemory;

/** How much task off-heap memory is needed. */
// task堆外內(nèi)存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskOffHeapMemory;

/** How much managed memory is needed. */
// 管理內(nèi)存
@Nullable // can be null only for UNKNOWN
private final MemorySize managedMemory;

/** How much network memory is needed. */
// 網(wǎng)絡(luò)傳輸緩存
@Nullable // can be null only for UNKNOWN
private final MemorySize networkMemory;

/** A extensible field for user specified resources from {@link ResourceSpec}. */
// 其他類型的資源,在Resource中指定
private final Map<String, Resource> extendedResources = new HashMap<>(1);

AllocationID

AllocationIDJobManager通過ResourceManger申請物理slot時的唯一標(biāo)識。它在SlotPoolImplrequestSlotFromResourceManager方法中創(chuàng)建并確定下來,以后即便是請求重試,AllocationID也不會再改變。

調(diào)用流程

Flink Slot分配全過程涉及到的幾個重點類的調(diào)用流程如下圖。

調(diào)用流程

Slot 申請流程

Slot申請流程我們從ExecutionGraph分配資源開始分析,一路跟蹤,直到TaskExecutor中創(chuàng)建出slot。

我們從JobManager分配資源的入口開始逐個分析調(diào)用流程。

Execution 的 allocateAndAssignSlotForExecution

該方法為ExecutionGraph中的一個頂點vertex分配其執(zhí)行所需的slot。

private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
        SlotProviderStrategy slotProviderStrategy,
        LocationPreferenceConstraint locationPreferenceConstraint,
        @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {

    checkNotNull(slotProviderStrategy);

    // 檢測必須在JobMaster的主線程執(zhí)行
    assertRunningInJobMasterMainThread();

    // 獲取ExecutionGraph任務(wù)定點的slot共享組配置
    // 在slotSharingGroup是軟限制,位于同一個slotSharingGroup的task可在同一個slot中運行
    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
    // CoLocationConstraint管理task的執(zhí)行位置
    final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

    // this method only works if the execution is in the state 'CREATED'
    // 從CREATE狀態(tài)變更為SCHEDULED狀態(tài),只有初始為CREATED狀態(tài)時才返回true
    if (transitionState(CREATED, SCHEDULED)) {

        // 獲取SlotSharingGroup的ID
        final SlotSharingGroupId slotSharingGroupId = sharingGroup.getSlotSharingGroupId();

        // 構(gòu)建調(diào)度單元,將Execution vertex,slotSharingGroup和locationConstraint封裝在一起
        ScheduledUnit toSchedule = locationConstraint == null ?
                new ScheduledUnit(this, slotSharingGroupId) :
                new ScheduledUnit(this, slotSharingGroupId, locationConstraint);

        // try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
        ExecutionVertex executionVertex = getVertex();
        // 獲取最近一次執(zhí)行分配的allocation id
        AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();

        // allocation id裝入集合中
        Collection<AllocationID> previousAllocationIDs =
            lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();

        // calculate the preferred locations
        // 計算task首選運行位置,在哪些task manager
        final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
            calculatePreferredLocations(locationPreferenceConstraint);

        final SlotRequestId slotRequestId = new SlotRequestId();

        final CompletableFuture<LogicalSlot> logicalSlotFuture =
            preferredLocationsFuture.thenCompose(
                (Collection<TaskManagerLocation> preferredLocations) -> {
                    LOG.info("Allocating slot with SlotRequestID {} for the execution attempt {}.", slotRequestId, attemptId);
                    // 組合上一個CompletableFuture
                    // 等task執(zhí)行位置計算完畢后,調(diào)用SlotProviderStrategy(slot供給策略)的分配slot邏輯
                    return slotProviderStrategy.allocateSlot(
                        slotRequestId,
                        toSchedule,
                        SlotProfile.priorAllocation(
                            vertex.getResourceProfile(),
                            getPhysicalSlotResourceProfile(vertex),
                            preferredLocations,
                            previousAllocationIDs,
                            allPreviousExecutionGraphAllocationIds));
                });

        // register call back to cancel slot request in case that the execution gets canceled
        // 當(dāng)分配的資源被回收的時候調(diào)用
        releaseFuture.whenComplete(
            (Object ignored, Throwable throwable) -> {
                // 如果slot請求取消
                // 調(diào)用取消Slot請求的邏輯
                if (logicalSlotFuture.cancel(false)) {
                    slotProviderStrategy.cancelSlotRequest(
                        slotRequestId,
                        slotSharingGroupId,
                        new FlinkException("Execution " + this + " was released."));
                }
            });

        // This forces calls to the slot pool back into the main thread, for normal and exceptional completion
        // 將攜帶了LogicalSlot的future返回
        return logicalSlotFuture.handle(
            (LogicalSlot logicalSlot, Throwable failure) -> {

                if (failure != null) {
                    throw new CompletionException(failure);
                }

                // 如果logicalSlot可以分配給execution,返回true
                if (tryAssignResource(logicalSlot)) {
                    return logicalSlot;
                } else {
                    // release the slot
                    // 如果無法分配,釋放掉這個slot
                    logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
                    throw new CompletionException(
                        new FlinkException(
                            "Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
                }
            });
    } else {
        // call race, already deployed, or already done
        throw new IllegalExecutionStateException(this, CREATED, state);
    }
}

這里涉及到兩個類:SlotSharingGroupCoLocationConstraint。

其中SlotSharingGroup是slot共享的軟限制。group id相同的Execution Vertex可以被調(diào)度到同一個slot中執(zhí)行。它包含3個成員變量:

// 保存屬于這個group的execution vertex
private final Set<JobVertexID> ids = new TreeSet<>();

// group id,由long類型的lowerPart和upperPart構(gòu)成
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();

// 組內(nèi)所有task的資源需求
private ResourceSpec resourceSpec = ResourceSpec.ZERO;

相對于SlotSharingGroup而言,CoLocationConstraint是slot共享的硬限制。CoLocationConstraint規(guī)定了task(execution頂點)在哪里執(zhí)行。CoLocationConstraintColocationGroupTaskManagerLocation綁定在一起,屬于同一個ColocationGroup的task都在指定的TaskManager中運行。ColocationGroup持有一系列JobVertex的集合。這里就不在貼出代碼了。

接著我們重點跟蹤SlotProviderStrategyallocateSlot方法。
SlotProviderStrategy具有兩個子類:

  • BatchSlotProviderStrategy:不指定分配slot操作的超時時間
  • NormalSlotProviderStrategy:指定分配slot操作的超時時間,除此之外其他邏輯和BatchSlotProviderStrategy一模一樣

NormalSlotProviderStrategy為例,它的allocateSlot方法調(diào)用了SchedulerImplallocateSlot。一路追蹤調(diào)用:allocateSlot -> allocateSlotInternal -> internalAllocateSlot -> allocateSharedSlot

SchedulerImpl 的 allocateSharedSlot

private CompletableFuture<LogicalSlot> allocateSharedSlot(
    SlotRequestId slotRequestId,
    ScheduledUnit scheduledUnit,
    SlotProfile slotProfile,
    @Nullable Time allocationTimeout) {
    // allocate slot with slot sharing
    // 構(gòu)建一個SlotSharingManager
    // 負(fù)責(zé)管理slot共享。slot共享允許同一個slot運行不同的任務(wù)
    final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
        scheduledUnit.getSlotSharingGroupId(),
        id -> new SlotSharingManager(
            id,
            slotPool,
            this));

    // MultiTaskSlotLocality為MultiTaskSlot和Locality的封裝
    final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
    try {
        // 判斷是否有colocation限制,調(diào)用不同的分配多任務(wù)slot方法
        if (scheduledUnit.getCoLocationConstraint() != null) {
            multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
                scheduledUnit.getCoLocationConstraint(),
                multiTaskSlotManager,
                slotProfile,
                allocationTimeout);
        } else {
            multiTaskSlotLocality = allocateMultiTaskSlot(
                scheduledUnit.getJobVertexId(),
                multiTaskSlotManager,
                slotProfile,
                allocationTimeout);
        }
    } catch (NoResourceAvailableException noResourceException) {
        return FutureUtils.completedExceptionally(noResourceException);
    }

    // sanity check
    // 檢查這個multiTaskSlotLocality對象的MultiTaskSlot或者是其子slot需要包含jobVertex id
    Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));

    // 在這個MultiTaskSlot下分配一個SingleTaskSlot
    final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
        slotRequestId,
        slotProfile.getTaskResourceProfile(),
        scheduledUnit.getJobVertexId(),
        multiTaskSlotLocality.getLocality());
    return leaf.getLogicalSlotFuture();
}

allocateCoLocatedMultiTaskSlot

該方法分配具有colocation限制的MultiTaskSlot

在分析這個方法之前我們要先了解下Locality這個枚舉,它表示task需要如何調(diào)度執(zhí)行。各個值的解釋如下:

  • UNCONSTRAINED:沒有限制task調(diào)度到何處
  • LOCAL:task分配到同一個TaskManager中
  • HOST_LOCAL:task分配到同一個主機上
  • NON_LOCAL:task分配到除了locality偏好之外的地方
  • UNKNOWN:未知

下面是allocateCoLocatedMultiTaskSlot方法的代碼和分析:

private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
    CoLocationConstraint coLocationConstraint,
    SlotSharingManager multiTaskSlotManager,
    SlotProfile slotProfile,
    @Nullable Time allocationTimeout) throws NoResourceAvailableException {
    final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();

    if (coLocationSlotRequestId != null) {
        // we have a slot assigned --> try to retrieve it
        // 獲取SlotSharingManager中slot request id對應(yīng)的taskSlot
        final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);

        if (taskSlot != null) {
            // 檢查這個slot必須是MultiTaskSlot
            Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);

            SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;

            // 如果這個MultiTaskSlot持有的資源滿足slotProfile的要求,返回這個slot,模式為在同一TM運行
            if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getTaskResourceProfile())) {
                return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
            }

            // 否則拋出異常,資源不足
            throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
        } else {
            // the slot may have been cancelled in the mean time
            // 執(zhí)行這個方法的時候可能slot被取消,因此增加這個邏輯
            coLocationConstraint.setSlotRequestId(null);
        }
    }

    // 如果這個constraint的運行位置已經(jīng)指定
    if (coLocationConstraint.isAssigned()) {
        // refine the preferred locations of the slot profile
        // 更新slot profile,加入首選的運行位置(TaskManager位置)
        slotProfile = SlotProfile.priorAllocation(
            slotProfile.getTaskResourceProfile(),
            slotProfile.getPhysicalSlotResourceProfile(),
            Collections.singleton(coLocationConstraint.getLocation()),
            slotProfile.getPreferredAllocations(),
            slotProfile.getPreviousExecutionGraphAllocations());
    }

    // get a new multi task slot
    // 前面邏輯已經(jīng)判斷過,如果之前沒有申請過slot,在這里分配一個
    SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
        coLocationConstraint.getGroupId(),
        multiTaskSlotManager,
        slotProfile,
        allocationTimeout);

    // check whether we fulfill the co-location constraint
    // 檢查constraint狀態(tài)和能否在同一個TM運行
    if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
        // 如果不能,不符合限制要求,釋放掉這個slot
        multiTaskSlotLocality.getMultiTaskSlot().release(
            new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
        
        // 拋出資源不足異常
        throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " +
                                               "co location constraint " + coLocationConstraint + '.');
    }

    // 為這個MultiTaskSlot分配一個子MultiTaskSlot
    final SlotRequestId slotRequestId = new SlotRequestId();
    final SlotSharingManager.MultiTaskSlot coLocationSlot =
        multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
        slotRequestId,
        coLocationConstraint.getGroupId());

    // mark the requested slot as co-located slot for other co-located tasks
    // 將coLocationConstraint和slot request關(guān)聯(lián)起來,表示這個slot是具有運行位置限制的slot
    coLocationConstraint.setSlotRequestId(slotRequestId);

    // lock the co-location constraint once we have obtained the allocated slot
    // slot分配完畢之后執(zhí)行
    coLocationSlot.getSlotContextFuture().whenComplete(
        (SlotContext slotContext, Throwable throwable) -> {
            if (throwable == null) {
                // check whether we are still assigned to the co-location constraint
                // 如果沒有異常,綁定coLocationConstraint的位置限制
                if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
                    coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
                } else {
                    log.debug("Failed to lock colocation constraint {} because assigned slot " +
                              "request {} differs from fulfilled slot request {}.",
                              coLocationConstraint.getGroupId(),
                              coLocationConstraint.getSlotRequestId(),
                              slotRequestId);
                }
            } else {
                log.debug("Failed to lock colocation constraint {} because the slot " +
                          "allocation for slot request {} failed.",
                          coLocationConstraint.getGroupId(),
                          coLocationConstraint.getSlotRequestId(),
                          throwable);
            }
        });

    return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
}

接下來我們分析下MultiTaskSlot是怎么分配出來的。

allocateMultiTaskSlot

private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
    AbstractID groupId,
    SlotSharingManager slotSharingManager,
    SlotProfile slotProfile,
    @Nullable Time allocationTimeout) {

    // 返回所有根slot的信息
    Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
        slotSharingManager.listResolvedRootSlotInfo(groupId);

    // 根據(jù)slot選擇策略,從SlotSharingManager中選擇出一個最適合的根slot
    SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
        slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);

    // 將這個選擇出的slot包裝為MultiTaskSlotLocality
    final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ?
        new SlotSharingManager.MultiTaskSlotLocality(
        slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()),
        bestResolvedRootSlotWithLocality.getLocality()) :
    null;

    // 如果這個slot資源充足,可以LOCAL模式運行,返回這個multiTaskSlotLocality
    if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
        return multiTaskSlotLocality;
    }

    final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
    final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();

    // 嘗試從SlotPool中查找一個最合適的slot
    Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);

    // 如果找到了
    if (optionalPoolSlotAndLocality.isPresent()) {
        SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
        // 校驗下如果這個slot資源充足,并且在SlotSharingManager中沒有找到最合適slot
        if (poolSlotAndLocality.getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null) {

            final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
            // 在SlotSharingManager中創(chuàng)建這個slot對應(yīng)的MultiTaskSlot
            final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
                multiTaskSlotRequestId,
                CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
                allocatedSlotRequestId);

            // 將multiTaskSlot加入到allocatedSlot的負(fù)載中
            if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
                return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality());
            } else {
                multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
                                                         allocatedSlot.getAllocationId() + '.'));
            }
        }
    }

    if (multiTaskSlotLocality != null) {
        // prefer slot sharing group slots over unused slots
        // 如果在SlotSharingManager和SlotPool都找到了匹配的slot,優(yōu)先使用SlotSharingManager中的
        // 將SlotPool中的匹配slot釋放掉
        if (optionalPoolSlotAndLocality.isPresent()) {
            slotPool.releaseSlot(
                allocatedSlotRequestId,
                new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
        }
        return multiTaskSlotLocality;
    }

    // there is no slot immediately available --> check first for uncompleted slots at the slot sharing group
    // 到這里說明目前沒有可用的slot,從unresolvedRootSlots中獲取一個尚未分配的slot
    SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);

    // 如果沒有找到
    if (multiTaskSlot == null) {
        // it seems as if we have to request a new slot from the resource manager, this is always the last resort!!!
        // 到這里意味著我們必須去ResourceManager申請一個新的slot
        final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(
            allocatedSlotRequestId,
            slotProfile,
            allocationTimeout);

        // 在SlotSharingManager中創(chuàng)建一個root slot
        multiTaskSlot = slotSharingManager.createRootSlot(
            multiTaskSlotRequestId,
            slotAllocationFuture,
            allocatedSlotRequestId);

        // 設(shè)定分配成功之后的邏輯
        slotAllocationFuture.whenComplete(
            (PhysicalSlot allocatedSlot, Throwable throwable) -> {
                final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);

                if (taskSlot != null) {
                    // still valid
                    // 遇到異常的時候,釋放掉slot
                    if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
                        taskSlot.release(throwable);
                    } else {
                        if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
                            taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
                                                                allocatedSlot.getAllocationId() + '.'));
                        }
                    }
                } else {
                    slotPool.releaseSlot(
                        allocatedSlotRequestId,
                        new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
                }
            });
    }

    return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
}

這里我們順便深入下SlotSelectionStrategy。它是一系列挑選最匹配slot的子類共同的接口。它的子類如下:

  • DefaultLocationPreferenceSlotSelectionStrategy:從待選slot集合中找到第一個返回。
  • EvenlySpreadOutLocationPreferenceSlotSelectionStrategy:找到所有匹配slot中所在TM資源使用率最低的返回。
  • PreviousAllocationSlotSelectionStrategy:先使用SlotProfile中指定的首選運行位置,如果沒有,再使用其他Slot選擇策略。

下面我們開始分析SchedulerImplrequestNewAllocatedSlot,即請求分配新的slot。

@Nonnull
private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
        SlotRequestId slotRequestId,
        SlotProfile slotProfile,
        @Nullable Time allocationTimeout) {
    if (allocationTimeout == null) {
        return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());
    } else {
        return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);
    }
}

這個方法根據(jù)是否指定了分配超時時間來調(diào)用SlotPool的對應(yīng)方法。requestNewAllocatedBatchSlotrequestNewAllocatedSlot邏輯基本相同,只是后者增加了超時檢測邏輯。我們選擇最為復(fù)雜的requestNewAllocatedSlot方法分析。

SlotPoolImplrequestNewAllocatedSlot方法如下所示:

@Nonnull
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
        @Nonnull SlotRequestId slotRequestId,
        @Nonnull ResourceProfile resourceProfile,
        @Nullable Time timeout) {

    // 檢查方法在主線程執(zhí)行
    componentMainThreadExecutor.assertRunningInMainThread();

    // 創(chuàng)建一個Slot請求對象
    // SlotPool先將Slot請求緩存起來,當(dāng)TaskManager獲取slot的時候才會真正創(chuàng)建
    final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);

    // 如果傳入了超時時間,注冊超時處理
    if (timeout != null) {
        // register request timeout
        FutureUtils
            .orTimeout(
                pendingRequest.getAllocatedSlotFuture(),
                timeout.toMilliseconds(),
                TimeUnit.MILLISECONDS,
                componentMainThreadExecutor)
            .whenComplete(
                (AllocatedSlot ignored, Throwable throwable) -> {
                    if (throwable instanceof TimeoutException) {
                        timeoutPendingSlotRequest(slotRequestId);
                    }
                });
    }

    // 調(diào)用requestNewAllocatedSlotInternal請求新slot
    return requestNewAllocatedSlotInternal(pendingRequest)
        .thenApply((Function.identity()));
}

SlotPoolImplrequestNewAllocatedSlotInternal方法如下所示。這個方法SlotPool請求ResourceManager來分配一個新的slot。

@Nonnull
private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {

    if (resourceManagerGateway == null) {
        // 如果沒有ResourceManager網(wǎng)關(guān),先將請求入棧,放入到waitingForResourceManager中
        // 這個LinkedHashMap保存了slot request id和slot request的對應(yīng)關(guān)系
        stashRequestWaitingForResourceManager(pendingRequest);
    } else {
        // 從ResourceManager請求slot
        requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
    }

    return pendingRequest.getAllocatedSlotFuture();
}

我們重點分析下SlotPoolImplrequestSlotFromResourceManager方法:

private void requestSlotFromResourceManager(
        final ResourceManagerGateway resourceManagerGateway,
        final PendingRequest pendingRequest) {

    checkNotNull(resourceManagerGateway);
    checkNotNull(pendingRequest);

    // 創(chuàng)建一個allocationID,lowerPart和upperPart使用隨機long
    final AllocationID allocationId = new AllocationID();
    // 為pendingRequest指定allocationID
    pendingRequest.setAllocationId(allocationId);

    // 放入pendingRequests集合
    // 這是一個復(fù)合key map,分別使用slot request id和allocation id作為key
    pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);

    // 指定slot分配完成時的操作
    pendingRequest.getAllocatedSlotFuture().whenComplete(
        (AllocatedSlot allocatedSlot, Throwable throwable) -> {
            if (throwable != null) {
                // the allocation id can be remapped so we need to get it from the pendingRequest
                // where it will be updated timely
                // 重新獲取allocationID,因為這個id可能會在申請slot過程中改變
                final Optional<AllocationID> updatedAllocationId = pendingRequest.getAllocationId();

                // 處理出錯邏輯,取消申請slot
                if (updatedAllocationId.isPresent()) {
                    // cancel the slot request if there is a failure
                    resourceManagerGateway.cancelSlotRequest(updatedAllocationId.get());
                }
            }
        });

    log.info("Requesting new slot [{}] and profile {} with allocation id {} from resource manager.",
        pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile(), allocationId);

    // 向ResourceManager發(fā)送一個SlotRequest,請求slot
    CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
        jobMasterId,
        new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
        rpcTimeout);

    // slot請求完畢后執(zhí)行
    FutureUtils.whenCompleteAsyncIfNotDone(
        rmResponse,
        componentMainThreadExecutor,
        (Acknowledge ignored, Throwable failure) -> {
            // on failure, fail the request future
            if (failure != null) {
                // 如果失敗,調(diào)用失敗處理邏輯,調(diào)用future的completeExceptionally方法
                slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
            }
        });
}

ResourceManager請求slot的邏輯如下:

@Override
public CompletableFuture<Acknowledge> requestSlot(
        JobMasterId jobMasterId,
        SlotRequest slotRequest,
        final Time timeout) {

    JobID jobId = slotRequest.getJobId();
    // 獲取作業(yè)ID對應(yīng)的JobManager
    JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);

    if (null != jobManagerRegistration) {
        if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
            log.info("Request slot with profile {} for job {} with allocation id {}.",
                slotRequest.getResourceProfile(),
                slotRequest.getJobId(),
                slotRequest.getAllocationId());

            try {
                // 注冊slot申請給SlotManager
                slotManager.registerSlotRequest(slotRequest);
            } catch (ResourceManagerException e) {
                return FutureUtils.completedExceptionally(e);
            }

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
                jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
        }

    } else {
        return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
    }
}

分析到這里,我們得知ResourceManager最終將SlotRequest交給了內(nèi)部的SlotManager來處理。

@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
    // 檢查狀態(tài)是否為started
    checkInit();

    // 檢查已滿足的slot請求和積壓的slot請求中有沒有allocation id和下面方法參數(shù)相同的
    // 如果重復(fù),返回false
    if (checkDuplicateRequest(slotRequest.getAllocationId())) {
        LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());

        return false;
    } else {
        PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

        // 包裝slot并存入pendingSlotRequests集合
        pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

        try {
            // 調(diào)用內(nèi)部請求slot的方法,下面分析
            internalRequestSlot(pendingSlotRequest);
        } catch (ResourceManagerException e) {
            // requesting the slot failed --> remove pending slot request
            pendingSlotRequests.remove(slotRequest.getAllocationId());

            throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
        }

        return true;
    }
}

我們繼續(xù)跟蹤internalRequestSlot方法。

internalRequestSlot

private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
    // 獲取slotRequest的資源要求
    final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();

    OptionalConsumer.of(findMatchingSlot(resourceProfile))
        .ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
        .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}

這里的findMatchingSlot方法通過slotMatchingStrategyfreeSlots集合中查找出資源需求匹配的slot,確保匹配的slot狀態(tài)為SlotState.FREE,將其從freeSlots集合中剔除后返回。

Flink把查找匹配slot的邏輯封裝為slotMatchingStrategy,它有兩個子類:

  • AnyMatchingSlotMatchingStrategy:找到第一個匹配的slot,只要發(fā)現(xiàn)的slot持有的資源大于資源需求就返回。
  • LeastUtilizationSlotMatchingStrategy:在前一個策略的基礎(chǔ)上,還會計算每個TaskExecutor的slot利用率,將利用率最低的TaskExecutor上的slot返回。

通過findMatchingSlot方法,如果找到了匹配的slot,調(diào)用allocateSlot方法,通知TaskExecutor分配slot。如果沒有匹配到,調(diào)用fulfillPendingSlotRequestWithPendingTaskManagerSlot。

fulfillPendingSlotRequestWithPendingTaskManagerSlot

該方法將根據(jù)所需資源(pendingSlotRequest.getResourceProfile()),創(chuàng)建出PendingTaskManagerSlot放入到pendingSlot中保存。這些處于pending狀態(tài)的slot在registerTaskManager的時候會被注冊(registerSlot)。在這個時候,pending slot才會被真正的分配出來,在對應(yīng)的TaskExecutor中創(chuàng)建。

private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
    // 獲取PendingSlotRequest的資源要求
    ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
    // 從PendingTaskManagerSlot中找到一個符合資源需求的slot
    Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);

    // 如果沒有找到資源需求匹配的pending slot
    // 分配resourceProfile指定的資源,創(chuàng)建一個pending slot
    if (!pendingTaskManagerSlotOptional.isPresent()) {
        pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
    }

    // 如果創(chuàng)建成功,執(zhí)行assignPendingTaskManagerSlot方法
    // 此方法將pendingSlotRequest和pendingTaskManagerSlot關(guān)聯(lián)起來
    OptionalConsumer.of(pendingTaskManagerSlotOptional)
        .ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot))
        .ifNotPresent(() -> {
            // request can not be fulfilled by any free slot or pending slot that can be allocated,
            // check whether it can be fulfilled by allocated slots
            if (failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {
                throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
            }
        });
}

allocateSlot

現(xiàn)在我們分析下internalRequestSlot邏輯的里一個分支allocateSlot方法調(diào)用。

SlotManagerImplallocateSlot方法內(nèi)容如下:

private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
    Preconditions.checkState(taskManagerSlot.getState() == SlotState.FREE);

    // 從slot中獲取和TaskManager的連接信息
    TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
    // 獲取RPC調(diào)用端
    TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
    
    final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
    final AllocationID allocationId = pendingSlotRequest.getAllocationId();
    final SlotID slotId = taskManagerSlot.getSlotId();
    final InstanceID instanceID = taskManagerSlot.getInstanceId();
    
    // 為slot指定PendingSlotRequest
    taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
    pendingSlotRequest.setRequestFuture(completableFuture);
    
    // 如果這個PendingSlotRequest已經(jīng)分配slot,需要先歸還
    returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
    
    // 獲取已注冊的TaskManager
    TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
    
    if (taskManagerRegistration == null) {
        throw new IllegalStateException("Could not find a registered task manager for instance id " +
            instanceID + '.');
    }
    
    // 標(biāo)記這個taskManager狀態(tài)為使用中
    taskManagerRegistration.markUsed();
    
    // RPC call to the task manager
    // 遠(yuǎn)程調(diào)用TaskExecutor的requestSlot方法
    CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
        slotId,
        pendingSlotRequest.getJobId(),
        allocationId,
        pendingSlotRequest.getResourceProfile(),
        pendingSlotRequest.getTargetAddress(),
        resourceManagerId,
        taskManagerRequestTimeout);
    
    // RPC調(diào)用完成后,執(zhí)行completableFuture
    requestFuture.whenComplete(
        (Acknowledge acknowledge, Throwable throwable) -> {
            if (acknowledge != null) {
                completableFuture.complete(acknowledge);
            } else {
                completableFuture.completeExceptionally(throwable);
            }
        });
    
    completableFuture.whenCompleteAsync(
        (Acknowledge acknowledge, Throwable throwable) -> {
            try {
                if (acknowledge != null) {
                    // 如果分配成功,更新slot信息
                    updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
                } else {
                    if (throwable instanceof SlotOccupiedException) {
                        SlotOccupiedException exception = (SlotOccupiedException) throwable;
                        // 如果slot被占用,更新slot信息
                        updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
                    } else {
                        // 否則,移除SlotRequest
                        removeSlotRequestFromSlot(slotId, allocationId);
                    }
    
                    if (!(throwable instanceof CancellationException)) {
                        // 如果slot分配操作取消,調(diào)用處理失敗slot請求邏輯
                        handleFailedSlotRequest(slotId, allocationId, throwable);
                    } else {
                        LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
                    }
                }
            } catch (Exception e) {
                LOG.error("Error while completing the slot allocation.", e);
            }
        },
        mainThreadExecutor);
}

上面ResourceManager通過RPC調(diào)用了TaskExecutorrequestSlot方法。

requestSlot

TaskExecutor.requestSlot內(nèi)容如下:

@Override
public CompletableFuture<Acknowledge> requestSlot(
    final SlotID slotId,
    final JobID jobId,
    final AllocationID allocationId,
    final ResourceProfile resourceProfile,
    final String targetAddress,
    final ResourceManagerId resourceManagerId,
    final Time timeout) {
    // TODO: Filter invalid requests from the resource manager by using the instance/registration Id

    log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
        allocationId, jobId, resourceManagerId);

    // 檢測是否連接到了ResourceManager
    if (!isConnectedToResourceManager(resourceManagerId)) {
        final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
        log.debug(message);
        return FutureUtils.completedExceptionally(new TaskManagerException(message));
    }

    try {
        // 執(zhí)行TaskExecutor的allocateSlot方法
        allocateSlot(
            slotId,
            jobId,
            allocationId,
            resourceProfile);
    } catch (SlotAllocationException sae) {
        return FutureUtils.completedExceptionally(sae);
    }

    final JobTable.Job job;

    try {
        // 創(chuàng)建一個作業(yè)
        job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
    } catch (Exception e) {
        // free the allocated slot
        try {
            taskSlotTable.freeSlot(allocationId);
        } catch (SlotNotFoundException slotNotFoundException) {
            // slot no longer existent, this should actually never happen, because we've
            // just allocated the slot. So let's fail hard in this case!
            onFatalError(slotNotFoundException);
        }

        // release local state under the allocation id.
        localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

        // sanity check
        if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
            onFatalError(new Exception("Could not free slot " + slotId));
        }

        return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
    }

    if (job.isConnected()) {
        offerSlotsToJobManager(jobId);
    }

    return CompletableFuture.completedFuture(Acknowledge.get());
}

最后我們跟蹤到了TaskExecutorallocateSlot方法。這個方法內(nèi)容較少,不再貼出相關(guān)代碼。該方法最終調(diào)用TaskSlotTableallocateSlot方法。

TaskExecutor 的 allocateSlot 方法

private void allocateSlot(
        SlotID slotId,
        JobID jobId,
        AllocationID allocationId,
        ResourceProfile resourceProfile) throws SlotAllocationException {
    // 如果slot處于空閑狀態(tài)
    if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
        // taskSlotTable分配slot
        if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
            log.info("Allocated slot for {}.", allocationId);
        } else {
            log.info("Could not allocate slot for {}.", allocationId);
            throw new SlotAllocationException("Could not allocate slot.");
        }
    } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
        // 進(jìn)入這個分支表明slot被分配給了其他的job
        final String message = "The slot " + slotId + " has already been allocated for a different job.";

        log.info(message);

        final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
        throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
    }
}

到這里為止,我們完成了從Execution vertex到最終TaskManager創(chuàng)建出TaskSlot的過程。

TaskManager 啟動時分配slot邏輯

TaskExecutorstartTaskExecutorServices方法。該方法啟動了ResourceManager資源管理器Leader信息的獲取服務(wù),并注冊了一個監(jiān)聽器,實時監(jiān)聽ResourceManager leader狀態(tài)的變化。然后啟動TaskSlotTable,Job leader服務(wù)和文件緩存。

startTaskExecutorServices

private void startTaskExecutorServices() throws Exception {
    try {
        // start by connecting to the ResourceManager
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

        // tell the task slot table who's responsible for the task slot actions
        taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

        // start the job leader service
        jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

        fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
    } catch (Exception e) {
        handleStartTaskExecutorServicesException(e);
    }
}

當(dāng)ResourceManager leader服務(wù)選舉成功之時通知ResourceManagerLeaderListener,調(diào)用它的notifyLeaderAddress方法。

notifyLeaderAddress

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
    runAsync(
        () -> notifyOfNewResourceManagerLeader(
            leaderAddress,
            ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}

這里異步調(diào)用了notifyOfNewResourceManagerLeader方法。我們跟蹤一下。

notifyOfNewResourceManagerLeader

private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
    resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
    reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}

該方法首先保存了選舉后確定的ResourceManager leader地址,然后建立和ResourceManager的連接。

我們跟蹤下建立連接的方法。

reconnectToResourceManager

private void reconnectToResourceManager(Exception cause) {
    closeResourceManagerConnection(cause);
    startRegistrationTimeout();
    tryConnectToResourceManager();
}

為了邏輯統(tǒng)一,這里實際使用的是重新連接的邏輯。首先關(guān)閉和ResourceManager的連接,然后創(chuàng)建超時檢測任務(wù)(超時時間從配置文件中讀?。?,最后嘗試和ResourceManager建立連接。

繼續(xù)跟蹤tryConnectToResourceManager方法。

tryConnectToResourceManager

private void tryConnectToResourceManager() {
    if (resourceManagerAddress != null) {
        connectToResourceManager();
    }
}

private void connectToResourceManager() {
    assert(resourceManagerAddress != null);
    assert(establishedResourceManagerConnection == null);
    assert(resourceManagerConnection == null);

    log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

    // 創(chuàng)建一個TaskExecutor注冊對象
    // 包含TaskExecutor的地址端口,資源配置硬件信息等
    final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
        getAddress(),
        getResourceID(),
        unresolvedTaskManagerLocation.getDataPort(),
        JMXService.getPort().orElse(-1),
        hardwareDescription,
        memoryConfiguration,
        taskManagerConfiguration.getDefaultSlotResourceProfile(),
        taskManagerConfiguration.getTotalResourceProfile()
    );

    // 建立和ResourceManager的連接,并啟動
    resourceManagerConnection =
        new TaskExecutorToResourceManagerConnection(
            log,
            getRpcService(),
            taskManagerConfiguration.getRetryingRegistrationConfiguration(),
            resourceManagerAddress.getAddress(),
            resourceManagerAddress.getResourceManagerId(),
            getMainThreadExecutor(),
            new ResourceManagerRegistrationListener(),
            taskExecutorRegistration);
    resourceManagerConnection.start();
}

TaskExecutorResourceManager注冊并開啟連接。注意這里創(chuàng)建了一個連接狀態(tài)監(jiān)聽器。注冊并連接成功后,調(diào)用ResourceManagerRegistrationListeneronRegistrationSuccess方法。

onRegistrationSuccess

@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
    final ResourceID resourceManagerId = success.getResourceManagerId();
    final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
    final ClusterInformation clusterInformation = success.getClusterInformation();
    final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();

    runAsync(
        () -> {
            // filter out outdated connections
            //noinspection ObjectEquality
            if (resourceManagerConnection == connection) {
                try {
                    establishResourceManagerConnection(
                        resourceManagerGateway,
                        resourceManagerId,
                        taskExecutorRegistrationId,
                        clusterInformation);
                } catch (Throwable t) {
                    log.error("Establishing Resource Manager connection in Task Executor failed", t);
                }
            }
        });
}

回調(diào)函數(shù)異步調(diào)用TaskExecutorestablishResourceManagerConnection,執(zhí)行建立連接后的邏輯。

establishResourceManagerConnection

private void establishResourceManagerConnection(
        ResourceManagerGateway resourceManagerGateway,
        ResourceID resourceManagerResourceId,
        InstanceID taskExecutorRegistrationId,
        ClusterInformation clusterInformation) {

    // 向ResourceManager異步發(fā)送slot報告
    final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
        getResourceID(),
        taskExecutorRegistrationId,
        // 通過TaskSlotTable創(chuàng)建slot報告
        taskSlotTable.createSlotReport(getResourceID()),
        taskManagerConfiguration.getTimeout());

    slotReportResponseFuture.whenCompleteAsync(
        (acknowledge, throwable) -> {
            if (throwable != null) {
                // 如果遇到異常,再次嘗試重新連接ResourceManager
                reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
            }
        }, getMainThreadExecutor());

    // monitor the resource manager as heartbeat target
    // 監(jiān)測和ResourceManager之間的心跳狀態(tài)
    resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
        @Override
        public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
            // 收到心跳后向ResourceManager回送心跳信息
            resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
        }

        @Override
        public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
            // the TaskManager won't send heartbeat requests to the ResourceManager
        }
    });

    // set the propagated blob server address
    // 設(shè)置blob server地址信息
    final InetSocketAddress blobServerAddress = new InetSocketAddress(
        clusterInformation.getBlobServerHostname(),
        clusterInformation.getBlobServerPort());

    // 設(shè)置blobCache
    blobCacheService.setBlobServerAddress(blobServerAddress);

    // 保存已創(chuàng)建的ResourceManager連接信息
    establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
        resourceManagerGateway,
        resourceManagerResourceId,
        taskExecutorRegistrationId);

    // 停止連接注冊過程超時計時器
    stopRegistrationTimeout();
}

發(fā)送SlotReport的方法在ResourceManagersendSlotReport。

sendSlotReport

@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
    final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);

    if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
        // 通過SlotManager注冊TaskManager
        if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
            onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    } else {
        return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
    }
}

這個方法向SlotManager注冊了TaskManager。我們繼續(xù)跟蹤。

registerTaskManager

SlotManagerImpl.registerTaskManager方法內(nèi)容如下:

@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
    // 檢查slotManager狀態(tài)確保已經(jīng)啟動
    checkInit();

    LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());

    // we identify task managers by their instance id
    // 如果包含TaskExecutor的instance id,說明這個task executor已經(jīng)注冊過
    // 更新slots中保存的slot信息,返回false
    if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
        reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
        return false;
    } else {
        // 檢查分配slot后slot個數(shù)是否超過上限
        // slot最大個數(shù)通過slotmanager.number-of-slots.max配置
        if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
            LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
            // 將資源釋放
            resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
            return false;
        }

        // first register the TaskManager
        ArrayList<SlotID> reportedSlots = new ArrayList<>();

        // 將slot report中的各個slot id寫入reportedSlots
        for (SlotStatus slotStatus : initialSlotReport) {
            reportedSlots.add(slotStatus.getSlotID());
        }

        // 生成并保存TaskManager的注冊信息
        TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
            taskExecutorConnection,
            reportedSlots);

        taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);

        // next register the new slots
        // 逐個注冊slot
        for (SlotStatus slotStatus : initialSlotReport) {
            registerSlot(
                slotStatus.getSlotID(),
                slotStatus.getAllocationID(),
                slotStatus.getJobID(),
                slotStatus.getResourceProfile(),
                taskExecutorConnection);
        }

        return true;
    }

}

這個方法返回一個布爾值。如果TaskManager之前沒有注冊過,并且注冊成功,返回true。否則返回false。

接下來我們重點分析下SlotManagerImpl.registerSlot方法。SlotManager通過這個方法為TaskManager注冊slot。

registerSlot

private void registerSlot(
        SlotID slotId,
        AllocationID allocationId,
        JobID jobId,
        ResourceProfile resourceProfile,
        TaskExecutorConnection taskManagerConnection) {

    // 如果要注冊的slot id和已存在的某個slot相同,需要先移除這個已存在的slot
    if (slots.containsKey(slotId)) {
        // remove the old slot first
        removeSlot(
            slotId,
            new SlotManagerException(
                String.format(
                    "Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
                    slotId)));
    }

    // 創(chuàng)建一個新的TaskManagerSlot中,并注冊(保存到slots集合)
    final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);

    final PendingTaskManagerSlot pendingTaskManagerSlot;

    if (allocationId == null) {
        // 如果沒有allocationId,找到一個資源要求匹配的pending slot
        pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
    } else {
        pendingTaskManagerSlot = null;
    }

    // 如果沒有找到資源要求匹配的slot,更新slot信息
    if (pendingTaskManagerSlot == null) {
        updateSlot(slotId, allocationId, jobId);
    } else {
        // 從pendingSlots中移除
        pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
        // 取出pending slot申請請求
        final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();

        if (assignedPendingSlotRequest == null) {
            // 當(dāng)前slot無人請求,放入空閑slot集合中
            handleFreeSlot(slot);
        } else {
            // 開始分配slot
            assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
            allocateSlot(slot, assignedPendingSlotRequest);
        }
    }
}

此時,ResourceManager開始真正的分配slot流程。分配slot過程位于SlotManagerImpl.allocateSlot方法。后面的過程和Slot申請流程相同,不再贅述。

本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容