每天起床第一句,看看Spark調(diào)度器

之前呢,我們詳細(xì)地分析了DAGScheduler的執(zhí)行過程,我們知道,RDD形成的DAG經(jīng)過DAGScheduler,依據(jù)shuffle將DAG劃分為若干個stage,再由taskScheduler提交task到executor中執(zhí)行,那么執(zhí)行task的過程,就需要調(diào)度器來參與了。

Spark調(diào)度器主要有兩種模式,也是大家耳熟能詳?shù)?strong>FIFO和FAIR模式。默認(rèn)情況下,Spark是FIFO(先入先出)模式,即誰先提交誰先執(zhí)行。而FAIR(公平調(diào)度)模式會在調(diào)度池中為任務(wù)進行分組,可以有不同的權(quán)重,根據(jù)權(quán)重來決定執(zhí)行順序。

那么源碼中是怎么實現(xiàn)的呢?

首先,當(dāng)Stage劃分好,會調(diào)用TaskSchedulerImpl.submitTasks()方法,以TaskSet的形式提交給TaskScheduler,并創(chuàng)建一個TaskSetManger對象添加進調(diào)度池。

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    //....
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
    //.....
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

SchedulerBulider通過TaskSchedulerImpl.initialize()進行了實例化,并調(diào)用了SchedulerBulider.buildPools()方法。具體怎么個build,就要看用戶選擇的schedulingMode了。

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

然后我們來看一下兩個調(diào)度器的buildPools()方法。

override def buildPools() {
    // nothing
  }

FIFO什么也沒干~~

override def buildPools() {
    var fileData: Option[(InputStream, String)] = None
    try {
      fileData = schedulerAllocFile.map { f =>
        val fis = new FileInputStream(f)
        logInfo(s"Creating Fair Scheduler pools from $f")
        Some((fis, f))
      }.getOrElse {
        val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
        if (is != null) {
          logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
          Some((is, DEFAULT_SCHEDULER_FILE))
        } else {
          logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
            s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
            s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
          None
        }
      }

      fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
    } catch {
      case NonFatal(t) =>
        val defaultMessage = "Error while building the fair scheduler pools"
        val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
          .getOrElse(defaultMessage)
        logError(message, t)
        throw t
    } finally {
      fileData.foreach { case (is, fileName) => is.close() }
    }

    // finally create "default" pool
    buildDefaultPool()
  }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容