//包裝成一個(gè)任務(wù)級(jí)進(jìn)行提交
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
TaskSchedulerImpl.submitTasks
//任務(wù)集管理器
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
}
//調(diào)度buid加入管理器
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
調(diào)度器初始化
def initialize(backend: SchedulerBackend): Unit = {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
FIFOSchedulableBuilder.addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
//任務(wù)池
rootPool.addSchedulable(manager)
}
backend.reviveOffers()
CoarseGrainedSchedulerBackend.reviveOffers
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
//給自己發(fā)送ReviveOffers消息
driverEndpoint.send(ReviveOffers)
}
自己接收消息
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data, resources) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
... ...
case ReviveOffers =>
//接收到ReviveOffers消息
makeOffers()
CoarseGrainedSchedulerBackend.makeOffers
private def makeOffers(): Unit = {
// Make sure no executor is killed while some task is launching on it
//得到任務(wù)的描述信息
val taskDescs = withLock {
... ...
(rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId)
}.toIndexedSeq
//調(diào)度任務(wù),從任務(wù)池里取任務(wù) 執(zhí)行
scheduler.resourceOffers(workOffers, true)
}
if (taskDescs.nonEmpty) {
//任務(wù)運(yùn)行
launchTasks(taskDescs)
}
}
resourceOffers
val sortedTaskSets = rootPool.getSortedTaskSetQueue
//判斷本地化級(jí)別
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus,
availableResources, tasks, addressesWithDescs)
launchedTaskAtCurrentMaxLocality = minLocality.isDefined
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
noDelaySchedulingRejects &= noDelayScheduleReject
globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
} while (launchedTaskAtCurrentMaxLocality)
}
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
//跟據(jù)調(diào)度算法進(jìn)行manager排序 schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
}
sortedTaskSetQueue
}
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
//不同的調(diào)度模式 有不同算法
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case _ =>
val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
throw new IllegalArgumentException(msg)
}
}
最終拿到任務(wù)就開始執(zhí)行了
launchTasks
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
遍歷每一個(gè)任務(wù)描述
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
//是否task序列化的size超出限制
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
// Do resources allocation here. The allocated resources will get released after the task
// finishes.
val rpId = executorData.resourceProfileId
val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorData.freeCores -= taskCpus
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
}
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//找到對(duì)應(yīng)executor的終端,發(fā)送LaunchTask消息
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}