[Spark源碼剖析]Spark 延遲調度策略

本文旨在說明 Spark 的延遲調度及其是如何工作的

什么是延遲調度

在 Spark 中,若 task 與其輸入數據在同一個 jvm 中,我們稱 task 的本地性為 PROCESS_LOCAL,這種本地性(locality level)是最優(yōu)的,避免了網絡傳輸及文件 IO,是最快的;其次是 task 與輸入數據在同一節(jié)點上的 NODE_LOCAL,數據在哪都一樣的 NO_PREF,數據與 task 在同一機架不同節(jié)點的 RACK_LOCAL 及最糟糕的不在同一機架的 ANY。

本地性越好,對于 task 來說,花在網絡傳輸及文件 IO 的時間越少,整個 task 執(zhí)行耗時也就更少。而對于很多 task 來說,執(zhí)行 task 的時間往往會比網絡傳輸/文件 IO 的耗時要短的多。所以 Spark 希望盡量以更優(yōu)的本地性啟動 task。延遲調度就是為此而存在的。

Spark的位置優(yōu)先(1): TaskSetManager 的有效 Locality Levels這篇文章中,我們可以知道,假設一個 task 的最優(yōu)本地性為 N,那么該 task 同時也具有其他所有本地性比 N 差的本地性。

假設調度器上一次以 locality level(本地性) M 為某個 taskSetManager 啟動 task 失敗,則說明該 taskSetManager 中包含本地性 M 的 tasks 的本地性 M 對應的所有節(jié)點均沒有空閑資源。此時,只要當期時間與上一次以 M 為 taskSetManager 啟動 task 時間差小于配置的值,調度器仍然會以 locality level M 來為 taskSetManager 啟動 task

延時調度如何工作

函數TaskSetManager#getAllowedLocalityLevel是實現延時調度最關鍵的地方,用來返回當前該 taskSetManager 中未執(zhí)行的 tasks 的最高可能 locality level。以下為其實現

/**
   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
   */
  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        currentLocalityIndex += 1
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

代碼有點小長,好在并不復雜,一些關鍵注釋在以上源碼中都有注明。

循環(huán)條件為while (currentLocalityIndex < myLocalityLevels.length - 1)
其中myLocalityLevels: Array[TaskLocality.TaskLocality]是當前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合,本地性越高的 locality level 在 myLocalityLevels 中的下標越小(具體請參見http://www.itdecent.cn/p/05034a9c8cae

currentLocalityIndex 是 getAllowedLocalityLevel 前一次返回的 locality level 在 myLocalityLevels 中的索引(下標),若 getAllowedLocalityLevel 是第一次被調用,則 currentLocalityIndex 為0

整個循環(huán)體都在做這幾個事情:

  1. 判斷 myLocalityLevels(currentLocalityIndex) 這個級別的本地性對應的待執(zhí)行 tasks 集合中是否還有待執(zhí)行的 task

  2. 若無;則將 currentLocalityIndex += 1 進行下一次循環(huán),即將 locality level 降低一級回到第1步

  3. 若有,且當前時間與上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) 時間間隔小于 myLocalityLevels(currentLocalityIndex) 對應的延遲時間(通過spark.locality.wait.process或spark.locality.wait.node或spark.locality.wait.rack配置),則 currentLocalityIndex 不變,返回myLocalityLevels(currentLocalityIndex)。這里是延遲調度的關鍵,只要當前時間與上一次以某個 locality level 啟動 task 的時間只差小于配置的值,不管上次是否成功啟動了 task,這一次仍然以上次的 locality level 來啟動 task。說的更明白一些:比如上次以 localtyX 為 taskSetManager 啟動 task 失敗,說明taskSetManager 中 tasks 對應 localityX 的節(jié)點均沒有空閑資源來啟動 task,但 Spark 此時仍然會以 localityX 來為 taskSetManager 啟動 task。為什么要這樣做?一般來說,task 執(zhí)行耗時相對于網絡傳輸/文件IO 要小得多,調度器多等待1 2秒可能就可以以更好的本地性執(zhí)行 task,避免了更耗時的網絡傳輸或文件IO,task 整體執(zhí)行時間會降低

  4. 若有,且當前時間與上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) 時間間隔大于 myLocalityLevels(currentLocalityIndex) 對應的延遲時間,則將 currentLocalityIndex += 1 進行下一次循環(huán),即將 locality level 降低一級回到第1步


下面為幫助理解代碼的部分說明

判斷是否還有當前 locality level 的 task 需要執(zhí)行

val moreTasks = myLocalityLevels(currentLocalityIndex) match {
    case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
    case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
    case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
    case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
  }

moreTasksToRunIn就不進行過多解釋了,主要作用有兩點:

  1. 對于不同等級的 locality level 的 tasks 列表,將已經成功執(zhí)行的或正在執(zhí)行的該 locality level 的 task 從對應的列表中移除
  2. 判斷對應的 locality level 的 task 是否還要等待執(zhí)行的,若有則返回 true,否則返回 false

myLocalityLevels(currentLocalityIndex) 等于 PROCESS_LOCAL 為例,這一段代碼用來判斷該 taskSetManager 中的 tasks 是否還有 task 的 locality levels 包含 PROCESS_LOCAL

if (!moreTasks)

若!moreTasks,則對currentLocalityIndex加1,即 locality level 變低一級,再次循環(huán)。

根據 http://www.itdecent.cn/p/05034a9c8cae 的分析我們知道,若一個 task 存在于某個 locality level 為 level1 待執(zhí)行 tasks 集合中,那么該 task 也一定存在于所有 locality level 低于 level1 的待執(zhí)行 tasks 集合。

從另一個角度看,對于每個 task,總是嘗試以最高的 locality level 去啟動,若啟動失敗且下次以該 locality 啟動時間與上次以該 locality level 啟動時間超過配置的值,則將 locality level 降低一級來嘗試啟動 task

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容