Spark:Dynamic Resource Allocation【動(dòng)態(tài)資源分配】

1. 問題背景
2. 原理分析
   2.1 Executor生命周期
   2.2 ExecutorAllocationManager上下游調(diào)用關(guān)系
3. 總結(jié)與反思
4. Community Feedback

1.問題背景

用戶提交Spark應(yīng)用到Y(jié)arn上時(shí),可以通過spark-submit的num-executors參數(shù)顯示地指定executor個(gè)數(shù),隨后,ApplicationMaster會(huì)為這些executor申請(qǐng)資源,每個(gè)executor作為一個(gè)Container在Yarn上運(yùn)行。Spark調(diào)度器會(huì)把Task按照合適的策略分配到executor上執(zhí)行。所有任務(wù)執(zhí)行完后,executor被殺死,應(yīng)用結(jié)束。在job運(yùn)行的過程中,無論executor是否領(lǐng)取到任務(wù),都會(huì)一直占有著資源不釋放。很顯然,這在任務(wù)量小且顯示指定大量executor的情況下會(huì)很容易造成資源浪費(fèi)。

在探究Spark如何實(shí)現(xiàn)之前,首先思考下如果自己來解決這個(gè)問題,需要考慮哪些因素?大致的方案很容易想到:如果executor在一段時(shí)間內(nèi)一直處于空閑狀態(tài),那么就可以kill該executor,釋放其占用的資源。當(dāng)然,一些細(xì)節(jié)及邊界條件需要考慮到:

  • executor動(dòng)態(tài)調(diào)整的范圍?無限減少?無限制增加?
  • executor動(dòng)態(tài)調(diào)整速率?線性增減?指數(shù)增減?
  • 何時(shí)移除Executor?
  • 何時(shí)新增Executor了?只要由新提交的Task就新增Executor嗎?
  • Spark中的executor不僅僅提供計(jì)算能力,還可能存儲(chǔ)持久化數(shù)據(jù),這些數(shù)據(jù)在宿主executor被kill后,該如何訪問?
  • 。。。

2.原理分析

2.1 Executor生命周期

首先,先簡單分析下Spark靜態(tài)資源分配中Executor的生命周期,以spark-shell中的wordcount為例,執(zhí)行命令如下:

# 以yarn模式執(zhí)行,并指定executor個(gè)數(shù)為1
$ spark-shell --master=yarn --num-executors=1

# 提交Job1 wordcount
scala> sc.textFile("file:///etc/hosts").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# 提交Job2 wordcount
scala> sc.textFile("file:///etc/profile").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();

# Ctrl+C Kill JVM

上述的Spark應(yīng)用中,以yarn模式啟動(dòng)spark-shell,并順序執(zhí)行兩次wordcount,最后Ctrl+C退出spark-shell。此例中Executor的生命周期如下圖:


static-allocation

從上圖可以看出,Executor在整個(gè)應(yīng)用執(zhí)行過程中,其狀態(tài)一直處于Busy(執(zhí)行Task)或Idle(空等)。處于Idle狀態(tài)的Executor造成資源浪費(fèi)這個(gè)問題已經(jīng)在上面提到。下面重點(diǎn)看下開啟Spark動(dòng)態(tài)資源分配功能后,Executor如何運(yùn)作。


spark_dynamic_allocation_executor_lifecycle

下面分析下上圖中各個(gè)步驟:

  1. spark-shell Start:啟動(dòng)spark-shell應(yīng)用,并通過--num-executor指定了1個(gè)執(zhí)行器。
  2. Executor1 Start:啟動(dòng)執(zhí)行器Executor1。注意:Executor啟動(dòng)前存在一個(gè)AM向ResourceManager申請(qǐng)資源的過程,所以啟動(dòng)時(shí)機(jī)略微滯后與Driver。
  3. Job1 Start:提交第一個(gè)wordcount作業(yè),此時(shí),Executor1處于Busy狀態(tài)。
  4. Job1 End:作業(yè)1結(jié)束,Executor1又處于Idle狀態(tài)。
  5. Executor1 timeout:Executor1空閑一段時(shí)間后,超時(shí)被Kill。
  6. Job2 Submit:提交第二個(gè)wordcount,此時(shí),沒有Active的Executor可用。Job2處于Pending狀態(tài)。
  7. Executor2 Start:檢測到有Pending的任務(wù),此時(shí)Spark會(huì)啟動(dòng)Executor2。
  8. Job2 Start:此時(shí),已經(jīng)有Active的執(zhí)行器,Job2會(huì)被分配到Executor2上執(zhí)行。
  9. Job2 End:Job2結(jié)束。
  10. Executor2 End:Ctrl+C 殺死Driver,Executor2也會(huì)被RM殺死。

上述流程中需要重點(diǎn)關(guān)注的幾個(gè)問題:

  • Executor超時(shí):當(dāng)Executor不執(zhí)行任何任務(wù)時(shí),會(huì)被標(biāo)記為Idle狀態(tài)。空閑一段時(shí)間后即被認(rèn)為超時(shí),會(huì)被kill。該空閑時(shí)間由spark.dynamicAllocation.executorIdleTimeout決定,默認(rèn)值60s。對(duì)應(yīng)上圖中:Job1 End到Executor1 timeout之間的時(shí)間。
  • 資源不足時(shí),何時(shí)新增Executor:當(dāng)有Task處于pending狀態(tài),意味著資源不足,此時(shí)需要增加Executor。這段時(shí)間由spark.dynamicAllocation.schedulerBacklogTimeout控制,默認(rèn)1s。對(duì)應(yīng)上述step6和step7之間的時(shí)間。
  • 該新增多少Executor:新增Executor的個(gè)數(shù)主要依據(jù)是當(dāng)前負(fù)載情況,即running和pending任務(wù)數(shù)以及當(dāng)前Executor個(gè)數(shù)決定。用maxNumExecutorsNeeded代表當(dāng)前實(shí)際需要的最大Executor個(gè)數(shù),maxNumExecutorsNeeded和當(dāng)前Executor個(gè)數(shù)的差值即是潛在的新增Executor的個(gè)數(shù)。注意:之所以說潛在的個(gè)數(shù),是因?yàn)樽罱K新增的Executor個(gè)數(shù)還有別的因素需要考慮,后面會(huì)有分析。下面是maxNumExecutorsNeeded計(jì)算方法:
  private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
              tasksPerExecutorForFullParallelism)
      .toInt
  }
  • 其中numRunningOrPendingTasks為當(dāng)前running和pending任務(wù)數(shù)之和。
  • executorAllocationRatio:最理想的情況下,有多少待執(zhí)行的任務(wù),那么我們就新增多少個(gè)Executor,從而達(dá)到最大的任務(wù)并發(fā)度。但是這也有副作用,如果當(dāng)前任務(wù)都是小任務(wù),那么這一策略就會(huì)造成資源浪費(fèi)。可能最后申請(qǐng)的Executor還沒啟動(dòng),這些小任務(wù)已經(jīng)被執(zhí)行完了。該值是一個(gè)系數(shù)值,范圍[0~1]。默認(rèn)1.
  • tasksPerExecutorForFullParallelism:每個(gè)Executor的最大并發(fā)數(shù),簡單理解為:cpu核心數(shù)(spark.executor.cores)/ 每個(gè)任務(wù)占用的核心數(shù)(spark.task.cpus)。
問題1:executor動(dòng)態(tài)調(diào)整的范圍?無限減少?無限制增加?調(diào)整速率?

要實(shí)現(xiàn)資源的動(dòng)態(tài)調(diào)整,那么限定調(diào)整范圍是最先考慮的事情,Spark通過下面幾個(gè)參數(shù)實(shí)現(xiàn):

  • spark.dynamicAllocation.minExecutors:Executor調(diào)整下限。(默認(rèn)值:0)
  • spark.dynamicAllocation.maxExecutors:Executor調(diào)整上限。(默認(rèn)值:Integer.MAX_VALUE)
  • spark.dynamicAllocation.initialExecutors:Executor初始數(shù)量(默認(rèn)值:minExecutors)。

三者的關(guān)系必須滿足:minExecutors <= initialExecutors <= maxExecutors

注意:如果顯示指定了num-executors參數(shù),那么initialExecutors就是num-executor指定的值。

問題2:Spark中的Executor既提供計(jì)算能力,也提供存儲(chǔ)能力。這些因超時(shí)被殺死的Executor中持久化的數(shù)據(jù)如何處理?

如果Executor中緩存了數(shù)據(jù),那么該Executor的Idle-timeout時(shí)間就不是由executorIdleTimeout決定,而是用spark.dynamicAllocation.cachedExecutorIdleTimeout控制,默認(rèn)值:Integer.MAX_VALUE。如果手動(dòng)設(shè)置了該值,當(dāng)這些緩存數(shù)據(jù)的Executor被kill后,我們可以通過NodeManannger的External Shuffle Server來訪問這些數(shù)據(jù)。這就要求NodeManager中spark.shuffle.service.enabled必須開啟。

2.2 ExecutorAllocationManager上下游調(diào)用關(guān)系

Spark動(dòng)態(tài)分配的主要邏輯由ExecutorAllocationManager類實(shí)現(xiàn),首先分析下與其交互的上下游關(guān)系,如下圖所示:


spark_dynamic_allocation

主要的邏輯很簡單:ExecutorAllocationManager中啟動(dòng)一個(gè)周期性任務(wù),監(jiān)控當(dāng)前Executor是否超時(shí),如果超時(shí)就將其移除。當(dāng)然Executor狀態(tài)的收集主要依賴于Spark提供的SparkListener機(jī)制。周期性任務(wù)邏輯如下:

private[spark] class ExecutorAllocationManager {

  // Executor that handles the scheduling task.
  private val executor =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

  def start(): Unit = {
    。。。
    val scheduleTask = new Runnable() {
      override def run(): Unit = {
        try {
          schedule()
        } catch {...}
      }
    }
    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
    。。。
  }
  
  private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis
    // 同步當(dāng)前所需要的Executor數(shù)
    updateAndSyncNumExecutorsTarget(now)

    val executorIdsToBeRemoved = ArrayBuffer[String]()
    // removeTimes是<executorId, expireTime>的映射。
    removeTimes.retain { case (executorId, expireTime) =>
      val expired = now >= expireTime
      if (expired) {
        initializing = false
        executorIdsToBeRemoved += executorId
      }
      !expired
    }
    // 移除所有超時(shí)的Executor
    if (executorIdsToBeRemoved.nonEmpty) {
      removeExecutors(executorIdsToBeRemoved)
    }
  }
}

以上就是對(duì)于Spark的動(dòng)態(tài)資源分配的原理分析,相關(guān)源碼可以參考Apache Spark:ExecutorAllocationManager。完整的配置參數(shù)見:Spark Configuration: Dynamic Allocation

3.總結(jié)與反思

  1. Pascal之父Nicklaus Wirth曾經(jīng)說過一句名言:程序=算法+數(shù)據(jù)結(jié)構(gòu)。對(duì)于Spark動(dòng)態(tài)資源分配來說,我們應(yīng)更加關(guān)注算法方面,即其動(dòng)態(tài)行為。如何分配?如何伸縮?上下游關(guān)系如何?等等。
  2. 回饋社區(qū):回饋是一種輸出,就迫使我們輸入的質(zhì)量要足夠高。這是一種很有效的技能提升方式。萬事開頭難,從最簡單的typo fix/docs improvement起步。

4. Community Feedback

  1. 完善Executor相關(guān)參數(shù)的文檔說明。SPARK-26446: Add cachedExecutorIdleTimeout docs at ExecutorAllocationManager
  2. fix bug:SPARK-26588:Idle executor should properly be killed when no job is submitted

參考

我的博客即將同步至騰訊云+社區(qū),邀請(qǐng)大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=2zo9qc4727c4s

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容