微信公眾號:大數(shù)據(jù)開發(fā)運維架構(gòu)
關(guān)注可了解更多大數(shù)據(jù)相關(guān)的資訊。問題或建議,請公眾號留言;
如果您覺得“大數(shù)據(jù)開發(fā)運維架構(gòu)”對你有幫助,歡迎轉(zhuǎn)發(fā)朋友圈
從微信公眾號拷貝過來,格式有些錯亂,建議直接去公眾號閱讀
???上一篇文章Spark2.x精通:Master端循環(huán)消息處理源碼剖析(一)主要講解了:
??????? 1主節(jié)點切換后,數(shù)據(jù)恢復(fù) app、driver、executor恢復(fù)流程;
2.完成數(shù)據(jù)恢復(fù)后對應(yīng)App、Worker返回消息的處理。
這里繼續(xù)講Master.scala中的receive()函數(shù)剩下幾種消息類型:
1.觸發(fā)Master Leadership的選舉,結(jié)束當(dāng)前Master節(jié)點進程。
case RevokedLeadership =>
? logError("Leadership has been revoked -- master shutting down.")
? System.exit(0)
2.向Master注冊Worker節(jié)點,這里處理時,會向Worker發(fā)送回執(zhí)類的消息,我們這里不分析Worker里的消息,處理后面會單獨寫Worker端消息處理函數(shù):
case RegisterWorker(
? id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
? logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
? workerHost, workerPort, cores, Utils.megabytesToString(memory)))
? //如果當(dāng)前Master節(jié)點狀態(tài)為STANDBY,直接向Worker發(fā)送MasterInStandby消息,
? // 這個會在Woroker.scala中處理
? if (state == RecoveryState.STANDBY) {
? workerRef.send(MasterInStandby)
? }
? // else如果發(fā)現(xiàn)當(dāng)前Worker節(jié)點已注冊,也會向Worker發(fā)送RegisterWorkerFailed,
? else if (idToWorker.contains(id)) {
? workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
? } else {
? //根據(jù)傳過來的主機、端口、cpu、內(nèi)存創(chuàng)建一個WorkerInfo實例
? val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
? ? workerRef, workerWebUiUrl)
? ? //這里才是核心處理函數(shù),進行Worker的處理
? if (registerWorker(worker)) {
? //將注冊的Woker添加到已實例化的持久化引擎中,一般我們持久化引擎是zookeeper
? ? persistenceEngine.addWorker(worker)
? ? //注冊成功后,向Worker發(fā)送RegisteredWorker消息,通知Worker注冊完成,Worker.scala進行處理
? ? workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
? ? //有新的Worker加入,進行重新調(diào)度
? ? schedule()
? } else {
? ? //如果注冊失敗,打印日志,向Worker發(fā)送RegisterWorkerFailed消息,通知Worker注冊失敗,Worker.scala進行處理
? ? val workerAddress = worker.endpoint.address
? ? logWarning("Worker registration failed. Attempted to re-register worker at same " +
? ? "address: " + workerAddress)
? ? workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
? ? + workerAddress))
? }
? }
????這里專門分析下第18行的核心的注冊函數(shù)registerWorker(worker):
private def registerWorker(worker: WorkerInfo): Boolean = {
? ? // There may be one or more refs to dead workers on this same node (w/ different ID's),
? ? // remove them.
? ? //同一節(jié)點上可能有一個或多個引用指向該已死的Worker,這里直接移除他們
? ? workers.filter { w =>
? ? ? (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
? ? }.foreach { w =>
? ? ? workers -= w
? ? }
? ? //這下面幾行大體意思是:如果Worker是從UNKNOWN狀態(tài)來進行注冊的,說明這個Worker在數(shù)據(jù)恢復(fù)期間發(fā)生了重啟,
? ? // 我們認(rèn)為他已經(jīng)死了 直接先移除原來那個Worker(ip相同)
? ? // 這里提醒一下:主節(jié)點選舉完成后,進行數(shù)據(jù)恢復(fù)的過程中就是把Worker狀態(tài)先置為UNKNOWN再置為ALIVE
? ? val workerAddress = worker.endpoint.address
? ? if (addressToWorker.contains(workerAddress)) {
? ? ? val oldWorker = addressToWorker(workerAddress)
? ? ? if (oldWorker.state == WorkerState.UNKNOWN) {
? ? ? ? // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
? ? ? ? // The old worker must thus be dead, so we will remove it and accept the new worker.
? ? ? ? removeWorker(oldWorker)
? ? ? } else {
? ? ? ? logInfo("Attempted to re-register worker at same address: " + workerAddress)
? ? ? ? return false
? ? ? }
? ? }
? ? //將Worker相關(guān)信息加入Master的相關(guān)變量中
? ? workers += worker
? ? idToWorker(worker.id) = worker
? ? addressToWorker(workerAddress) = worker
? ? if (reverseProxy) {
? ? ? webUi.addProxyTargets(worker.id, worker.webUiAddress)
? ? }
? ? //注冊完畢,返回ture
? ? true
3.注冊Application信息,先看代碼:
case RegisterApplication(description, driver) =>
? ? //如果當(dāng)前Master節(jié)點狀態(tài)為STANDBY,直接忽略不處理
? ? ? if (state == RecoveryState.STANDBY) {
? ? ? ? // ignore, don't send response
? ? ? } else {
? ? ? ? logInfo("Registering app " + description.name)
? ? ? ? //接收ApplicationDescription類的對象,這個類里包含了啟動這個App的詳細參數(shù)信息
? ? ? ? // (name、maxCores、memoryPerExecutorMB...),這個對象作為ApplicationInfo類的屬性當(dāng)做引用使用
? ? ? ? val app = createApplication(description, driver)
? ? ? ? registerApplication(app)
? ? ? ? logInfo("Registered app " + description.name + " with ID " + app.id)
? ? ? ? //根據(jù)配置的持久化引擎,持久化app實例
? ? ? ? persistenceEngine.addApplication(app)
? ? ? ? //注冊完成,返回注冊結(jié)果,包含了appId,和Master的通信對象RpcEndpointRef
? ? ? ? //這個消息會在StandaloneAppClient.scala的receive()函數(shù)進行處理
? ? ? ? driver.send(RegisteredApplication(app.id, self))
? ? ? //有新的Application加入,進行重新調(diào)度
? ? ? ? schedule()
? ? ? }
這里專門分析下第9行的核心的注冊函數(shù)registerApplication(app):
private def registerApplication(app: ApplicationInfo): Unit = {
? ? //獲取driver地址
? ? val appAddress = app.driver.address
? ? //如果app注冊,進行重新注冊時,不處理,直接返回
? ? if (addressToApp.contains(appAddress)) {
? ? ? logInfo("Attempted to re-register application at same address: " + appAddress)
? ? ? return
? ? }
? //這是監(jiān)控系統(tǒng)相關(guān)的東西,也是一個注冊函數(shù),這里不做討論
? ? applicationMetricsSystem.registerSource(app.appSource)
? ? //這里將app相關(guān)信息加入到Master的相關(guān)變量中
? ? apps += app
? ? idToApp(app.id)
? ? //endpointToApp存的是HashMap<RpcEndpointRef, ApplicationInfo>,
? ? // 保存了每個消息發(fā)送方所對應(yīng)的app
? ? endpointToApp(app.driver)
? ? addressToApp(appAddress)
? ? //將注冊的app加入到等待隊列,等待后續(xù)的處理
? ? waitingApps += app
? ? if (reverseProxy) {
? ? ? webUi.addProxyTargets(app.id, app.desc.appUiUrl)
? ? }
? }
4.Executor狀態(tài)變更消息處理
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
? //找到這個executor對應(yīng)的app,然后從app中獲取executor描述句柄ExecutorDescription
? // ExecutorDescription包含以下信息:
? // val appId: String,
? // val execId: Int,
? // val cores: Int,
? // val state: ExecutorState.Value)
? val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
? execOption match {
? //exec不為空值
? case Some(exec) =>
? ? val appInfo = idToApp(appId)
? ? val oldState = exec.state
? ? exec.state = state
? ? //上面三行獲取app信息,備份原有狀態(tài)到oldState
? ? //如果新狀態(tài)為RUNNING,打印日志:狀態(tài)從LAUNCHING變?yōu)镽UNNING。重置app最大重試次數(shù)
? ? if (state == ExecutorState.RUNNING) {
? ? assert(oldState == ExecutorState.LAUNCHING,
? ? ? s"executor $execId state transfer from $oldState to RUNNING is illegal")
? ? appInfo.resetRetryCount()
? ? }
? ? //向driver端發(fā)送ExecutorUpdated消息,消息將在StandaloneAppClient.scala中進行處理
? ? exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
? ? //如果Executor已結(jié)束,狀態(tài)可能是KILLED, FAILED, LOST, EXITED
? ? if (ExecutorState.isFinished(state)) {
? ? // Remove this executor from the worker and app? ? 、
? ? logInfo(s"Removing executor ${exec.fullId} because it is $state")
? ? // If an application has already finished, preserve its
? ? // state to display its information properly on the UI
? //如果app未結(jié)束,則移除executor
? ? if (!appInfo.isFinished) {
? ? ? appInfo.removeExecutor(exec)
? ? }
? ? exec.worker.removeExecutor(exec)
? ? val normalExit = exitStatus == Some(0)
? ? // Only retry certain number of times so we don't go into an infinite loop.
? ? // Important note: this code path is not exercised by tests, so be very careful when
? ? // changing this `if` condition.
? ? //下面幾行大體意思是:如果異常退出,且app重試次數(shù)大于MAX_EXECUTOR_RETRIES
? ? //刪除app,這里會調(diào)用 removeApplication(appInfo, ApplicationState.FAILED)函數(shù)
? //這個函數(shù)合理不分析了,主要就是從master刪除原先保存的app信息,并添加到已完成的app中,并且會更新webUI展示信息
? ? if (!normalExit
? ? ? && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
? ? ? && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
? ? ? val execs = appInfo.executors.values
? ? ? if (!execs.exists(_.state == ExecutorState.RUNNING)) {
? ? ? logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
? ? ? ? s"${appInfo.retryCount} times; removing it")
? ? ? removeApplication(appInfo, ApplicationState.FAILED)
? ? ? }
? ? }
? ? }
? ? //executor狀態(tài)變化,需要進行重新調(diào)度
? ? schedule()
? case None =>
? ? logWarning(s"Got status update for unknown executor $appId/$execId")
? }
5.Driver狀態(tài)變更消息處理,代碼比較少,這里簡單介紹一下:
case DriverStateChanged(driverId, state, exception) =>
? ? ? state match {
? ? ? ? // 如果狀態(tài)是ERROR、FINISHED KILLED FAILED ,直接刪除
? ? ? ? case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
? ? ? ? ? removeDriver(driverId, state, exception)
? ? ? ? case _ =>
? ? ? ? ? throw new Exception(s"Received unexpected state update for driver $driverId: $state")
? ? ? }
? ? 這里看下上面代碼第5行的removeDriver()函數(shù):
private def removeDriver(
? ? ? driverId: String,
? ? ? finalState: DriverState,
? ? ? exception: Option[Exception]) {
? ? drivers.find(d => d.id == driverId) match {
? ? ? case Some(driver) =>
? ? ? ? logInfo(s"Removing driver: $driverId")
? ? ? ? //將driver從drivers[HashSet]內(nèi)存中移除
? ? ? ? drivers -= driver
? ? ? ? //監(jiān)控相關(guān)更新,不解釋
? ? ? ? if (completedDrivers.size >= RETAINED_DRIVERS) {
? ? ? ? ? val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
? ? ? ? ? completedDrivers.trimStart(toRemove)
? ? ? ? }
? ? ? ? completedDrivers += driver
? ? ? ? //從已實例化的持久化引擎中移除
? ? ? ? persistenceEngine.removeDriver(driver)
? ? ? ? driver.state = finalState
? ? ? ? driver.exception = exception
? ? ? ? //從driver所在的Worker中移除,這可以去看Workerinfo.scala中的removeDriver()函數(shù)
? ? ? ? driver.worker.foreach(w => w.removeDriver(driver))
? ? ? ? //移除之后,重新進行調(diào)度
? ? ? ? schedule()
? ? ? case None =>
? ? ? ? logWarning(s"Asked to remove unknown driver: $driverId")
? ? }
? }
后面還有幾個消息處理函數(shù),由于代碼比較簡單我這里不再一一細說,簡單列一下:
1).Heartbeat
? ? 如果發(fā)送過來的worker已經(jīng)保存在master的內(nèi)存中,那么就更新該worker的最后心跳時間;如果當(dāng)前master內(nèi)存中還沒有worker的信息,那么向worker發(fā)送ReconnectWorker消息
2).WorkerLatestState
? ? worker發(fā)送過來的executor信息,如果在master內(nèi)存中找不到,給worker發(fā)送KillExecutor消息;worker發(fā)送過來的driver信息,如果在master內(nèi)存中找不到,給worker發(fā)送KillDriver消息。
3).UnregisterApplication
? ? 從Master節(jié)點移除app、driver信息;移除executor,并向Worker發(fā)送KillExecutor消息;從持久化實例中刪除app信息,最后還是調(diào)用schedule()函數(shù),重新進行資源調(diào)度。
? ? 至此,Master相關(guān)的消息處理函數(shù),基本介紹完了,如果覺得有幫助,請幫我轉(zhuǎn)發(fā)朋友圈,謝謝關(guān)注?。?!