本文為 Spark 2.0 源碼分析筆記,由于源碼只包含 standalone 模式下完整的 executor 相關(guān)代碼,所以本文主要針對(duì) standalone 模式下的 executor 模塊,文中內(nèi)容若不特意說(shuō)明均為 standalone 模式內(nèi)容
前一篇文章簡(jiǎn)要介紹了 Spark 執(zhí)行模塊中幾個(gè)主要的類(lèi)以及 AppClient 是如何被創(chuàng)建的,這篇文章將詳細(xì)的介紹 AppClient 向 Master 注冊(cè) Application 的過(guò)程,將主要從以下幾個(gè)方面進(jìn)行說(shuō)明:
- 注冊(cè) Application 時(shí)機(jī)
- 注冊(cè) Application 的重試機(jī)制
- 注冊(cè)行為細(xì)節(jié)
注冊(cè) Application 時(shí)機(jī)
簡(jiǎn)單來(lái)說(shuō),AppClient 向 Master 注冊(cè) Application 是在 SparkContext 構(gòu)造時(shí)發(fā)生的,也就是 driver 一開(kāi)始運(yùn)行就立馬向 Master 注冊(cè) Application。更具體的步驟可以如下圖表示:

注冊(cè) Application 的重試機(jī)制
StandaloneAppClient 中有兩個(gè)成員,分別是:private val REGISTRATION_TIMEOUT_SECONDS = 20 和 private val REGISTRATION_RETRIES = 3。 其中,REGISTRATION_RETRIES 代表注冊(cè) Application 的最大重試次數(shù),為3次;而 REGISTRATION_TIMEOUT_SECONDS 代表 StandaloneAppClient 在執(zhí)行注冊(cè)之后隔多少秒去獲取注冊(cè)結(jié)果,具體的流程如下:
- ClientEndpoint 實(shí)例通過(guò)發(fā)送
RegisterApplication消息給 Master 來(lái)向 Master 注冊(cè) Application - 隔
REGISTRATION_TIMEOUT_SECONDS秒后檢測(cè) registered 標(biāo)記,若其對(duì)應(yīng)值為 true,則表明注冊(cè)成功;否則,表明注冊(cè)失敗- Master 會(huì)在注冊(cè) Application 后向 AppClient 響應(yīng)
RegisteredApplication消息,AppClient 收到該消息會(huì)置 registered 對(duì)應(yīng)值為 true - 若 Master 沒(méi)有響應(yīng)該消息,則 registered 一直為 false)
- Master 會(huì)在注冊(cè) Application 后向 AppClient 響應(yīng)
- 若注冊(cè)成功,注冊(cè)流程結(jié)束;若注冊(cè)失?。?
- 已嘗試注冊(cè)次數(shù)小于
REGISTRATION_RETRIES,返回第一步再來(lái)一次 - 已嘗試注冊(cè)次數(shù)等于
REGISTRATION_RETRIES,結(jié)束注冊(cè)流程,將 Application 標(biāo)記為 dead,通過(guò)回調(diào)通知 SchedulerBackend Application dead
- 已嘗試注冊(cè)次數(shù)小于
上面這一小段即時(shí)注冊(cè) Application 的重試機(jī)制,下面再來(lái)看看注冊(cè)的一些細(xì)節(jié)
注冊(cè)行為的細(xì)節(jié)
注冊(cè)行為可以主要分為以下三步:
- AppClient 發(fā)起注冊(cè)
- Master 接收并處理注冊(cè)消息
- AppClient 處理 Master 的注冊(cè)響應(yīng)消息
Step1:AppClient 發(fā)起注冊(cè)
AppClient 是通過(guò)向 Master 發(fā)送 RegisterApplication 消息進(jìn)行注冊(cè)的。該消息定義為一個(gè) case class,其中 appDescription: ApplicationDescription 成員描述了要注冊(cè)并啟動(dòng)一個(gè)怎么樣的 Application(主要包含屬性及資源信息),其定義如下:
private[spark] case class ApplicationDescription(
name: String, //< Application 的名字
maxCores: Option[Int], //< application 總共能用的最大 cores 數(shù)量
memoryPerExecutorMB: Int, //< 每個(gè) executor 分配的內(nèi)存
command: Command, //< 啟動(dòng) executor 的 ClassName、所需參數(shù)、環(huán)境信息等啟動(dòng)一個(gè) Java 進(jìn)程的所有需要的信息;在 Standalone 模式下,類(lèi)名就是 CoarseGrainedExecutorBackend
appUiUrl: String, //< Application 的 web ui 的 host:port
eventLogDir: Option[URI] = None, //< Spark事件日志記錄的目錄。在這個(gè)基本目錄下,Spark為每個(gè) Application 創(chuàng)建一個(gè)子目錄。各個(gè)應(yīng)用程序記錄日志到相應(yīng)的目錄。常設(shè)置為 hdfs 目錄以便于 history server 訪問(wèn)來(lái)重構(gòu) web ui的目錄
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None, //< 每個(gè) executor 使用的 cores 數(shù)量
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
override def toString: String = "ApplicationDescription(" + name + ")"
}
private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
javaOpts: Seq[String]) {
}
除了 Application 的描述,注冊(cè)時(shí)還會(huì)帶上 ClientEndpoint 對(duì)應(yīng)的 rpcEndpointRef,以便 Master 能通過(guò)該 rpcEndpointRef 給自身發(fā)送消息。
構(gòu)造該消息實(shí)例后,ClientEndpoint 就會(huì)通過(guò) master rpcEndpointRef 給 Master 發(fā)送該注冊(cè)消息
Step2:Master 接收并處理注冊(cè)消息
Master 接收到注冊(cè)消息后的主要處理流程如下圖所示:

在向 driver 發(fā)送 RegisteredApplication 消息后,其實(shí)已經(jīng)完成了注冊(cè)流程,從上面的流程圖可以看出,只要接收到 AppClient 的注冊(cè)請(qǐng)求,Master 都能成功注冊(cè) Application 并響應(yīng)消息。這之后的調(diào)度都做了什么呢?我們繼續(xù)跟進(jìn) Master#schedule() 方法。
schedule() 的流程如下:
- 打散(shuffle)所有狀態(tài)為 ALIVE 的 workders
- 對(duì)于每一個(gè)處于 WAITTING 狀態(tài)的 driver,都要遍歷所有的打散的 alive works
- 如果 worker 的 free memory 和 free cores 都大于等于 driver 要求的值,則通過(guò)給該 worker 發(fā)送
LaunchDriver消息來(lái)啟動(dòng) driver 并把該 driver 從 WAITTING driver 中除名
- 如果 worker 的 free memory 和 free cores 都大于等于 driver 要求的值,則通過(guò)給該 worker 發(fā)送
-
startExecutorsOnWorkers():在 workers 上啟動(dòng) executors(當(dāng)前,只實(shí)現(xiàn)了簡(jiǎn)單的 FIFO 調(diào)度,先滿足第一個(gè) app,然后再滿足第二個(gè) app,以此類(lèi)推)- 從 waitingApps 中取出一個(gè) app(app.coresLeft > 0)
- 對(duì)于該 app,從所有可用的 workers 中篩選出 free memory和 free cores 滿足 app executor 需求的 worker,為 usableWorkers
- 調(diào)用
scheduleExecutorsOnWorkers方法來(lái)在 usableWorkers 上分配 executors,有兩種模式:- 一種是盡量把一個(gè) app 的 executors 分配到盡可能多的 workers 上
- 另一種是盡量把一個(gè) app 的 executors 分配到盡量少的 workers 上
- 上一步得到了要在每個(gè) workers 上使用多少個(gè) cores,這一步就要來(lái)分配這些了:
- 調(diào)用
allocateWorkerResourceToExecutors進(jìn)行分配:- 分配一個(gè) worker 的資源給一個(gè)或多個(gè) executors
- 調(diào)用
launchExecutor(worker, exec)啟動(dòng) executor- 對(duì)應(yīng)的 WorkerInfo 增加剛分配的 ExecutorDesc
- 給 worker 發(fā)送 LaunchExecutor 消息,以要求其啟動(dòng)指定信息的 executor
- 給 driver 發(fā)送 ExecutorAdded 消息,以通知其有新的 Executor 添加了
- 置 app 的狀態(tài)為 RUNNING
- 調(diào)用
Step3:AppClient 處理 Master 的注冊(cè)響應(yīng)消息
Master 若成功處理了注冊(cè)請(qǐng)求,會(huì)響應(yīng)給 AppClient 一個(gè) RegisteredApplication 消息,AppClient 在接收到該響應(yīng)消息后,會(huì)進(jìn)行一些簡(jiǎn)單的操作,主要包括:
- 設(shè)置 appId
- 至 registered 為 true
- 通知 SchedulerBackend 已成功注冊(cè) Application