Spark1.3.1源碼分析 Spark job 提交流程

spark提交

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

例如WordCount代碼

import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by lancerlin on 2018/2/2. 
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop");

    //非常重要,是通向Spark集群的入口
    val conf = new SparkConf().setAppName("WC")
    val sc = new SparkContext(conf)

    sc.textFile(args(0)) 
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .saveAsTextFile(args(1))
    sc.stop()
  }
}

提交腳本

./bin/spark-submit \
  --class WordCount  \
  --master master01:7070 \
  --deploy-mode client \
  --executor-memory 8G \ 
  -- total-executor-cores 3  \
  wordcount.jar \
  hdfs://wordcount.txt

所以,想要分析程序提交的流程,必須從spark-submit腳本開始分析,--deploy-mode有兩種模式,clientcluster,client是client 跟driver都在客戶端啟動,同一個jvm中,一種是client跟driver分開啟動的方式,為什么叫client和driver,在源碼中我們會找到答案。

spark-submit

image.png

查看分析org.apache.spark.deploy.SparkSubmit
image.png

image.png

image.png

image.png

  • 查看源碼,我們可以知道,SparkSubmit類中的main方法中,調(diào)用Submit方法,submit方法調(diào)用doRunMain,doRunMain方法通過反射,實例化主程序,這里就是WordCount,在調(diào)用主程序的main方法,所以Submit的工作就完成了,接下來分析我們的主程序代碼。
  • 總體流程如下圖所示


    image.png

主程序

WordCount 代碼

import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by lancerlin on 2018/2/2. 
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop");

    //非常重要,是通向Spark集群的入口
    val conf = new SparkConf()
    val sc = new SparkContext(conf)

    sc.textFile(args(0)) 
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .saveAsTextFile(args(1))
    sc.stop()
  }
}

SparkContext

sparkContext是spark程序的入口,通過sc來連接集群

sparkEnv

272行代碼,創(chuàng)建了sparkDriverEnv,sparkEnv里面,翻譯文檔就是
保存正在運行的Spark實例(master或worker)的所有運行時環(huán)境對象, 包括序列化器,Akka ActorSystem,塊管理器,地圖輸出追蹤器等
這里最重要的是,前面我們分析master worker的時候,看到的ActorSystem

image.png

image.png

image.png

SparkContext.DAGScheduler

image.png

主要作用
主要就是計算生成stages,并跟蹤RDD跟stage的輸入輸出,將stage的tasks封裝成一個taskSet并提交,每個task的失敗重試,推測,都由DAGScheduler處理


image.png

DAGSchedulerEventProcessLoop

DAGScheduler里有個很重要的成員變量DAGSchedulerEventProcessLoop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
可以學習下這個類的編程模式,設計模式中,這個叫做模板方法模式
父類EventLoop定義好了編程模型,子類DAGSchedulerEventProcessLoop重寫recieve方法

image.png

schedulerBackend, taskScheduler

創(chuàng)建了兩個非常重要的成員變量,schedulerBackend, taskScheduler,然后我們查看SparkContext.createTaskScheduler(this, master),這個方法的實現(xiàn)

image.png

根據(jù)傳入的master信息,來決定調(diào)度器的實現(xiàn),我們查看spark的模式實現(xiàn)

  • 首先創(chuàng)建TaskSchedulerImpl(sc),這是task任務的實現(xiàn)類
  • 然后創(chuàng)建SparkDeploySchedulerBackend,接著調(diào)用scheduler.initialize(backend)

image.png

image.png

scheduler.initialize(backend)方法中,創(chuàng)建了一個調(diào)度器,默認是FIFO調(diào)度器
image.png

image.png

381行代碼中,將剛才創(chuàng)建的taskScheduler啟動了起來,backend是實現(xiàn)類是SparkDeploySchedulerBackend,查看SparkDeploySchedulerBackendstart方法
image.png

image.png

SparkDeploySchedulerBackend繼承了CoarseGrainedSchedulerBackend,spark-on-yarn的時候,backend的實現(xiàn)類就是這個CoarseGrainedSchedulerBackend

image.png

CoarseGrainedSchedulerBackendstart方法中,創(chuàng)建了一個driverActor,通過查看DriverActor類,結合查看master worker的源碼,我們知道,DriverActor會走生命周期方法,然而,CoarseGrainedSchedulerBackend只是創(chuàng)建了DriverActor,并沒有啟動,分析DriverActor,我們可以知道,他是發(fā)送task到executor上執(zhí)行的,所以等待分配好資源后,才啟動
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive
image.png

image.png

image.png

image.png

image.png

看回SparkDeploySchedulerBackend的start方法

image.png

重要的代碼,查看AppClient

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

start方法中通過actorSystem創(chuàng)建了一個ClientActor,執(zhí)行preStart(),receiveWithLogging()生命周期方法

image.png

image.png

image.png

image.png

master接收到RegisterApplication(appDescription),保存app信息,告訴client注冊完畢,RegisteredApplication(app.id, masterUrl),然后調(diào)用schedule(),這個方法我們在分析master、worker的時候已經(jīng)看到過,后面我們會重點分析,主要就是mater指揮worker啟動executor

image.png

Master:schedule

image.png

這里是啟動executor的兩種方式,一種是盡量打散,默認的方式,一種是盡量集中,通過
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)配置
image.png

盡量打散

分析源碼我們可以看到盡量打散的源碼分析, 比如此時需要--executor-memory 4G --total-executor-cores 8

  • 首先判斷內(nèi)存大于app資源,這里是4G的worker并且該worker沒有該app的executor,源碼里說的是,standalone的情況下,不允許app的兩個executor在同一個worker上


    image.png
  • 符合條件的worker按照剩余內(nèi)存降序,判斷cpu cores,如果app需要的cores > 符合條件workers的cores的總數(shù),則取小的,比如app需要10cores,而符合條件的worker一共只有9cores,那么就是用9cores

  • 分配好每個worker的cores和memory后,launchExecutor(usableWorkers(pos), exec)
    比如現(xiàn)在有3個Worker來執(zhí)行WordCount,程序,按照盡量打散的邏輯,分配前后的executor如下圖所示

    image.png

image.png
集中
  • 找到內(nèi)存資源符合的worker,計算該worker的cores是否大于app需要的cores,取兩者的最小值
  • 啟動ExecutorlaunchExecutor(worker, exec)
    image.png
launchExecutor

接下來重點分析如何啟動Executor,分析到這里,我們知道,SparkDeploySchedulerBackend start()方法中,將將app信息封裝成一個appDesc,然后通過ClientActor,將appDesc封裝成case class發(fā)送給Master,master接收到后,通過集中或者打散的規(guī)則給Worker分配需要啟動的Executor資源,調(diào)用launchExecutor,重點分析launchExecutor

image.png

image.png

master發(fā)送信息給Worker,LaunchExecutor,然后又給driver發(fā)送ExecutorAdded,需要分別查看Worker跟DirverActor

image.png

Worker接收到LaunchExecutor,首先創(chuàng)建executor工作目錄,然后啟動一個ExecutorRunner.start,start方法中創(chuàng)建了一個線程workerThread去啟動executor,因為啟動executor進程可能會消耗很多時間,需要異步處理,所以開啟線程去啟動。fetchAndRunExecutor是啟動executor的方法。通過fetchAndRunExecutor啟動的類是clientActor指定的,org.apache.spark.executor.CoarseGrainedExecutorBackend,所以此時,需要到org.apache.spark.executor.CoarseGrainedExecutorBackend類中查看main方法,啟動Executor后,worker會相應的減少cores和memory。

image.png

image.png

image.png

org.apache.spark.executor.CoarseGrainedExecutorBackend

Executor也是一個Actor,會走跟master、worker一樣的Actor生命周期方法。executor啟動包含dirverUrl,appId,cores,worker等信息
private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends Actor with ActorLogReceive with ExecutorBackend with Logging

image.png

run方法中,啟動了兩個actorCoarseGrainedExecutorBackend, WorkerWatcher
image.png

org.apache.spark.executor.CoarseGrainedExecutorBackend.preStart & receiveWithLogging

向dirver發(fā)送信息注冊executor,dirver回復注冊executor成功,executor接收driver注冊成功后,創(chuàng)建一個Executor對象

image.png

image.png

image.png

image.png

Executor

構造了一個線城池threadPoollaunchTask方法中,將接收到的task封裝成一個TaskRunner,然后將他提交給threadPool執(zhí)行

image.png

image.png

創(chuàng)建executorActor與driver交互


image.png

startDriverHeartbeater,查看代碼,這是executor與driver保持心跳的,默認是val interval = conf.getInt("spark.executor.heartbeatInterval", 10000),10秒鐘發(fā)送一次,但是在driverActor中,沒有找到跟他交互的代碼

image.png

image.png

CoarseGrainedExecutorBackend中還啟動了個WorkerWatcher,負責跟worker保持心跳

image.png

image.png

總結

說些了那么多,最終我們可以得到如下圖所示的關系,一切都準備好了,此時,就等待driver task的發(fā)送,
executor接收task,開始執(zhí)行task


image.png
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容