【Spark】Stage生成和Stage源碼淺析

引入

上一篇文章《DAGScheduler源碼淺析》中,介紹了handleJobSubmitted函數(shù),它作為生成finalStage的重要函數(shù)存在,這一篇文章中,我將就DAGScheduler生成Stage過(guò)程繼續(xù)學(xué)習(xí),同時(shí)介紹Stage的相關(guān)源碼。

Stage生成

Stage的調(diào)度是由DAGScheduler完成的。由RDD的有向無(wú)環(huán)圖DAG切分出了Stage的有向無(wú)環(huán)圖DAG。Stage的DAG通過(guò)最后執(zhí)行的Stage為根進(jìn)行廣度優(yōu)先遍歷,遍歷到最開(kāi)始執(zhí)行的Stage執(zhí)行,如果提交的Stage仍有未完成的父母Stage,則Stage需要等待其父Stage執(zhí)行完才能執(zhí)行。同時(shí)DAGScheduler中還維持了幾個(gè)重要的Key-Value集合結(jié)構(gòu),用來(lái)記錄Stage的狀態(tài),這樣能夠避免過(guò)早執(zhí)行和重復(fù)提交Stage。waitingStages中記錄仍有未執(zhí)行的父母Stage,防止過(guò)早執(zhí)行。runningStages中保存正在執(zhí)行的Stage,防止重復(fù)執(zhí)行。failedStages中保存執(zhí)行失敗的Stage,需要重新執(zhí)行,這里的設(shè)計(jì)是出于容錯(cuò)的考慮。

  // Stages we need to run whose parents aren't done
  private[scheduler] val waitingStages = new HashSet[Stage]

  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]

  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

依賴關(guān)系

RDD的窄依賴是指父RDD的所有輸出都會(huì)被指定的子RDD消費(fèi),即輸出路徑是固定的;寬依賴是指父RDD的輸出會(huì)由不同的子RDD消費(fèi),即輸出路徑不固定。
調(diào)度器會(huì)計(jì)算RDD之間的依賴關(guān)系,將擁有持續(xù)窄依賴的RDD歸并到同一個(gè)Stage中,而寬依賴則作為劃分不同Stage的判斷標(biāo)準(zhǔn)。
導(dǎo)致窄依賴的Transformation操作:map、flatMap、filter、sample;導(dǎo)致寬依賴的Transformation操作:sortByKey、reduceByKey、groupByKey、cogroupByKey、join、cartensian。

Stage分為兩種:
ShuffleMapStage, in which case its tasks' results are input for another stage
其實(shí)就是,非最終stage, 后面還有其他的stage, 所以它的輸出一定是需要shuffle并作為后續(xù)的輸入。

這種Stage是以Shuffle為輸出邊界,其輸入邊界可以是從外部獲取數(shù)據(jù),也可以是另一個(gè)ShuffleMapStage的輸出
其輸出可以是另一個(gè)Stage的開(kāi)始。
ShuffleMapStage的最后Task就是ShuffleMapTask。
在一個(gè)Job里可能有該類型的Stage,也可以能沒(méi)有該類型Stage。

ResultStage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)
最終的stage, 沒(méi)有輸出, 而是直接產(chǎn)生結(jié)果或存儲(chǔ)。

這種Stage是直接輸出結(jié)果,其輸入邊界可以是從外部獲取數(shù)據(jù),也可以是另一個(gè)ShuffleMapStage的輸出。
ResultStage的最后Task就是ResultTask,在一個(gè)Job里必定有該類型Stage。
一個(gè)Job含有一個(gè)或多個(gè)Stage,但至少含有一個(gè)ResultStage。

Stage類

stage的RDD參數(shù)只有一個(gè)RDD, final RDD, 而不是一系列的RDD。
因?yàn)樵谝粋€(gè)stage中的所有RDD都是map, partition不會(huì)有任何改變, 只是在data依次執(zhí)行不同的map function所以對(duì)于TaskScheduler而言, 一個(gè)RDD的狀況就可以代表這個(gè)stage。

Stage參數(shù)說(shuō)明:
val id: Int //Stage的序號(hào)數(shù)值越大,優(yōu)先級(jí)越高
val rdd: RDD[], //歸屬于本Stage的最后一個(gè)rdd
val numTasks: Int, //創(chuàng)建的Task數(shù)目,等于父RDD的輸出Partition數(shù)目
val shuffleDep: Option[ShuffleDependency[
, _, _]], //是否存在SuffleDependency,寬依賴
val parents: List[Stage], //父Stage列表
val jobId: Int //作業(yè)ID

private[spark] class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
    val parents: List[Stage],
    val jobId: Int,
    val callSite: CallSite)
  extends Logging {

  val isShuffleMap = shuffleDep.isDefined
  val numPartitions = rdd.partitions.size
  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
  var numAvailableOutputs = 0

  /** Set of jobs that this stage belongs to. */
  val jobIds = new HashSet[Int]

  /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */
  var resultOfJob: Option[ActiveJob] = None
  var pendingTasks = new HashSet[Task[_]]

  private var nextAttemptId = 0

  val name = callSite.shortForm
  val details = callSite.longForm

  /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
  var latestInfo: StageInfo = StageInfo.fromStage(this)

  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  }

  def addOutputLoc(partition: Int, status: MapStatus) {
    val prevList = outputLocs(partition)
    outputLocs(partition) = status :: prevList
    if (prevList == Nil) {
      numAvailableOutputs += 1
    }
  }

  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
    val prevList = outputLocs(partition)
    val newList = prevList.filterNot(_.location == bmAddress)
    outputLocs(partition) = newList
    if (prevList != Nil && newList == Nil) {
      numAvailableOutputs -= 1
    }
  }

  /**
   * Removes all shuffle outputs associated with this executor. Note that this will also remove
   * outputs which are served by an external shuffle server (if one exists), as they are still
   * registered with this execId.
   */
  def removeOutputsOnExecutor(execId: String) {
    var becameUnavailable = false
    for (partition <- 0 until numPartitions) {
      val prevList = outputLocs(partition)
      val newList = prevList.filterNot(_.location.executorId == execId)
      outputLocs(partition) = newList
      if (prevList != Nil && newList == Nil) {
        becameUnavailable = true
        numAvailableOutputs -= 1
      }
    }
    if (becameUnavailable) {
      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
        this, execId, numAvailableOutputs, numPartitions, isAvailable))
    }
  }

  /** Return a new attempt id, starting with 0. */
  def newAttemptId(): Int = {
    val id = nextAttemptId
    nextAttemptId += 1
    id
  }

  def attemptId: Int = nextAttemptId

  override def toString = "Stage " + id

  override def hashCode(): Int = id

  override def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }
}

處理Job,分割Job為Stage,封裝Stage成TaskSet,最終提交給TaskScheduler的調(diào)用鏈

dagScheduler.handleJobSubmitted-->dagScheduler.submitStage-->dagScheduler.submitMissingTasks-->taskScheduler.submitTasks。

handleJobSubmitted函數(shù)

函數(shù)handleJobSubmitted和submitStage主要負(fù)責(zé)依賴性分析,對(duì)其處理邏輯做進(jìn)一步的分析。
handleJobSubmitted最主要的工作是生成Stage,并根據(jù)finalStage來(lái)產(chǎn)生ActiveJob。

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      //錯(cuò)誤處理,告訴監(jiān)聽(tīng)器作業(yè)失敗,返回....
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // 很短、沒(méi)有父stage的本地操作,比如 first() or take() 的操作本地執(zhí)行
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        // collect等操作走的是這個(gè)過(guò)程,更新相關(guān)的關(guān)系映射,用監(jiān)聽(tīng)器監(jiān)聽(tīng),然后提交作業(yè)
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
  }

newStage函數(shù)

  /**
   * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
   * of a shuffle map stage in newOrUsedStage.  The stage will be associated with the provided
   * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
   * directly.
   */
  private def newStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    val parentStages = getParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

其中,Stage的初始化參數(shù):在創(chuàng)建一個(gè)Stage之前,需要知道該Stage需要從多少個(gè)Partition讀入數(shù)據(jù),這個(gè)數(shù)值直接影響要?jiǎng)?chuàng)建多少個(gè)Task。也就是說(shuō),創(chuàng)建Stage時(shí),已經(jīng)清楚該Stage需要從多少不同的Partition讀入數(shù)據(jù),并寫(xiě)出到多少個(gè)不同的Partition中,輸入和輸出的個(gè)數(shù)均已明確。

getParentStages函數(shù):
通過(guò)不停的遍歷它之前的rdd,如果碰到有依賴是ShuffleDependency類型的,就通過(guò)getShuffleMapStage方法計(jì)算出來(lái)它的Stage來(lái)。

  /**
   * Get or create the list of parent stages for a given RDD. The stages will be assigned the
   * provided jobId if they haven't already been created with a lower jobId.
   */
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (!waitingForVisit.isEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

ActiveJob類

用戶所提交的job在得到DAGScheduler的調(diào)度后,會(huì)被包裝成ActiveJob,同時(shí)會(huì)啟動(dòng)JobWaiter阻塞監(jiān)聽(tīng)job的完成狀況。
同時(shí)依據(jù)job中RDD的dependency和dependency屬性(NarrowDependency,ShufflerDependecy),DAGScheduler會(huì)根據(jù)依賴關(guān)系的先后產(chǎn)生出不同的stage DAG(result stage, shuffle map stage)。
在每一個(gè)stage內(nèi)部,根據(jù)stage產(chǎn)生出相應(yīng)的task,包括ResultTask或是ShuffleMapTask,這些task會(huì)根據(jù)RDD中partition的數(shù)量和分布,產(chǎn)生出一組相應(yīng)的task,并將其包裝為T(mén)askSet提交到TaskScheduler上去。

/**
 * Tracks information about an active job in the DAGScheduler.
 */
private[spark] class ActiveJob(
    val jobId: Int,
    val finalStage: Stage,
    val func: (TaskContext, Iterator[_]) => _,
    val partitions: Array[Int],
    val callSite: CallSite,
    val listener: JobListener,
    val properties: Properties) {

  val numPartitions = partitions.length
  val finished = Array.fill[Boolean](numPartitions)(false)
  var numFinished = 0
}

submitStage函數(shù)

submitStage函數(shù)中會(huì)根據(jù)依賴關(guān)系劃分stage,通過(guò)遞歸調(diào)用從finalStage一直往前找它的父stage,直到stage沒(méi)有父stage時(shí)就調(diào)用submitMissingTasks方法提交改stage。這樣就完成了將job劃分為一個(gè)或者多個(gè)stage。
submitStage處理流程:

  • 所依賴的Stage是否都已經(jīng)完成,如果沒(méi)有完成則先執(zhí)行所依賴的Stage
  • 如果所有的依賴已經(jīng)完成,則提交自身所處的Stage
  • 最后會(huì)在submitMissingTasks函數(shù)中將stage封裝成TaskSet通過(guò)taskScheduler.submitTasks函數(shù)提交給TaskScheduler處理。
  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id) // 根據(jù)final stage發(fā)現(xiàn)是否有parent stage
        logDebug("missing: " + missing)
        if (missing == Nil) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get) // 如果沒(méi)有parent stage需要執(zhí)行, 則直接submit當(dāng)前stage
        } else {
          for (parent <- missing) {
            submitStage(parent) // 如果有parent stage,需要先submit parent, 因?yàn)閟tage之間需要順序執(zhí)行
          }
          waitingStages += stage // 當(dāng)前stage放到waitingStages中
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }

getMissingParentStages

getMissingParentStages通過(guò)圖的遍歷,來(lái)找出所依賴的所有父Stage。

  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        if (getCacheLocs(rdd).contains(Nil)) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>  // 如果發(fā)現(xiàn)ShuffleDependency, 說(shuō)明遇到新的stage
                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                // check shuffleToMapStage, 如果該stage已經(jīng)被創(chuàng)建則直接返回, 否則newStage
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] => // 對(duì)于NarrowDependency, 說(shuō)明仍然在這個(gè)stage中
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (!waitingForVisit.isEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

submitMissingTasks

可見(jiàn)無(wú)論是哪種stage,都是對(duì)于每個(gè)stage中的每個(gè)partitions創(chuàng)建task,并最終封裝成TaskSet,將該stage提交給taskscheduler。

  /** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    stage.pendingTasks.clear()

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = {
      if (stage.isShuffleMap) {
        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
      } else {
        val job = stage.resultOfJob.get
        (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    }

    val properties = if (jobIdToActiveJob.contains(jobId)) {
      jobIdToActiveJob(stage.jobId).properties
    } else {
      // this stage will be assigned to "default" pool
      null
    }

    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
    outputCommitCoordinator.stageStart(stage.id)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] =
        if (stage.isShuffleMap) {
          closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
        } else {
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
        }
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString)
        runningStages -= stage
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }
    } else {
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }
  }

參考資料

fxjwind--Spark源碼分析--Stage

轉(zhuǎn)載請(qǐng)注明作者Jason Ding及其出處
GitCafe博客主頁(yè)(http://jasonding1354.gitcafe.io/)
Github博客主頁(yè)(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡(jiǎn)書(shū)主頁(yè)(http://www.itdecent.cn/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進(jìn)入我的博客主頁(yè)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容