本文主要講解一下spark2.0版本Spark-StandAlone模式下executor的分配過程和分配機制。
跟蹤這一塊的源代碼應(yīng)該從SparkContext類開始。當用戶new SparkContext時,會執(zhí)行該類中定義在class body中的代碼。這部分代碼會執(zhí)行spark作業(yè)初始化之類的操作,比如校驗參數(shù)、初始化spark history server、初始化blockManager等等。我們需要關(guān)注的是他構(gòu)建TaskScheduler這一部分的代碼。這也是executor初始化的切入點,如下圖:

點開createTaskScheduler方法可以看到,他基于spark master的類型構(gòu)建了SchedulerBackend。而TaskScheduler的類型基本上都是TaskSchedulerImpl。

匹配spark master類型的正則表達式如下,感興趣的可以了解一下:

然后我們一路ctrl + alt +左箭頭返回SparkContext類,看到_taskScheduler.start(),點進去。然后看到backend.start()點進去。其中backend對象就是之前在createTaskScheduler方法中創(chuàng)建的StandaloneSchedulerBackend。
下圖中command對象封裝的是在worker端啟動executor的命令。而initialExecutorLimit是spark-standalone模式提供的dynamic allocation機制,他可以在executor閑置一段時間后就將其移除。對應(yīng)參數(shù)spark.dynamicAllocation.enabled,默認為false。

然后看一下client.start()方法,點進去。然后看到new ClientEndpoint,再點進去。

然后會調(diào)用到這個類中的onStart方法,有人會問為什么會調(diào)到這個方法呢,這里只是將其new出來了,并沒有執(zhí)行任何操作?。??答案就在前邊的rpcEnv.setupEndpoint方法。在該方法中通過回調(diào)機制會重新調(diào)用到onStart方法。點進去,跟到NettyRpcEnv的setupEndpoint方法

然后繼續(xù)跟蹤到dispatcher.registerRpcEndpoint方法,

然后點進new EndpointData方法,然后跟蹤到new Inbox。Inbox在通信機制中起到一個消息存取的作用。

在Inbox中的class body中有如下一段

然后看到OnStart event的處理

?
點開ClientEndpoint后就會看到我們接下來要執(zhí)行到的onStart()方法了。

然后再看這個類中的onStart()方法,registerWithMaster方法點進去,然后再看到tryRegisterAllMasters方法點進去。在這個方法中會看到構(gòu)建了一個與master通信的RPC終端,并發(fā)送了RegisterApplication事件,

在發(fā)送過程中,會將這條消息序列化,如下:

在整個spark core中搜索一下這個case class,然后在Master的receive方法中找到了對這類消息的處理邏輯如下:

在這部分代碼中createApplication方法會根據(jù)當前時間創(chuàng)建appId,appId格式為:
app-yyyyMMddHHmmss-nextAppNumber
nextAppNumber是隨著app的提交而遞增的。
除此之外還會構(gòu)建ApplicationInfo對象,需要注意的是該對象的desc.command中存放了在worker端啟動Executor的命令。
然后看一下scheduler()方法。該方法中實現(xiàn)了啟動driver的邏輯,以及啟動executor的入口方法。該部分代碼的邏輯請參考圖中的注釋。

然后看一下startExecutorsOnWorkers方法,該方法對多個app采用FIFO策略分配executor。首先選出內(nèi)存和cpu核數(shù)滿足條件的worker,作為候選worker。然后按照scheduleExecutorsOnWorkers方法實現(xiàn)的策略分配executor。然后通過allocateWorkerResourceToExecutors方法在指定的worker上通過事件啟動該executor。方法講解參考備注。

?
接下來我們重點的看一下scheduleExecutorsOnWorkers方法。該方法實現(xiàn)了在worker集群中分配executor的策略。
首先看一下局部方法canLaunchExecutor,該方法用來判斷指定序號的worker能否啟動executor。判斷的兩個主要因素是內(nèi)存和cpu core數(shù)是否滿足單個executor的最小要求,除此之外還會考慮app要求的總資源數(shù),以及單個worker上能否啟動多個executor的配置。具體的實現(xiàn)邏輯請參考截圖中的代碼注釋。

然后看一下scheduleExecutorsOnWorkers方法中的其他代碼:

這部分代碼是用來實現(xiàn)executor分配機制的。這段代碼比較清晰,master會逐個遍歷當前可用的worker,如果該worker可用,直接為其分配一個executor基數(shù)的core,然后會讀取一個系統(tǒng)參數(shù)spark.deploy.spreadOut。當該參數(shù)配置為false,master會將該worker上的core一直分配給這個app,直到當前worker沒有足夠資源,或者app的要求已經(jīng)滿足。如果配置為true,則每個worker在分配完一次資源后,會跳轉(zhuǎn)到下一個worker繼續(xù)再分配,直到下一次對整個workers集群的遍歷重新開始。這樣做的意義是盡可能的將executor分配到更多的worker上去執(zhí)行,有利于計算時的本地化計算,否則在計算時計算所需的數(shù)據(jù)不在當前節(jié)點,就需要占用網(wǎng)絡(luò)資源拉取數(shù)據(jù)。系統(tǒng)默認配置為true。

我在閱讀這段代碼時有一個疑問。Executor的分配結(jié)果不僅是core,還包含了mem和某個worker上啟動executor的個數(shù)。而方法的返回值只有某個worker上分配的core數(shù),那么如何判斷executor的個數(shù)呢。對于不了解計算邏輯的人會認為oneExecutorPerWorker模式下一個executor可能會被分配了minCorePerExecutor+個core,此時如果單純通過worker上實際分配的core個數(shù)除以minCorePerExecutor就無法正確計算出executor個數(shù)了。這時就會想為什么不在方法返回值中帶有executor個數(shù)呢?
繼續(xù)往下看就會找到答案,在實際啟動executor時,是根據(jù)用戶是否配置了coresPerExecutor來決定executor個數(shù)的,在沒有配置(也就是在OneExecutorPerWorker模式下)的情況下executor個數(shù)是固定為1的,其他情況下是用已分配core數(shù)/ coresPerExecutor計算的。

然后點進launchExecutor方法看一下,在master端會向worker端發(fā)送啟動executor的命令LaunchExecutor,命令包含在exec.application.desc的command中。創(chuàng)建完成后會向driver端發(fā)送ExecutorAdded event,driver接收到后會打印日志。


然后worker端在收到LaunchExecutor命令后,會實例化一個ExecutorRunner,然后調(diào)用其start方法,在該方法中啟動了一個workerThread線程,其run方法的實現(xiàn)邏輯為方法fetchAndRunExecutor。這個方法就比較直觀了,取出創(chuàng)建Executor的命令appDesc.command封裝成ProcessBuilder類,用它來執(zhí)行啟動Executor的命令。

沒錯,啟動的命令就是前文中提到的command對象:
