Pool-Spark Standalone模式下的隊列
org.apache.spark.scheduler.Pool是 Spark Standalone 模式下的隊列。從其重要成員及成員函數(shù)來剖析這個在 TaskScheduler 調(diào)度中起關鍵作用的類。
成員
下圖展示了 Pool 的所有成員及一些簡要說明

其中,taskSetSchedulingAlgorithm的類型由schedulingMode決定,下文會對FairSchedulingAlgorithm和FIFOSchedulingAlgorithm做詳細分析
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}
成員函數(shù)
先來看看如何向一個 Pool 中添加 TaskSetManager 或 Pool,說明都寫在注釋中。
override def addSchedulable(schedulable: Schedulable) {
//<f 判斷 schedulable 不為 null
require(schedulable != null)
//< 往隊列中添加schedulable 對象,可以是taskSet,也可以是子隊列
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
//< 將該 schedulable 對象的父親設置為自己
schedulable.parent = this
}
以下為如何 remove 一個 TaskSetManager 或 Pool,需要注意的是schedulableQueue為ConcurrentLinkedQueue類型,其 remove 方法可以刪除與參數(shù)值相等的元素
override def removeSchedulable(schedulable: Schedulable) {
schedulableQueue.remove(schedulable)
schedulableNameToSchedulable.remove(schedulable.name)
}
當有 executor 丟失時,會調(diào)用 executorLost 方法
override def executorLost(executorId: String, host: String) {
schedulableQueue.foreach(_.executorLost(executorId, host))
}
若該隊列中某個元素為 TaskSetManager 類型,會調(diào)用 TaskSetManager.executorLost 方法,該方法將查找是否有自己管理的 task 在 lost 的 executor 上運行,若有,則重新將該 lost 的 task 插入隊列,等待執(zhí)行;若某元素為 Pool 類型,即子隊列,那么 Pool.executorLost 方法會對其schedulableQueue的所有元素調(diào)用 executorLost 方法,這樣一來,若根 Pool 調(diào)用 executorLost 方法,則該隊列下的所有 TaskSetManager 對象都能調(diào)用 executorLost 方法,那么因某個 executor lost 而 lost 的 task 都將被重新插入隊列執(zhí)行
getSortedTaskSetQueue方法是 Pool 最重要的方法,它將以該 Pool 為根隊列的所有 TaskSetManager 排序后存在一個數(shù)組中,下標越小的數(shù)組越早被執(zhí)行。代碼如下:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
sortedTaskSetQueue
}
這個函數(shù)的實現(xiàn)邏輯主要分為兩步,假設現(xiàn)在調(diào)用 tmpPool.getSortedTaskSetQueue,tmpPool 為 Pool 類型:
- 對 tmpPool 的直接子 Pool 和 TaskSetManager 進行排序,排序的算法根據(jù)Pool 的 schedulingMode 而定,F(xiàn)AIR 和 FIFO 不相同。排序后得到sortedSchedulableQueue
- 遍歷sortedSchedulableQueue所有元素。若元素為 TaskSetManager 類型,則將該元素添加到
sortedTaskSetQueue: ArrayBuffer[TaskSetManager]尾部,若為 Pool 類型,則執(zhí)行第一步 - 返回包含對 tmpPool 下所有 TaskSetManager 排序過后的數(shù)組
經(jīng)過這幾部,就能將一個 Pool 下的所有 TaskSetManager 排序,也就能確定哪個 TaskSetManager 的 tasks 要優(yōu)先被 TaskScheduler 調(diào)度。
如上所述,排序的關鍵是taskSetSchedulingAlgorithm.comparator,上文中已經(jīng)提到taskSetSchedulingAlgorithm根據(jù)schedulingMode值的不同,可以有FairSchedulingAlgorithm和FIFOSchedulingAlgorithm兩種類型。先來看
FIFOSchedulingAlgorithm的排序
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
true
} else {
false
}
}
}
FIFOSchedulingAlgorithm比較邏輯很簡單,可概括為下面兩句話:
- 首先比較優(yōu)先級值,優(yōu)先級值越小的更優(yōu)先(好拗口)
- 若優(yōu)先級值相等,則比較 stageId 值,stageId 值越小的越優(yōu)先
FairSchedulingAlgorithm的比較邏輯會復雜一些,代碼如下:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
if (s1Needy && !s2Needy) {
//< s1中正在執(zhí)行的 tasks 個數(shù)小于 s1的最小 cpu 核數(shù);且s2中正在執(zhí)行的 tasks 個數(shù)等于 s2的最小 cpu 核數(shù)。則 s1優(yōu)先
return true
} else if (!s1Needy && s2Needy) {
//< s2中正在執(zhí)行的 tasks 個數(shù)小于 s2的最小 cpu 核數(shù);且s1中正在執(zhí)行的 tasks 個數(shù)等于 s1的最小 cpu 核數(shù)。則 s2優(yōu)先
return false
} else if (s1Needy && s2Needy) {
//< s1,s2中正在執(zhí)行的 tasks 個數(shù)小于其最小 cpu 核數(shù)。則比較各自 runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble 的比值,小的優(yōu)先
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
//< s1,s2中正在執(zhí)行的 tasks 個數(shù)等于其最小 cpu 核數(shù)。則比較runningTasks1.toDouble / s1.weight.toDouble,小的優(yōu)先
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
//< 若以上比較都相等,則比較 s1和 s2的名字
s1.name < s2.name
}
}
}
FairSchedulingAlgorithm的比較規(guī)則以在上面代碼的注釋中說明
PS
Pool 的成員stageId 初始值為-1,但搜遍整個 Spark 源碼也沒有找到哪里有對該值的重新賦值。這個 stageId 的具體含義及如何發(fā)揮作用還沒有完全搞明白,若哪位朋友知道,麻煩告知,多謝