本篇結(jié)構(gòu):
- YARN-Client 工作流程圖
- YARN-Client 工作流程
- YARN-Client 模式啟動(dòng)類圖
- YARN-Client 實(shí)現(xiàn)原理
- YARN-Client 作業(yè)運(yùn)行調(diào)用圖
一、YARN-Client 工作流程圖

二、YARN-Client 工作流程
- 啟動(dòng)運(yùn)用程序,main 函數(shù)方法中會(huì)啟動(dòng) SparkContext,在 SparkContext 啟動(dòng)過(guò)程,會(huì)構(gòu)建 DAGScheduler,利用反射構(gòu)建 YarnScheduler(TaskScheduler)和 YarnClientSchedulerBackend(SchedulerBackend)。YarnClientSchedulerBackend 內(nèi)部又會(huì)啟動(dòng) YarnDriverEndpoint 和 Client,接著 Client 向 Yarn 集群的 ResourceManager 申請(qǐng)啟動(dòng) ApplicationMaster。
- RM 收到請(qǐng)求,在集群中選擇一個(gè) NodeManager,為該應(yīng)用程序分配第一個(gè) Container,在該 Container 中啟動(dòng)應(yīng)用程序的 ApplicationMaster(AM),和 YARN-Cluster 不同,YARN-Client 在 AM 中不運(yùn)行 SparkContext,只與 SparkContext 進(jìn)行聯(lián)系進(jìn)行資源的分派。
- 客戶端的 SparkContext 啟動(dòng)完畢后,與 AM 建立通信,向 ResourceManager 注冊(cè),根據(jù)任務(wù)信息向 RM 申請(qǐng)資源(Container)。
- 一旦 AM 申請(qǐng)到資源(即 Container)后,便與對(duì)應(yīng)的 NodeManager 通信,要求它在獲得的 Container 中啟動(dòng) CoarseGrainedExecutorBackend ,CoarseGrainedExecutorBackend 啟動(dòng)后會(huì)向客戶端的 SparkContext 注冊(cè)并等待接收任務(wù)集。
- 客戶端的 SparkContext 分配任務(wù)集給 CoarseGrainedExecutorBackend 執(zhí)行,CoarseGrainedExecutorBackend 運(yùn)行任務(wù)并向終端點(diǎn) YarnDriverEndpoint 匯報(bào)運(yùn)行的狀態(tài)和進(jìn)度,讓客戶端隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí)重新啟動(dòng)任務(wù)。
- 程序運(yùn)行結(jié)束后,客戶端 SparkContext 向 RM 申請(qǐng)注銷并關(guān)閉自身。
三、YARN-Client 模式啟動(dòng)類圖
先大致畫(huà)個(gè) YARN-Clinet 運(yùn)行模式應(yīng)用程序啟動(dòng)類圖:

四、YARN-Client 實(shí)現(xiàn)原理
-
SparkContext 啟動(dòng)時(shí),通過(guò) createTaskScheduler 構(gòu)建 TaskScheduler 和 SchedulerBackend 兩個(gè)對(duì)象,具體在 YARN-Client 模式,創(chuàng)建 TaskScheduler 實(shí)現(xiàn) YarnScheduler 和 SchedulerBackend 實(shí)現(xiàn) YarnClientSchedulerBackend。其中 YarnScheduler 完全繼承 TaskSchedulerImpl,只重寫(xiě)了 getRackForHost() 方法。來(lái)看 SparkContext # createTaskScheduler:
... case masterUrl => // 對(duì)于 Yarn 模式,獲取的是 YarnClusterManager val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } ...分別具體看 createTaskScheduler,createSchedulerBackend:
YarnClusterManager # createTaskScheduler:
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { case "cluster" => new YarnClusterScheduler(sc) // 對(duì)于 YARN-Clinet,創(chuàng)建 YarnScheduler 對(duì)象 case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } }YarnClusterManager # createSchedulerBackend:
override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } }由上述源碼可確認(rèn),YARN-Client 模式,SparkContext 初始化時(shí)創(chuàng)建的兩個(gè)重要類分別是 YarnScheduler(TaskScheduler)、YarnClientSchedulerBackend(SchedulerBackend)。
SparkContext 創(chuàng)建上述兩個(gè)對(duì)象后,調(diào)用 _taskScheduler.start() 方法,在該方法中,會(huì)調(diào)用 backend.start() 方法,而在 YarnClientSchedulerBackend 的 start() 方法中,啟動(dòng) Client,并在 client.submitApplication() 方法中申請(qǐng)啟動(dòng) AM,同時(shí)會(huì)調(diào)用 super.start 啟動(dòng) DriverEndpoint。
YarnClientSchedulerBackend # start:
override def start() { val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ("--arg", hostport) logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray) totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) client = new Client(args, conf) bindToYarn(client.submitApplication(), None) // SPARK-8687: Ensure all necessary properties have already been set before // we initialize our driver scheduler backend, which serves these properties // to the executors super.start() waitForApplication() // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver // reads the credentials from HDFS, just like the executors and updates its own credentials // cache. if (conf.contains("spark.yarn.credentials.file")) { YarnSparkHadoopUtil.startCredentialUpdater(conf) } monitorThread = asyncMonitorApplication() monitorThread.start() }在 client.submitApplication() 方法中,會(huì)判斷 ResourceManager 是否有足夠的資源,如果資源足夠,則構(gòu)造用于啟動(dòng) ApplicationMaster 環(huán)境并提交應(yīng)用程序到 YARN 集群中。
Client # submitApplication:
def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { launcherBackend.connect() // Setup the credentials before doing anything else, // so we have don't have issues at any point. setupCredentials() // 初始化 yarnClient,用于和 YARN 集群交互 yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM // 向 RM 申請(qǐng)應(yīng)用程序編號(hào) val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM // 查證是否有足夠的資源啟動(dòng) AM verifyClusterResources(newAppResponse) // Set up the appropriate contexts to launch our AM // 構(gòu)造適當(dāng)?shù)沫h(huán)境用于啟動(dòng) AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") // 向 RM 提交并監(jiān)控應(yīng)用程序 yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { case e: Throwable => if (appId != null) { cleanupStagingDir(appId) } throw e } } -
當(dāng) ResourceManager 收到請(qǐng)求后,在集群中選擇一個(gè) NodeManager 并啟動(dòng) ExecutorLauncher,在 ExecutorLauncher 初始化中啟動(dòng) ApplicationMaster。啟動(dòng) ExecutorLauncher 是在 Client.createContainerLaunchContext 方法指定。
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { ... val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } ... } -
ExecutorLauncher 啟動(dòng)后,在其 main 方法中調(diào)用 ApplicationMaster.main 方法,ApplicationMaster main 方法調(diào)用其 run 方法,在 run 方法調(diào)用 runExecutorLauncher,接著在 runExecutorLauncher 中調(diào)用 registerAM,registerAM 方法由 ResourceManager 向終端點(diǎn) DriverEndpoint 發(fā)送消息通知 ApplicationMaster 已經(jīng)啟動(dòng)完畢。然后 ApplicationMaster 通過(guò) YarnAllocator 的 allocateResources 方法向 ResourceManager 申請(qǐng)資源(Container)。
ApplicationMaster # registerAM:
private def registerAM( _sparkConf: SparkConf, _rpcEnv: RpcEnv, driverRef: RpcEndpointRef, uiAddress: Option[String]) = { // 獲取應(yīng)用程序和 Attempt 編號(hào) val appId = client.getAttemptId().getApplicationId().toString() val attemptId = client.getAttemptId().getAttemptId().toString() val historyAddress = ApplicationMaster .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) // 獲取 DriverEndpoint 終端點(diǎn)引用 val driverUrl = RpcEndpointAddress( _sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString // Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. // Use placeholders for information that changes such as executor IDs. logInfo { val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>", "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources) dummyRunner.launchContextDebugInfo() } // 在 ResouceManager 發(fā)送消息通知 DriverEndpoint,通知其 AM 已經(jīng)啟動(dòng) allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress, historyAddress, securityMgr, localResources) // Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), // the allocator is ready to service requests. rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) // 申請(qǐng) Executor 的資源 allocator.allocateResources() reporterThread = launchReporterThread() } -
在 YarnAllocator.allocateResources 中獲取可用的 Container,如果獲取到了 Container,YarnAllocator 調(diào)用 handleAllocatedContainers 方法,做一些處理后接著調(diào)用 runAllocatedContainers 方法,在該方法中,以線程池執(zhí)行線程 ExecutorRunnable,ExecutorRunnable run 中,調(diào)用 startContainer 啟動(dòng) Container,Container 中運(yùn)行的是 CoarseGrainedExecutorBackend(可由 startContainer 調(diào)用的 prepareCommand 方法看出)。
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { // 更新計(jì)數(shù)器 executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId val executorId = executorIdCounter.toString assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId") def updateInternalState(): Unit = synchronized { runningExecutors.add(executorId) numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) // 在機(jī)器與 Container 列表中加入當(dāng)前 Container 信息 containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) } if (runningExecutors.size() < targetNumExecutors) { numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(new Runnable { override def run(): Unit = { try { // ExecutorRunnable 加入線程池中調(diào)用,ExecutorRunnable run 方法會(huì)啟動(dòng) Container 并將 CoarseGrainedExecutorBackend // 傳給 Container 啟動(dòng) new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources ).run() updateInternalState() } catch { case e: Throwable => numExecutorsStarting.decrementAndGet() if (NonFatal(e)) { logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately // to avoid unnecessary resource occupation. amClient.releaseAssignedContainer(containerId) } else { throw e } } } }) } else { // For test only updateInternalState() } } else { logInfo(("Skip launching executorRunnable as running executors count: %d " + "reached target executors count: %d.").format( runningExecutors.size, targetNumExecutors)) } } } SparkContext 收到 CoarseGrainedExecutorBackend 發(fā)出的 RegisterExecutor 注冊(cè)消息后,回復(fù) RegisteredExecutor 消息, CoarseGrainedExecutorBackend 收到消息創(chuàng)建 Executor 等待接收任務(wù)集運(yùn)行,SparkContext 分配任務(wù)集給 Executor 并跟蹤運(yùn)行狀態(tài),和 Standalone 運(yùn)行模式是一致的。
運(yùn)用程序運(yùn)行完成后,SparkContext 向 ResourceManager 申請(qǐng)注銷并關(guān)閉。
五、YARN-Client 作業(yè)運(yùn)行調(diào)用圖
