本文旨在說明 Spark 的延遲調度及其是如何工作的
什么是延遲調度
在 Spark 中,若 task 與其輸入數據在同一個 jvm 中,我們稱 task 的本地性為 PROCESS_LOCAL,這種本地性(locality level)是最優(yōu)的,避免了網絡傳輸及文件 IO,是最快的;其次是 task 與輸入數據在同一節(jié)點上的 NODE_LOCAL,數據在哪都一樣的 NO_PREF,數據與 task 在同一機架不同節(jié)點的 RACK_LOCAL 及最糟糕的不在同一機架的 ANY。
本地性越好,對于 task 來說,花在網絡傳輸及文件 IO 的時間越少,整個 task 執(zhí)行耗時也就更少。而對于很多 task 來說,執(zhí)行 task 的時間往往會比網絡傳輸/文件 IO 的耗時要短的多。所以 Spark 希望盡量以更優(yōu)的本地性啟動 task。延遲調度就是為此而存在的。
在Spark的位置優(yōu)先(1): TaskSetManager 的有效 Locality Levels這篇文章中,我們可以知道,假設一個 task 的最優(yōu)本地性為 N,那么該 task 同時也具有其他所有本地性比 N 差的本地性。
假設調度器上一次以 locality level(本地性) M 為某個 taskSetManager 啟動 task 失敗,則說明該 taskSetManager 中包含本地性 M 的 tasks 的本地性 M 對應的所有節(jié)點均沒有空閑資源。此時,只要當期時間與上一次以 M 為 taskSetManager 啟動 task 時間差小于配置的值,調度器仍然會以 locality level M 來為 taskSetManager 啟動 task
延時調度如何工作
函數TaskSetManager#getAllowedLocalityLevel是實現延時調度最關鍵的地方,用來返回當前該 taskSetManager 中未執(zhí)行的 tasks 的最高可能 locality level。以下為其實現
/**
* Get the level we can launch tasks according to delay scheduling, based on current wait time.
*/
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
代碼有點小長,好在并不復雜,一些關鍵注釋在以上源碼中都有注明。
循環(huán)條件為while (currentLocalityIndex < myLocalityLevels.length - 1),
其中myLocalityLevels: Array[TaskLocality.TaskLocality]是當前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合,本地性越高的 locality level 在 myLocalityLevels 中的下標越小(具體請參見http://www.itdecent.cn/p/05034a9c8cae)
currentLocalityIndex 是 getAllowedLocalityLevel 前一次返回的 locality level 在 myLocalityLevels 中的索引(下標),若 getAllowedLocalityLevel 是第一次被調用,則 currentLocalityIndex 為0
整個循環(huán)體都在做這幾個事情:
判斷
myLocalityLevels(currentLocalityIndex)這個級別的本地性對應的待執(zhí)行 tasks 集合中是否還有待執(zhí)行的 task若無;則將
currentLocalityIndex += 1進行下一次循環(huán),即將 locality level 降低一級回到第1步若有,且當前時間與上次getAllowedLocalityLevel返回
myLocalityLevels(currentLocalityIndex)時間間隔小于myLocalityLevels(currentLocalityIndex)對應的延遲時間(通過spark.locality.wait.process或spark.locality.wait.node或spark.locality.wait.rack配置),則 currentLocalityIndex 不變,返回myLocalityLevels(currentLocalityIndex)。這里是延遲調度的關鍵,只要當前時間與上一次以某個 locality level 啟動 task 的時間只差小于配置的值,不管上次是否成功啟動了 task,這一次仍然以上次的 locality level 來啟動 task。說的更明白一些:比如上次以 localtyX 為 taskSetManager 啟動 task 失敗,說明taskSetManager 中 tasks 對應 localityX 的節(jié)點均沒有空閑資源來啟動 task,但 Spark 此時仍然會以 localityX 來為 taskSetManager 啟動 task。為什么要這樣做?一般來說,task 執(zhí)行耗時相對于網絡傳輸/文件IO 要小得多,調度器多等待1 2秒可能就可以以更好的本地性執(zhí)行 task,避免了更耗時的網絡傳輸或文件IO,task 整體執(zhí)行時間會降低-
若有,且當前時間與上次getAllowedLocalityLevel返回
myLocalityLevels(currentLocalityIndex)時間間隔大于myLocalityLevels(currentLocalityIndex)對應的延遲時間,則將currentLocalityIndex += 1進行下一次循環(huán),即將 locality level 降低一級回到第1步
下面為幫助理解代碼的部分說明
判斷是否還有當前 locality level 的 task 需要執(zhí)行
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
moreTasksToRunIn就不進行過多解釋了,主要作用有兩點:
- 對于不同等級的 locality level 的 tasks 列表,將已經成功執(zhí)行的或正在執(zhí)行的該 locality level 的 task 從對應的列表中移除
- 判斷對應的 locality level 的 task 是否還要等待執(zhí)行的,若有則返回 true,否則返回 false
以 myLocalityLevels(currentLocalityIndex) 等于 PROCESS_LOCAL 為例,這一段代碼用來判斷該 taskSetManager 中的 tasks 是否還有 task 的 locality levels 包含 PROCESS_LOCAL
if (!moreTasks)
若!moreTasks,則對currentLocalityIndex加1,即 locality level 變低一級,再次循環(huán)。
根據 http://www.itdecent.cn/p/05034a9c8cae 的分析我們知道,若一個 task 存在于某個 locality level 為 level1 待執(zhí)行 tasks 集合中,那么該 task 也一定存在于所有 locality level 低于 level1 的待執(zhí)行 tasks 集合。
從另一個角度看,對于每個 task,總是嘗試以最高的 locality level 去啟動,若啟動失敗且下次以該 locality 啟動時間與上次以該 locality level 啟動時間超過配置的值,則將 locality level 降低一級來嘗試啟動 task