繼續(xù)前一篇的內(nèi)容。前一篇內(nèi)容為:
Spark中Worker源碼分析(一)http://www.itdecent.cn/p/015b3ff0a5bf
4.receive方法,
receive方法中消息類型主要分為以下14種情況:
(1)worker向master注冊(cè)成功后,詳見代碼
(2)worker向master發(fā)送心跳消息,如果還沒有注冊(cè)到master上,該消息將被忽略,詳見代碼
(3)worker的工作空間的清理,詳見代碼
(4)更換master,詳見代碼
(5)worker注冊(cè)失敗,詳見代碼
(6)再次連接worker,詳見代碼
(7)創(chuàng)建executor,詳見代碼
(8)executor的轉(zhuǎn)態(tài)發(fā)生改變時(shí),詳見代碼
(9)kill executor,詳見代碼
(10)創(chuàng)建driver,詳見代碼
(11)kill driver,詳見代碼
(12)driver的狀態(tài)發(fā)生變化時(shí),詳見代碼
(13)將worker注冊(cè)到master上,詳見代碼
(14)app執(zhí)行完畢,詳見代碼
worker與master相關(guān)的交互為(1)(2)(4)(6)(13)
worker與driver相關(guān)的交互為(10)(11)(12)
worker與executor相關(guān)的交互為(3)(7)(8)(9)(14),需要說(shuō)明的是(3)(14)它們的完成都與executor有著密切的聯(lián)系。
<code>
override def receive: PartialFunction[Any, Unit] = {
//(1)注冊(cè)成功的Woker
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true
changeMaster(masterRef, masterWebUiUrl)
//守護(hù)線程15s發(fā)送一次心跳消息
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
//如果允許清理
if (CLEANUP_ENABLED) {
logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
//守護(hù)線程30min清理app文件夾
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
//(2)worker向master發(fā)送心跳消息,如果還沒有注冊(cè)到master上,該消息將被忽略
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
//(3)worker的工作空間的清理
case WorkDirCleanup =>
//為了加快獨(dú)立將來(lái)獨(dú)立線程的清理工作,不要占用worker rpcEndpoint的端口號(hào),拷貝ids所以它可以被清理線程使用
val appIds = executors.values.map(.appId).toSet
val cleanupFuture = concurrent.future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
}
appDirs.filter { dir =>
//目錄正在被app使用-當(dāng)清理時(shí)檢查app是否在運(yùn)行
val appIdFromDir = dir.getName
val isAppStillRunning = appIds.contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
}(cleanupThreadExecutor)
cleanupFuture.onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}(cleanupThreadExecutor)
//(4)更換master
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
changeMaster(masterRef, masterWebUiUrl)
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
//(5)worker注冊(cè)失敗
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
//(6)再次連接Worker
case ReconnectWorker(masterUrl) =>
logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
//再次將worker注冊(cè)到masters上
registerWithMaster()
//(7)創(chuàng)建Executor
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores, memory_) =>if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
//創(chuàng)建executor的工作目錄
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
//為executors創(chuàng)建本地目錄,通過(guò)SPARK_EXECUTOR_DIRS環(huán)境變量設(shè)置,當(dāng)app執(zhí)行完后并刪除
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception => {
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
}
//(8)executor的轉(zhuǎn)態(tài)發(fā)生改變時(shí)
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
handleExecutorStateChanged(executorStateChanged)
//(9)kill executor
case KillExecutor(masterUrl, appId, execId) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
} else {
val fullId = appId + "/" + execId
executors.get(fullId) match {
case Some(executor) =>
logInfo("Asked to kill executor " + fullId)
executor.kill()
case None =>
logInfo("Asked to kill unknown executor " + fullId)
}
}
//(10)創(chuàng)建Driver
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start(
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}
//(11)kill Driver
case KillDriver(driverId) => {
logInfo(s"Asked to kill driver $driverId")
drivers.get(driverId) match {
case Some(runner) =>
runner.kill()
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
}
//(12)driver的狀態(tài)發(fā)生變化時(shí)
case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
handleDriverStateChanged(driverStateChanged)
}
//(13)將worker注冊(cè)到master上
case ReregisterWithMaster =>
reregisterWithMaster()
//(14)app執(zhí)行完畢
case ApplicationFinished(id) =>
finishedApps += id
//刪除執(zhí)行完的app在執(zhí)行過(guò)程中創(chuàng)建的本地文件
maybeCleanupApplication(id)
}
</code>