在Spark Standalone中我們所謂的Client,它的任務(wù)其實是由AppClient和DriverClient共同完成的。AppClient是一個允許app(Client)和Spark集群通信的中間人,接受master URL、app的信息、一個集群事件的監(jiān)聽器以及事件監(jiān)聽的回調(diào)函數(shù),主要和Master交互App相關(guān)的信息,DriverClient主要用于和Master交互Driver相關(guān)的信息,比如啟動、停止及運行狀況等,本篇先介紹AppClient。
1.AppClient類主要字段、方法如下:

由上圖我們可以知道,ClientEndpoint是作為AppClient的一個私有類存在的。
(1)stop方法如下所示,主要用于向master發(fā)送消息,停止并注銷app。
<code>
def stop() {
if (endpoint != null) {
try {
//返回Rpc ask的超時時間120s
val timeout = RpcUtils.askRpcTimeout(conf)
//client向master發(fā)送注銷app的信息,在120s內(nèi)如果不響應(yīng),那么將拋RpcTimeoutException
timeout.awaitResult(endpoint.askBoolean)
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
}
endpoint = null
}
}</code>
下面我們重點看ClientEndpoint,它是線程安全的。
2.ClientEndpoint
2.1屬性
(1)//設(shè)置一個boolean標識,用于避免多次調(diào)用listener.disconnected()
private var alreadyDisconnected = false
(2)//app向master申請注冊的線程池,因為被maser注冊是一個阻塞操作,所以線程池的個數(shù)是"masterRpcAddresses.size",這樣app就能同時被所有的master注冊
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueueRunnable,
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
(3)一個守護單線程用于申請注冊操作
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
2.2方法
(1)構(gòu)造函數(shù)為ClientEndpoint的主構(gòu)造器。
(2)onStart方法,用于將App注冊到所有的Master上
<code>
override def onStart(): Unit = {
try {
//“1”表示第幾次注冊,最大次數(shù)不超過3次,第n次申請注冊到master上
registerWithMaster(1)詳見下①
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
//監(jiān)聽器停止并將boolen狀態(tài)標識設(shè)置為true
markDisconnected()
//停止rpcendpoint
stop()
}
}
</code>
①registerWithMaster方法如下,用于異步注冊到所有的master上,如果沒有超過再次注冊的次數(shù)(3次),那么每20s將會重新調(diào)用該方法申請注冊,如果注冊成功,所有的調(diào)用work和futures將會被取消。
<code>
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures = tryRegisterAllMasters()
registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered) {
registerMasterFutures.foreach(.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.foreach(.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
</code>
(3)onstop方法如下,釋放資源。
<code>
override def onStop(): Unit = {
if (registrationRetryTimer != null) {
registrationRetryTimer.cancel(true)
}
registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
}
</code>
(4)receive方法,receive接受到的消息分為5種,分別為
- (1)app被master成功注冊,并將注冊成功的app添加到監(jiān)聽器中
<code>
case RegisteredApplication(appId_, masterRef) => appId = appId_ registered = true master = Some(masterRef) listener.connected(appId)
</code> - (2)移除app,停止rpcendpoint
<code>
case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message))
stop()
</code> - (3)向master申請為app添加executor,并添加到監(jiān)聽器中
<code>
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)</code> - (4)Executor的信息發(fā)生改變,記錄到日志中<code>
case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) }
</code> - (5)HA機制,為app更換master
<code>
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId))
</code>