Spark源碼之Worker介紹篇
Worker介紹
Worker作為工作節(jié)點,一般Driver以及Executor都會在這Worker上分布;
Worker代碼概覽
- Worker繼承了ThreadSafeRpcEndpoint,所以本身就是一個消息循環(huán)體,可以直接跟其他組件進行通信;
- 內(nèi)部封裝一堆數(shù)據(jù)結(jié)構(gòu),用于記錄存儲Driver,Executor,Application等信息;
- Worker內(nèi)部對自身的資源維護;
- 與其他組件通信的通信結(jié)構(gòu);
- 與Master之間的協(xié)助等
見下面源碼:
//todo 繼承ThreadSafeRpcEndpoint消息循環(huán)體
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
systemName: String,
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
//TODO 用戶存儲數(shù)據(jù)結(jié)構(gòu)
var workDir: File = null
val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
//TODO 內(nèi)部的資源維護
var coresUsed = 0
var memoryUsed = 0
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
//通信結(jié)構(gòu)
override def receive: PartialFunction[Any, Unit] = synchronized(......)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {......}
//與Master之間協(xié)作
private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {......}
private def tryRegisterAllMasters(): Array[JFuture[_]] = {......}
private def reregisterWithMaster(): Unit = {......}
......
Worker內(nèi)部分析
還會從Woker的初始化和啟動開始,如下代碼所示:
//在Worker的伴生對象中
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
再看Worker的啟動onStart()方法,因為Worker是主動向Master注冊的,所以在WorKer啟動的方法內(nèi)就直接向Master注冊;
override def onStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
//TODO 向master注冊
registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
進入registerWithMaster() 方法,
private def registerWithMaster() {
// onDisconnected may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
registrationRetryTimer match {
case None =>
registered = false
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
繼續(xù)往下走,進入tryRegisterAllMasters(),為什么會有tryRegisterAllMasters()方法呢?因為如何Master是HA的情況下就會出現(xiàn)多個Master,所以Worker要將它的信息注冊給每個Master,在下面的代碼中可見,在Worker里用了一個線程池registerMasterThreadPool生成一條線程去與Master通信;
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
繼續(xù)進入registerWithMaster()方法,在這個方法內(nèi)你會看到具體想Master發(fā)送的注冊請求,以及對請求的響應(yīng)狀態(tài)的處理;
Worker向Master注冊,在[Spark源碼之Master中已經(jīng)詳細敘述過],這里就不再累贅;
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
進入handleRegisterResponse()方法,看下具體是如何處理這些Worker向Master注冊后的事件;
這個方法里處理的事件:
- Worker向Master注冊成功,在worker注冊成功后會調(diào)用changeMaster(masterRef, masterWebUiUrl)將當前的Master
信息保存在Woker內(nèi)部的數(shù)據(jù)結(jié)構(gòu)中; - Worker向Master注冊失敗;
- Master出現(xiàn)StandBy的情況;
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true
changeMaster(masterRef, masterWebUiUrl)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = masterRef.address.toSparkURL
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
Woker的其他工作
在Worker還維護著Driver和Executor的變化,如下代碼所示:
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {......}
private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged): Unit = {......}
......
Worker的源碼內(nèi)容敘述完畢!Worker中的Driver和Executor分析流程部分可參看下圖:

image