spark提交
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
例如WordCount代碼
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by lancerlin on 2018/2/2.
*/
object WordCount {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hadoop");
//非常重要,是通向Spark集群的入口
val conf = new SparkConf().setAppName("WC")
val sc = new SparkContext(conf)
sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(1))
sc.stop()
}
}
提交腳本
./bin/spark-submit \
--class WordCount \
--master master01:7070 \
--deploy-mode client \
--executor-memory 8G \
-- total-executor-cores 3 \
wordcount.jar \
hdfs://wordcount.txt
所以,想要分析程序提交的流程,必須從spark-submit腳本開始分析,--deploy-mode有兩種模式,client和cluster,client是client 跟driver都在客戶端啟動,同一個jvm中,一種是client跟driver分開啟動的方式,為什么叫client和driver,在源碼中我們會找到答案。
spark-submit

查看分析
org.apache.spark.deploy.SparkSubmit類



- 查看源碼,我們可以知道,
SparkSubmit類中的main方法中,調(diào)用Submit方法,submit方法調(diào)用doRunMain,doRunMain方法通過反射,實例化主程序,這里就是WordCount,在調(diào)用主程序的main方法,所以Submit的工作就完成了,接下來分析我們的主程序代碼。 -
總體流程如下圖所示
image.png
主程序
WordCount 代碼
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by lancerlin on 2018/2/2.
*/
object WordCount {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hadoop");
//非常重要,是通向Spark集群的入口
val conf = new SparkConf()
val sc = new SparkContext(conf)
sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(1))
sc.stop()
}
}
SparkContext
sparkContext是spark程序的入口,通過sc來連接集群
sparkEnv
272行代碼,創(chuàng)建了sparkDriverEnv,sparkEnv里面,翻譯文檔就是
保存正在運行的Spark實例(master或worker)的所有運行時環(huán)境對象, 包括序列化器,Akka ActorSystem,塊管理器,地圖輸出追蹤器等
這里最重要的是,前面我們分析master worker的時候,看到的ActorSystem



SparkContext.DAGScheduler

主要作用
主要就是計算生成stages,并跟蹤RDD跟stage的輸入輸出,將stage的tasks封裝成一個taskSet并提交,每個task的失敗重試,推測,都由DAGScheduler處理

DAGSchedulerEventProcessLoop
DAGScheduler里有個很重要的成員變量DAGSchedulerEventProcessLoop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
可以學習下這個類的編程模式,設計模式中,這個叫做模板方法模式
父類EventLoop定義好了編程模型,子類DAGSchedulerEventProcessLoop重寫recieve方法

schedulerBackend, taskScheduler
創(chuàng)建了兩個非常重要的成員變量,schedulerBackend, taskScheduler,然后我們查看SparkContext.createTaskScheduler(this, master),這個方法的實現(xiàn)

根據(jù)傳入的master信息,來決定調(diào)度器的實現(xiàn),我們查看spark的模式實現(xiàn)
- 首先創(chuàng)建
TaskSchedulerImpl(sc),這是task任務的實現(xiàn)類 - 然后創(chuàng)建
SparkDeploySchedulerBackend,接著調(diào)用scheduler.initialize(backend)


scheduler.initialize(backend)方法中,創(chuàng)建了一個調(diào)度器,默認是FIFO調(diào)度器

381行代碼中,將剛才創(chuàng)建的
taskScheduler啟動了起來,backend是實現(xiàn)類是SparkDeploySchedulerBackend,查看SparkDeploySchedulerBackend的start方法

SparkDeploySchedulerBackend繼承了CoarseGrainedSchedulerBackend,spark-on-yarn的時候,backend的實現(xiàn)類就是這個CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend的start方法中,創(chuàng)建了一個driverActor,通過查看DriverActor類,結合查看master worker的源碼,我們知道,DriverActor會走生命周期方法,然而,CoarseGrainedSchedulerBackend只是創(chuàng)建了DriverActor,并沒有啟動,分析DriverActor,我們可以知道,他是發(fā)送task到executor上執(zhí)行的,所以等待分配好資源后,才啟動class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive




看回SparkDeploySchedulerBackend的start方法

重要的代碼,查看
AppClient
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
start方法中通過actorSystem創(chuàng)建了一個ClientActor,執(zhí)行preStart(),receiveWithLogging()生命周期方法




master接收到RegisterApplication(appDescription),保存app信息,告訴client注冊完畢,RegisteredApplication(app.id, masterUrl),然后調(diào)用schedule(),這個方法我們在分析master、worker的時候已經(jīng)看到過,后面我們會重點分析,主要就是mater指揮worker啟動executor

Master:schedule

這里是啟動executor的兩種方式,一種是盡量打散,默認的方式,一種是盡量集中,通過
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)配置
盡量打散
分析源碼我們可以看到盡量打散的源碼分析, 比如此時需要--executor-memory 4G --total-executor-cores 8
-
首先判斷內(nèi)存大于app資源,這里是4G的worker并且該worker沒有該app的executor,源碼里說的是,standalone的情況下,不允許app的兩個executor在同一個worker上
image.png 符合條件的worker按照剩余內(nèi)存降序,判斷cpu cores,如果app需要的cores > 符合條件workers的cores的總數(shù),則取小的,比如app需要10cores,而符合條件的worker一共只有9cores,那么就是用9cores
-
分配好每個worker的cores和memory后,
launchExecutor(usableWorkers(pos), exec)
比如現(xiàn)在有3個Worker來執(zhí)行WordCount,程序,按照盡量打散的邏輯,分配前后的executor如下圖所示
image.png

集中
- 找到內(nèi)存資源符合的worker,計算該worker的cores是否大于app需要的cores,取兩者的最小值
- 啟動Executor
launchExecutor(worker, exec)
image.png
launchExecutor
接下來重點分析如何啟動Executor,分析到這里,我們知道,SparkDeploySchedulerBackend start()方法中,將將app信息封裝成一個appDesc,然后通過ClientActor,將appDesc封裝成case class發(fā)送給Master,master接收到后,通過集中或者打散的規(guī)則給Worker分配需要啟動的Executor資源,調(diào)用launchExecutor,重點分析launchExecutor


master發(fā)送信息給Worker,LaunchExecutor,然后又給driver發(fā)送ExecutorAdded,需要分別查看Worker跟DirverActor

Worker接收到LaunchExecutor,首先創(chuàng)建executor工作目錄,然后啟動一個ExecutorRunner.start,start方法中創(chuàng)建了一個線程workerThread去啟動executor,因為啟動executor進程可能會消耗很多時間,需要異步處理,所以開啟線程去啟動。fetchAndRunExecutor是啟動executor的方法。通過fetchAndRunExecutor啟動的類是clientActor指定的,org.apache.spark.executor.CoarseGrainedExecutorBackend,所以此時,需要到org.apache.spark.executor.CoarseGrainedExecutorBackend類中查看main方法,啟動Executor后,worker會相應的減少cores和memory。



org.apache.spark.executor.CoarseGrainedExecutorBackend
Executor也是一個Actor,會走跟master、worker一樣的Actor生命周期方法。executor啟動包含dirverUrl,appId,cores,worker等信息
private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends Actor with ActorLogReceive with ExecutorBackend with Logging

run方法中,啟動了兩個actor
CoarseGrainedExecutorBackend, WorkerWatcher
org.apache.spark.executor.CoarseGrainedExecutorBackend.preStart & receiveWithLogging
向dirver發(fā)送信息注冊executor,dirver回復注冊executor成功,executor接收driver注冊成功后,創(chuàng)建一個Executor對象




Executor
構造了一個線城池threadPool,launchTask方法中,將接收到的task封裝成一個TaskRunner,然后將他提交給threadPool執(zhí)行


創(chuàng)建executorActor與driver交互

startDriverHeartbeater,查看代碼,這是executor與driver保持心跳的,默認是val interval = conf.getInt("spark.executor.heartbeatInterval", 10000),10秒鐘發(fā)送一次,但是在driverActor中,沒有找到跟他交互的代碼


CoarseGrainedExecutorBackend中還啟動了個WorkerWatcher,負責跟worker保持心跳


總結
說些了那么多,最終我們可以得到如下圖所示的關系,一切都準備好了,此時,就等待driver task的發(fā)送,
executor接收task,開始執(zhí)行task




