Spark源碼分析之作業(yè)和任務(wù)調(diào)度流程

一.前言

Spark的作業(yè)和任務(wù)調(diào)度系統(tǒng)是其核心。Spark的作業(yè)調(diào)度主要是基于RDD的一系列操作構(gòu)成一個(gè)作業(yè),然后在Executor上執(zhí)行,這些操作算子主要分為轉(zhuǎn)換和行動(dòng)算子,對(duì)于轉(zhuǎn)換算子的計(jì)算是lazy級(jí)別的,也就是延遲執(zhí)行,只有出現(xiàn)了行動(dòng)算子才觸發(fā)作業(yè)的提交。在Spark調(diào)度中,最重要的是DAGScheduler和TaskSechduler兩個(gè)調(diào)度器,其中DAGScheduler負(fù)責(zé)任務(wù)的邏輯調(diào)度,將作業(yè)拆分成不同Stage,具有依賴關(guān)系的任務(wù)集,而TaskSechduler則負(fù)責(zé)具體任務(wù)的調(diào)度執(zhí)行。

下面介紹一些相關(guān)術(shù)語(yǔ)

  • 1.作業(yè)job:RDD中由action算子觸發(fā)所生成的一個(gè)或者多個(gè)Stage
  • 2.調(diào)度階段Stage:每個(gè)作業(yè)會(huì)因?yàn)镽DD之間的依賴關(guān)系拆分成多組任務(wù)集合,稱之為Stage,也叫做任務(wù)集TaskSet,Stage是由DAGScheduler來(lái)劃分的,Stage分為ShuffleMapStage和ResultStage兩種
  • 3.DAGSchuduler:面向Stage的任務(wù)調(diào)度器,負(fù)責(zé)Spark提交的作業(yè),根據(jù)RDD的依賴關(guān)系劃分Stage,根據(jù)Stage中的最后一個(gè)RDD中的partition來(lái)確定task的數(shù)量,確定最優(yōu)的task的location,封裝成
    taskSet提交給taskScheduler
  • 4.TaskScheduler:接受DAGScheduler提交過(guò)來(lái)的調(diào)度階段,然后把任務(wù)發(fā)到woker節(jié)點(diǎn)上的Executor來(lái)運(yùn)行任務(wù)

SparkContext中有三個(gè)重要的組件,DAGScheduler、TaskSechduler、和SchedulerBackend

1.DAGScheduler
DAGScheduler主要負(fù)責(zé)將用戶的應(yīng)用的DAG劃分為不同的Stage,其中每個(gè)Stage由可以并發(fā)執(zhí)行的一組Task構(gòu)成, 這些Task的執(zhí)行邏輯完全相同,只是作用于不同的數(shù)據(jù)。

2.TaskSechduler
負(fù)責(zé)具體任務(wù)的調(diào)度執(zhí)行,從DAGScheduler接收不同Stage的任務(wù),按照調(diào)度算法,分配給應(yīng)用程序的資源Executor上執(zhí)行相關(guān)任務(wù),并為執(zhí)行特別慢的任務(wù)啟動(dòng)備份任務(wù)。

3.SchedulerBackend
分配當(dāng)前可用的資源, 具體就是向當(dāng)前等待分配計(jì)算資源的Task分配計(jì)算資源(即Executor) , 并且在分配的Executor上啟動(dòng)Task, 完成計(jì)算的調(diào)度過(guò)程。 它使用reviveOffers完成上述的任務(wù)調(diào)度。


接下來(lái)我們開(kāi)始根據(jù)源碼進(jìn)行作業(yè)和任務(wù)調(diào)度的流程分析。

二.作業(yè)和任務(wù)調(diào)度流程分析

1.劃分Stage階段

首先我們從一個(gè)action算子開(kāi)始明確整個(gè)作業(yè)的流程。

例如在RDD類中的foreach中會(huì)調(diào)用SparkContext的runJob方法,這里層層找下去會(huì)發(fā)現(xiàn)最終調(diào)用了DAGScheduler的runJob方法。

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

DAGScheduler的runJob方法中,會(huì)調(diào)用submitJob,這里會(huì)發(fā)生阻塞,直到返回作業(yè)完成或者失敗的結(jié)果。

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

然后在submitJob方法里,創(chuàng)建一個(gè)jobWaiter對(duì)象,封裝了job相關(guān)信息,并借助內(nèi)部消息處理把這個(gè)對(duì)象發(fā)送給DAGScheduler的內(nèi)嵌類DAGSchedulerEventProcessLoop進(jìn)行處理。

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //提交任務(wù)。eventProcessLoop 是 DAGSchedulerEventProcessLoop 對(duì)象
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

最后在DAGSchedulerEventProcessLoop消息接受,方法onReceive中,會(huì)調(diào)用doOnReceive,,會(huì)進(jìn)行模式匹配,匹配到接受的jobSubmitted樣例類,繼續(xù)調(diào)用DAGScheduler的handleJobSubmitted方法來(lái)提交作業(yè),在該方法中會(huì)進(jìn)行stage切分。

handleJobSubmitted的源碼如下

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
   略.....

這里通過(guò)調(diào)用createResultStage方法獲取最后一個(gè)stage。

 private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

上面通過(guò)createResultStage方法調(diào)用getOrCreateParentStages方法,主要流程是根據(jù)finalRdd找出其依賴的祖先rdd是否存在shuffle操作,如果沒(méi)有shuffle操作,則本次作業(yè)只有一個(gè)resultStage,該stage不存在父stage,如果存在shuffle操作,則本次作業(yè)存在一個(gè)resultStage和至少一個(gè)shuffleMapStage,具體是根據(jù)兩個(gè)set集合和一個(gè)棧完成的Stage判斷,而且最多只會(huì)返回當(dāng)前Stage的直接父Stage,并不會(huì)做過(guò)多的向前回溯。生成的resultStage叫做finalStage。

核心的getShuffleDependencies代碼如下

 private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }

上述的過(guò)程總結(jié)如下:
DAGScheduler會(huì)從最后一個(gè)RDD出發(fā),優(yōu)先使用廣度優(yōu)先遍歷整個(gè)依賴樹(shù),從而劃分Stage,Stage的劃分依據(jù)是以是否為寬依賴shuffleDependency進(jìn)行的,即當(dāng)某個(gè)RDD的操作為shuffle時(shí),以shuffle操作為界限劃分成兩個(gè)Stage。

當(dāng)所有Stage劃分完畢,這些Stage直接就形成了一個(gè)依賴關(guān)系,這些父Stage都會(huì)被封裝到List集合中,被稱為parents,通過(guò)該屬性可以獲取當(dāng)前Satge的所有父Stage,Stage是劃分Spark作業(yè)執(zhí)行的重要組成部分。

到這里Stage劃分階段就完成了。

2.提交Stage階段

通過(guò)上一階段生成的finalStage,生成一個(gè)作業(yè)實(shí)例,在該作業(yè)實(shí)例執(zhí)行過(guò)程中通過(guò)監(jiān)聽(tīng)總線獲取作業(yè),Stage執(zhí)行情況。

handleJobSubmitted的源碼如下

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {

  ....
   val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

之后在submitStage方法中調(diào)用getMissParentStage方法獲取finalStage中的父調(diào)度階段,如果不存在父調(diào)度階段,則使用submitMissingTasks方法提交執(zhí)行,如果存在父Stage,則把該調(diào)度階段存放到waitingStages列表中。

submitStage的源碼如下

//遞歸尋找stage
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing: List[Stage] = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //最終會(huì)執(zhí)行 submitMissingTasks 方法
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

如果waitingStages列表不為空,則遞歸調(diào)用submitStage方法,把存在父Stage的Stage放入到waitingStages列表中,不存在父Stage的作為作業(yè)的運(yùn)行入口,這就是一個(gè)遞歸的過(guò)程,遞歸的出口就是找到finalStage在stage數(shù)中的firstStage。

getMissParentStage的源碼如下

private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ArrayStack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

可以看到上述的這種算法,實(shí)現(xiàn)了一種大遞歸,小循環(huán)的調(diào)度模式,避免了回溯到firstStage時(shí),因?yàn)檫f歸過(guò)多而棧溢出的問(wèn)題。最后執(zhí)行的submitMissingTasks方法傳入的Stage都將會(huì)是Stage依賴樹(shù)中的源Stage。

3.提交任務(wù)階段

當(dāng)Stage切分階段完成之后,在DAGScheduler的submitMissingTasks方法中,會(huì)根據(jù)Stage中最后一個(gè)RDD的partition個(gè)數(shù)拆分對(duì)應(yīng)個(gè)數(shù)的任務(wù)。

 private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    ....
}

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  override def findMissingPartitions(): Seq[Int] = {
    mapOutputTrackerMaster
      .findMissingPartitions(shuffleDep.shuffleId)
      .getOrElse(0 until numPartitions)
  }

之后submitMissingTasks方法會(huì)對(duì)每個(gè)task計(jì)算出它的最佳位置,通過(guò)調(diào)用getPreferredLocs方法完成

val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

getPreferredLocsInternal方法中會(huì)首先判斷partition是否被緩存了,如果被緩存了就獲取其緩存的位置,如果沒(méi)有就返回該RDD的最佳位置列表,通過(guò)調(diào)用不同RDD的getPreferredLocations實(shí)現(xiàn)來(lái)完成。如果有些RDD沒(méi)有實(shí)現(xiàn)該方法,那么rddPrefs為空,則會(huì)判斷該RDD的依賴是否是窄依賴,然后獲取該RDD的第一個(gè)窄依賴對(duì)應(yīng)的RDD,計(jì)算該RDD去獲取最佳位置列表。

getPreferredLocsInternal的源碼如下

private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }

之后submitMissingTasks方法會(huì)進(jìn)行任務(wù)集的獲取,根據(jù)Stage的不同劃分出不同的Task,對(duì)于ResultStage生成resultTask,對(duì)于shuffleMapStage生成shuffleMapTask,最后這些任務(wù)組成一個(gè)任務(wù)集提交到TaskScheduler中進(jìn)行處理。

一個(gè)任務(wù)集taskSet包含該Stage中的所有任務(wù),這些任務(wù)的處理邏輯完全一樣,只不過(guò)是對(duì)應(yīng)的處理的數(shù)據(jù)不一樣,而且這些數(shù)據(jù)對(duì)應(yīng)的是其數(shù)據(jù)分片partition。需要注意的是,這里的任務(wù)都會(huì)被序列化,并且以二進(jìn)制的形式廣播出去。

val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      //以taskSet形式提交任務(wù)
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

上面DAGScheduler將任務(wù)集交給TaskScheduler去處理提交,當(dāng)TaskScheduler收到發(fā)來(lái)的任務(wù)集時(shí),在submitTask方法中構(gòu)建一個(gè)TaskSetManager實(shí)例,用于管理這個(gè)任務(wù)集的生命周期,之后將該任務(wù)集管理器加入到系統(tǒng)調(diào)度池中,由系統(tǒng)統(tǒng)一調(diào)配,該調(diào)度器屬于應(yīng)用級(jí)別,支持FIFO和FAIR兩種調(diào)度模式(schedulableBuilder.addTaskSetManager)

 override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

之后通過(guò)調(diào)度器后臺(tái)進(jìn)程schedulerBackend的reviveOffers分配資源并運(yùn)行,這里的schedulerBackend是CoarseGrainedSchedulerBackend粗粒度的schedulerBackend,該方法會(huì)將driverEndpoint端點(diǎn)發(fā)送消息。

 override def reviveOffers() {
    //給Driver 提交task,在當(dāng)前類中的DriverEndpoint中 有receive方法來(lái)接收數(shù)據(jù)
    driverEndpoint.send(ReviveOffers)
  }

上面的DriverEndpoint是CoarseGrainedSchedulerBackend中的一個(gè)內(nèi)部類,這里并沒(méi)有通過(guò)內(nèi)部類的方式直接去調(diào)用而是遵循了消息傳遞調(diào)度的規(guī)范,通過(guò)通過(guò)rpcEnv來(lái)完成的,也就是說(shuō),通過(guò)dipatcher的生產(chǎn)者消費(fèi)者模式來(lái)處理消息,保證消息處理的統(tǒng)一性。

最終會(huì)調(diào)用DriverEndpoint的receive方法,匹配ReviveOffers,然后調(diào)用makeoffers方法

 // Make fake resource offers on all executors
    private def makeOffers() {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
        // Filter out executors under killing
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
        }.toIndexedSeq
        scheduler.resourceOffers(workOffers)
      }
      if (!taskDescs.isEmpty) {
        //去Executor中啟動(dòng)Task
        launchTasks(taskDescs)
      }
    }

DriverEndpoint中的makeoffers方法會(huì)先獲取集群中可以的使用的Executor,然后發(fā)送到TaskSehedulerImpl中進(jìn)行對(duì)任務(wù)集的任務(wù)分配資源,resourceOffer方法在資源分配過(guò)程中會(huì)根據(jù)調(diào)度策略對(duì)TaskSetManager進(jìn)行排序,然后依次對(duì)這些TaskSetManager按照就近原則分配資源,順序?yàn)镻ROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

   ....
   val shuffledOffers = shuffleOffers(filteredOffers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    for (taskSet <- sortedTaskSets) {
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
}

之后分配好的資源任務(wù)最后提交到launchTask方法中。該方法會(huì)把任務(wù)一個(gè)一個(gè)的發(fā)送到woker節(jié)點(diǎn)上的CoarseGrainedExecutorBackend上,然后通過(guò)其內(nèi)部的Executor來(lái)執(zhí)行任務(wù)。

4.任務(wù)執(zhí)行階段

當(dāng)CoarseGrainedExecutorBackend接受到LaunchTask消息時(shí),會(huì)調(diào)用Executor的launchTask方法來(lái)處理,初始化一個(gè)TaskRunner來(lái)封裝任務(wù),它用于管理任務(wù)運(yùn)行時(shí)的細(xì)節(jié),再把TaskRunner對(duì)象放入到ThreadPool線程池中去執(zhí)行。

//啟動(dòng)Task
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        //Executor 啟動(dòng)Task
        executor.launchTask(this, taskDesc)
      }

在TaskRunner的run方法中首先會(huì)對(duì)發(fā)送過(guò)來(lái)的Task本身和它所依賴的Jar等文件反序列化,然后對(duì)反序列化的任務(wù)調(diào)用Task中的runTask方法,由于task本身是一個(gè)抽象類,具體的runTask方法是由它的兩個(gè)子類ShuffleMapTask和ReduceTask來(lái)實(shí)現(xiàn)。

對(duì)應(yīng)ShuffleMapTask來(lái)說(shuō):
runTask方法中首先會(huì)反序列化廣播變量中的RDD及其依賴關(guān)系,之后通過(guò)shuffleManager(在2.3.1版本中只有sortShuffleManager)根據(jù)從依賴關(guān)系中獲取ShuffleHandle,調(diào)用getWriter方法,創(chuàng)建對(duì)應(yīng)的shuffleWriter,
getWriter中的會(huì)對(duì)handle的類型做判斷,采用哪種shuffle寫,在Spark中有三種shuffle寫B(tài)ypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。

這三者和ShuffleHandle的對(duì)應(yīng)關(guān)系如下:

  • UnsafeShuffleWriter:SerializedShuffleHandle
  • bypassMergeSortShuffleWriter:BypassMergeSortShuffleHandle,
  • SortShuffleWriter:BaseShuffleHandle

ShuffleMapTask的runTask方法

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    //反序列化回來(lái)當(dāng)前RDD的依賴關(guān)系
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager

      /**
        * 從依賴關(guān)系中獲取ShuffleHandle ,調(diào)用getWriter 方法 創(chuàng)建相對(duì)應(yīng)的 ShuffleWriter
        */
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //寫磁盤
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

之后就是調(diào)用相關(guān)shuffleWriter中的write,通過(guò)該Stage中的最后一個(gè)rdd的迭代器,從數(shù)據(jù)源讀數(shù)據(jù),迭代器的調(diào)用就會(huì)形成一個(gè)pipeline,計(jì)算結(jié)果會(huì)保存在本地系統(tǒng)中的blockManager中,最終返回給DAGScheduler是一個(gè)MapStatus對(duì)象。該對(duì)象中管理了ShuffleMapTask的運(yùn)算結(jié)果存儲(chǔ)到BlockManager的相關(guān)存儲(chǔ)信息,而不是計(jì)算結(jié)果本身,這些儲(chǔ)存信息將會(huì)成為下一階段任務(wù)需要獲取輸入數(shù)據(jù)時(shí)的依據(jù)。

以BypassMergeSortShuffleWriter為例,可以看到這里調(diào)用了rdd的迭代器,構(gòu)成了pipeline,向前抓取數(shù)據(jù)。

public void write(Iterator<Product2<K, V>> records) throws IOException {
    assert (partitionWriters == null);
    if (!records.hasNext()) {
      partitionLengths = new long[numPartitions];
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
        blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      partitionWriters[i] =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      //partitioner.getPartition(key) 得到當(dāng)前這條數(shù)據(jù)寫入的分區(qū)號(hào)
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    for (int i = 0; i < numPartitions; i++) {
      final DiskBlockObjectWriter writer = partitionWriters[i];
      partitionWriterSegments[i] = writer.commitAndGet();
      writer.close();
    }

    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    File tmp = Utils.tempFileWith(output);
    try {
      partitionLengths = writePartitionedFile(tmp);
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }

bypassMergeSortShuffleWriter中的write方法首先會(huì)獲取序列化器,通過(guò)blockManager獲取diskWriter,即磁盤文件的IO流(磁盤對(duì)象寫),其中緩沖區(qū)大小為32k,之后將每一條記錄partitioner.getPartition(key),判斷其分區(qū)號(hào),選擇對(duì)應(yīng)的分區(qū)的IO流寫入到文件中去,然后這些文件最終會(huì)形成一個(gè)大的文件,文件都是根據(jù)partition排好序的。
這個(gè)過(guò)程不需要開(kāi)辟很大內(nèi)存,也不需要頻繁序列化反序列化,也不需要比較這種比較損耗資源的操作。

對(duì)于ResultTask來(lái)說(shuō):
它的最終返回結(jié)果就是func函數(shù)的計(jì)算結(jié)果。

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

5.獲取執(zhí)行結(jié)果

對(duì)于Executor的計(jì)算結(jié)果,會(huì)根據(jù)結(jié)果有著不同的策略:
1.對(duì)于生成的結(jié)果大小大于1G,則結(jié)果直接丟棄,該配置項(xiàng)可以通過(guò)maxResultSize來(lái)配置

2.對(duì)于生成結(jié)果大小在min(1m,128MB)-1G之間的數(shù)據(jù),會(huì)把該結(jié)果以taskId為編號(hào)存入到blockManager中,然后把該編號(hào)通過(guò)netty發(fā)送給driverEndpoint端點(diǎn),該閾值是netty框架傳輸?shù)淖畲笾?28MB和配置的最大maxDirectResultSize的值(默認(rèn)1MB)的最小值。

3.對(duì)于生成結(jié)果在0-1MB的數(shù)據(jù),通過(guò)netty直接發(fā)送到driverEndpoint端點(diǎn).

執(zhí)行完任務(wù)的run方法對(duì)于Executor的計(jì)算結(jié)果的處理如下

      // directSend = sending directly back to the driver
        val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize > maxDirectResultSize) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId,
              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
              StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
     ....中間部分省略
       setTaskFinishedAndClearInterruptStatus()
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

任務(wù)執(zhí)行完畢后,TaskRunner會(huì)將任務(wù)的執(zhí)行結(jié)果發(fā)送給driverEndpoint端點(diǎn),該端點(diǎn)會(huì)轉(zhuǎn)發(fā)給TaskScheduler的statusUpdate方法進(jìn)行處理,在該方法中對(duì)于不同的任務(wù)狀態(tài)有著不同的處理。

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    var reason: Option[ExecutorLossReason] = None
    synchronized {
      try {
        taskIdToTaskSetManager.get(tid) match {
          case Some(taskSet) =>
            if (state == TaskState.LOST) {
              // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
              // where each executor corresponds to a single task, so mark the executor as failed.
              val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
                "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
              if (executorIdToRunningTaskIds.contains(execId)) {
                reason = Some(
                  SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
                removeExecutor(execId, reason.get)
                failedExecutor = Some(execId)
              }
            }
            if (TaskState.isFinished(state)) {
              cleanupTaskState(tid)
              taskSet.removeRunningTask(tid)
              if (state == TaskState.FINISHED) {
                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
              }
            }
          case None =>
            logError(
              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
                "likely the result of receiving duplicate task finished status updates) or its " +
                "executor has been marked as failed.")
                .format(state, tid))
        }
      } catch {
        case e: Exception => logError("Exception in statusUpdate", e)
      }
    }
    // Update the DAGScheduler without holding a lock on this, since that can deadlock
    if (failedExecutor.isDefined) {
      assert(reason.isDefined)
      dagScheduler.executorLost(failedExecutor.get, reason.get)
      backend.reviveOffers()
    }
  }

statusUpdate對(duì)于不同的任務(wù)狀態(tài)的處理方式如下:
1.如果類型為Taskstate.Finished,那么會(huì)調(diào)用TaskResultGetter的enquenceSuccessfulTask方法進(jìn)行處理,該方法的主要是根據(jù)TaskResult的發(fā)送方式去做出相應(yīng)的處理,如果是indirectTaskResult,就根據(jù)blockid獲取結(jié)果,如果是directTaskResult,那么結(jié)果就無(wú)需遠(yuǎn)程獲取了(因?yàn)橹苯影l(fā)送到driverEndpoint端點(diǎn)了,不需要從blockmanager去拉取數(shù)據(jù))。

2.如果類型是Taskstate.failed或者Taskstate.killed,或者Taskstate.lost,調(diào)用TaskResultGetter的enquenceFailedTask進(jìn)行處理,對(duì)于Taskstate.lost,還需要將其所在的executor標(biāo)記為failed,并且根據(jù)更新后的executor去重新調(diào)度。

enquenceSuccessfulTask方法最終會(huì)調(diào)用TaskScheduler的handleSuccessfulTask方法,最終調(diào)用DAGSchedule的handleTaskCompletion方法。

還記得之前說(shuō)過(guò)shufflewriter階段寫入到blockManager中時(shí),最終返回給DAGScheduler是一個(gè)MapStatus對(duì)象嗎,該對(duì)象會(huì)被序列化存入到indirectTaskResult和directTaskResult中,而handleTaskCompletion方法就會(huì)獲取這個(gè)結(jié)果,并把mapStatus注冊(cè)到MapOutputTrackerMaster中,從而完成ShuffleMapTask的處理。

DAGScheduler的handleTaskCompletion方法匹配ShuffleMapTask的處理過(guò)程如下:

case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
      
              shuffleStage.pendingPartitions -= task.partitionId
            }
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {        
              mapOutputTracker.registerMapOutput(
               shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
           
              shuffleStage.pendingPartitions -= task.partitionId
            }
            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
              markStageAsFinished(shuffleStage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
    
              mapOutputTracker.incrementEpoch()
              clearCacheLocs()

              if (!shuffleStage.isAvailable) {
                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                  ") because some of its tasks had failed: " +
                  shuffleStage.findMissingPartitions().mkString(", "))
                submitStage(shuffleStage)
              } else {
                markMapStageJobsAsFinished(shuffleStage)
                submitWaitingChildStages(shuffleStage)
              }
            }
        }

而如果任務(wù)是ResultTask,會(huì)判斷該作業(yè)是否完成,如果完成,則標(biāo)記該作業(yè)以及完成,清除作業(yè)依賴的資源并發(fā)送消息給系統(tǒng)監(jiān)聽(tīng)總線告知作業(yè)完成。

三.總結(jié)

任務(wù)調(diào)度流程
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容