Spark中Client源碼分析(一)

在Spark Standalone中我們所謂的Client,它的任務(wù)其實是由AppClient和DriverClient共同完成的。AppClient是一個允許app(Client)和Spark集群通信的中間人,接受master URL、app的信息、一個集群事件的監(jiān)聽器以及事件監(jiān)聽的回調(diào)函數(shù),主要和Master交互App相關(guān)的信息,DriverClient主要用于和Master交互Driver相關(guān)的信息,比如啟動、停止及運行狀況等,本篇先介紹AppClient。

1.AppClient類主要字段、方法如下:

AppClient類成員

由上圖我們可以知道,ClientEndpoint是作為AppClient的一個私有類存在的。
(1)stop方法如下所示,主要用于向master發(fā)送消息,停止并注銷app。
<code>
def stop() {
if (endpoint != null) {
try {
//返回Rpc ask的超時時間120s
val timeout = RpcUtils.askRpcTimeout(conf)
//client向master發(fā)送注銷app的信息,在120s內(nèi)如果不響應(yīng),那么將拋RpcTimeoutException
timeout.awaitResult(endpoint.askBoolean)
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
}
endpoint = null
}
}</code>
下面我們重點看ClientEndpoint,它是線程安全的。

2.ClientEndpoint

2.1屬性

(1)//設(shè)置一個boolean標識,用于避免多次調(diào)用listener.disconnected()
private var alreadyDisconnected = false
(2)//app向master申請注冊的線程池,因為被maser注冊是一個阻塞操作,所以線程池的個數(shù)是"masterRpcAddresses.size",這樣app就能同時被所有的master注冊
private val registerMasterThreadPool = new ThreadPoolExecutor(
0,
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
60L, TimeUnit.SECONDS,
new SynchronousQueueRunnable,
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
(3)一個守護單線程用于申請注冊操作
private val registrationRetryThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")

2.2方法

(1)構(gòu)造函數(shù)為ClientEndpoint的主構(gòu)造器。
(2)onStart方法,用于將App注冊到所有的Master上
<code>
override def onStart(): Unit = {
try {
//“1”表示第幾次注冊,最大次數(shù)不超過3次,第n次申請注冊到master上
registerWithMaster(1)詳見下①
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
//監(jiān)聽器停止并將boolen狀態(tài)標識設(shè)置為true
markDisconnected()
//停止rpcendpoint
stop()
}
}
</code>
①registerWithMaster方法如下,用于異步注冊到所有的master上,如果沒有超過再次注冊的次數(shù)(3次),那么每20s將會重新調(diào)用該方法申請注冊,如果注冊成功,所有的調(diào)用work和futures將會被取消。
<code>
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures = tryRegisterAllMasters()
registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered) {
registerMasterFutures.foreach(.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.foreach(
.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}
</code>
(3)onstop方法如下,釋放資源。
<code>
override def onStop(): Unit = {
if (registrationRetryTimer != null) {
registrationRetryTimer.cancel(true)
}
registrationRetryThread.shutdownNow()
registerMasterFutures.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
}
</code>
(4)receive方法,receive接受到的消息分為5種,分別為

  • (1)app被master成功注冊,并將注冊成功的app添加到監(jiān)聽器中
    <code>
    case RegisteredApplication(appId_, masterRef) => appId = appId_ registered = true master = Some(masterRef) listener.connected(appId)
    </code>
  • (2)移除app,停止rpcendpoint
    <code>
    case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message))
    stop()
    </code>
  • (3)向master申請為app添加executor,并添加到監(jiān)聽器中
    <code>
    case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>val fullId = appId + "/" + id
    logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
    sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))
    listener.executorAdded(fullId, workerId, hostPort, cores, memory)</code>
  • (4)Executor的信息發(fā)生改變,記錄到日志中<code>
    case ExecutorUpdated(id, state, message, exitStatus) =>
    val fullId = appId + "/" + id
    val messageText = message.map(s => " (" + s + ")").getOrElse("")
    logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
    if (ExecutorState.isFinished(state)) {
    listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) }
    </code>
  • (5)HA機制,為app更換master
    <code>
    case MasterChanged(masterRef, masterWebUiUrl) =>
    logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
    master = Some(masterRef)
    alreadyDisconnected = false
    masterRef.send(MasterChangeAcknowledged(appId))
    </code>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容