任務(wù)調(diào)度-源碼分析

//包裝成一個(gè)任務(wù)級(jí)進(jìn)行提交
taskScheduler.submitTasks(new TaskSet(
       tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
       stage.resourceProfileId))

TaskSchedulerImpl.submitTasks

//任務(wù)集管理器
private[scheduler] def createTaskSetManager(
      taskSet: TaskSet,
      maxTaskFailures: Int): TaskSetManager = {
    new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
  }
//調(diào)度buid加入管理器
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

調(diào)度器初始化

 def initialize(backend: SchedulerBackend): Unit = {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

FIFOSchedulableBuilder.addTaskSetManager

  override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
   //任務(wù)池
    rootPool.addSchedulable(manager)
  }

backend.reviveOffers()
CoarseGrainedSchedulerBackend.reviveOffers

 override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
     //給自己發(fā)送ReviveOffers消息
    driverEndpoint.send(ReviveOffers)
  }

自己接收消息

 override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data, resources) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
          ... ...

      case ReviveOffers =>
      //接收到ReviveOffers消息
        makeOffers()

CoarseGrainedSchedulerBackend.makeOffers

private def makeOffers(): Unit = {
      // Make sure no executor is killed while some task is launching on it
     //得到任務(wù)的描述信息
      val taskDescs = withLock {
           ... ...
                (rName, rInfo.availableAddrs.toBuffer)
              }, executorData.resourceProfileId)
        }.toIndexedSeq
        //調(diào)度任務(wù),從任務(wù)池里取任務(wù) 執(zhí)行
        scheduler.resourceOffers(workOffers, true)
      }
      if (taskDescs.nonEmpty) {
        //任務(wù)運(yùn)行
        launchTasks(taskDescs)
      }
    }

resourceOffers

    val sortedTaskSets = rootPool.getSortedTaskSetQueue
//判斷本地化級(jí)別
 for (currentMaxLocality <- taskSet.myLocalityLevels) {
          var launchedTaskAtCurrentMaxLocality = false
          do {
            val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
              taskSet, currentMaxLocality, shuffledOffers, availableCpus,
              availableResources, tasks, addressesWithDescs)
            launchedTaskAtCurrentMaxLocality = minLocality.isDefined
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            noDelaySchedulingRejects &= noDelayScheduleReject
            globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
          } while (launchedTaskAtCurrentMaxLocality)
        }
 override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
//跟據(jù)調(diào)度算法進(jìn)行manager排序   schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
    }
    sortedTaskSetQueue
  }


 private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
   //不同的調(diào)度模式 有不同算法
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
      case _ =>
        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
        throw new IllegalArgumentException(msg)
    }
  }

最終拿到任務(wù)就開始執(zhí)行了
launchTasks

 private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
    遍歷每一個(gè)任務(wù)描述
     for (task <- tasks.flatten) {
       val serializedTask = TaskDescription.encode(task)
     //是否task序列化的size超出限制
       if (serializedTask.limit() >= maxRpcMessageSize) {
         Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
           try {
             var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
               s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
               s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
             msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
             taskSetMgr.abort(msg)
           } catch {
             case e: Exception => logError("Exception in error callback", e)
           }
         }
       }
       else {
         val executorData = executorDataMap(task.executorId)
         // Do resources allocation here. The allocated resources will get released after the task
         // finishes.
         val rpId = executorData.resourceProfileId
         val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
         val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
         executorData.freeCores -= taskCpus
         task.resources.foreach { case (rName, rInfo) =>
           assert(executorData.resourcesInfo.contains(rName))
           executorData.resourcesInfo(rName).acquire(rInfo.addresses)
         }

         logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
           s"${executorData.executorHost}.")
//找到對(duì)應(yīng)executor的終端,發(fā)送LaunchTask消息
         executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
       }
     }
   }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容