[Spark源碼剖析]Pool-Standalone模式下的隊列

Pool-Spark Standalone模式下的隊列

org.apache.spark.scheduler.Pool是 Spark Standalone 模式下的隊列。從其重要成員及成員函數(shù)來剖析這個在 TaskScheduler 調(diào)度中起關鍵作用的類。

成員

下圖展示了 Pool 的所有成員及一些簡要說明

其中,taskSetSchedulingAlgorithm的類型由schedulingMode決定,下文會對FairSchedulingAlgorithmFIFOSchedulingAlgorithm做詳細分析

  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

成員函數(shù)

先來看看如何向一個 Pool 中添加 TaskSetManager 或 Pool,說明都寫在注釋中。

  override def addSchedulable(schedulable: Schedulable) {
    //<f 判斷 schedulable 不為 null
    require(schedulable != null)
    //< 往隊列中添加schedulable 對象,可以是taskSet,也可以是子隊列
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    //< 將該 schedulable 對象的父親設置為自己
    schedulable.parent = this
  }

以下為如何 remove 一個 TaskSetManager 或 Pool,需要注意的是schedulableQueue為ConcurrentLinkedQueue類型,其 remove 方法可以刪除與參數(shù)值相等的元素

  override def removeSchedulable(schedulable: Schedulable) {
    schedulableQueue.remove(schedulable)
    schedulableNameToSchedulable.remove(schedulable.name)
  }

當有 executor 丟失時,會調(diào)用 executorLost 方法

  override def executorLost(executorId: String, host: String) {
    schedulableQueue.foreach(_.executorLost(executorId, host))
  }

若該隊列中某個元素為 TaskSetManager 類型,會調(diào)用 TaskSetManager.executorLost 方法,該方法將查找是否有自己管理的 task 在 lost 的 executor 上運行,若有,則重新將該 lost 的 task 插入隊列,等待執(zhí)行;若某元素為 Pool 類型,即子隊列,那么 Pool.executorLost 方法會對其schedulableQueue的所有元素調(diào)用 executorLost 方法,這樣一來,若根 Pool 調(diào)用 executorLost 方法,則該隊列下的所有 TaskSetManager 對象都能調(diào)用 executorLost 方法,那么因某個 executor lost 而 lost 的 task 都將被重新插入隊列執(zhí)行

getSortedTaskSetQueue方法是 Pool 最重要的方法,它將以該 Pool 為根隊列的所有 TaskSetManager 排序后存在一個數(shù)組中,下標越小的數(shù)組越早被執(zhí)行。代碼如下:

  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

這個函數(shù)的實現(xiàn)邏輯主要分為兩步,假設現(xiàn)在調(diào)用 tmpPool.getSortedTaskSetQueue,tmpPool 為 Pool 類型:

  1. 對 tmpPool 的直接子 Pool 和 TaskSetManager 進行排序,排序的算法根據(jù)Pool 的 schedulingMode 而定,F(xiàn)AIR 和 FIFO 不相同。排序后得到sortedSchedulableQueue
  2. 遍歷sortedSchedulableQueue所有元素。若元素為 TaskSetManager 類型,則將該元素添加到sortedTaskSetQueue: ArrayBuffer[TaskSetManager]尾部,若為 Pool 類型,則執(zhí)行第一步
  3. 返回包含對 tmpPool 下所有 TaskSetManager 排序過后的數(shù)組

經(jīng)過這幾部,就能將一個 Pool 下的所有 TaskSetManager 排序,也就能確定哪個 TaskSetManager 的 tasks 要優(yōu)先被 TaskScheduler 調(diào)度。

如上所述,排序的關鍵是taskSetSchedulingAlgorithm.comparator,上文中已經(jīng)提到taskSetSchedulingAlgorithm根據(jù)schedulingMode值的不同,可以有FairSchedulingAlgorithmFIFOSchedulingAlgorithm兩種類型。先來看
FIFOSchedulingAlgorithm的排序

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

FIFOSchedulingAlgorithm比較邏輯很簡單,可概括為下面兩句話:

  1. 首先比較優(yōu)先級值,優(yōu)先級值越小的更優(yōu)先(好拗口)
  2. 若優(yōu)先級值相等,則比較 stageId 值,stageId 值越小的越優(yōu)先

FairSchedulingAlgorithm的比較邏輯會復雜一些,代碼如下:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      //< s1中正在執(zhí)行的 tasks 個數(shù)小于 s1的最小 cpu 核數(shù);且s2中正在執(zhí)行的 tasks 個數(shù)等于 s2的最小 cpu 核數(shù)。則 s1優(yōu)先
      return true
    } else if (!s1Needy && s2Needy) {
      //< s2中正在執(zhí)行的 tasks 個數(shù)小于 s2的最小 cpu 核數(shù);且s1中正在執(zhí)行的 tasks 個數(shù)等于 s1的最小 cpu 核數(shù)。則 s2優(yōu)先
      return false
    } else if (s1Needy && s2Needy) {
      //< s1,s2中正在執(zhí)行的 tasks 個數(shù)小于其最小 cpu 核數(shù)。則比較各自 runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble 的比值,小的優(yōu)先
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      //< s1,s2中正在執(zhí)行的 tasks 個數(shù)等于其最小 cpu 核數(shù)。則比較runningTasks1.toDouble / s1.weight.toDouble,小的優(yōu)先
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      //< 若以上比較都相等,則比較 s1和 s2的名字
      s1.name < s2.name
    }
  }
}

FairSchedulingAlgorithm的比較規(guī)則以在上面代碼的注釋中說明

PS

Pool 的成員stageId 初始值為-1,但搜遍整個 Spark 源碼也沒有找到哪里有對該值的重新賦值。這個 stageId 的具體含義及如何發(fā)揮作用還沒有完全搞明白,若哪位朋友知道,麻煩告知,多謝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容