Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
Slot 概念
Flink中的Slot是一組資源的集合,包含CPU核心數(shù),task堆內(nèi)存,task對外內(nèi)存,管理內(nèi)存和網(wǎng)絡(luò)內(nèi)存。同時slot也是Flink的資源分配單位。
一個TaskManager中包含一個或者多個Slot。根據(jù)slot共享配置,一個slot中可同時運行多個task。這些task以工作線程的形式存在于slot中。
TaskManager,Slot,Task和并行度parallelism的關(guān)系如下圖所示(引用官網(wǎng)的圖):

Slot 相關(guān)的一些類
SchedulerImpl
SchedulerImpl負(fù)責(zé)為Execution節(jié)點的任務(wù)執(zhí)行分配slot。
在后面的分析中涉及到的SchedulerImpl兩個最重要的方法為allocateSlot和allocateBatchSlot。這兩個方法的邏輯基本相同,只是前一個方法參數(shù)中多了分配slot超時時間。
具體分配slot的流程較為復(fù)雜,在后面分析slot申請流程的時候再講解。
SlotSharingManager
SlotSharingManager負(fù)責(zé)Slot共享。Slot共享指的是不同的task在同一個slot中運行。
SlotSharingManager維護(hù)了一個slot層級結(jié)構(gòu):其中根節(jié)點和層級結(jié)構(gòu)的中間節(jié)點為MultiTaskSlot。MultiTaskSlot可從屬于另一個MultiTaskSlot,同時它又包含多個MultiTaskSlot或SingleTaskSlot,這樣就形成了層級結(jié)構(gòu)。SingleTaskSlot是slot層級結(jié)構(gòu)中的最底層節(jié)點,只能擁有一個parent作為它的父節(jié)點。
Slot共享正是通過這種層級結(jié)構(gòu)體現(xiàn)出來的。一個Slot被多個task共享,以Slot層級結(jié)構(gòu)表示就是一個MultiTaskSlot包含多個SingleTaskSlot。
下面我們分析下幾個重要的方法。
createRootSlot
創(chuàng)建一個根節(jié)點slot,該Slot的類型為MultiTaskSlot。
@Nonnull
MultiTaskSlot createRootSlot(
SlotRequestId slotRequestId,
CompletableFuture<? extends SlotContext> slotContextFuture,
SlotRequestId allocatedSlotRequestId) {
LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
// 創(chuàng)建一個根節(jié)點
// 這個方法同時將創(chuàng)建出的MultiTaskSlot存入到allTaskSlots和unresolvedRootSlots集合中
final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(
slotRequestId,
allocatedSlotRequestId,
slotContextFutureAfterRootSlotResolution);
// 當(dāng)slotContextFuture完成后執(zhí)行
// slotContextFuture是向SlotPool申請slot的過程
// 這個future在SlotPoolImpl的tryFulfillSlotRequestOrMakeAvailable方法中complete
FutureUtils.forward(
slotContextFuture.thenApply(
(SlotContext slotContext) -> {
// add the root node to the set of resolved root nodes once the SlotContext future has
// been completed and we know the slot's TaskManagerLocation
// 此時slot已經(jīng)分配完畢,將該slot從unresolvedRootSlots集合移除
// 存入到resolvedRootSlots集合中
tryMarkSlotAsResolved(slotRequestId, slotContext);
return slotContext;
}),
slotContextFutureAfterRootSlotResolution);
return rootMultiTaskSlot;
}
SlotPool
SlotPool用于緩存slot。它接收ExecutionGraph發(fā)起的slot申請,為其分配執(zhí)行任務(wù)所需的slot。如果SlotPool無法處理slot請求,他會嘗試去連接ResourceManager獲取新的slot。如果ResourceManager目前狀態(tài)不可用,被ResourceManager拒絕或者是請求超時,則slot申請失敗。SlotPool緩存了一部分slot,在ResourceManager不可用的時候,SlotPool仍然可以提供已注冊的空閑slot。這些Slot只會在它們不再被使用的時候釋放掉。比如說作業(yè)在運行但仍有空閑slot這種情況。
啟動方法
SlotPool在JobMaster的startJobMasterServices中啟動。該方法中注冊了兩個周期任務(wù):檢測空閑的slot和批量檢測超時的slot
public void start(
@Nonnull JobMasterId jobMasterId,
@Nonnull String newJobManagerAddress,
@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
this.jobMasterId = jobMasterId;
this.jobManagerAddress = newJobManagerAddress;
this.componentMainThreadExecutor = componentMainThreadExecutor;
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
if (log.isDebugEnabled()) {
scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}
checkIdleSlot
該方法邏輯為SlotPool周期運行任務(wù)之一,用戶定期檢測空閑slot。
protected void checkIdleSlot() {
// The timestamp in SlotAndTimestamp is relative
// 獲取當(dāng)前時間
final long currentRelativeTimeMillis = clock.relativeTimeMillis();
// 創(chuàng)建用于保存空閑slot的集合
final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());
// 遍歷找出所有空閑的slot
for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
expiredSlots.add(slotAndTimestamp.slot);
}
}
final FlinkException cause = new FlinkException("Releasing idle slot.");
for (AllocatedSlot expiredSlot : expiredSlots) {
// 獲取每個過期slot的allocation ID
final AllocationID allocationID = expiredSlot.getAllocationId();
// 移除該allocation id對應(yīng)的slot
if (availableSlots.tryRemove(allocationID) != null) {
log.info("Releasing idle slot [{}].", allocationID);
// RPC調(diào)用空閑slot所在的TaskManager,通知去釋放掉這個slot
final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
allocationID,
cause,
rpcTimeout);
// RPC調(diào)用完成執(zhí)行
// 如果釋放slot出現(xiàn)異常,廢棄掉這個slot,下次心跳的時候向taskManager同步slot狀態(tài)
FutureUtils.whenCompleteAsyncIfNotDone(
freeSlotFuture,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
// The slot status will be synced to task manager in next heartbeat.
log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.",
allocationID, expiredSlot.getTaskManagerId(), throwable);
}
});
}
}
// 安排下一次調(diào)用時間,實現(xiàn)周期調(diào)用
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
}
checkBatchSlotTimeout
protected void checkBatchSlotTimeout() {
// 如果沒開啟批量超時檢測,方法直接返回
if (!batchSlotRequestTimeoutCheckEnabled) {
return;
}
final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();
if (!pendingBatchRequests.isEmpty()) {
// 獲取積壓的slot請求
final Set<ResourceProfile> allocatedResourceProfiles = getAllocatedResourceProfiles();
//將這些slot申請按照資源要求進(jìn)行分組,和已分配過的slot的資源要求相同的分為一組,其余的在另一組
final Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests
.stream()
.collect(Collectors.partitioningBy(canBeFulfilledWithAllocatedSlot(allocatedResourceProfiles)));
// 提取出資源要求相同和不同的兩組積壓的slot請求
final List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
final List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);
final long currentTimestamp = clock.relativeTimeMillis();
// 標(biāo)記為可滿足要求
for (PendingRequest fulfillableRequest : fulfillableRequests) {
fulfillableRequest.markFulfillable();
}
for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
// 更新請求為無法滿足,并設(shè)置時間
unfulfillableRequest.markUnfulfillable(currentTimestamp);
if (unfulfillableRequest.getUnfulfillableSince() + batchSlotTimeout.toMilliseconds() <= currentTimestamp) {
// 如果請求已超時,調(diào)用超時處理邏輯,后面分析
timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
}
}
}
// 安排下一次調(diào)用時間,實現(xiàn)周期調(diào)用
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
}
// 處理超時積壓請求的方法
protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
log.info("Pending slot request [{}] timed out.", slotRequestId);
// 從waitingForResourceManager和pendingRequests中移除這個request
final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
// 異步拋出請求超時異常
if (pendingRequest != null) {
pendingRequest
.getAllocatedSlotFuture()
.completeExceptionally(new TimeoutException("Pending slot request timed out in SlotPool."));
}
}
ResourceManager
和JobManager,TaskManager一樣,ResourceManager也是Flink中的一個重要角色。ResourceManager負(fù)責(zé)資源的分配和撤回,以及資源的登記保管。ResourceManager具有HA功能,可參與選主。ResourceManager還持有JobManager的連接。后來創(chuàng)建出的TaskManager可以通過registerTaskExecutor方法注冊到ResourceManager中。
ResourceManager中最為重要的成員為SlotManager??捎玫膕lot交由SlotManager維護(hù)。
ResourceManager本身是一個抽象類。它有兩個子類
- StandaloneResourceManager:用于standalone模式部署的時候。
- ActiveResourceManager:用于非standalone模式。其中有一個成員變量
ResourceManagerDriver。ResourceManagerDriver有多個子類,分別對應(yīng)著支持Kubernetes, Mesos和Yarn。
SlotManager
SlotManager負(fù)責(zé)維護(hù)所有已注冊的slot。SlotManager統(tǒng)計了所有的已注冊slot,空閑的slot,積壓待分配的slot(pendingSlot),積壓的slot請求(pendingSlotRequest)以及以滿足的slot請求。
TaskSlotTable
Task Manager上的slot和task的分配表,是TaskSlot的容器。它維護(hù)了多個索引,用于快速訪問task和分配給它的slot。
下面我們分析下它的主要方法。
start 方法
在使用TaskSlotTable之前必須先啟動它。啟動方法為start如下所示:
start方法:
@Override
public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
// 檢查狀態(tài),必須為CREATED
Preconditions.checkState(
state == State.CREATED,
"The %s has to be just created before starting",
TaskSlotTableImpl.class.getSimpleName());
// 設(shè)置slotAction,下面分析
this.slotActions = Preconditions.checkNotNull(initialSlotActions);
// 設(shè)置主線程執(zhí)行器
this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
// 一個定時任務(wù),可以schedule多個事件,到期時通知對應(yīng)的timeout listener
timerService.start(this);
// 改變狀態(tài)為RUNNING,防止反復(fù)啟動
state = State.RUNNING;
}
SlotAction
SlotAction包含了Slot分配動作的回調(diào)邏輯。該接口包含了兩個回調(diào)動作:
- freeSlot:slot被釋放的時候回調(diào)
- timeoutSlot:slot超時的時候回調(diào)
接口代碼如下:
public interface SlotActions {
/**
* Free the task slot with the given allocation id.
*
* @param allocationId to identify the slot to be freed
*/
void freeSlot(AllocationID allocationId);
/**
* Timeout the task slot for the given allocation id. The timeout is identified by the given
* ticket to filter invalid timeouts out.
*
* @param allocationId identifying the task slot to be timed out
* @param ticket allowing to filter invalid timeouts out
*/
void timeoutSlot(AllocationID allocationId, UUID ticket);
}
其中AllocationID是JobManager通過ResourceManager分配的物理Slot對應(yīng)的唯一標(biāo)識。在JobManager第一次請求的時候指定,重試的時候保持不變。這個ID用于TaskManager和ResourceManager追蹤和同步slot的分配狀態(tài)。和SlotRequestId不同的是,task從SlotPool中請求邏輯slot的時候使用SlotRequestId。由于存在slot共享的緣故,多個邏輯slot的請求可能映射到同一個物理slot請求。
SlotAction唯一的實現(xiàn)類是SlotActionsImpl,位于TaskExecutor.java中。稍后用到的時候在分析它。
allocateSlot 方法
為指定的job分配一個slot,使用指定的index。如果index為負(fù)數(shù)則使用自增的index。如果slot可以分配,返回true。
@Override
public boolean allocateSlot(
int index,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
Time slotTimeout) {
// 檢查TaskSlotTable的狀態(tài)是否為RUNNING
checkRunning();
Preconditions.checkArgument(index < numberSlots);
// 檢查這個allocation id是否已經(jīng)分配過slot
// 如果分配過,直接返回
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
return false;
}
// 如果taskSlots列表包含這個index
if (taskSlots.containsKey(index)) {
// 獲取這個重復(fù)的taskslot
TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
index,
duplicatedTaskSlot.getResourceProfile(),
duplicatedTaskSlot.getJobId(),
duplicatedTaskSlot.getAllocationId());
// 只有在這個重復(fù)的taskslot的job id和allocation id相同的情況下,才允許分配
return duplicatedTaskSlot.getJobId().equals(jobId) &&
duplicatedTaskSlot.getAllocationId().equals(allocationId);
} else if (allocatedSlots.containsKey(allocationId)) {
// 如果allocation id已經(jīng)分配過slot,返回true
// 這里有疑問,上面已經(jīng)檢測過是否已分配,不太可能進(jìn)入這個分支
return true;
}
// 如果index大于等于0,使用默認(rèn)的ResourceProfile,否則使用方法傳入的resourceProfile
resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
// 檢查是否還能夠分配出滿足條件的資源
if (!budgetManager.reserve(resourceProfile)) {
LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
+ "while the currently remaining available resources are {}, total is {}.",
resourceProfile,
budgetManager.getAvailableBudget(),
budgetManager.getTotalBudget());
return false;
}
// 創(chuàng)建一個新的TaskSlot
taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);
if (index >= 0) {
// 存入taskSlots集合
taskSlots.put(index, taskSlot);
}
// update the allocation id to task slot map
// 加入到已分配slot的集合中
allocatedSlots.put(allocationId, taskSlot);
// register a timeout for this slot since it's in state allocated
// 注冊slot的超時時間定時器,在slot超時后會調(diào)用超時處理邏輯
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
// add this slot to the set of job slots
// 將slot和job id關(guān)聯(lián)起來
Set<AllocationID> slots = slotsPerJob.get(jobId);
if (slots == null) {
slots = new HashSet<>(4);
slotsPerJob.put(jobId, slots);
}
slots.add(allocationId);
return true;
}
addTask
將task添加到slot中,通過allocation id匹配。
@Override
public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
// 檢查TaskSlotTable是否在運行
checkRunning();
Preconditions.checkNotNull(task);
// 從allocatedSlots集合獲取taskSlot
TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
if (taskSlot != null) {
// 如果taskSlot在運行狀態(tài),job id和allocation id與task的相同
if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
// 將task指定給taskslot,并且設(shè)定映射關(guān)系
if (taskSlot.add(task)) {
taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
return true;
} else {
return false;
}
} else {
throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
}
} else {
throw new SlotNotFoundException(task.getAllocationId());
}
}
createSlotReport
這個方法返回當(dāng)前TaskManager中slot分配情況的報告。
返回的SlotReport是TaskExecutor中一系列slot狀態(tài)的報告。
@Override
public SlotReport createSlotReport(ResourceID resourceId) {
List<SlotStatus> slotStatuses = new ArrayList<>();
// 獲取固定分配的slot狀態(tài)
for (int i = 0; i < numberSlots; i++) {
SlotID slotId = new SlotID(resourceId, i);
SlotStatus slotStatus;
if (taskSlots.containsKey(i)) {
TaskSlot<T> taskSlot = taskSlots.get(i);
slotStatus = new SlotStatus(
slotId,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
} else {
slotStatus = new SlotStatus(
slotId,
defaultSlotResourceProfile,
null,
null);
}
slotStatuses.add(slotStatus);
}
// 獲取自動分配的slot狀態(tài)
for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
// slot id小于0表示該slot是動態(tài)分配的
if (taskSlot.getIndex() < 0) {
SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
SlotStatus slotStatus = new SlotStatus(
slotID,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
slotStatuses.add(slotStatus);
}
}
final SlotReport slotReport = new SlotReport(slotStatuses);
return slotReport;
}
TaskSlot
TaskSlot是TaskSlotTable中維護(hù)的Slot的類型的包裝類。TaskSlot是一個容器,內(nèi)部有一個tasks變量,負(fù)責(zé)維護(hù)屬于同一個slot的所有tasks。
TaskSlot 的成員變量
/** Index of the task slot. */
// taskSlot的索引
// 小于0的值表示動態(tài)分配
private final int index;
/** Resource characteristics for this slot. */
private final ResourceProfile resourceProfile;
/** Tasks running in this slot. */
private final Map<ExecutionAttemptID, T> tasks;
private final MemoryManager memoryManager;
/** State of this slot. */
private TaskSlotState state;
/** Job id to which the slot has been allocated. */
private final JobID jobId;
/** Allocation id of this slot. */
private final AllocationID allocationId;
/** The closing future is completed when the slot is freed and closed. */
private final CompletableFuture<Void> closingFuture;
ResourceProfile
ResourceProfile是Slot資源需求的一個包裝類。它的所有字段都是final類型,一旦創(chuàng)建后不可再修改。
ResourceProfile的主要成員變量如下所示:
/** How many cpu cores are needed. Can be null only if it is unknown. */
// CPU核心數(shù)
@Nullable
private final Resource cpuCores;
/** How much task heap memory is needed. */
// task堆內(nèi)存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskHeapMemory;
/** How much task off-heap memory is needed. */
// task堆外內(nèi)存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskOffHeapMemory;
/** How much managed memory is needed. */
// 管理內(nèi)存
@Nullable // can be null only for UNKNOWN
private final MemorySize managedMemory;
/** How much network memory is needed. */
// 網(wǎng)絡(luò)傳輸緩存
@Nullable // can be null only for UNKNOWN
private final MemorySize networkMemory;
/** A extensible field for user specified resources from {@link ResourceSpec}. */
// 其他類型的資源,在Resource中指定
private final Map<String, Resource> extendedResources = new HashMap<>(1);
AllocationID
AllocationID是JobManager通過ResourceManger申請物理slot時的唯一標(biāo)識。它在SlotPoolImpl的requestSlotFromResourceManager方法中創(chuàng)建并確定下來,以后即便是請求重試,AllocationID也不會再改變。
調(diào)用流程
Flink Slot分配全過程涉及到的幾個重點類的調(diào)用流程如下圖。

Slot 申請流程
Slot申請流程我們從ExecutionGraph分配資源開始分析,一路跟蹤,直到TaskExecutor中創(chuàng)建出slot。
我們從JobManager分配資源的入口開始逐個分析調(diào)用流程。
Execution 的 allocateAndAssignSlotForExecution
該方法為ExecutionGraph中的一個頂點vertex分配其執(zhí)行所需的slot。
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
SlotProviderStrategy slotProviderStrategy,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
checkNotNull(slotProviderStrategy);
// 檢測必須在JobMaster的主線程執(zhí)行
assertRunningInJobMasterMainThread();
// 獲取ExecutionGraph任務(wù)定點的slot共享組配置
// 在slotSharingGroup是軟限制,位于同一個slotSharingGroup的task可在同一個slot中運行
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
// CoLocationConstraint管理task的執(zhí)行位置
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
// this method only works if the execution is in the state 'CREATED'
// 從CREATE狀態(tài)變更為SCHEDULED狀態(tài),只有初始為CREATED狀態(tài)時才返回true
if (transitionState(CREATED, SCHEDULED)) {
// 獲取SlotSharingGroup的ID
final SlotSharingGroupId slotSharingGroupId = sharingGroup.getSlotSharingGroupId();
// 構(gòu)建調(diào)度單元,將Execution vertex,slotSharingGroup和locationConstraint封裝在一起
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
// try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
ExecutionVertex executionVertex = getVertex();
// 獲取最近一次執(zhí)行分配的allocation id
AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
// allocation id裝入集合中
Collection<AllocationID> previousAllocationIDs =
lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
// calculate the preferred locations
// 計算task首選運行位置,在哪些task manager
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
calculatePreferredLocations(locationPreferenceConstraint);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> logicalSlotFuture =
preferredLocationsFuture.thenCompose(
(Collection<TaskManagerLocation> preferredLocations) -> {
LOG.info("Allocating slot with SlotRequestID {} for the execution attempt {}.", slotRequestId, attemptId);
// 組合上一個CompletableFuture
// 等task執(zhí)行位置計算完畢后,調(diào)用SlotProviderStrategy(slot供給策略)的分配slot邏輯
return slotProviderStrategy.allocateSlot(
slotRequestId,
toSchedule,
SlotProfile.priorAllocation(
vertex.getResourceProfile(),
getPhysicalSlotResourceProfile(vertex),
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds));
});
// register call back to cancel slot request in case that the execution gets canceled
// 當(dāng)分配的資源被回收的時候調(diào)用
releaseFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
// 如果slot請求取消
// 調(diào)用取消Slot請求的邏輯
if (logicalSlotFuture.cancel(false)) {
slotProviderStrategy.cancelSlotRequest(
slotRequestId,
slotSharingGroupId,
new FlinkException("Execution " + this + " was released."));
}
});
// This forces calls to the slot pool back into the main thread, for normal and exceptional completion
// 將攜帶了LogicalSlot的future返回
return logicalSlotFuture.handle(
(LogicalSlot logicalSlot, Throwable failure) -> {
if (failure != null) {
throw new CompletionException(failure);
}
// 如果logicalSlot可以分配給execution,返回true
if (tryAssignResource(logicalSlot)) {
return logicalSlot;
} else {
// release the slot
// 如果無法分配,釋放掉這個slot
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
throw new CompletionException(
new FlinkException(
"Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
} else {
// call race, already deployed, or already done
throw new IllegalExecutionStateException(this, CREATED, state);
}
}
這里涉及到兩個類:SlotSharingGroup和CoLocationConstraint。
其中SlotSharingGroup是slot共享的軟限制。group id相同的Execution Vertex可以被調(diào)度到同一個slot中執(zhí)行。它包含3個成員變量:
// 保存屬于這個group的execution vertex
private final Set<JobVertexID> ids = new TreeSet<>();
// group id,由long類型的lowerPart和upperPart構(gòu)成
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
// 組內(nèi)所有task的資源需求
private ResourceSpec resourceSpec = ResourceSpec.ZERO;
相對于SlotSharingGroup而言,CoLocationConstraint是slot共享的硬限制。CoLocationConstraint規(guī)定了task(execution頂點)在哪里執(zhí)行。CoLocationConstraint將ColocationGroup和TaskManagerLocation綁定在一起,屬于同一個ColocationGroup的task都在指定的TaskManager中運行。ColocationGroup持有一系列JobVertex的集合。這里就不在貼出代碼了。
接著我們重點跟蹤SlotProviderStrategy的allocateSlot方法。
SlotProviderStrategy具有兩個子類:
- BatchSlotProviderStrategy:不指定分配slot操作的超時時間
- NormalSlotProviderStrategy:指定分配slot操作的超時時間,除此之外其他邏輯和
BatchSlotProviderStrategy一模一樣
以NormalSlotProviderStrategy為例,它的allocateSlot方法調(diào)用了SchedulerImpl的allocateSlot。一路追蹤調(diào)用:allocateSlot -> allocateSlotInternal -> internalAllocateSlot -> allocateSharedSlot。
SchedulerImpl 的 allocateSharedSlot
private CompletableFuture<LogicalSlot> allocateSharedSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
// allocate slot with slot sharing
// 構(gòu)建一個SlotSharingManager
// 負(fù)責(zé)管理slot共享。slot共享允許同一個slot運行不同的任務(wù)
final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
scheduledUnit.getSlotSharingGroupId(),
id -> new SlotSharingManager(
id,
slotPool,
this));
// MultiTaskSlotLocality為MultiTaskSlot和Locality的封裝
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
try {
// 判斷是否有colocation限制,調(diào)用不同的分配多任務(wù)slot方法
if (scheduledUnit.getCoLocationConstraint() != null) {
multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
scheduledUnit.getCoLocationConstraint(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
} else {
multiTaskSlotLocality = allocateMultiTaskSlot(
scheduledUnit.getJobVertexId(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
}
} catch (NoResourceAvailableException noResourceException) {
return FutureUtils.completedExceptionally(noResourceException);
}
// sanity check
// 檢查這個multiTaskSlotLocality對象的MultiTaskSlot或者是其子slot需要包含jobVertex id
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));
// 在這個MultiTaskSlot下分配一個SingleTaskSlot
final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
slotRequestId,
slotProfile.getTaskResourceProfile(),
scheduledUnit.getJobVertexId(),
multiTaskSlotLocality.getLocality());
return leaf.getLogicalSlotFuture();
}
allocateCoLocatedMultiTaskSlot
該方法分配具有colocation限制的MultiTaskSlot。
在分析這個方法之前我們要先了解下Locality這個枚舉,它表示task需要如何調(diào)度執(zhí)行。各個值的解釋如下:
- UNCONSTRAINED:沒有限制task調(diào)度到何處
- LOCAL:task分配到同一個TaskManager中
- HOST_LOCAL:task分配到同一個主機上
- NON_LOCAL:task分配到除了locality偏好之外的地方
- UNKNOWN:未知
下面是allocateCoLocatedMultiTaskSlot方法的代碼和分析:
private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
CoLocationConstraint coLocationConstraint,
SlotSharingManager multiTaskSlotManager,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) throws NoResourceAvailableException {
final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
if (coLocationSlotRequestId != null) {
// we have a slot assigned --> try to retrieve it
// 獲取SlotSharingManager中slot request id對應(yīng)的taskSlot
final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
if (taskSlot != null) {
// 檢查這個slot必須是MultiTaskSlot
Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
// 如果這個MultiTaskSlot持有的資源滿足slotProfile的要求,返回這個slot,模式為在同一TM運行
if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getTaskResourceProfile())) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
}
// 否則拋出異常,資源不足
throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
} else {
// the slot may have been cancelled in the mean time
// 執(zhí)行這個方法的時候可能slot被取消,因此增加這個邏輯
coLocationConstraint.setSlotRequestId(null);
}
}
// 如果這個constraint的運行位置已經(jīng)指定
if (coLocationConstraint.isAssigned()) {
// refine the preferred locations of the slot profile
// 更新slot profile,加入首選的運行位置(TaskManager位置)
slotProfile = SlotProfile.priorAllocation(
slotProfile.getTaskResourceProfile(),
slotProfile.getPhysicalSlotResourceProfile(),
Collections.singleton(coLocationConstraint.getLocation()),
slotProfile.getPreferredAllocations(),
slotProfile.getPreviousExecutionGraphAllocations());
}
// get a new multi task slot
// 前面邏輯已經(jīng)判斷過,如果之前沒有申請過slot,在這里分配一個
SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
coLocationConstraint.getGroupId(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
// check whether we fulfill the co-location constraint
// 檢查constraint狀態(tài)和能否在同一個TM運行
if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
// 如果不能,不符合限制要求,釋放掉這個slot
multiTaskSlotLocality.getMultiTaskSlot().release(
new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
// 拋出資源不足異常
throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " +
"co location constraint " + coLocationConstraint + '.');
}
// 為這個MultiTaskSlot分配一個子MultiTaskSlot
final SlotRequestId slotRequestId = new SlotRequestId();
final SlotSharingManager.MultiTaskSlot coLocationSlot =
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
slotRequestId,
coLocationConstraint.getGroupId());
// mark the requested slot as co-located slot for other co-located tasks
// 將coLocationConstraint和slot request關(guān)聯(lián)起來,表示這個slot是具有運行位置限制的slot
coLocationConstraint.setSlotRequestId(slotRequestId);
// lock the co-location constraint once we have obtained the allocated slot
// slot分配完畢之后執(zhí)行
coLocationSlot.getSlotContextFuture().whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (throwable == null) {
// check whether we are still assigned to the co-location constraint
// 如果沒有異常,綁定coLocationConstraint的位置限制
if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
} else {
log.debug("Failed to lock colocation constraint {} because assigned slot " +
"request {} differs from fulfilled slot request {}.",
coLocationConstraint.getGroupId(),
coLocationConstraint.getSlotRequestId(),
slotRequestId);
}
} else {
log.debug("Failed to lock colocation constraint {} because the slot " +
"allocation for slot request {} failed.",
coLocationConstraint.getGroupId(),
coLocationConstraint.getSlotRequestId(),
throwable);
}
});
return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
}
接下來我們分析下MultiTaskSlot是怎么分配出來的。
allocateMultiTaskSlot
private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
AbstractID groupId,
SlotSharingManager slotSharingManager,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
// 返回所有根slot的信息
Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
slotSharingManager.listResolvedRootSlotInfo(groupId);
// 根據(jù)slot選擇策略,從SlotSharingManager中選擇出一個最適合的根slot
SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
// 將這個選擇出的slot包裝為MultiTaskSlotLocality
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ?
new SlotSharingManager.MultiTaskSlotLocality(
slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()),
bestResolvedRootSlotWithLocality.getLocality()) :
null;
// 如果這個slot資源充足,可以LOCAL模式運行,返回這個multiTaskSlotLocality
if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
return multiTaskSlotLocality;
}
final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
// 嘗試從SlotPool中查找一個最合適的slot
Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);
// 如果找到了
if (optionalPoolSlotAndLocality.isPresent()) {
SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
// 校驗下如果這個slot資源充足,并且在SlotSharingManager中沒有找到最合適slot
if (poolSlotAndLocality.getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null) {
final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
// 在SlotSharingManager中創(chuàng)建這個slot對應(yīng)的MultiTaskSlot
final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
allocatedSlotRequestId);
// 將multiTaskSlot加入到allocatedSlot的負(fù)載中
if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality());
} else {
multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
}
if (multiTaskSlotLocality != null) {
// prefer slot sharing group slots over unused slots
// 如果在SlotSharingManager和SlotPool都找到了匹配的slot,優(yōu)先使用SlotSharingManager中的
// 將SlotPool中的匹配slot釋放掉
if (optionalPoolSlotAndLocality.isPresent()) {
slotPool.releaseSlot(
allocatedSlotRequestId,
new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
}
return multiTaskSlotLocality;
}
// there is no slot immediately available --> check first for uncompleted slots at the slot sharing group
// 到這里說明目前沒有可用的slot,從unresolvedRootSlots中獲取一個尚未分配的slot
SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
// 如果沒有找到
if (multiTaskSlot == null) {
// it seems as if we have to request a new slot from the resource manager, this is always the last resort!!!
// 到這里意味著我們必須去ResourceManager申請一個新的slot
final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(
allocatedSlotRequestId,
slotProfile,
allocationTimeout);
// 在SlotSharingManager中創(chuàng)建一個root slot
multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
slotAllocationFuture,
allocatedSlotRequestId);
// 設(shè)定分配成功之后的邏輯
slotAllocationFuture.whenComplete(
(PhysicalSlot allocatedSlot, Throwable throwable) -> {
final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
if (taskSlot != null) {
// still valid
// 遇到異常的時候,釋放掉slot
if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
taskSlot.release(throwable);
} else {
if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
} else {
slotPool.releaseSlot(
allocatedSlotRequestId,
new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
}
});
}
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
}
這里我們順便深入下SlotSelectionStrategy。它是一系列挑選最匹配slot的子類共同的接口。它的子類如下:
- DefaultLocationPreferenceSlotSelectionStrategy:從待選slot集合中找到第一個返回。
- EvenlySpreadOutLocationPreferenceSlotSelectionStrategy:找到所有匹配slot中所在TM資源使用率最低的返回。
- PreviousAllocationSlotSelectionStrategy:先使用SlotProfile中指定的首選運行位置,如果沒有,再使用其他Slot選擇策略。
下面我們開始分析SchedulerImpl的requestNewAllocatedSlot,即請求分配新的slot。
@Nonnull
private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
SlotRequestId slotRequestId,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
if (allocationTimeout == null) {
return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());
} else {
return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);
}
}
這個方法根據(jù)是否指定了分配超時時間來調(diào)用SlotPool的對應(yīng)方法。requestNewAllocatedBatchSlot和requestNewAllocatedSlot邏輯基本相同,只是后者增加了超時檢測邏輯。我們選擇最為復(fù)雜的requestNewAllocatedSlot方法分析。
SlotPoolImpl的requestNewAllocatedSlot方法如下所示:
@Nonnull
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nullable Time timeout) {
// 檢查方法在主線程執(zhí)行
componentMainThreadExecutor.assertRunningInMainThread();
// 創(chuàng)建一個Slot請求對象
// SlotPool先將Slot請求緩存起來,當(dāng)TaskManager獲取slot的時候才會真正創(chuàng)建
final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
// 如果傳入了超時時間,注冊超時處理
if (timeout != null) {
// register request timeout
FutureUtils
.orTimeout(
pendingRequest.getAllocatedSlotFuture(),
timeout.toMilliseconds(),
TimeUnit.MILLISECONDS,
componentMainThreadExecutor)
.whenComplete(
(AllocatedSlot ignored, Throwable throwable) -> {
if (throwable instanceof TimeoutException) {
timeoutPendingSlotRequest(slotRequestId);
}
});
}
// 調(diào)用requestNewAllocatedSlotInternal請求新slot
return requestNewAllocatedSlotInternal(pendingRequest)
.thenApply((Function.identity()));
}
SlotPoolImpl的requestNewAllocatedSlotInternal方法如下所示。這個方法SlotPool請求ResourceManager來分配一個新的slot。
@Nonnull
private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {
if (resourceManagerGateway == null) {
// 如果沒有ResourceManager網(wǎng)關(guān),先將請求入棧,放入到waitingForResourceManager中
// 這個LinkedHashMap保存了slot request id和slot request的對應(yīng)關(guān)系
stashRequestWaitingForResourceManager(pendingRequest);
} else {
// 從ResourceManager請求slot
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
return pendingRequest.getAllocatedSlotFuture();
}
我們重點分析下SlotPoolImpl的requestSlotFromResourceManager方法:
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway,
final PendingRequest pendingRequest) {
checkNotNull(resourceManagerGateway);
checkNotNull(pendingRequest);
// 創(chuàng)建一個allocationID,lowerPart和upperPart使用隨機long
final AllocationID allocationId = new AllocationID();
// 為pendingRequest指定allocationID
pendingRequest.setAllocationId(allocationId);
// 放入pendingRequests集合
// 這是一個復(fù)合key map,分別使用slot request id和allocation id作為key
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
// 指定slot分配完成時的操作
pendingRequest.getAllocatedSlotFuture().whenComplete(
(AllocatedSlot allocatedSlot, Throwable throwable) -> {
if (throwable != null) {
// the allocation id can be remapped so we need to get it from the pendingRequest
// where it will be updated timely
// 重新獲取allocationID,因為這個id可能會在申請slot過程中改變
final Optional<AllocationID> updatedAllocationId = pendingRequest.getAllocationId();
// 處理出錯邏輯,取消申請slot
if (updatedAllocationId.isPresent()) {
// cancel the slot request if there is a failure
resourceManagerGateway.cancelSlotRequest(updatedAllocationId.get());
}
}
});
log.info("Requesting new slot [{}] and profile {} with allocation id {} from resource manager.",
pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile(), allocationId);
// 向ResourceManager發(fā)送一個SlotRequest,請求slot
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
// slot請求完畢后執(zhí)行
FutureUtils.whenCompleteAsyncIfNotDone(
rmResponse,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable failure) -> {
// on failure, fail the request future
if (failure != null) {
// 如果失敗,調(diào)用失敗處理邏輯,調(diào)用future的completeExceptionally方法
slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
}
});
}
ResourceManager請求slot的邏輯如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
final Time timeout) {
JobID jobId = slotRequest.getJobId();
// 獲取作業(yè)ID對應(yīng)的JobManager
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
log.info("Request slot with profile {} for job {} with allocation id {}.",
slotRequest.getResourceProfile(),
slotRequest.getJobId(),
slotRequest.getAllocationId());
try {
// 注冊slot申請給SlotManager
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
return FutureUtils.completedExceptionally(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
}
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
}
}
分析到這里,我們得知ResourceManager最終將SlotRequest交給了內(nèi)部的SlotManager來處理。
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
// 檢查狀態(tài)是否為started
checkInit();
// 檢查已滿足的slot請求和積壓的slot請求中有沒有allocation id和下面方法參數(shù)相同的
// 如果重復(fù),返回false
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
// 包裝slot并存入pendingSlotRequests集合
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
// 調(diào)用內(nèi)部請求slot的方法,下面分析
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
我們繼續(xù)跟蹤internalRequestSlot方法。
internalRequestSlot
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
// 獲取slotRequest的資源要求
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
這里的findMatchingSlot方法通過slotMatchingStrategy在freeSlots集合中查找出資源需求匹配的slot,確保匹配的slot狀態(tài)為SlotState.FREE,將其從freeSlots集合中剔除后返回。
Flink把查找匹配slot的邏輯封裝為slotMatchingStrategy,它有兩個子類:
-
AnyMatchingSlotMatchingStrategy:找到第一個匹配的slot,只要發(fā)現(xiàn)的slot持有的資源大于資源需求就返回。 -
LeastUtilizationSlotMatchingStrategy:在前一個策略的基礎(chǔ)上,還會計算每個TaskExecutor的slot利用率,將利用率最低的TaskExecutor上的slot返回。
通過findMatchingSlot方法,如果找到了匹配的slot,調(diào)用allocateSlot方法,通知TaskExecutor分配slot。如果沒有匹配到,調(diào)用fulfillPendingSlotRequestWithPendingTaskManagerSlot。
fulfillPendingSlotRequestWithPendingTaskManagerSlot
該方法將根據(jù)所需資源(pendingSlotRequest.getResourceProfile()),創(chuàng)建出PendingTaskManagerSlot放入到pendingSlot中保存。這些處于pending狀態(tài)的slot在registerTaskManager的時候會被注冊(registerSlot)。在這個時候,pending slot才會被真正的分配出來,在對應(yīng)的TaskExecutor中創(chuàng)建。
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
// 獲取PendingSlotRequest的資源要求
ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
// 從PendingTaskManagerSlot中找到一個符合資源需求的slot
Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
// 如果沒有找到資源需求匹配的pending slot
// 分配resourceProfile指定的資源,創(chuàng)建一個pending slot
if (!pendingTaskManagerSlotOptional.isPresent()) {
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
}
// 如果創(chuàng)建成功,執(zhí)行assignPendingTaskManagerSlot方法
// 此方法將pendingSlotRequest和pendingTaskManagerSlot關(guān)聯(lián)起來
OptionalConsumer.of(pendingTaskManagerSlotOptional)
.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot))
.ifNotPresent(() -> {
// request can not be fulfilled by any free slot or pending slot that can be allocated,
// check whether it can be fulfilled by allocated slots
if (failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {
throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
}
});
}
allocateSlot
現(xiàn)在我們分析下internalRequestSlot邏輯的里一個分支allocateSlot方法調(diào)用。
SlotManagerImpl的allocateSlot方法內(nèi)容如下:
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
Preconditions.checkState(taskManagerSlot.getState() == SlotState.FREE);
// 從slot中獲取和TaskManager的連接信息
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
// 獲取RPC調(diào)用端
TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
final InstanceID instanceID = taskManagerSlot.getInstanceId();
// 為slot指定PendingSlotRequest
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
// 如果這個PendingSlotRequest已經(jīng)分配slot,需要先歸還
returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
// 獲取已注冊的TaskManager
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
if (taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " +
instanceID + '.');
}
// 標(biāo)記這個taskManager狀態(tài)為使用中
taskManagerRegistration.markUsed();
// RPC call to the task manager
// 遠(yuǎn)程調(diào)用TaskExecutor的requestSlot方法
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
// RPC調(diào)用完成后,執(zhí)行completableFuture
requestFuture.whenComplete(
(Acknowledge acknowledge, Throwable throwable) -> {
if (acknowledge != null) {
completableFuture.complete(acknowledge);
} else {
completableFuture.completeExceptionally(throwable);
}
});
completableFuture.whenCompleteAsync(
(Acknowledge acknowledge, Throwable throwable) -> {
try {
if (acknowledge != null) {
// 如果分配成功,更新slot信息
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if (throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
// 如果slot被占用,更新slot信息
updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
// 否則,移除SlotRequest
removeSlotRequestFromSlot(slotId, allocationId);
}
if (!(throwable instanceof CancellationException)) {
// 如果slot分配操作取消,調(diào)用處理失敗slot請求邏輯
handleFailedSlotRequest(slotId, allocationId, throwable);
} else {
LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
}
} catch (Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
},
mainThreadExecutor);
}
上面ResourceManager通過RPC調(diào)用了TaskExecutor的requestSlot方法。
requestSlot
TaskExecutor.requestSlot內(nèi)容如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, resourceManagerId);
// 檢測是否連接到了ResourceManager
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}
try {
// 執(zhí)行TaskExecutor的allocateSlot方法
allocateSlot(
slotId,
jobId,
allocationId,
resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
// 創(chuàng)建一個作業(yè)
job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
}
if (job.isConnected()) {
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
最后我們跟蹤到了TaskExecutor的allocateSlot方法。這個方法內(nèi)容較少,不再貼出相關(guān)代碼。該方法最終調(diào)用TaskSlotTable的allocateSlot方法。
TaskExecutor 的 allocateSlot 方法
private void allocateSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile) throws SlotAllocationException {
// 如果slot處于空閑狀態(tài)
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
// taskSlotTable分配slot
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
// 進(jìn)入這個分支表明slot被分配給了其他的job
final String message = "The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
}
到這里為止,我們完成了從Execution vertex到最終TaskManager創(chuàng)建出TaskSlot的過程。
TaskManager 啟動時分配slot邏輯
TaskExecutor的startTaskExecutorServices方法。該方法啟動了ResourceManager資源管理器Leader信息的獲取服務(wù),并注冊了一個監(jiān)聽器,實時監(jiān)聽ResourceManager leader狀態(tài)的變化。然后啟動TaskSlotTable,Job leader服務(wù)和文件緩存。
startTaskExecutorServices
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
當(dāng)ResourceManager leader服務(wù)選舉成功之時通知ResourceManagerLeaderListener,調(diào)用它的notifyLeaderAddress方法。
notifyLeaderAddress
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
這里異步調(diào)用了notifyOfNewResourceManagerLeader方法。我們跟蹤一下。
notifyOfNewResourceManagerLeader
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
該方法首先保存了選舉后確定的ResourceManager leader地址,然后建立和ResourceManager的連接。
我們跟蹤下建立連接的方法。
reconnectToResourceManager
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
startRegistrationTimeout();
tryConnectToResourceManager();
}
為了邏輯統(tǒng)一,這里實際使用的是重新連接的邏輯。首先關(guān)閉和ResourceManager的連接,然后創(chuàng)建超時檢測任務(wù)(超時時間從配置文件中讀?。?,最后嘗試和ResourceManager建立連接。
繼續(xù)跟蹤tryConnectToResourceManager方法。
tryConnectToResourceManager
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
// 創(chuàng)建一個TaskExecutor注冊對象
// 包含TaskExecutor的地址端口,資源配置硬件信息等
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
JMXService.getPort().orElse(-1),
hardwareDescription,
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
// 建立和ResourceManager的連接,并啟動
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
resourceManagerConnection.start();
}
TaskExecutor向ResourceManager注冊并開啟連接。注意這里創(chuàng)建了一個連接狀態(tài)監(jiān)聽器。注冊并連接成功后,調(diào)用ResourceManagerRegistrationListener的onRegistrationSuccess方法。
onRegistrationSuccess
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch (Throwable t) {
log.error("Establishing Resource Manager connection in Task Executor failed", t);
}
}
});
}
回調(diào)函數(shù)異步調(diào)用TaskExecutor的establishResourceManagerConnection,執(zhí)行建立連接后的邏輯。
establishResourceManagerConnection
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
// 向ResourceManager異步發(fā)送slot報告
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
// 通過TaskSlotTable創(chuàng)建slot報告
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
// 如果遇到異常,再次嘗試重新連接ResourceManager
reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
}
}, getMainThreadExecutor());
// monitor the resource manager as heartbeat target
// 監(jiān)測和ResourceManager之間的心跳狀態(tài)
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// 收到心跳后向ResourceManager回送心跳信息
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
// set the propagated blob server address
// 設(shè)置blob server地址信息
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
// 設(shè)置blobCache
blobCacheService.setBlobServerAddress(blobServerAddress);
// 保存已創(chuàng)建的ResourceManager連接信息
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId,
taskExecutorRegistrationId);
// 停止連接注冊過程超時計時器
stopRegistrationTimeout();
}
發(fā)送SlotReport的方法在ResourceManager的sendSlotReport。
sendSlotReport
@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
// 通過SlotManager注冊TaskManager
if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}
這個方法向SlotManager注冊了TaskManager。我們繼續(xù)跟蹤。
registerTaskManager
SlotManagerImpl.registerTaskManager方法內(nèi)容如下:
@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
// 檢查slotManager狀態(tài)確保已經(jīng)啟動
checkInit();
LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
// 如果包含TaskExecutor的instance id,說明這個task executor已經(jīng)注冊過
// 更新slots中保存的slot信息,返回false
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
// 檢查分配slot后slot個數(shù)是否超過上限
// slot最大個數(shù)通過slotmanager.number-of-slots.max配置
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
// 將資源釋放
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
return false;
}
// first register the TaskManager
ArrayList<SlotID> reportedSlots = new ArrayList<>();
// 將slot report中的各個slot id寫入reportedSlots
for (SlotStatus slotStatus : initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}
// 生成并保存TaskManager的注冊信息
TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
taskExecutorConnection,
reportedSlots);
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
// next register the new slots
// 逐個注冊slot
for (SlotStatus slotStatus : initialSlotReport) {
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}
這個方法返回一個布爾值。如果TaskManager之前沒有注冊過,并且注冊成功,返回true。否則返回false。
接下來我們重點分析下SlotManagerImpl.registerSlot方法。SlotManager通過這個方法為TaskManager注冊slot。
registerSlot
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
JobID jobId,
ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {
// 如果要注冊的slot id和已存在的某個slot相同,需要先移除這個已存在的slot
if (slots.containsKey(slotId)) {
// remove the old slot first
removeSlot(
slotId,
new SlotManagerException(
String.format(
"Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
slotId)));
}
// 創(chuàng)建一個新的TaskManagerSlot中,并注冊(保存到slots集合)
final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
final PendingTaskManagerSlot pendingTaskManagerSlot;
if (allocationId == null) {
// 如果沒有allocationId,找到一個資源要求匹配的pending slot
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}
// 如果沒有找到資源要求匹配的slot,更新slot信息
if (pendingTaskManagerSlot == null) {
updateSlot(slotId, allocationId, jobId);
} else {
// 從pendingSlots中移除
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
// 取出pending slot申請請求
final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
if (assignedPendingSlotRequest == null) {
// 當(dāng)前slot無人請求,放入空閑slot集合中
handleFreeSlot(slot);
} else {
// 開始分配slot
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
allocateSlot(slot, assignedPendingSlotRequest);
}
}
}
此時,ResourceManager開始真正的分配slot流程。分配slot過程位于SlotManagerImpl.allocateSlot方法。后面的過程和Slot申請流程相同,不再贅述。
本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。