【Spark】Spark 運(yùn)行架構(gòu)--YARN-Client

本篇結(jié)構(gòu):

  • YARN-Client 工作流程圖
  • YARN-Client 工作流程
  • YARN-Client 模式啟動(dòng)類圖
  • YARN-Client 實(shí)現(xiàn)原理
  • YARN-Client 作業(yè)運(yùn)行調(diào)用圖

一、YARN-Client 工作流程圖

image

二、YARN-Client 工作流程

  1. 啟動(dòng)運(yùn)用程序,main 函數(shù)方法中會(huì)啟動(dòng) SparkContext,在 SparkContext 啟動(dòng)過(guò)程,會(huì)構(gòu)建 DAGScheduler,利用反射構(gòu)建 YarnScheduler(TaskScheduler)和 YarnClientSchedulerBackend(SchedulerBackend)。YarnClientSchedulerBackend 內(nèi)部又會(huì)啟動(dòng) YarnDriverEndpoint 和 Client,接著 Client 向 Yarn 集群的 ResourceManager 申請(qǐng)啟動(dòng) ApplicationMaster。
  2. RM 收到請(qǐng)求,在集群中選擇一個(gè) NodeManager,為該應(yīng)用程序分配第一個(gè) Container,在該 Container 中啟動(dòng)應(yīng)用程序的 ApplicationMaster(AM),和 YARN-Cluster 不同,YARN-Client 在 AM 中不運(yùn)行 SparkContext,只與 SparkContext 進(jìn)行聯(lián)系進(jìn)行資源的分派。
  3. 客戶端的 SparkContext 啟動(dòng)完畢后,與 AM 建立通信,向 ResourceManager 注冊(cè),根據(jù)任務(wù)信息向 RM 申請(qǐng)資源(Container)。
  4. 一旦 AM 申請(qǐng)到資源(即 Container)后,便與對(duì)應(yīng)的 NodeManager 通信,要求它在獲得的 Container 中啟動(dòng) CoarseGrainedExecutorBackend ,CoarseGrainedExecutorBackend 啟動(dòng)后會(huì)向客戶端的 SparkContext 注冊(cè)并等待接收任務(wù)集。
  5. 客戶端的 SparkContext 分配任務(wù)集給 CoarseGrainedExecutorBackend 執(zhí)行,CoarseGrainedExecutorBackend 運(yùn)行任務(wù)并向終端點(diǎn) YarnDriverEndpoint 匯報(bào)運(yùn)行的狀態(tài)和進(jìn)度,讓客戶端隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí)重新啟動(dòng)任務(wù)。
  6. 程序運(yùn)行結(jié)束后,客戶端 SparkContext 向 RM 申請(qǐng)注銷并關(guān)閉自身。

三、YARN-Client 模式啟動(dòng)類圖

先大致畫(huà)個(gè) YARN-Clinet 運(yùn)行模式應(yīng)用程序啟動(dòng)類圖:

image

四、YARN-Client 實(shí)現(xiàn)原理

  1. SparkContext 啟動(dòng)時(shí),通過(guò) createTaskScheduler 構(gòu)建 TaskScheduler 和 SchedulerBackend 兩個(gè)對(duì)象,具體在 YARN-Client 模式,創(chuàng)建 TaskScheduler 實(shí)現(xiàn) YarnScheduler 和 SchedulerBackend 實(shí)現(xiàn) YarnClientSchedulerBackend。其中 YarnScheduler 完全繼承 TaskSchedulerImpl,只重寫(xiě)了 getRackForHost() 方法。來(lái)看 SparkContext # createTaskScheduler:

    ...
    case masterUrl =>
      // 對(duì)于 Yarn 模式,獲取的是 YarnClusterManager
      val cm = getClusterManager(masterUrl) match {
        case Some(clusterMgr) => clusterMgr
        case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
      }
      try {
        val scheduler = cm.createTaskScheduler(sc, masterUrl)
        val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
        cm.initialize(scheduler, backend)
        (backend, scheduler)
      } catch {
        case se: SparkException => throw se
        case NonFatal(e) =>
          throw new SparkException("External scheduler cannot be instantiated", e)
      }
    ...
    

    分別具體看 createTaskScheduler,createSchedulerBackend:

    YarnClusterManager # createTaskScheduler:

    override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
      sc.deployMode match {
        case "cluster" => new YarnClusterScheduler(sc)
        // 對(duì)于 YARN-Clinet,創(chuàng)建 YarnScheduler 對(duì)象
        case "client" => new YarnScheduler(sc)
        case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
      }
    }
    

    YarnClusterManager # createSchedulerBackend:

    override def createSchedulerBackend(sc: SparkContext,
        masterURL: String,
        scheduler: TaskScheduler): SchedulerBackend = {
      sc.deployMode match {
        case "cluster" =>
          new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
        case "client" =>
          new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
        case  _ =>
          throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
      }
    }
    

    由上述源碼可確認(rèn),YARN-Client 模式,SparkContext 初始化時(shí)創(chuàng)建的兩個(gè)重要類分別是 YarnScheduler(TaskScheduler)、YarnClientSchedulerBackend(SchedulerBackend)。

    SparkContext 創(chuàng)建上述兩個(gè)對(duì)象后,調(diào)用 _taskScheduler.start() 方法,在該方法中,會(huì)調(diào)用 backend.start() 方法,而在 YarnClientSchedulerBackend 的 start() 方法中,啟動(dòng) Client,并在 client.submitApplication() 方法中申請(qǐng)啟動(dòng) AM,同時(shí)會(huì)調(diào)用 super.start 啟動(dòng) DriverEndpoint。

    YarnClientSchedulerBackend # start:

    override def start() {
      val driverHost = conf.get("spark.driver.host")
      val driverPort = conf.get("spark.driver.port")
      val hostport = driverHost + ":" + driverPort
      sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
    
      val argsArrayBuf = new ArrayBuffer[String]()
      argsArrayBuf += ("--arg", hostport)
    
      logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
      val args = new ClientArguments(argsArrayBuf.toArray)
      totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
      client = new Client(args, conf)
      bindToYarn(client.submitApplication(), None)
    
      // SPARK-8687: Ensure all necessary properties have already been set before
      // we initialize our driver scheduler backend, which serves these properties
      // to the executors
      super.start()
      waitForApplication()
    
      // SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
      // reads the credentials from HDFS, just like the executors and updates its own credentials
      // cache.
      if (conf.contains("spark.yarn.credentials.file")) {
        YarnSparkHadoopUtil.startCredentialUpdater(conf)
      }
      monitorThread = asyncMonitorApplication()
      monitorThread.start()
    }
    

    在 client.submitApplication() 方法中,會(huì)判斷 ResourceManager 是否有足夠的資源,如果資源足夠,則構(gòu)造用于啟動(dòng) ApplicationMaster 環(huán)境并提交應(yīng)用程序到 YARN 集群中。

    Client # submitApplication:

    def submitApplication(): ApplicationId = {
      var appId: ApplicationId = null
      try {
        launcherBackend.connect()
        // Setup the credentials before doing anything else,
        // so we have don't have issues at any point.
        setupCredentials()
        // 初始化 yarnClient,用于和 YARN 集群交互
        yarnClient.init(hadoopConf)
        yarnClient.start()
    
        logInfo("Requesting a new application from cluster with %d NodeManagers"
          .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
    
        // Get a new application from our RM
        // 向 RM 申請(qǐng)應(yīng)用程序編號(hào)
        val newApp = yarnClient.createApplication()
        val newAppResponse = newApp.getNewApplicationResponse()
        appId = newAppResponse.getApplicationId()
    
        new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
          Option(appId.toString)).setCurrentContext()
    
        // Verify whether the cluster has enough resources for our AM
        // 查證是否有足夠的資源啟動(dòng) AM
        verifyClusterResources(newAppResponse)
    
        // Set up the appropriate contexts to launch our AM
        // 構(gòu)造適當(dāng)?shù)沫h(huán)境用于啟動(dòng) AM
        val containerContext = createContainerLaunchContext(newAppResponse)
        val appContext = createApplicationSubmissionContext(newApp, containerContext)
    
        // Finally, submit and monitor the application
        logInfo(s"Submitting application $appId to ResourceManager")
        // 向 RM 提交并監(jiān)控應(yīng)用程序
        yarnClient.submitApplication(appContext)
        launcherBackend.setAppId(appId.toString)
        reportLauncherState(SparkAppHandle.State.SUBMITTED)
    
        appId
      } catch {
        case e: Throwable =>
          if (appId != null) {
            cleanupStagingDir(appId)
          }
          throw e
      }
    }
    
  2. 當(dāng) ResourceManager 收到請(qǐng)求后,在集群中選擇一個(gè) NodeManager 并啟動(dòng) ExecutorLauncher,在 ExecutorLauncher 初始化中啟動(dòng) ApplicationMaster。啟動(dòng) ExecutorLauncher 是在 Client.createContainerLaunchContext 方法指定。

    private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
      : ContainerLaunchContext = {
      ...
      val amClass =
        if (isClusterMode) {
          Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
        } else {
          Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
        }
    ...
    }
    
  3. ExecutorLauncher 啟動(dòng)后,在其 main 方法中調(diào)用 ApplicationMaster.main 方法,ApplicationMaster main 方法調(diào)用其 run 方法,在 run 方法調(diào)用 runExecutorLauncher,接著在 runExecutorLauncher 中調(diào)用 registerAM,registerAM 方法由 ResourceManager 向終端點(diǎn) DriverEndpoint 發(fā)送消息通知 ApplicationMaster 已經(jīng)啟動(dòng)完畢。然后 ApplicationMaster 通過(guò) YarnAllocator 的 allocateResources 方法向 ResourceManager 申請(qǐng)資源(Container)。

    ApplicationMaster # registerAM:

    private def registerAM(
        _sparkConf: SparkConf,
        _rpcEnv: RpcEnv,
        driverRef: RpcEndpointRef,
        uiAddress: Option[String]) = {
      // 獲取應(yīng)用程序和 Attempt 編號(hào)
      val appId = client.getAttemptId().getApplicationId().toString()
      val attemptId = client.getAttemptId().getAttemptId().toString()
      val historyAddress = ApplicationMaster
        .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
      
      // 獲取 DriverEndpoint 終端點(diǎn)引用
      val driverUrl = RpcEndpointAddress(
        _sparkConf.get("spark.driver.host"),
        _sparkConf.get("spark.driver.port").toInt,
        CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    
      // Before we initialize the allocator, let's log the information about how executors will
      // be run up front, to avoid printing this out for every single executor being launched.
      // Use placeholders for information that changes such as executor IDs.
      logInfo {
        val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
        val executorCores = sparkConf.get(EXECUTOR_CORES)
        val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
          "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
        dummyRunner.launchContextDebugInfo()
      }
    
      // 在 ResouceManager 發(fā)送消息通知 DriverEndpoint,通知其 AM 已經(jīng)啟動(dòng)
      allocator = client.register(driverUrl,
        driverRef,
        yarnConf,
        _sparkConf,
        uiAddress,
        historyAddress,
        securityMgr,
        localResources)
    
      // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
      // that when the driver sends an initial executor request (e.g. after an AM restart),
      // the allocator is ready to service requests.
      rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
    
      // 申請(qǐng) Executor 的資源
      allocator.allocateResources()
      reporterThread = launchReporterThread()
    }
    
  4. 在 YarnAllocator.allocateResources 中獲取可用的 Container,如果獲取到了 Container,YarnAllocator 調(diào)用 handleAllocatedContainers 方法,做一些處理后接著調(diào)用 runAllocatedContainers 方法,在該方法中,以線程池執(zhí)行線程 ExecutorRunnable,ExecutorRunnable run 中,調(diào)用 startContainer 啟動(dòng) Container,Container 中運(yùn)行的是 CoarseGrainedExecutorBackend(可由 startContainer 調(diào)用的 prepareCommand 方法看出)。

    private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
      for (container <- containersToUse) {
        // 更新計(jì)數(shù)器
        executorIdCounter += 1
        val executorHostname = container.getNodeId.getHost
        val containerId = container.getId
        val executorId = executorIdCounter.toString
        assert(container.getResource.getMemory >= resource.getMemory)
        logInfo(s"Launching container $containerId on host $executorHostname " +
          s"for executor with ID $executorId")
    
        def updateInternalState(): Unit = synchronized {
          runningExecutors.add(executorId)
          numExecutorsStarting.decrementAndGet()
          executorIdToContainer(executorId) = container
          containerIdToExecutorId(container.getId) = executorId
          val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
            new HashSet[ContainerId])
          // 在機(jī)器與 Container 列表中加入當(dāng)前 Container 信息
          containerSet += containerId
          allocatedContainerToHostMap.put(containerId, executorHostname)
        }
    
        if (runningExecutors.size() < targetNumExecutors) {
          numExecutorsStarting.incrementAndGet()
          if (launchContainers) {
            launcherPool.execute(new Runnable {
              override def run(): Unit = {
                try {
                  // ExecutorRunnable 加入線程池中調(diào)用,ExecutorRunnable run 方法會(huì)啟動(dòng) Container 并將 CoarseGrainedExecutorBackend
                  // 傳給 Container 啟動(dòng)
                  new ExecutorRunnable(
                    Some(container),
                    conf,
                    sparkConf,
                    driverUrl,
                    executorId,
                    executorHostname,
                    executorMemory,
                    executorCores,
                    appAttemptId.getApplicationId.toString,
                    securityMgr,
                    localResources
                  ).run()
                  updateInternalState()
                } catch {
                  case e: Throwable =>
                    numExecutorsStarting.decrementAndGet()
                    if (NonFatal(e)) {
                      logError(s"Failed to launch executor $executorId on container $containerId", e)
                      // Assigned container should be released immediately
                      // to avoid unnecessary resource occupation.
                      amClient.releaseAssignedContainer(containerId)
                    } else {
                      throw e
                    }
                }
              }
            })
          } else {
            // For test only
            updateInternalState()
          }
        } else {
          logInfo(("Skip launching executorRunnable as running executors count: %d " +
            "reached target executors count: %d.").format(
            runningExecutors.size, targetNumExecutors))
        }
      }
    }
    
  5. SparkContext 收到 CoarseGrainedExecutorBackend 發(fā)出的 RegisterExecutor 注冊(cè)消息后,回復(fù) RegisteredExecutor 消息, CoarseGrainedExecutorBackend 收到消息創(chuàng)建 Executor 等待接收任務(wù)集運(yùn)行,SparkContext 分配任務(wù)集給 Executor 并跟蹤運(yùn)行狀態(tài),和 Standalone 運(yùn)行模式是一致的。

  6. 運(yùn)用程序運(yùn)行完成后,SparkContext 向 ResourceManager 申請(qǐng)注銷并關(guān)閉。

五、YARN-Client 作業(yè)運(yùn)行調(diào)用圖

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容