SparkContext內(nèi)部執(zhí)行的時(shí)序圖
對(duì)于這個(gè)時(shí)序圖的具體描述如下:
1.SparkSubmit在main()方法中執(zhí)行,然后根據(jù)提交的類(lèi)型調(diào)用相應(yīng)的方法,這里是"Submit",調(diào)用submit()方法,submit()里面進(jìn)行一些判斷后,
使用反射Class.forName(childMainClass, true, loader),然后調(diào)用invoke()方法來(lái)調(diào)用程序員自己寫(xiě)的類(lèi),我們這里是WordCount。
2.在WordCount類(lèi)中,main()方法里有調(diào)用SparkContext,SparkContext構(gòu)造器使用createSparkEnv()方法,
這個(gè)方法使用SparkEnv.createDriverEnv(conf, isLocal, listenerBus)方法創(chuàng)建SparkEnv對(duì)象;
在SparkEnv類(lèi),調(diào)用create()方法來(lái)進(jìn)行創(chuàng)建SparkEnv,在這個(gè)方法內(nèi)部,有一個(gè)
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)的調(diào)用過(guò)程,
主要用來(lái)產(chǎn)生Akka中的ActorSystem以及得到綁定的端口號(hào)。
3.在創(chuàng)建SparkEnv對(duì)象后,SparkContext構(gòu)造器使用代碼SparkContext.createTaskScheduler(this, master)創(chuàng)建TaskScheduler對(duì)象,
這里根據(jù)實(shí)際的提交模式來(lái)進(jìn)行創(chuàng)建TaskScheduler對(duì)象,提交模式有:local、Mesos、Zookeeper、Simr、Spark,
這里模們主要分析Spark集群下的模式;然后還需要?jiǎng)?chuàng)建一個(gè)SparkDeploySchedulerBackend對(duì)象;
在創(chuàng)建TaskScheduler對(duì)象調(diào)用initialize()方法,這里選擇調(diào)度模式,主要有兩種模式,F(xiàn)IFO和FAIR,默認(rèn)的調(diào)度模式;
最后調(diào)用taskScheduler的start()方法,里面主要調(diào)用SparkDeploySchedulerBackend對(duì)象的start()方法,
首先調(diào)用父類(lèi)的start()方法產(chǎn)生一個(gè)用于和Executor通信的DriverActor對(duì)象,然后里面主要?jiǎng)?chuàng)建一個(gè)AppClient對(duì)象內(nèi)部有ClientActor類(lèi)對(duì)象,
用于Driver和Master進(jìn)行RPC通信。
SparkContext源碼分析流程
1.SparkSubmit半生對(duì)象的源碼
1.1SparkSubmit的main()函數(shù)在SparkSubmit半生對(duì)象的104行左右,這個(gè)是程序的主要入口:
接下來(lái)主要進(jìn)入submit()方法,下面是submit()方法
1.2SparkSubmit的submit()方法,代碼大約在142行左右, 這個(gè)方法的主要作用是根據(jù)不同的模式使用runMain()方法:
1.3SparkSubmit的runMain()方法,代碼大約在505行左右,這個(gè)方法主要的主要作用是通過(guò)反射獲取自定義類(lèi),這里我們主要的是WordCount,然后通過(guò)invoke方法調(diào)用main?這里是方法的重要代碼:
調(diào)用WordCount的main()方法后,接下來(lái)就要看SparkContext的內(nèi)部了。
2.SparkContext內(nèi)部源碼分析
很重要:SparkContext是Spark提交任務(wù)到集群的入口
我們看一下SparkContext的主構(gòu)造器
1.調(diào)用createSparkEnv方法創(chuàng)建SparkEnv,里面有一個(gè)非常重要的對(duì)象ActorSystem
2.創(chuàng)建TaskScheduler -> 根據(jù)提交任務(wù)的URL進(jìn)行匹配 -> TaskSchedulerImpl -> SparkDeploySchedulerBackend(里面有兩個(gè)Actor)
3.創(chuàng)建DAGScheduler
2.1創(chuàng)建SparkEnv獲取ActorSystem,代碼大約在275行左右,這一步的主要的作用是創(chuàng)建ActorSystem對(duì)象以后根據(jù)這個(gè)對(duì)象來(lái)創(chuàng)建相應(yīng)的Actor
主要調(diào)用SparkEnv類(lèi)的createDriverEnv()方法獲取SparkEnv對(duì)象,createDriverEnv()主要調(diào)用SparkEnv的create()方法,這里代碼大約
在SparkEnv的154行,代碼具體如下:
createDriverEnv()內(nèi)部主要調(diào)用create()方法,代碼大約在202行,重要的代碼如下:
這個(gè)方法的主要作用是調(diào)用AkkaUtils這個(gè)工具類(lèi)創(chuàng)建ActorSystem。
2.2創(chuàng)建TaskScheduler,代碼大約在374行,重要的代碼如下:
這里調(diào)用createTaskScheduler()方法,這個(gè)類(lèi)主要的作用是根據(jù)提交的類(lèi)型創(chuàng)建相應(yīng)的TaskScheduler(),這里主要分析Spark集群下,主要的代碼如下:
這里進(jìn)行模式匹配,以上代碼大約在SparkContext的2159行,主要的作用是創(chuàng)建TaskSchedulerImpl對(duì)象,然后初始化調(diào)度器這里,需要看的是initialize(),主要的實(shí)現(xiàn)是
TaskSchedulerImpl類(lèi),這里我們將會(huì)深入TaskSchedulerImpl類(lèi)的initialize()方法,下面是該方法的實(shí)現(xiàn):
主要用于調(diào)度的模式,調(diào)度模式主要分為FIFO和FAIR。在進(jìn)行創(chuàng)建了TaskScheduler對(duì)象后,我們?cè)賮?lái)看一下主要的代碼:
上述代碼中,這里主要用于創(chuàng)建一個(gè)HeartbeatReceiver對(duì)象來(lái)進(jìn)行心跳,用于Executors和DriverActor的心跳。
然后創(chuàng)建DAGScheduler對(duì)象,這個(gè)對(duì)象的主要作用是用來(lái)劃分Stage。
2.3TaskScheduler進(jìn)行啟動(dòng),代碼大約在395行,重要的代碼如下:
由于這里是TaskScheduler的主要的實(shí)現(xiàn)類(lèi)是TaskScheduler是TaskSchedulerImpl類(lèi),我們要進(jìn)入的源碼是:
主要調(diào)用了SparkDeploySchedulerBackend的start()方法,接下來(lái)我們需要看SparkDeploySchedulerBackend內(nèi)部實(shí)現(xiàn)。
以下是SparkDeploySchedulerBackend的構(gòu)造器函數(shù),這個(gè)代碼大約在SparkDeploySchedulerBackend的45行重要的代碼如下:
從上面的代碼可以看出首先調(diào)用父類(lèi)(CoarseGrainedSchedulerBackend)的start()方法,然后對(duì)于一些重要的參數(shù)進(jìn)行封裝,這里最重要的參數(shù)是
CoarseGrainedExecutorBackend類(lèi),還有一些driverUrl和WORKER_URL等參數(shù)的封裝,將CoarseGrainedExecutorBackend
封裝成Command,這是一個(gè)樣例類(lèi),不知道樣例類(lèi)請(qǐng)點(diǎn)擊這里,將這個(gè)參數(shù)封裝成為一個(gè)
ApplicationDescription對(duì)象,創(chuàng)建一個(gè)AppClient對(duì)象,這個(gè)對(duì)象主要用于Driver和Master之間的通信,以下我們分析start()方法后再分析client.start()。
可以從上面的代碼看出,?這里主要?jiǎng)?chuàng)建一個(gè)DriverActor,這個(gè)Actor的主要的作用是Driver進(jìn)程和Executor進(jìn)程進(jìn)行RPC通信
在分析完以上的CoarseGrainedSchedulerBackend的start()方法后,這里主要進(jìn)行的源碼分析是client.start()方法這里創(chuàng)建一個(gè)ClientActor,準(zhǔn)確來(lái)說(shuō)是這個(gè)
ClientActor來(lái)和Master通信。
現(xiàn)在,這里就調(diào)用ClientActor的生命周期方法,對(duì)于Akka通信不了解的,請(qǐng)點(diǎn)擊這里進(jìn)行了解Actor的生命周期方法。
Akka的Actor的生命周期方法主要從preStart()方法開(kāi)始,這行代碼大約在67行左右,主要的內(nèi)容如下:
在preStart()方法中主要做的事情是向Master注冊(cè),registerWithMaster()的主要內(nèi)容是:
這個(gè)方法主要是向活著的Master進(jìn)行注冊(cè),將以前所有的參數(shù)封裝的appDescription發(fā)送給Master,然后啟動(dòng)定時(shí)調(diào)度器進(jìn)行和Master的注冊(cè),因?yàn)橛锌赡苓M(jìn)行多次通信。
Master收到通過(guò)樣例類(lèi)的模式匹配,對(duì)于Driver向Master注冊(cè)Application,主要的作用是持久化Driver傳遞的參數(shù)和進(jìn)行資源調(diào)度.
主要的代碼大約在Master類(lèi)的315行,主要的代碼如下:
這段代碼的意義是:持久化信息,告知ClientActor發(fā)送注冊(cè)成功的信息,然后適使用schedule()進(jìn)行資源的調(diào)度。
對(duì)于schedule()方法,代碼大約在533行,這里的主要作用是進(jìn)行資源調(diào)度,主要的是兩種資源調(diào)度的方法,一種是盡量打散的分配資源,還有一種是盡量集中。
對(duì)于盡量打散的方式:將每個(gè)app分配到盡可能多的worker中執(zhí)行
App調(diào)度時(shí)會(huì)為app分配滿足條件的資源-----Worker(State是Alive,其上并沒(méi)有該Application的executor,可用內(nèi)存滿足要求(spark.executor.memory指定,默認(rèn)512),
核滿足要求(spark.cores.max, 最大可用core數(shù),若未指定,則為全部資源)),然后通知Woker啟動(dòng)Excutor. 及向AppClient發(fā)送ExecutorAdded消息。
進(jìn)行調(diào)度時(shí),調(diào)度程序會(huì)根據(jù)配制SpreadOutApps = spark.deploy.spreadOut情況決定資源分配方式。
1 從列表中取下一app,根據(jù)CPU情況找出合適的woker,按核從小到大排序
2 如果worker節(jié)點(diǎn)存在可以分配的core 則進(jìn)行預(yù)分配處理(輪循一次分一個(gè)直至滿足app需求),并在分配列表(assigned = ArrayInt)中計(jì)數(shù)。
3根據(jù)assinged列表中的預(yù)分配信息,進(jìn)行分配Executor(真實(shí)分配)
4 啟動(dòng)Executor并設(shè)置app.state = ApplicationState.RUNNING
盡情集中的方式: 將每個(gè)app分配到盡可能少的worker中執(zhí)行。?1 從可用的worker列表中取下一work. (worker <- workers if worker.coresFree > 0)
2 遍歷waitingApps 找到滿足app運(yùn)行條件的app,進(jìn)行分配
3啟動(dòng)Executor(launchExecutor(w,e))并設(shè)置app.state = ApplicationState.RUNNING
其中:launchExcutor(worker, exec) 具體內(nèi)容如下:
向executor分配給worker
通知worker啟動(dòng)executor
由分配過(guò)程可知, 分配的Excutor個(gè)數(shù)與CPU核心數(shù)有關(guān)。當(dāng)指定完Worker節(jié)點(diǎn)后,會(huì)在Worker節(jié)點(diǎn)創(chuàng)建ExecutorRunner,并啟動(dòng),執(zhí)行App中的Command
去創(chuàng)建并啟動(dòng)CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend啟動(dòng)后,會(huì)首先通過(guò)傳入的driverUrl這個(gè)參數(shù)向
在CoarseGrainedSchedulerBackend::DriverActor(用于與Master通信,及調(diào)度任務(wù))發(fā)送RegisterExecutor(executorId, hostPort, cores),
DriverActor會(huì)創(chuàng)建executorData(executor信息)加入executorDataMap供后續(xù)task使用,并回復(fù)RegisteredExecutor,
此時(shí)CoarseGrainedExecutorBackend會(huì)創(chuàng)建一個(gè)org.apache.spark.executor.Executor。至此,Executor創(chuàng)建完畢。
Executor是直接用于task執(zhí)行, 是集群中的直接勞動(dòng)者。至此,資源分配結(jié)束。
百度腦圖關(guān)于作業(yè)提交以及SparkContext的示意圖
注意:這里的SparkContext和Master是兩個(gè)獨(dú)立的類(lèi),由于Baidu腦圖不能獨(dú)立劃分,所以看起來(lái)像父子類(lèi)關(guān)系。
總結(jié)
在SparkContext(這里基于Spark的版本是1.3.1)主要做的工作是:
1.創(chuàng)建SparkEnv,里面有一個(gè)很重要的對(duì)象ActorSystem
2.創(chuàng)建TaskScheduler,這里是根據(jù)提交的集群來(lái)創(chuàng)建相應(yīng)的TaskScheduler
3.對(duì)于TaskScheduler,主要的任務(wù)調(diào)度模式有FIFO和FAIR
4.在SparkContext中創(chuàng)建了兩個(gè)Actor,一個(gè)是DriverActor,這里主要用于Driver和Executor之間的通信;還有一個(gè)是ClientActor,主要用于Driver和Master之間的通信。
5.創(chuàng)建DAGScheduler,其實(shí)這個(gè)是用于Stage的劃分
6.調(diào)用taskScheduler.start()方法啟動(dòng),進(jìn)行資源調(diào)度,有兩種資源分配方法,一種是盡量打散;一種是盡量集中
7.Driver向Master注冊(cè),發(fā)送了一些信息,其中一個(gè)重要的類(lèi)是CoarseGrainedExecutorBackend,這個(gè)類(lèi)以后用于創(chuàng)建Executor進(jìn)程。