Spark On YARN啟動(dòng)流程源碼分析

[TOC]
本章將針對(duì)yarn-cluster(--master yarn –deploy-mode cluster)模式下全面進(jìn)行講解:

1)什么時(shí)候初始化SparkContext;
2)如何實(shí)現(xiàn)ApplicationMaster如何啟動(dòng)executor;
3)啟動(dòng)后如何通過(guò)rpc實(shí)現(xiàn)executor與driver端通信,并實(shí)現(xiàn)分配任務(wù)的功能。

Yarn-Cluster總體流程:

image.png

1)Spark Yarn Client向YARN中提交應(yīng)用程序,包括ApplicationMaster程序、啟動(dòng)ApplicationMaster的命令、需要在Executor中運(yùn)行的程序等;

2)ResourceManager收到請(qǐng)求后,在集群中選擇一個(gè)NodeManager,為該應(yīng)用程序分配第一個(gè)Container,要求它在這個(gè)Container中啟動(dòng)應(yīng)用程序的ApplicationMaster,其中ApplicationMaster進(jìn)行SparkContext等的初始化;

3)ApplicationMaster向ResourceManager注冊(cè),這樣用戶(hù)可以直接通過(guò)ResourceManage查看應(yīng)用程序的運(yùn)行狀態(tài),然后它將采用輪詢(xún)的方式通過(guò)RPC協(xié)議為各個(gè)任務(wù)申請(qǐng)資源,并監(jiān)控它們的運(yùn)行狀態(tài)直到運(yùn)行結(jié)束;

4)一旦ApplicationMaster申請(qǐng)到資源(也就是Container)后,便與對(duì)應(yīng)的NodeManager通信,要求它在獲得的Container中啟動(dòng)CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動(dòng)后會(huì)向ApplicationMaster中的SparkContext注冊(cè)并申請(qǐng)Task。這一點(diǎn)和Standalone模式一樣,只不過(guò)SparkContext在Spark Application中初始化時(shí),使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進(jìn)行任務(wù)的調(diào)度,其中YarnClusterScheduler只是對(duì)TaskSchedulerImpl的一個(gè)簡(jiǎn)單包裝,增加了對(duì)Executor的等待邏輯等;

5)ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執(zhí)行,CoarseGrainedExecutorBackend運(yùn)行Task并向ApplicationMaster匯報(bào)運(yùn)行的狀態(tài)和進(jìn)度,以讓ApplicationMaster隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí)重新啟動(dòng)任務(wù);

6)應(yīng)用程序運(yùn)行完成后,ApplicationMaster向ResourceManager申請(qǐng)注銷(xiāo)并關(guān)閉自己。

SparkSubmit類(lèi)流程:

使用spark-submit.sh提交任務(wù):

#/bin/sh
#LANG=zh_CN.utf8
#export LANG
export SPARK_KAFKA_VERSION=0.10
export LANG=zh_CN.UTF-8
jarspath=''
for file in `ls /home/dx/works/myapp001/sparks/*.jar`
do
jarspath=${file},$jarspath
done
jarspath=${jarspath%?}
echo $jarspath

spark-submit \
--jars $jarspath \
--properties-file ./conf/spark-properties-myapp001.conf \
--verbose \
--master yarn \
--deploy-mode cluster \#或者client
--name Streaming-$1-$2-$3-$4-$5-Agg-Parser \
--num-executors 16 \
--executor-memory 6G \
--executor-cores 2 \
--driver-memory 2G \
--driver-java-options "-XX:+TraceClassPaths" \
--class com.dx.myapp001.Main \
/home/dx/works/myapp001/lib/application-jar.jar $1 $2 $3 $4 $5

運(yùn)行spark-submit.sh,實(shí)際上執(zhí)行的是org.apache.spark.deploy.SparkSubmit的main:

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

1)--master yarn --deploy-mode:cluster

調(diào)用YarnClusterApplication進(jìn)行提交

YarnClusterApplication這是org.apache.spark.deploy.yarn.Client中的一個(gè)內(nèi)部類(lèi),在YarnClusterApplication中new了一個(gè)Client對(duì)象,并調(diào)用了run方法

private[spark] class YarnClusterApplication extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove("spark.jars")
    conf.remove("spark.files")

    new Client(new ClientArguments(args), conf).run()
  }
}

2)--master yarn --deploy-mode:client[可忽略]

調(diào)用application-jar.jar自身main函數(shù),執(zhí)行的是JavaMainApplication

/**
 * Implementation of SparkApplication that wraps a standard Java class with a "main" method.
 *
 * Configuration is propagated to the application via system properties, so running multiple
 * of these in the same JVM may lead to undefined behavior due to configuration leaks.
 */
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }

    mainMethod.invoke(null, args)
  }

}

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala

從JavaMainApplication實(shí)現(xiàn)可以發(fā)現(xiàn),JavaSparkApplication中調(diào)用start方法時(shí),只是通過(guò)反射執(zhí)行application-jar.jar的main函數(shù)。

YarnClusterApplication運(yùn)行流程

當(dāng)yarn-custer模式中,YarnClusterApplication類(lèi)中運(yùn)行的是Client中run方法,Client#run()中實(shí)現(xiàn)了任務(wù)提交流程:

/**
   * Submit an application to the ResourceManager.
   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
   * reporting the application's status until the application has exited for any reason.
   * Otherwise, the client process will exit after submission.
   * If the application finishes with a failed, killed, or undefined status,
   * throw an appropriate SparkException.
   */
  def run(): Unit = {
    this.appId = submitApplication()
    if (!launcherBackend.isConnected() && fireAndForget) {
      val report = getApplicationReport(appId)
      val state = report.getYarnApplicationState
      logInfo(s"Application report for $appId (state: $state)")
      logInfo(formatReportDetails(report))
      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
        throw new SparkException(s"Application $appId finished with status: $state")
      }
    } else {
      val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
      if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
        diags.foreach { err =>
          logError(s"Application diagnostics message: $err")
        }
        throw new SparkException(s"Application $appId finished with failed status")
      }
      if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
        throw new SparkException(s"Application $appId is killed")
      }
      if (finalState == FinalApplicationStatus.UNDEFINED) {
        throw new SparkException(s"The final status of application $appId is undefined")
      }
    }
  }

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

其中run的方法流程:
1) 運(yùn)行submitApplication()初始化yarn,使用yarn進(jìn)行資源管理,并運(yùn)行spark任務(wù)提交接下來(lái)的流程:分配driver container,然后在Driver Containe中啟動(dòng)ApplicaitonMaster,ApplicationMaster中初始化SparkContext。

2) 狀態(tài)成功,上報(bào)執(zhí)行進(jìn)度等信息。
3) 狀態(tài)失敗,報(bào)告執(zhí)行失敗。

其中submitApplication()的實(shí)現(xiàn)流程:

/**
   * Submit an application running our ApplicationMaster to the ResourceManager.
   *
   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
   * creating applications and setting up the application submission context. This was not
   * available in the alpha API.
   */
  def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()

      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      verifyClusterResources(newAppResponse)

      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId
    } catch {
      case e: Throwable =>
        if (appId != null) {
          cleanupStagingDir(appId)
        }
        throw e
    }
  }

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

這段代碼主要實(shí)現(xiàn)向ResourceManager申請(qǐng)資源,啟動(dòng)Container并運(yùn)行ApplicationMaster。

其中createContainerLaunchContext(newAppResponse)中對(duì)應(yīng)的啟動(dòng)主類(lèi)amClass分支邏輯如下:

val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

當(dāng)yarn-cluster模式下,會(huì)先通過(guò)Client#run()方法中調(diào)用Client#submitApplication()向Yarn的Resource Manager申請(qǐng)一個(gè)container,來(lái)啟動(dòng)ApplicationMaster。

啟動(dòng)ApplicationMaster的執(zhí)行腳本示例:

[dx@hadoop143 bin]$ps -ef|grep ApplicationMaster
# yarn賬戶(hù)在執(zhí)行
/bin/bash -c /usr/java/jdk1.8.0_171-amd64/bin/java \
-server \
-Xmx2048m \
-Djava.io.tmpdir=/mnt/data3/yarn/nm/usercache/dx/appcache/application_1554704591622_0340/container_1554704591622_0340_01_000001/tmp \
-Dspark.yarn.app.container.log.dir=/mnt/data4/yarn/container-logs/application_1554704591622_0340/container_1554704591622_0340_01_000001 \
org.apache.spark.deploy.yarn.ApplicationMaster \
--class 'com.dx.myapp001.Main' \
--jar file:/home/dx/works/myapp001/lib/application-jar.jar \
--arg '-type' \
--arg '0' \
--properties-file /mnt/data3/yarn/nm/usercache/dx/appcache/application_1554704591622_0340/container_1554704591622_0340_01_000001/__spark_conf__/__spark_conf__.properties \
1> /mnt/data4/yarn/container-logs/application_1554704591622_0340/container_1554704591622_0340_01_000001/stdout \
2> /mnt/data4/yarn/container-logs/application_1554704591622_0340/container_1554704591622_0340_01_000001/stderr

ApplicationMaster運(yùn)行流程

ApplicaitonMaster啟動(dòng)過(guò)程會(huì)通過(guò)半生類(lèi)ApplicationMaster的main作為入口,執(zhí)行:

private var master: ApplicationMaster = _

  def main(args: Array[String]): Unit = {
    SignalUtils.registerLogger(log)
    val amArgs = new ApplicationMasterArguments(args)
    master = new ApplicationMaster(amArgs)
    System.exit(master.run())
  }

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

通過(guò)ApplicationMasterArguments類(lèi)對(duì)args進(jìn)行解析,然后將解析后的amArgs作為master初始化的參數(shù),并執(zhí)行master#run()方法啟動(dòng)ApplicationMaster。

ApplicationMaster實(shí)例化

在ApplicationMaster類(lèi)實(shí)例化中,ApplicationMaster的屬性包含以下:

private val isClusterMode = args.userClass != null

  private val sparkConf = new SparkConf()
  if (args.propertiesFile != null) {
    Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
      sparkConf.set(k, v)
    }
  }

  private val securityMgr = new SecurityManager(sparkConf)

  private var metricsSystem: Option[MetricsSystem] = None

  // Set system properties for each config entry. This covers two use cases:
  // - The default configuration stored by the SparkHadoopUtil class
  // - The user application creating a new SparkConf in cluster mode
  //
  // Both cases create a new SparkConf object which reads these configs from system properties.
  sparkConf.getAll.foreach { case (k, v) =>
    sys.props(k) = v
  }

  private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

  private val userClassLoader = {
    val classpath = Client.getUserClasspath(sparkConf)
    val urls = classpath.map { entry =>
      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
    }

    if (isClusterMode) {
      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      } else {
        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      }
    } else {
      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
    }
  }

  private val client = doAsUser { new YarnRMClient() }
  private var rpcEnv: RpcEnv = null

  // In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
  private val sparkContextPromise = Promise[SparkContext]()

ApplicationMaster屬性解釋?zhuān)?/p>

  • --isClusterMode是否cluster模式,userClass有值則為true(來(lái)自ApplicationMaster參數(shù)--class 'com.dx.myapp001.Main' )
  • --sparkConf spark運(yùn)行配置,配置信息來(lái)自args.propertiesFile(來(lái)自ApplicationMaster的參數(shù)--properties-file /mnt/data3/yarn/nm/usercache/dx/appcache/application_1554704591622_0340/container_1554704591622_0340_01_000001/spark_conf/spark_conf.properties)
  • --securityMgr安全管理類(lèi),初始化時(shí)需要傳入sparkConf對(duì)象
  • --metricsSystem是測(cè)量系統(tǒng)初始化,用來(lái)記錄執(zhí)行進(jìn)度,資源等信息。
  • --client是YarnRMClient的對(duì)象,它主要用來(lái)使用YARN ResourceManager處理注冊(cè)和卸載application。https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
  • --userClassLoader用戶(hù)類(lèi)加載器,從sparkConf中獲取application的jar路徑(實(shí)際來(lái)此ApplicationMaster初始化接收參數(shù)),為后邊通過(guò)它的main運(yùn)行做輔助。
  • --userClassThread用來(lái)執(zhí)行application main的線程
  • --rpcEnvrpc通信環(huán)境對(duì)象
  • --sparkContextPromise在cluster模式,當(dāng)用戶(hù)(application)的SparkContext對(duì)象已經(jīng)被初始化用來(lái)通知ApplicationMaster

ApplicationMaster執(zhí)行Run方法

ApplicationMaster#run()->ApplicationMaster#runImpl,在ApplicationMaster#runImpl方法中包含以下比較重要分支邏輯:

 if (isClusterMode) {
        runDriver()
      } else {
        runExecutorLauncher()
      }

因?yàn)閍rgs.userClass不為null,因此isCusterMode為true,則執(zhí)行runDriver()方法。

ApplicationMaster#runDriver如下:

private def runDriver(): Unit = {
    addAmIpFilter(None)
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
        registerAM(host, port, userConf, sc.ui.map(_.webUrl))

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        createAllocator(driverRef, userConf)
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    } catch {
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
        logError(
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
           "Please check earlier log output for errors. Failing the application.")
        finish(FinalApplicationStatus.FAILED,
          ApplicationMaster.EXIT_SC_NOT_INITED,
          "Timed out waiting for SparkContext.")
    } finally {
      resumeDriver()
    }
  }

其執(zhí)行流程如下:

1) 初始化userClassThread=startUserApplication(),運(yùn)行用戶(hù)定義的代碼,通過(guò)反射運(yùn)行application_jar.jar(sparksubmit命令中--class指定的類(lèi))的main函數(shù);

2) 初始化SparkContext,通過(guò)sparkContextPromise來(lái)獲取初始化SparkContext,并設(shè)定最大等待時(shí)間。

a) 這也充分證實(shí)了driver是運(yùn)行在ApplicationMaster上(SparkContext相當(dāng)于driver);
b) 該SparkContext的真正初始化是在application_jar.jar的代碼中執(zhí)行,通過(guò)反射執(zhí)行的。

3) resumeDriver()當(dāng)初始化SparkContext完成后,恢復(fù)用戶(hù)線程。

4) userClassThread.join()阻塞方式等待反射application_jar.jar的main執(zhí)行完成。

提問(wèn):SparkContext初始化后是如何被ApplicationMaster主線程獲取到的?

在spark-submit任務(wù)提交過(guò)程中,當(dāng)采用spark-submit --master yarn --deploy-mode cluster時(shí),SparkContext(driver)初始化是在ApplicationMaster中子線程中,SparkContext初始化是運(yùn)行在該

@volatile private var userClassThread: Thread = _
// In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
private val sparkContextPromise = Promise[SparkContext]()

線程下

val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
       Duration(totalWaitTime, TimeUnit.MILLISECONDS))

sparkContextPromise是怎么拿到userClassThread(反射執(zhí)行用戶(hù)代碼線程)中的SparkContext的實(shí)例呢?

回答:

這個(gè)是在SparkContext初始化TaskScheduler時(shí),yarn-cluster模式對(duì)應(yīng)的是YarnClusterScheduler,它里邊有一個(gè)后啟動(dòng)鉤子:

/**
 * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
 * ApplicationMaster, etc is done
 */
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {

  logInfo("Created YarnClusterScheduler")

  override def postStartHook() {
    ApplicationMaster.sparkContextInitialized(sc)
    super.postStartHook()
    logInfo("YarnClusterScheduler.postStartHook done")
  }

}

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala

調(diào)用的ApplicationMaster.sparkContextInitialized()方法把SparkContext實(shí)例賦給前面的Promise對(duì)象:

private def sparkContextInitialized(sc: SparkContext) = {
   sparkContextPromise.synchronized {
       // Notify runDriver function that SparkContext is available
    sparkContextPromise.success(sc)
       // Pause the user class thread in order to make proper initialization in runDriver function.
    sparkContextPromise.wait()
    }
  }

然后userClassThread是調(diào)用startUserApplication()方法產(chǎn)生的,這之后就是列舉的那一句:

val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
       Duration(totalWaitTime, TimeUnit.MILLISECONDS))

這句就是在超時(shí)時(shí)間內(nèi)等待sparkContextPromise的Future對(duì)象返回SparkContext實(shí)例。

其實(shí)可以理解為下邊這個(gè)模擬代碼:

object App {
  def main(args: Array[String]): Unit = {
    val userClassThread = startUserApplication()
    val totalWaitTime = 15000
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        println("the sc has initialized")
        val rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    } catch {
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
        println(
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
            "Please check earlier log output for errors. Failing the application.")
    } finally {
      resumeDriver()
    }
  }

  /**
    * Start the user class, which contains the spark driver, in a separate Thread.
    * If the main routine exits cleanly or exits with System.exit(N) for any N
    * we assume it was successful, for all other cases we assume failure.
    *
    * Returns the user thread that was started.
    */
  private def startUserApplication(): Thread = {
    val userThread = new Thread {
      override def run() {
        try {
          val conf = new SparkConf().setMaster("local[*]").setAppName("appName")
          val sc = new SparkContext(conf)
          sparkContextInitialized(sc)
        } catch {
          case e: Exception =>
            sparkContextPromise.tryFailure(e.getCause())
        } finally {
          sparkContextPromise.trySuccess(null)
        }
      }
    }

    userThread.setName("Driver")
    userThread.start()
    userThread
  }

  // In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
  private val sparkContextPromise = Promise[SparkContext]()

  private def resumeDriver(): Unit = {
    // When initialization in runDriver happened the user class thread has to be resumed.
    sparkContextPromise.synchronized {
      sparkContextPromise.notify()
    }
  }

  private def sparkContextInitialized(sc: SparkContext) = {
    sparkContextPromise.synchronized {
      // Notify runDriver function that SparkContext is available
      sparkContextPromise.success(sc)
      // Pause the user class thread in order to make proper initialization in runDriver function.
      sparkContextPromise.wait()
    }
  }
}

SparkContext初始化后注冊(cè)AM,申請(qǐng)Container啟動(dòng)Executor

ApplicationMaster#runDriver中邏輯包含內(nèi)容挺多,因此單獨(dú)提到這個(gè)小節(jié)來(lái)講解。

val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
        registerAM(host, port, userConf, sc.ui.map(_.webUrl))

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        createAllocator(driverRef, userConf)
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    } catch {
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
        logError(
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
           "Please check earlier log output for errors. Failing the application.")
        finish(FinalApplicationStatus.FAILED,
          ApplicationMaster.EXIT_SC_NOT_INITED,
          "Timed out waiting for SparkContext.")
    } finally {
      resumeDriver()
    }

ü SparkContext初始化過(guò)程,通過(guò)startUserApplication()反射application_jar.jar(用來(lái)代碼)中的main初始化SparkContext。

ü 如果SparkContext初始化成功,就進(jìn)入:

i. 給rpcEnv賦值為初始化的SparkContext對(duì)象sc的env對(duì)象的rpcEnv.

     ii. 從sc獲取到userConf(SparkConf),driver host,driver port,sc.ui,并將他們作為registerAM(注冊(cè)ApplicationMaster)的參數(shù)。

    iii. 根據(jù)driver host、driver port和driver rpc server名稱(chēng)YarnSchedulerBackend.ENDPOINT_NAME獲取到driver的EndpointRef對(duì)象driverRef。

    iv. 調(diào)用createAllocator(driverRef, userConf)

     v. resumeDriver() ---SparkContext初始化線程釋放信號(hào)量(或者歸還主線程)

    vi. userClassThread.join()等待運(yùn)行application_jar.jar的程序運(yùn)行完成。

ü 如果SparkContext初始化失敗,則拋出異常throw new IllegalStateException("User did not initialize spark context!")

向RM(ResourceManager)注冊(cè)AM

初始化SparkContext成功后將返回sc(SparkContext實(shí)例對(duì)象),然后從sc中獲取到userConf(SparkConf),driver host,driver port,sc.ui,并將它們作為registerAM()方法的參數(shù)。其中registerAM()方法就是注冊(cè)AM(ApplicationMaster)。

private val client = doAsUser { new YarnRMClient() }

  private def registerAM(
      host: String,
      port: Int,
      _sparkConf: SparkConf,
      uiAddress: Option[String]): Unit = {
    val appId = client.getAttemptId().getApplicationId().toString()
    val attemptId = client.getAttemptId().getAttemptId().toString()
    val historyAddress = ApplicationMaster
      .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

    client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
    registered = true
  }

ü client是private val client = doAsUser { new YarnRMClient() }

ü 注冊(cè)ApplicationMaster需要調(diào)用client#register(..)方法,該方法需要傳入driver host、driver port、historyAddress

ü 在client#register(…)內(nèi)部是通過(guò)org.apache.hadoop.yarn.client.api.AMRMClient#registerApplicationMaster(driverHost, driverPort, trackingUrl)方法來(lái)實(shí)現(xiàn)向YARN ResourceManager注冊(cè)ApplicationMaster的。

上述代碼中client#register(…)client是YarmRMClient實(shí)例,register方式具體實(shí)現(xiàn)如下:

/**
 * Handles registering and unregistering the application with the YARN ResourceManager.
 */
private[spark] class YarnRMClient extends Logging {

  private var amClient: AMRMClient[ContainerRequest] = _
  private var uiHistoryAddress: String = _
  private var registered: Boolean = false

  /**
   * Registers the application master with the RM.
   *
   * @param driverHost Host name where driver is running.
   * @param driverPort Port where driver is listening.
   * @param conf The Yarn configuration.
   * @param sparkConf The Spark configuration.
   * @param uiAddress Address of the SparkUI.
   * @param uiHistoryAddress Address of the application on the History Server.
   */
  def register(
      driverHost: String,
      driverPort: Int,
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      uiAddress: Option[String],
      uiHistoryAddress: String): Unit = {
    amClient = AMRMClient.createAMRMClient()
    amClient.init(conf)
    amClient.start()
    this.uiHistoryAddress = uiHistoryAddress

    val trackingUrl = uiAddress.getOrElse {
      if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
    }

    logInfo("Registering the ApplicationMaster")
    synchronized {
      amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
      registered = true
    }
  }
。。。
}

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

運(yùn)行過(guò)程:
1)需要先初始化 AMRMClient[ContainerRequest] 對(duì)象amClient并調(diào)動(dòng)amClient#start()啟動(dòng);
2)同步方式執(zhí)行 amClient#registerApplicationMaster(driverHost, driverPort, trackingUrl) 方法,使用YarnRMClient對(duì)象向Yarn Resource Manager注冊(cè)ApplicationMaster;

3)注冊(cè)時(shí),會(huì)傳遞一個(gè)trackingUrl,記錄的是通過(guò)UI方式查看應(yīng)用程序的運(yùn)行狀態(tài)的地址。

備注:ApplicationMaster 向 ResourceManager 注冊(cè),這樣用戶(hù)可以直接通過(guò)ResourceManage查看應(yīng)用程序的運(yùn)行狀態(tài),然后它將采用輪詢(xún)的方式通過(guò)RPC協(xié)議為各個(gè)任務(wù)申請(qǐng)資源,并監(jiān)控它們的運(yùn)行狀態(tài)直到運(yùn)行結(jié)束。

向RM申請(qǐng)資源啟動(dòng)Container

接著向下分析ApplicationMaster#runDriver中邏輯,上邊我們看到SparkContext初始化成功后返回sc對(duì)象,并將AM注冊(cè)到RM,接下來(lái):

1)根據(jù)driver host、driver port和driver rpc server名稱(chēng)YarnSchedulerBackend.ENDPOINT_NAME獲取到driver的EndpointRef對(duì)象driverRef,方便AM與Driver通信;同時(shí)Container中Executor啟動(dòng)時(shí)也傳遞了driverRef的host、port等信息,這樣Executor就可以通過(guò)driver host,dirver port獲取到driverEndpointRef實(shí)現(xiàn):executor與driver之間的RPC通信。

2)調(diào)用createAllocator(driverRef, userConf),使用YarnRMClient對(duì)象向RM申請(qǐng)Container資源,并啟動(dòng)Executor。

在ApplicationMaster中定義了createAllocator(driverRef, userConf)方法如下

private val client = doAsUser { new YarnRMClient() }

  private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
    val appId = client.getAttemptId().getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

    // Before we initialize the allocator, let's log the information about how executors will
    // be run up front, to avoid printing this out for every single executor being launched.
    // Use placeholders for information that changes such as executor IDs.
    logInfo {
      val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
      val executorCores = _sparkConf.get(EXECUTOR_CORES)
      val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
      dummyRunner.launchContextDebugInfo()
    }

    allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

    credentialRenewer.foreach(_.setDriverRef(driverRef))

    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
    // that when the driver sends an initial executor request (e.g. after an AM restart),
    // the allocator is ready to service requests.
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

    allocator.allocateResources()
    val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
    ms.start()
    metricsSystem = Some(ms)
    reporterThread = launchReporterThread()
  }

上邊這段代碼主要通過(guò)client(YarnRMClient對(duì)象)創(chuàng)建的YarnAllocator對(duì)象allocator來(lái)進(jìn)行container申請(qǐng),通過(guò)ExecutorRunnable來(lái)啟動(dòng)executor,下邊我們看下具體執(zhí)行步驟:

1)通過(guò)yarn#createAllocator(yarnConf,_sparkConf,driverUrl,driverRef,securityMgr,localResources)創(chuàng)建allocator,該allocator是YarnAllocator的對(duì)象

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
2) 初始化AMEndpoint(它是ApplicationMaster下的一個(gè)內(nèi)部類(lèi))對(duì)象,用來(lái)實(shí)現(xiàn)與driver之間rpc通信。

其中需要注意:AMEndpoint初始化時(shí)傳入了dirverRef的,在AMEndpoint的onStart()方法中調(diào)用driver.send(RegisterClusterManager(self)),這時(shí)driver端接收該信息類(lèi)是:

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

3)調(diào)用allocator.allocateResources()其內(nèi)部實(shí)現(xiàn)是循環(huán)申請(qǐng)container,并通過(guò)ExecutorRunnable啟動(dòng)executor。

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

4)上報(bào)測(cè)量數(shù)據(jù),allocationThreadImpl()收集錯(cuò)誤信息并做出響應(yīng)。

上邊提到AMEndpoint類(lèi)(它是ApplicationMaster的一個(gè)內(nèi)部類(lèi)),下邊看下他的具體實(shí)現(xiàn):

/**
   * An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
   */
  private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
    extends RpcEndpoint with Logging {

    override def onStart(): Unit = {
      driver.send(RegisterClusterManager(self))
    }

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      case r: RequestExecutors =>
        Option(allocator) match {
          case Some(a) =>
            if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
              r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
              resetAllocatorInterval()
            }
            context.reply(true)

          case None =>
            logWarning("Container allocator is not ready to request executors yet.")
            context.reply(false)
        }

      case KillExecutors(executorIds) =>
        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
        Option(allocator) match {
          case Some(a) => executorIds.foreach(a.killExecutor)
          case None => logWarning("Container allocator is not ready to kill executors yet.")
        }
        context.reply(true)

      case GetExecutorLossReason(eid) =>
        Option(allocator) match {
          case Some(a) =>
            a.enqueueGetLossReasonRequest(eid, context)
            resetAllocatorInterval()
          case None =>
            logWarning("Container allocator is not ready to find executor loss reasons yet.")
        }
    }

    override def onDisconnected(remoteAddress: RpcAddress): Unit = {
      // In cluster mode, do not rely on the disassociated event to exit
      // This avoids potentially reporting incorrect exit codes if the driver fails
      if (!isClusterMode) {
        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
      }
    }
  }

需要來(lái)說(shuō)說(shuō)這類(lèi):

1)在onStart()時(shí),會(huì)向driver(SparkContext實(shí)例)發(fā)送一個(gè)RegisterClusterManager(self)請(qǐng)求,該用意用來(lái)告知driver,ClusterManger權(quán)限交給我,其中driver接收該AM參數(shù)代碼在YarnSchedulerBackend(該對(duì)象是SparkContext的schedulerBackend屬性)

2)在receiveAndReply()方法包含了三種處理:

2.1)RequestExecutors請(qǐng)求分配executor;

2.2)KillExecutors殺掉所有executor;

2.3)GetExecutorLossReason獲取executor丟失原因。

通過(guò)ExecutorRunable啟動(dòng)Executor

上邊講到調(diào)用ApplicationMaster.allocator.allocateResources()其內(nèi)部實(shí)現(xiàn)是循環(huán)申請(qǐng)container,并通過(guò)ExecutorRunnable啟動(dòng)executor。ExecutorRunnable是用來(lái)啟動(dòng)CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend其實(shí)是一個(gè)進(jìn)程,ExecutorRunnable包裝了CoarseGrainedExecutorBackend進(jìn)程啟動(dòng)腳本,并提供了通過(guò)nmClient(NameNode Client)啟動(dòng)Conatiner,啟動(dòng)container時(shí)附帶CoarseGrainedExecutorBackend進(jìn)程啟動(dòng)腳本。

private[yarn] class ExecutorRunnable(
    container: Option[Container],
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    masterAddress: String,
    executorId: String,
    hostname: String,
    executorMemory: Int,
    executorCores: Int,
    appId: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]) extends Logging {

  var rpc: YarnRPC = YarnRPC.create(conf)
  var nmClient: NMClient = _

  def run(): Unit = {
    logDebug("Starting Executor Container")
    nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
    startContainer()
  }

  def launchContextDebugInfo(): String = {
    val commands = prepareCommand()
    val env = prepareEnvironment()

    s"""
    |===============================================================================
    |YARN executor launch context:
    |  env:
    |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s"    $k -> $v\n" }.mkString}
    |  command:
    |    ${commands.mkString(" \\ \n      ")}
    |
    |  resources:
    |${localResources.map { case (k, v) => s"    $k -> $v\n" }.mkString}
    |===============================================================================""".stripMargin
  }

  def startContainer(): java.util.Map[String, ByteBuffer] = {
    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]
    val env = prepareEnvironment().asJava

    ctx.setLocalResources(localResources.asJava)
    ctx.setEnvironment(env)

    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setTokens(ByteBuffer.wrap(dob.getData()))

    val commands = prepareCommand()

    ctx.setCommands(commands.asJava)
    ctx.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)

    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
    }

    // Send the start request to the ContainerManager
    try {
      nmClient.startContainer(container.get, ctx)
    } catch {
      case ex: Exception =>
        throw new SparkException(s"Exception while starting container ${container.get.getId}" +
          s" on host $hostname", ex)
    }
  }

  private def prepareCommand(): List[String] = {
    // Extra options for the JVM
    val javaOpts = ListBuffer[String]()

    // Set the JVM memory
    val executorMemoryString = executorMemory + "m"
    javaOpts += "-Xmx" + executorMemoryString

    // Set extra Java options for the executor, if defined
    sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
      val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
      javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
    }

    // Set the library path through a command prefix to append to the existing value of the
    // env variable.
    val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath =>
      Client.createLibraryPathPrefix(libPath, sparkConf)
    }

    javaOpts += "-Djava.io.tmpdir=" +
      new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)

    // Certain configs need to be passed here because they are needed before the Executor
    // registers with the Scheduler and transfers the spark configs. Since the Executor backend
    // uses RPC to connect to the scheduler, the RPC settings are needed as well as the
    // authentication settings.
    sparkConf.getAll
      .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
      .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

    // Commenting it out for now - so that people can refer to the properties if required. Remove
    // it once cpuset version is pushed out.
    // The context is, default gc for server class machines end up using all cores to do gc - hence
    // if there are multiple containers in same node, spark gc effects all other containers
    // performance (which can also be other spark containers)
    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
    // of cores on a node.
    /*
        else {
          // If no java_opts specified, default to using -XX:+CMSIncrementalMode
          // It might be possible that other modes/config is being done in
          // spark.executor.extraJavaOptions, so we don't want to mess with it.
          // In our expts, using (default) throughput collector has severe perf ramifications in
          // multi-tenant machines
          // The options are based on
          // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
          // %20the%20Concurrent%20Low%20Pause%20Collector|outline
          javaOpts += "-XX:+UseConcMarkSweepGC"
          javaOpts += "-XX:+CMSIncrementalMode"
          javaOpts += "-XX:+CMSIncrementalPacing"
          javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
          javaOpts += "-XX:CMSIncrementalDutyCycle=10"
        }
    */

    // For log4j configuration to reference
    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

    val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
      val absPath =
        if (new File(uri.getPath()).isAbsolute()) {
          Client.getClusterPath(sparkConf, uri.getPath())
        } else {
          Client.buildPath(Environment.PWD.$(), uri.getPath())
        }
      Seq("--user-class-path", "file:" + absPath)
    }.toSeq

    YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++
      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId) ++
      userClassPath ++
      Seq(
        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    commands.map(s => if (s == null) "null" else s).toList
  }

  private def prepareEnvironment(): HashMap[String, String] = {
    val env = new HashMap[String, String]()
    Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))

    // lookup appropriate http scheme for container log urls
    val yarnHttpPolicy = conf.get(
      YarnConfiguration.YARN_HTTP_POLICY_KEY,
      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
    )
    val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"

    System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
      .foreach { case (k, v) => env(k) = v }

    sparkConf.getExecutorEnv.foreach { case (key, value) =>
      if (key == Environment.CLASSPATH.name()) {
        // If the key of env variable is CLASSPATH, we assume it is a path and append it.
        // This is kept for backward compatibility and consistency with hadoop
        YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
      } else {
        // For other env variables, simply overwrite the value.
        env(key) = value
      }
    }

    // Add log urls
    container.foreach { c =>
      sys.env.get("SPARK_USER").foreach { user =>
        val containerId = ConverterUtils.toString(c.getId)
        val address = c.getNodeHttpAddress
        val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"

        env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
        env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
      }
    }

    env
  }
}

ExecutorRunable該類(lèi)包含以下方法:

1)prepareEnvironment():準(zhǔn)備executor運(yùn)行環(huán)境
2)prepareCommand():生成啟動(dòng)CoarseGrainedExecutorBackend進(jìn)程啟動(dòng)腳本

3)startContainer():

 1)初始化executor運(yùn)行環(huán)境;

 2)生成啟動(dòng)CoarseGrainedExecutorBackend進(jìn)程啟動(dòng)腳本

 3)將生成啟動(dòng)CoarseGrainedExecutorBackend進(jìn)程啟動(dòng)腳本附加到container中,并調(diào)用nmClient.startContainer(container.get, ctx)實(shí)現(xiàn)container啟動(dòng),在container中運(yùn)行啟動(dòng)CoarseGrainedExecutorBackend進(jìn)程啟動(dòng)腳本來(lái)啟動(dòng)executor。

4)launchContextDebugInfo():打印測(cè)試日志

5)run(): 通過(guò)NMClient.createNMClient()初始化nmClient ,并啟動(dòng)nmClient ,并調(diào)用startContainer()。

啟動(dòng)CoarseGrainedExecutorBackend進(jìn)程腳本示例:

/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/launch_container.sh

launch_container.sh內(nèi)容

#!/bin/bash
。。。
exec /bin/bash -c "$JAVA_HOME/bin/java 
-server -Xmx6144m 
-Djava.io.tmpdir=$PWD/tmp
'-Dspark.driver.port=50365' 
'-Dspark.network.timeout=10000000' 
'-Dspark.port.maxRetries=32' 
-Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 
-XX:OnOutOfMemoryError='kill %p' 
org.apache.spark.executor.CoarseGrainedExecutorBackend
--driver-url spark://CoarseGrainedScheduler@CDH-143:50365 
--executor-id 2 
--hostname CDH-141 
--cores 2 
--app-id application_1559203334026_0010 
--user-class-path file:$PWD/__app__.jar 
--user-class-path file:$PWD/spark-sql-kafka-0-10_2.11-2.4.0.jar 
--user-class-path file:$PWD/spark-avro_2.11-3.2.0.jar  
--user-class-path file:$PWD/bijection-core_2.11-0.9.5.jar 
--user-class-path file:$PWD/bijection-avro_2.11-0.9.5.jar 
1>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stdout 
2>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stderr"
。。。。

CoarseGrainedExecutorBackend啟動(dòng)

CoarseGrainedExecutorBackend入口函數(shù)

和SparkSubmit半生對(duì)象、AppllicationMaster半生對(duì)象一樣,CoarseGrainedExecutorBackend也包含一個(gè)半生對(duì)象,同樣也包含了入口main函數(shù)。該main函數(shù)執(zhí)行:

def main(args: Array[String]) {
    var driverUrl: String = null
    var executorId: String = null
    var hostname: String = null
    var cores: Int = 0
    var appId: String = null
    var workerUrl: Option[String] = None
    val userClassPath = new mutable.ListBuffer[URL]()

    var argv = args.toList
    while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          // scalastyle:off println
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          // scalastyle:on println
          printUsageAndExit()
      }
    }

    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }

    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }

1) 解析ExecutorRunnable傳入的參數(shù);

a) var driverUrl: String = null ---driver的Rpc通信Url

b) var executorId: String = null ---executor的編號(hào)id(一般driver所在executor編號(hào)為0,其他一次加1,連續(xù)的)

c) var hostname: String = null --- executor運(yùn)行的集群節(jié)點(diǎn)的hostname

d) var cores: Int = 0 ---executor可使用vcore個(gè)數(shù)

e) var appId: String = null ---當(dāng)前應(yīng)用程序的id

f) var workerUrl: Option[String] = None ---worker UI地址

g) val userClassPath = new mutable.ListBufferURL ---用戶(hù)代碼(當(dāng)前應(yīng)用程序)main所在的包和依賴(lài)包的路徑列表。

2)將解析后的參數(shù)傳入run方法,執(zhí)行CoarseGrainedExecutorBackend半生對(duì)象的run方法。

private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
      // Debug code
      Utils.checkHost(hostname)

      // Bootstrap to fetch the driver's Spark properties.
      val executorConf = new SparkConf
      val fetcher = RpcEnv.create(
        "driverPropsFetcher",
        hostname,
        -1,
        executorConf,
        new SecurityManager(executorConf),
        clientMode = true)
      val driver = fetcher.setupEndpointRefByURI(driverUrl)
      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
      fetcher.shutdown()

      // Create SparkEnv using properties we fetched from the driver.
      val driverConf = new SparkConf()
      for ((key, value) <- props) {
        // this is required for SSL in standalone mode
        if (SparkConf.isExecutorStartupConf(key)) {
          driverConf.setIfMissing(key, value)
        } else {
          driverConf.set(key, value)
        }
      }

      cfg.hadoopDelegationCreds.foreach { tokens =>
        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
      }

      val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
    }
  }

代碼執(zhí)行邏輯:

1)通過(guò)driverUrl與driver Endpoint建立通信,向driver需求Spark應(yīng)用程序的配置信息,并來(lái)創(chuàng)建driverConf對(duì)象。RetrieveSparkAppConfig類(lèi)型請(qǐng)求被driver的schedulerBackend屬性接收,接收代碼位置:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

2)通過(guò)SparkEnv.createExecutorEnv() 方法創(chuàng)建SparkEnv對(duì)象env ,SparkEnv#createExecutorEnv 內(nèi)部會(huì)創(chuàng)建以下幾類(lèi)組件:RpcEnv,securityManager,broadcastManager,mapOutputTracker,shuffleManager,memoryManager,blockTransferService,blockManagerMaster,blockManager,metricsSystem,outputCommitCorrdinator,outputCommitCoordinatorRef等。

3)通過(guò)env.rpcEnv對(duì)象開(kāi)放RPC通信接口“Executor”,對(duì)應(yīng)RpcEndpoint類(lèi)型是CoarseGrainedExecutorBackend類(lèi)。

4)通過(guò)workerUrl開(kāi)發(fā)RPC通信接口“WorkerWatcher”,用來(lái)監(jiān)控worker運(yùn)行。WorkerWatcher的功能:連接到工作進(jìn)程并在連接斷開(kāi)時(shí)終止JVM的端點(diǎn);提供工作進(jìn)程及其關(guān)聯(lián)子進(jìn)程之間的命運(yùn)共享。

5)調(diào)用env.rpcEnv.awaitTermination()來(lái)阻塞程序,直到程序退出。

CoarseGrainedExecutorBackend類(lèi)

從該類(lèi)的定義上可以看出它是一個(gè)RpcEndpoint,因此它是實(shí)現(xiàn)RPC通信數(shù)據(jù)處理功能類(lèi)。

private[spark] class CoarseGrainedExecutorBackend(
    override val rpcEnv: RpcEnv,
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv)
  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

  private[this] val stopping = new AtomicBoolean(false)
  var executor: Executor = null
  @volatile var driver: Option[RpcEndpointRef] = None

  // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
  // to be changed so that we don't share the serializer instance across threads
  private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

  override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

  def extractLogUrls: Map[String, String] = {
    val prefix = "SPARK_LOG_URL_"
    sys.env.filterKeys(_.startsWith(prefix))
      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
  }

  override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

    case RegisterExecutorFailed(message) =>
      exitExecutor(1, "Slave registration failed: " + message)

    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskDesc)
      }

    case KillTask(taskId, _, interruptThread, reason) =>
      if (executor == null) {
        exitExecutor(1, "Received KillTask command but executor was null")
      } else {
        executor.killTask(taskId, interruptThread, reason)
      }

    case StopExecutor =>
      stopping.set(true)
      logInfo("Driver commanded a shutdown")
      // Cannot shutdown here because an ack may need to be sent back to the caller. So send
      // a message to self to actually do the shutdown.
      self.send(Shutdown)

    case Shutdown =>
      stopping.set(true)
      new Thread("CoarseGrainedExecutorBackend-stop-executor") {
        override def run(): Unit = {
          // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
          // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
          // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
          // Therefore, we put this line in a new thread.
          executor.stop()
        }
      }.start()

    case UpdateDelegationTokens(tokenBytes) =>
      logInfo(s"Received tokens of ${tokenBytes.length} bytes")
      SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
  }

  override def onDisconnected(remoteAddress: RpcAddress): Unit = {
    if (stopping.get()) {
      logInfo(s"Driver from $remoteAddress disconnected during shutdown")
    } else if (driver.exists(_.address == remoteAddress)) {
      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
        notifyDriver = false)
    } else {
      logWarning(s"An unknown ($remoteAddress) driver disconnected.")
    }
  }

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

  /**
   * This function can be overloaded by other child classes to handle
   * executor exits differently. For e.g. when an executor goes down,
   * back-end may not want to take the parent process down.
   */
  protected def exitExecutor(code: Int,
                             reason: String,
                             throwable: Throwable = null,
                             notifyDriver: Boolean = true) = {
    val message = "Executor self-exiting due to : " + reason
    if (throwable != null) {
      logError(message, throwable)
    } else {
      logError(message)
    }

    if (notifyDriver && driver.nonEmpty) {
      driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
    }

    System.exit(code)
  }
}

包含的屬性包含4個(gè):

ü stopping ---標(biāo)記executor運(yùn)行狀態(tài)

ü executor ---存儲(chǔ)當(dāng)前CoarseGrainedExecutorBackend進(jìn)程中存儲(chǔ)的Executor對(duì)象。

ü driver---存儲(chǔ)與driver交互使用的RpcEndpointRef對(duì)象

ü ser---當(dāng)前序列化使用的序列化工具

包含的方法解釋?zhuān)?/p>

ü onStart():重寫(xiě)RpcEndpoint的onStart()方法,在該方法rpcEnv.asyncSetupEndpointRefByURI(driverUrl)根據(jù)driverUrl異步的方式獲取driverEndpointRef并賦值給drvier屬性,并發(fā)送RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)到driver(schedulerBackend)。

在當(dāng)前提交模式(yarn-cluster)下,實(shí)際driver處理該信息的類(lèi)是CoarseGrainedSchedulerBackend,driver接收到該信息后會(huì)調(diào)用CoarseGrainedSchedulerBackend#driverEndpoint#receiveAndReply(context: RpcCallContext)做出響應(yīng),receiveAndReply方法內(nèi)部拿到了executorRef,并使用它發(fā)送信息executorRef.send(RegisteredExecutor)給executor(CoarseGrainedExecutorBackend的receive方法將接收到并處理)

ü receive():重寫(xiě)RpcEndpoint的onStart()方法,接收以下消息并處理:

n RegisteredExecutor 接收到driver端已經(jīng)注冊(cè)了executor(注冊(cè)時(shí)driver保留executorId,executorAddress等信息),此時(shí)才在executor端調(diào)用executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)進(jìn)行executor啟動(dòng),executor主要負(fù)責(zé)執(zhí)行task,上報(bào)task執(zhí)行狀態(tài),進(jìn)度,資源占用情況等。

n RegisterExecutorFailed(message) 注冊(cè)executor失敗

n LaunchTask(data) 加載任務(wù),通過(guò)executor去執(zhí)行 executor.launchTask(this, taskDesc)

n KillTask(taskId, _, interruptThread, reason) 殺掉task任務(wù)

n StopExecutor 停止executor

n Shutdown 關(guān)閉executor

n UpdateDelegationTokens(tokenBytes) 更新代理token

上邊這些參數(shù)類(lèi)型定義在CoarseGrainedClusterMessages中,這些接收到的消息發(fā)送者是driver端SparkContext下的schedulerBackend(CoarseGrainedSchedulerBackend)。

ü onDisconnected(remoteAddress: RpcAddress) :重寫(xiě)RpcEndpoint的onDisconnected()方法

ü statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer):重寫(xiě)RpcEndpoint的statusUpdate()方法

ü exitExecutor(code: Int, reason: String, throwable: Throwable = null, notifyDriver: Boolean = true)

driver端RpcEndpoint初始化過(guò)程

CoarseGrainedExecutorBackend在它重寫(xiě)RpcEndpoint的onStart()方法中,通過(guò)driverUrl獲取到了driver的RpcEndpointRef,并給driver發(fā)送了請(qǐng)求:

ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))

實(shí)際上這個(gè)接收對(duì)象是CoarseGrainedSchedulerBackend,對(duì)應(yīng)的發(fā)送類(lèi)型定義在CoarseGrainedClusterMessages中。

下面看下CoarseGrainedExecutorBackend引用的這個(gè)driver端schedulerBackend(CoarseGrainedSchedulerBackend)初始化過(guò)程具體過(guò)程。

初始化schedulerBackend和taskScheduler

在SparkContext初始化過(guò)程中,會(huì)初始化schedulerBackend和taskScheduler

private var _schedulerBackend: SchedulerBackend = _
  private var _taskScheduler: TaskScheduler = _

  private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend

  private[spark] def taskScheduler: TaskScheduler = _taskScheduler
  private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
    _taskScheduler = ts
  }

  // 構(gòu)造函數(shù)中初始化賦值
  // Create and start the scheduler
  val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  _schedulerBackend = sched
  _taskScheduler = ts
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容