Spark executor 模塊② - AppClient 向 Master 注冊(cè) Application

本文為 Spark 2.0 源碼分析筆記,由于源碼只包含 standalone 模式下完整的 executor 相關(guān)代碼,所以本文主要針對(duì) standalone 模式下的 executor 模塊,文中內(nèi)容若不特意說(shuō)明均為 standalone 模式內(nèi)容

前一篇文章簡(jiǎn)要介紹了 Spark 執(zhí)行模塊中幾個(gè)主要的類(lèi)以及 AppClient 是如何被創(chuàng)建的,這篇文章將詳細(xì)的介紹 AppClient 向 Master 注冊(cè) Application 的過(guò)程,將主要從以下幾個(gè)方面進(jìn)行說(shuō)明:

  • 注冊(cè) Application 時(shí)機(jī)
  • 注冊(cè) Application 的重試機(jī)制
  • 注冊(cè)行為細(xì)節(jié)

注冊(cè) Application 時(shí)機(jī)

簡(jiǎn)單來(lái)說(shuō),AppClient 向 Master 注冊(cè) Application 是在 SparkContext 構(gòu)造時(shí)發(fā)生的,也就是 driver 一開(kāi)始運(yùn)行就立馬向 Master 注冊(cè) Application。更具體的步驟可以如下圖表示:

注冊(cè) Application 的重試機(jī)制

StandaloneAppClient 中有兩個(gè)成員,分別是:private val REGISTRATION_TIMEOUT_SECONDS = 20private val REGISTRATION_RETRIES = 3。 其中,REGISTRATION_RETRIES 代表注冊(cè) Application 的最大重試次數(shù),為3次;而 REGISTRATION_TIMEOUT_SECONDS 代表 StandaloneAppClient 在執(zhí)行注冊(cè)之后隔多少秒去獲取注冊(cè)結(jié)果,具體的流程如下:

  1. ClientEndpoint 實(shí)例通過(guò)發(fā)送 RegisterApplication 消息給 Master 來(lái)向 Master 注冊(cè) Application
  2. REGISTRATION_TIMEOUT_SECONDS 秒后檢測(cè) registered 標(biāo)記,若其對(duì)應(yīng)值為 true,則表明注冊(cè)成功;否則,表明注冊(cè)失敗
    • Master 會(huì)在注冊(cè) Application 后向 AppClient 響應(yīng) RegisteredApplication 消息,AppClient 收到該消息會(huì)置 registered 對(duì)應(yīng)值為 true
    • 若 Master 沒(méi)有響應(yīng)該消息,則 registered 一直為 false)
  3. 若注冊(cè)成功,注冊(cè)流程結(jié)束;若注冊(cè)失?。?
    • 已嘗試注冊(cè)次數(shù)小于 REGISTRATION_RETRIES,返回第一步再來(lái)一次
    • 已嘗試注冊(cè)次數(shù)等于 REGISTRATION_RETRIES,結(jié)束注冊(cè)流程,將 Application 標(biāo)記為 dead,通過(guò)回調(diào)通知 SchedulerBackend Application dead

上面這一小段即時(shí)注冊(cè) Application 的重試機(jī)制,下面再來(lái)看看注冊(cè)的一些細(xì)節(jié)

注冊(cè)行為的細(xì)節(jié)

注冊(cè)行為可以主要分為以下三步:

  1. AppClient 發(fā)起注冊(cè)
  2. Master 接收并處理注冊(cè)消息
  3. AppClient 處理 Master 的注冊(cè)響應(yīng)消息

Step1:AppClient 發(fā)起注冊(cè)

AppClient 是通過(guò)向 Master 發(fā)送 RegisterApplication 消息進(jìn)行注冊(cè)的。該消息定義為一個(gè) case class,其中 appDescription: ApplicationDescription 成員描述了要注冊(cè)并啟動(dòng)一個(gè)怎么樣的 Application(主要包含屬性及資源信息),其定義如下:

private[spark] case class ApplicationDescription(
    name: String,                               //< Application 的名字
    maxCores: Option[Int],                      //< application 總共能用的最大 cores 數(shù)量
    memoryPerExecutorMB: Int,                   //< 每個(gè) executor 分配的內(nèi)存
    command: Command,                           //< 啟動(dòng) executor 的 ClassName、所需參數(shù)、環(huán)境信息等啟動(dòng)一個(gè) Java 進(jìn)程的所有需要的信息;在 Standalone 模式下,類(lèi)名就是 CoarseGrainedExecutorBackend
    appUiUrl: String,                           //< Application 的 web ui 的 host:port
    eventLogDir: Option[URI] = None,            //< Spark事件日志記錄的目錄。在這個(gè)基本目錄下,Spark為每個(gè) Application 創(chuàng)建一個(gè)子目錄。各個(gè)應(yīng)用程序記錄日志到相應(yīng)的目錄。常設(shè)置為 hdfs 目錄以便于 history server 訪問(wèn)來(lái)重構(gòu) web ui的目錄
    eventLogCodec: Option[String] = None,
    coresPerExecutor: Option[Int] = None,       //< 每個(gè) executor 使用的 cores 數(shù)量
    initialExecutorLimit: Option[Int] = None,
    user: String = System.getProperty("user.name", "<unknown>")) {

  override def toString: String = "ApplicationDescription(" + name + ")"
}

private[spark] case class Command(
    mainClass: String,
    arguments: Seq[String],
    environment: Map[String, String],
    classPathEntries: Seq[String],
    libraryPathEntries: Seq[String],
    javaOpts: Seq[String]) {
}

除了 Application 的描述,注冊(cè)時(shí)還會(huì)帶上 ClientEndpoint 對(duì)應(yīng)的 rpcEndpointRef,以便 Master 能通過(guò)該 rpcEndpointRef 給自身發(fā)送消息。

構(gòu)造該消息實(shí)例后,ClientEndpoint 就會(huì)通過(guò) master rpcEndpointRef 給 Master 發(fā)送該注冊(cè)消息

Step2:Master 接收并處理注冊(cè)消息

Master 接收到注冊(cè)消息后的主要處理流程如下圖所示:

在向 driver 發(fā)送 RegisteredApplication 消息后,其實(shí)已經(jīng)完成了注冊(cè)流程,從上面的流程圖可以看出,只要接收到 AppClient 的注冊(cè)請(qǐng)求,Master 都能成功注冊(cè) Application 并響應(yīng)消息。這之后的調(diào)度都做了什么呢?我們繼續(xù)跟進(jìn) Master#schedule() 方法。

schedule() 的流程如下:

  1. 打散(shuffle)所有狀態(tài)為 ALIVE 的 workders
  2. 對(duì)于每一個(gè)處于 WAITTING 狀態(tài)的 driver,都要遍歷所有的打散的 alive works
    • 如果 worker 的 free memory 和 free cores 都大于等于 driver 要求的值,則通過(guò)給該 worker 發(fā)送 LaunchDriver 消息來(lái)啟動(dòng) driver 并把該 driver 從 WAITTING driver 中除名
  3. startExecutorsOnWorkers():在 workers 上啟動(dòng) executors(當(dāng)前,只實(shí)現(xiàn)了簡(jiǎn)單的 FIFO 調(diào)度,先滿足第一個(gè) app,然后再滿足第二個(gè) app,以此類(lèi)推)
    1. 從 waitingApps 中取出一個(gè) app(app.coresLeft > 0)
    2. 對(duì)于該 app,從所有可用的 workers 中篩選出 free memory和 free cores 滿足 app executor 需求的 worker,為 usableWorkers
    3. 調(diào)用 scheduleExecutorsOnWorkers 方法來(lái)在 usableWorkers 上分配 executors,有兩種模式:
      • 一種是盡量把一個(gè) app 的 executors 分配到盡可能多的 workers 上
      • 另一種是盡量把一個(gè) app 的 executors 分配到盡量少的 workers 上
  4. 上一步得到了要在每個(gè) workers 上使用多少個(gè) cores,這一步就要來(lái)分配這些了:
    • 調(diào)用 allocateWorkerResourceToExecutors 進(jìn)行分配:
      • 分配一個(gè) worker 的資源給一個(gè)或多個(gè) executors
      • 調(diào)用 launchExecutor(worker, exec) 啟動(dòng) executor
        • 對(duì)應(yīng)的 WorkerInfo 增加剛分配的 ExecutorDesc
        • 給 worker 發(fā)送 LaunchExecutor 消息,以要求其啟動(dòng)指定信息的 executor
        • 給 driver 發(fā)送 ExecutorAdded 消息,以通知其有新的 Executor 添加了
      • 置 app 的狀態(tài)為 RUNNING

Step3:AppClient 處理 Master 的注冊(cè)響應(yīng)消息

Master 若成功處理了注冊(cè)請(qǐng)求,會(huì)響應(yīng)給 AppClient 一個(gè) RegisteredApplication 消息,AppClient 在接收到該響應(yīng)消息后,會(huì)進(jìn)行一些簡(jiǎn)單的操作,主要包括:

  • 設(shè)置 appId
  • 至 registered 為 true
  • 通知 SchedulerBackend 已成功注冊(cè) Application
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容