Spark-Core源碼精讀(12)、Task的提交流程分析

上一篇文章我們談到,DAGScheduler將Job劃分成由Stage組成的DAG后,就根據(jù)Stage的具體類型來生成ShuffleMapTask和ResultTask,然后使用TaskSet對其進(jìn)行封裝,最后調(diào)用TaskScheduler的submitTasks方法提交具體的TaskSet,而實(shí)際上是調(diào)用的TaskSchedulerImpl的submitTasks方法,下面我們就來分析具體Tasks提交的過程。

直接進(jìn)入TaskSchedulerImpl的submitTasks方法:

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  // 使用synchronized來確保Task的同步提交
  this.synchronized {
    // 首先創(chuàng)建一個TaskSetManager,主要負(fù)責(zé)調(diào)度TaskSet中的Tasks
    // 默認(rèn)的最大失敗重試次數(shù)是4次,可以通過spark.task.maxFailures進(jìn)行配置
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
      ts.taskSet != taskSet && !ts.isZombie
    }
    if (conflictingTaskSet) {
      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
    }
    // 將TaskSetManager及TaskSet的屬性信息添加到schedulableBuilder中
    // SchedulableBuilder會確定TaskSetManager的調(diào)度順序是FIFO還是FAIR,默認(rèn)是FIFO
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  backend.reviveOffers()
}

進(jìn)入backend的reviveOffers()方法:

override def reviveOffers() {
  driverEndpoint.send(ReviveOffers)
}

可以看到是給DriverEndpoint發(fā)送了一條ReviveOffers消息(我們在前面分析DriverEndpoint注冊的時候就給自己發(fā)送了一條ReviveOffers消息),DriverEndpoint接收到該消息后的處理如下:

case ReviveOffers =>
  makeOffers()

接下來看makeOffers():

private def makeOffers() {
  // Filter out executors under killing
  // 過濾出Alive的Executors
  val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  // 構(gòu)建由WorkerOffer組成的集合,WorkerOffer就代表Executor上可用的計算資源
  val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq
  // 將Tasks提交到Executors上
  launchTasks(scheduler.resourceOffers(workOffers))
}

在makeOffers()方法中首先準(zhǔn)備好可以用于計算的workOffers(代表所有可用的ExecutorBackend中可以使用的cores的信息)。

在launchTasks之前,我們重點(diǎn)看一下scheduler.resourceOffers(workOffers)都做了什么工作:

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  // 標(biāo)記slave為alive并記錄hostname的信息
  var newExecAvail = false
  for (o <- offers) {
    executorIdToHost(o.executorId) = o.host
    executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
    if (!executorsByHost.contains(o.host)) {
      executorsByHost(o.host) = new HashSet[String]()
      executorAdded(o.executorId, o.host)
      newExecAvail = true
    }
    // 跨機(jī)架的情況,我們這里不做考慮
    for (rack <- getRackForHost(o.host)) {
      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
  }
  // 將offers打亂,為的就是負(fù)載均衡
  // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
  val shuffledOffers = Random.shuffle(offers)
  // Build a list of tasks to assign to each worker.
  // 這里我們清楚的看見每個Executor上的Tasks的個數(shù)取決于該Executor上可用的cores的個數(shù)
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  // 每個executor上可用的cores的個數(shù)組成的數(shù)組
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  val sortedTaskSets = rootPool.getSortedTaskSetQueue
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }
  // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  // of locality levels so that it gets a chance to launch local tasks on all of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  var launchedTask = false
  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }
  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}

可以看到TaskSchedulerImpl的resourceOffers方法的主要作用就是為每個Task具體分配計算資源,輸入的就是可用的資源,輸出的是由TaskDescription組成的二維數(shù)組(Seq[Seq[TaskDescription]]),TaskDescription中保存了executorId,即每個Task具體運(yùn)行在哪個ExecutorBackend上,下面我們就具體分析該方法的執(zhí)行過程:

對可用的資源進(jìn)行標(biāo)記并記錄hostname,同時判斷是否有新的executor加入

即resourceOffers方法的如下部分:

var newExecAvail = false
for (o <- offers) {
  executorIdToHost(o.executorId) = o.host
  executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
  if (!executorsByHost.contains(o.host)) {
    executorsByHost(o.host) = new HashSet[String]()
    executorAdded(o.executorId, o.host)
    newExecAvail = true
  }
  for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
  }
}

這部分不是我們考慮的重點(diǎn),所以大家只要知道他的作用就可以了。

將所有可用的計算資源隨機(jī)打散

對應(yīng)的源碼:

// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)

通過Random.shuffle的方法將所有的計算資源重新進(jìn)行“洗牌”,以追求最大化的負(fù)載均衡

根據(jù)每個ExecutorBackend的cores的個數(shù)聲明類型為TaskDescription的ArrayBuffer數(shù)組

對應(yīng)的源碼:

val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))

此處只是聲明,并沒有加入TaskDescription

獲得根據(jù)具體的調(diào)度策略排序后的TaskSets

對應(yīng)的源碼:

val sortedTaskSets = rootPool.getSortedTaskSetQueue

調(diào)度策略即SchedulableBuilder,具體的實(shí)現(xiàn)分為FIFOSchedulableBuilder和FairSchedulableBuilder,而默認(rèn)使用的就是FIFO的調(diào)度策略。

如果有新的Executor加入,此時會調(diào)用TaskSet的executorAdded方法來獲取最新的完整的可用計算資源

對應(yīng)的源碼:

for (taskSet <- sortedTaskSets) {
  logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  if (newExecAvail) {
    taskSet.executorAdded()
  }
}

確定最高優(yōu)先級本地性

var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
  do {
    launchedTask = resourceOfferSingleTaskSet(
        taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
  } while (launchedTask)
}

這里的LocalityLevel從高到低依次為:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,其中NO_PREF是指機(jī)器的本地性,因?yàn)橐慌_機(jī)器上有可能有很多node。

下面我們來看這個resourceOfferSingleTaskSet方法:

private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  var launchedTask = false
  for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    if (availableCpus(i) >= CPUS_PER_TASK) {
      try {
        for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
          tasks(i) += task
          val tid = task.taskId
          taskIdToTaskSetManager(tid) = taskSet
          taskIdToExecutorId(tid) = execId
          executorIdToTaskCount(execId) += 1
          executorsByHost(host) += execId
          availableCpus(i) -= CPUS_PER_TASK
          assert(availableCpus(i) >= 0)
          launchedTask = true
        }
      } catch {
        case e: TaskNotSerializableException =>
          logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
          // Do not offer resources for this task, but don't throw an error to allow other
          // task sets to be submitted.
          return launchedTask
      }
    }
  }
  return launchedTask
}

首先循環(huán)遍歷判斷每個Executor上可用的cores的個數(shù)是否滿足每個Task所需要的CPU的個數(shù),默認(rèn)CPUS_PER_TASK的個數(shù)為1;然后通過調(diào)用TaskSetManager的resourceOffer方法最終確定每個Task具體運(yùn)行在哪個ExecutorBackend的具體的Locality Level:

@throws[TaskNotSerializableException]
def resourceOffer(
    execId: String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
  : Option[TaskDescription] =
{
  if (!isZombie) {
    val curTime = clock.getTimeMillis()
    var allowedLocality = maxLocality
    if (maxLocality != TaskLocality.NO_PREF) {
      allowedLocality = getAllowedLocalityLevel(curTime)
      if (allowedLocality > maxLocality) {
        // We're not allowed to search for farther-away tasks
        allowedLocality = maxLocality
      }
    }
    dequeueTask(execId, host, allowedLocality) match {
      case Some((index, taskLocality, speculative)) => {
        // Found a task; do some bookkeeping and return a task description
        val task = tasks(index)
        val taskId = sched.newTaskId()
        // Do various bookkeeping
        copiesRunning(index) += 1
        val attemptNum = taskAttempts(index).size
        val info = new TaskInfo(taskId, index, attemptNum, curTime,
          execId, host, taskLocality, speculative)
        taskInfos(taskId) = info
        taskAttempts(index) = info :: taskAttempts(index)
        // Update our locality level for delay scheduling
        // NO_PREF will not affect the variables related to delay scheduling
        if (maxLocality != TaskLocality.NO_PREF) {
          currentLocalityIndex = getLocalityIndex(taskLocality)
          lastLaunchTime = curTime
        }
        // Serialize and return the task
        // 將task序列化
        val startTime = clock.getTimeMillis()
        val serializedTask: ByteBuffer = try {
          Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
        } catch {
          // If the task cannot be serialized, then there's no point to re-attempt the task,
          // as it will always fail. So just abort the whole task-set.
          case NonFatal(e) =>
            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
            logError(msg, e)
            abort(s"$msg Exception during serialization: $e")
            throw new TaskNotSerializableException(e)
        }
        // 判斷序列化后的大小是否超過了TaskSetManager的限制,默認(rèn)是100k
        if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
            !emittedTaskSizeWarning) {
          emittedTaskSizeWarning = true
          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
            s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
        }
        addRunningTask(taskId)
        // We used to log the time it takes to serialize the task, but task size is already
        // a good proxy to task serialization time.
        // val timeTaken = clock.getTime() - startTime
        val taskName = s"task ${info.id} in stage ${taskSet.id}"
        logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
          s"$taskLocality, ${serializedTask.limit} bytes)")
        // 向DAGScheduler匯報Task開始
        sched.dagScheduler.taskStarted(task, info)
        // 返回TaskDescription
        return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
          taskName, index, serializedTask))
      }
      case _ =>
    }
  }
  None
}

該方法傳入的maxLocality就是上面TaskSchedulerImpl的resourceOffers方法中那個for循環(huán)中傳入的maxLocality,該循環(huán)是按照上面提到的Locality Level的級別由高到低進(jìn)行的,所以最優(yōu)的計算本地性就是PROCESS_LOCAL,最后返回Some(TaskDescription)給resourceOfferSingleTaskSet方法中,然后的一系列操作是更新數(shù)據(jù)結(jié)構(gòu)以及從可用的計算資源中減掉剛才Task使用的cores的個數(shù)(默認(rèn)每個Task使用一個core),最后將分配好的Task(Seq[ArrayBuffer[TaskDescription]])返回給TaskSchedulerImpl的resourceOffers方法,而該方法又將最后的結(jié)果(Seq[Seq[TaskDescription]])返回給CoarseGrainedSchedulerBackend的makeOffers方法,最后執(zhí)行l(wèi)aunchTask:

private def makeOffers() {
  // Filter out executors under killing
  val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq
  launchTasks(scheduler.resourceOffers(workOffers))
}

這里需要補(bǔ)充一點(diǎn):我們之前已經(jīng)分析過DAGScheduler是從數(shù)據(jù)的層面,也就是RDD的層面考慮的preferredLocation,即DAGScheduler部分已經(jīng)確定了Task要被發(fā)到哪個Executor上運(yùn)行;而TaskScheduler是從具體計算Task的角度考慮計算的本地性,也就是說具體的計算是發(fā)生在內(nèi)存中還是發(fā)生在本地磁盤等等(PROCESS_LOCAL、NODE_LOCAL...),由此也印證了DAGScheduler負(fù)責(zé)高層的調(diào)度任務(wù),而TaskScheduler負(fù)責(zé)底層的調(diào)度任務(wù)。

接下來我們就進(jìn)入launchTasks方法:

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    // 注意此處是對TaskDescription進(jìn)行序列化操作
    val serializedTask = ser.serialize(task)
    // 判斷序列化的大小是否超過限制
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
            "spark.akka.frameSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
            AkkaUtils.reservedSizeBytes)
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      // 獲得該TaskDescription要發(fā)送到的那個Executor的信息
      val executorData = executorDataMap(task.executorId)
      // 減掉使用的計算資源,即cores
      executorData.freeCores -= scheduler.CPUS_PER_TASK
      // 將Task發(fā)送到具體的Executor上
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

這里akkaFrameSize的默認(rèn)大小是128M(Spark 1.6.3版本,可以通過spark.akka.frameSize進(jìn)行配置),而AkkaUtils.reservedSizeBytes的大小是200k,也就是說序列化后的大小不能超過128MB-200k,最后如果小于該限制就會將task發(fā)送到具體的ExecutorBackend上。

ExecutorBackend(Standalone模式下就是CoarseGrainedExecutorBackend)接收到該消息后就會調(diào)用具體的executor的launchTask方法去執(zhí)行task:

case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }

由此可見真正發(fā)送tasks的不是TaskSetManager,而是CoarseGrainedSchedulerBackend,TaskSetManager只是負(fù)責(zé)監(jiān)控task的運(yùn)行。

至此Tasks的提交過程執(zhí)行完成,下一篇文章我們將繼續(xù)分析Tasks的運(yùn)行過程。

本文參照的是Spark 1.6.3版本的源碼,同時給出Spark 2.1.0版本的連接:

Spark 1.6.3 源碼

Spark 2.1.0 源碼

本文為原創(chuàng),歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處、作者,謝謝!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容