Spark2.x精通:Master端循環(huán)消息處理源碼剖析(二)

微信公眾號:大數(shù)據(jù)開發(fā)運維架構(gòu)

關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問題或建議,請公眾號留言;

如果您覺得“大數(shù)據(jù)開發(fā)運維架構(gòu)”對你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈

從微信公眾號拷貝過來,格式有些錯亂,建議直接去公眾號閱讀


???上一篇文章Spark2.x精通:Master端循環(huán)消息處理源碼剖析(一)主要講解了:

??????? 1主節(jié)點切換后,數(shù)據(jù)恢復(fù) app、driver、executor恢復(fù)流程;

2.完成數(shù)據(jù)恢復(fù)后對應(yīng)App、Worker返回消息的處理。


這里繼續(xù)講Master.scala中的receive()函數(shù)剩下幾種消息類型:

1.觸發(fā)Master Leadership的選舉,結(jié)束當(dāng)前Master節(jié)點進程。

case RevokedLeadership =>

? logError("Leadership has been revoked -- master shutting down.")

? System.exit(0)

2.向Master注冊Worker節(jié)點,這里處理時,會向Worker發(fā)送回執(zhí)類的消息,我們這里不分析Worker里的消息,處理后面會單獨寫Worker端消息處理函數(shù):

case RegisterWorker(

? id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>

? logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

? workerHost, workerPort, cores, Utils.megabytesToString(memory)))

? //如果當(dāng)前Master節(jié)點狀態(tài)為STANDBY,直接向Worker發(fā)送MasterInStandby消息,

? // 這個會在Woroker.scala中處理

? if (state == RecoveryState.STANDBY) {

? workerRef.send(MasterInStandby)

? }

? // else如果發(fā)現(xiàn)當(dāng)前Worker節(jié)點已注冊,也會向Worker發(fā)送RegisterWorkerFailed,

? else if (idToWorker.contains(id)) {

? workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))

? } else {

? //根據(jù)傳過來的主機、端口、cpu、內(nèi)存創(chuàng)建一個WorkerInfo實例

? val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

? ? workerRef, workerWebUiUrl)

? ? //這里才是核心處理函數(shù),進行Worker的處理

? if (registerWorker(worker)) {

? //將注冊的Woker添加到已實例化的持久化引擎中,一般我們持久化引擎是zookeeper

? ? persistenceEngine.addWorker(worker)

? ? //注冊成功后,向Worker發(fā)送RegisteredWorker消息,通知Worker注冊完成,Worker.scala進行處理

? ? workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))

? ? //有新的Worker加入,進行重新調(diào)度

? ? schedule()

? } else {

? ? //如果注冊失敗,打印日志,向Worker發(fā)送RegisterWorkerFailed消息,通知Worker注冊失敗,Worker.scala進行處理

? ? val workerAddress = worker.endpoint.address

? ? logWarning("Worker registration failed. Attempted to re-register worker at same " +

? ? "address: " + workerAddress)

? ? workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "

? ? + workerAddress))

? }

? }

????這里專門分析下第18行的核心的注冊函數(shù)registerWorker(worker):

private def registerWorker(worker: WorkerInfo): Boolean = {

? ? // There may be one or more refs to dead workers on this same node (w/ different ID's),

? ? // remove them.

? ? //同一節(jié)點上可能有一個或多個引用指向該已死的Worker,這里直接移除他們

? ? workers.filter { w =>

? ? ? (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)

? ? }.foreach { w =>

? ? ? workers -= w

? ? }


? ? //這下面幾行大體意思是:如果Worker是從UNKNOWN狀態(tài)來進行注冊的,說明這個Worker在數(shù)據(jù)恢復(fù)期間發(fā)生了重啟,

? ? // 我們認(rèn)為他已經(jīng)死了 直接先移除原來那個Worker(ip相同)

? ? // 這里提醒一下:主節(jié)點選舉完成后,進行數(shù)據(jù)恢復(fù)的過程中就是把Worker狀態(tài)先置為UNKNOWN再置為ALIVE

? ? val workerAddress = worker.endpoint.address

? ? if (addressToWorker.contains(workerAddress)) {

? ? ? val oldWorker = addressToWorker(workerAddress)

? ? ? if (oldWorker.state == WorkerState.UNKNOWN) {

? ? ? ? // A worker registering from UNKNOWN implies that the worker was restarted during recovery.

? ? ? ? // The old worker must thus be dead, so we will remove it and accept the new worker.

? ? ? ? removeWorker(oldWorker)

? ? ? } else {

? ? ? ? logInfo("Attempted to re-register worker at same address: " + workerAddress)

? ? ? ? return false

? ? ? }

? ? }

? ? //將Worker相關(guān)信息加入Master的相關(guān)變量中

? ? workers += worker

? ? idToWorker(worker.id) = worker

? ? addressToWorker(workerAddress) = worker

? ? if (reverseProxy) {

? ? ? webUi.addProxyTargets(worker.id, worker.webUiAddress)

? ? }

? ? //注冊完畢,返回ture

? ? true

3.注冊Application信息,先看代碼:

case RegisterApplication(description, driver) =>

? ? //如果當(dāng)前Master節(jié)點狀態(tài)為STANDBY,直接忽略不處理

? ? ? if (state == RecoveryState.STANDBY) {

? ? ? ? // ignore, don't send response

? ? ? } else {

? ? ? ? logInfo("Registering app " + description.name)

? ? ? ? //接收ApplicationDescription類的對象,這個類里包含了啟動這個App的詳細參數(shù)信息

? ? ? ? // (name、maxCores、memoryPerExecutorMB...),這個對象作為ApplicationInfo類的屬性當(dāng)做引用使用

? ? ? ? val app = createApplication(description, driver)

? ? ? ? registerApplication(app)

? ? ? ? logInfo("Registered app " + description.name + " with ID " + app.id)

? ? ? ? //根據(jù)配置的持久化引擎,持久化app實例

? ? ? ? persistenceEngine.addApplication(app)

? ? ? ? //注冊完成,返回注冊結(jié)果,包含了appId,和Master的通信對象RpcEndpointRef

? ? ? ? //這個消息會在StandaloneAppClient.scala的receive()函數(shù)進行處理

? ? ? ? driver.send(RegisteredApplication(app.id, self))

? ? ? //有新的Application加入,進行重新調(diào)度

? ? ? ? schedule()

? ? ? }

這里專門分析下第9行的核心的注冊函數(shù)registerApplication(app):

private def registerApplication(app: ApplicationInfo): Unit = {

? ? //獲取driver地址

? ? val appAddress = app.driver.address

? ? //如果app注冊,進行重新注冊時,不處理,直接返回

? ? if (addressToApp.contains(appAddress)) {

? ? ? logInfo("Attempted to re-register application at same address: " + appAddress)

? ? ? return

? ? }

? //這是監(jiān)控系統(tǒng)相關(guān)的東西,也是一個注冊函數(shù),這里不做討論

? ? applicationMetricsSystem.registerSource(app.appSource)

? ? //這里將app相關(guān)信息加入到Master的相關(guān)變量中

? ? apps += app

? ? idToApp(app.id)

? ? //endpointToApp存的是HashMap<RpcEndpointRef, ApplicationInfo>,

? ? // 保存了每個消息發(fā)送方所對應(yīng)的app

? ? endpointToApp(app.driver)

? ? addressToApp(appAddress)

? ? //將注冊的app加入到等待隊列,等待后續(xù)的處理

? ? waitingApps += app

? ? if (reverseProxy) {

? ? ? webUi.addProxyTargets(app.id, app.desc.appUiUrl)

? ? }

? }

4.Executor狀態(tài)變更消息處理

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>

? //找到這個executor對應(yīng)的app,然后從app中獲取executor描述句柄ExecutorDescription

? // ExecutorDescription包含以下信息:

? // val appId: String,

? // val execId: Int,

? // val cores: Int,

? // val state: ExecutorState.Value)

? val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

? execOption match {

? //exec不為空值

? case Some(exec) =>

? ? val appInfo = idToApp(appId)

? ? val oldState = exec.state

? ? exec.state = state

? ? //上面三行獲取app信息,備份原有狀態(tài)到oldState

? ? //如果新狀態(tài)為RUNNING,打印日志:狀態(tài)從LAUNCHING變?yōu)镽UNNING。重置app最大重試次數(shù)

? ? if (state == ExecutorState.RUNNING) {

? ? assert(oldState == ExecutorState.LAUNCHING,

? ? ? s"executor $execId state transfer from $oldState to RUNNING is illegal")

? ? appInfo.resetRetryCount()

? ? }

? ? //向driver端發(fā)送ExecutorUpdated消息,消息將在StandaloneAppClient.scala中進行處理

? ? exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

? ? //如果Executor已結(jié)束,狀態(tài)可能是KILLED, FAILED, LOST, EXITED

? ? if (ExecutorState.isFinished(state)) {

? ? // Remove this executor from the worker and app? ? 、

? ? logInfo(s"Removing executor ${exec.fullId} because it is $state")

? ? // If an application has already finished, preserve its

? ? // state to display its information properly on the UI

? //如果app未結(jié)束,則移除executor

? ? if (!appInfo.isFinished) {

? ? ? appInfo.removeExecutor(exec)

? ? }

? ? exec.worker.removeExecutor(exec)

? ? val normalExit = exitStatus == Some(0)

? ? // Only retry certain number of times so we don't go into an infinite loop.

? ? // Important note: this code path is not exercised by tests, so be very careful when

? ? // changing this `if` condition.

? ? //下面幾行大體意思是:如果異常退出,且app重試次數(shù)大于MAX_EXECUTOR_RETRIES

? ? //刪除app,這里會調(diào)用 removeApplication(appInfo, ApplicationState.FAILED)函數(shù)

? //這個函數(shù)合理不分析了,主要就是從master刪除原先保存的app信息,并添加到已完成的app中,并且會更新webUI展示信息

? ? if (!normalExit

? ? ? && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES

? ? ? && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path

? ? ? val execs = appInfo.executors.values

? ? ? if (!execs.exists(_.state == ExecutorState.RUNNING)) {

? ? ? logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +

? ? ? ? s"${appInfo.retryCount} times; removing it")

? ? ? removeApplication(appInfo, ApplicationState.FAILED)

? ? ? }

? ? }

? ? }

? ? //executor狀態(tài)變化,需要進行重新調(diào)度

? ? schedule()

? case None =>

? ? logWarning(s"Got status update for unknown executor $appId/$execId")

? }

5.Driver狀態(tài)變更消息處理,代碼比較少,這里簡單介紹一下:

case DriverStateChanged(driverId, state, exception) =>

? ? ? state match {

? ? ? ? // 如果狀態(tài)是ERROR、FINISHED KILLED FAILED ,直接刪除

? ? ? ? case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>

? ? ? ? ? removeDriver(driverId, state, exception)

? ? ? ? case _ =>

? ? ? ? ? throw new Exception(s"Received unexpected state update for driver $driverId: $state")

? ? ? }

? ? 這里看下上面代碼第5行的removeDriver()函數(shù):

private def removeDriver(

? ? ? driverId: String,

? ? ? finalState: DriverState,

? ? ? exception: Option[Exception]) {

? ? drivers.find(d => d.id == driverId) match {

? ? ? case Some(driver) =>

? ? ? ? logInfo(s"Removing driver: $driverId")

? ? ? ? //將driver從drivers[HashSet]內(nèi)存中移除

? ? ? ? drivers -= driver

? ? ? ? //監(jiān)控相關(guān)更新,不解釋

? ? ? ? if (completedDrivers.size >= RETAINED_DRIVERS) {

? ? ? ? ? val toRemove = math.max(RETAINED_DRIVERS / 10, 1)

? ? ? ? ? completedDrivers.trimStart(toRemove)

? ? ? ? }


? ? ? ? completedDrivers += driver

? ? ? ? //從已實例化的持久化引擎中移除

? ? ? ? persistenceEngine.removeDriver(driver)

? ? ? ? driver.state = finalState

? ? ? ? driver.exception = exception

? ? ? ? //從driver所在的Worker中移除,這可以去看Workerinfo.scala中的removeDriver()函數(shù)

? ? ? ? driver.worker.foreach(w => w.removeDriver(driver))

? ? ? ? //移除之后,重新進行調(diào)度

? ? ? ? schedule()

? ? ? case None =>

? ? ? ? logWarning(s"Asked to remove unknown driver: $driverId")

? ? }

? }

后面還有幾個消息處理函數(shù),由于代碼比較簡單我這里不再一一細說,簡單列一下:

1).Heartbeat

? ? 如果發(fā)送過來的worker已經(jīng)保存在master的內(nèi)存中,那么就更新該worker的最后心跳時間;如果當(dāng)前master內(nèi)存中還沒有worker的信息,那么向worker發(fā)送ReconnectWorker消息

2).WorkerLatestState

? ? worker發(fā)送過來的executor信息,如果在master內(nèi)存中找不到,給worker發(fā)送KillExecutor消息;worker發(fā)送過來的driver信息,如果在master內(nèi)存中找不到,給worker發(fā)送KillDriver消息。

3).UnregisterApplication

? ? 從Master節(jié)點移除app、driver信息;移除executor,并向Worker發(fā)送KillExecutor消息;從持久化實例中刪除app信息,最后還是調(diào)用schedule()函數(shù),重新進行資源調(diào)度。

? ? 至此,Master相關(guān)的消息處理函數(shù),基本介紹完了,如果覺得有幫助,請幫我轉(zhuǎn)發(fā)朋友圈,謝謝關(guān)注?。?!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容