目錄
前言
繼事件總線之后,SparkContext第二個(gè)初始化的主要組件是SparkEnv,即Spark執(zhí)行環(huán)境。Driver和Executor的正常運(yùn)行都依賴SparkEnv提供的環(huán)境作為支持。SparkEnv初始化成功之后,與Spark存儲(chǔ)、計(jì)算、監(jiān)控等相關(guān)的底層功能才會(huì)真正準(zhǔn)備好,可見(jiàn)它幾乎與SparkContext同等重要。
SparkEnv內(nèi)部也包含了很多種組件,比起SparkContext的組件會(huì)稍微接地氣一點(diǎn)。我們采用與研究SparkContext近似的方式來(lái)研究它。
SparkEnv的入口
在文章#2的代碼#2.5~#2.6中,我們已經(jīng)得知Driver執(zhí)行環(huán)境是通過(guò)調(diào)用SparkEnv.createDriverEnv()方法來(lái)創(chuàng)建的,這個(gè)方法位于SparkEnv類的伴生對(duì)象中。同理,也有createExecutorEnv()方法。我們從這兩個(gè)方法入手來(lái)看一下代碼。
代碼#7.1 - o.a.s.SparkEnv.createDriverEnv()與createExecutorEnv()方法
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
hostname,
None,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}
可見(jiàn),它們都是調(diào)用伴生對(duì)象內(nèi)的create()方法來(lái)創(chuàng)建SparkEnv的。這個(gè)方法很長(zhǎng),所以先來(lái)看一看它的聲明。
代碼#7.2 - o.a.s.SparkEnv.create()方法的聲明
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { /*...*/ }
其中有幾個(gè)參數(shù)需要說(shuō)明一下。
- executorId:Executor的唯一標(biāo)識(shí)。如果是Driver的話,值就是字符串"driver"。
- bindAddress/advertiseAddress:分別是監(jiān)聽(tīng)Socket綁定的地址,與RPC端點(diǎn)的地址。
- isLocal:是否為本地模式。
- numUsableCores:分配給Driver或Executor的CPU核心數(shù)。
- ioEncryptionKey:I/O加密的密鑰,當(dāng)spark.io.encryption.enabled配置項(xiàng)啟用時(shí)才有效。
SparkEnv初始化的組件
我們按照create()方法中的代碼順序,對(duì)SparkEnv內(nèi)涉及到的組件做簡(jiǎn)要介紹。
SecurityManager
SecurityManager即安全管理器。它負(fù)責(zé)通過(guò)共享密鑰的方式進(jìn)行認(rèn)證,以及基于ACL(Access Control List,訪問(wèn)控制列表)管理Spark內(nèi)部的賬號(hào)和權(quán)限。其初始化代碼如下。
代碼#7.3 - create()方法中SecurityManager的初始化
val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}
ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
}
RpcEnv
RpcEnv即RPC環(huán)境。在前面已經(jīng)講過(guò),Spark的各個(gè)實(shí)體間必然會(huì)涉及大量的網(wǎng)絡(luò)通信,這些通信實(shí)體在Spark的RPC體系中會(huì)抽象為RPC端點(diǎn)(RpcEndpoint)及其引用(RpcEndpointRef)。RpcEnv為RPC端點(diǎn)提供處理消息的環(huán)境,并負(fù)責(zé)RPC端點(diǎn)的注冊(cè),端點(diǎn)之間消息的路由,以及端點(diǎn)的銷毀等。RPC環(huán)境的初始化代碼如下。
代碼#7.4 - create()方法中RpcEnv的初始化
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
}
代碼#7.5 - o.a.s.rpc.RpcEnv.create()方法
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}
Spark的RPC底層是利用Netty實(shí)現(xiàn)的,NettyRpcEnv目前也是RpcEnv唯一的實(shí)現(xiàn)類。RPC的內(nèi)部細(xì)節(jié)很多,之后會(huì)用多篇文章來(lái)詳細(xì)分析。
SerializerManager
SerializerManager即序列化管理器。在Spark存儲(chǔ)或交換數(shù)據(jù)時(shí),往往先需要將數(shù)據(jù)序列化或反序列化,為了節(jié)省空間可能還要對(duì)數(shù)據(jù)進(jìn)行壓縮,SerializerManager就是負(fù)責(zé)這些工作的組件。其初始化代碼如下。
代碼#7.6 - create()方法中SerializerManager的初始化
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val closureSerializer = new JavaSerializer(conf)
instantiateClassFromConf()方法是create()方法內(nèi)定義的,它調(diào)用了工具類Utils的classForName()方法,通過(guò)反射創(chuàng)建類的實(shí)例。序列化器的類型可以用SparkConf配置項(xiàng)spark.serializer指定,其默認(rèn)值是org.apache.spark.serializer.JavaSerializer。我們?cè)谌粘i_發(fā)中常用的還有KryoSerializer。
序列化器有兩個(gè),serializer是數(shù)據(jù)的序列化器,closureSerializer則是閉包的序列化器。后者在調(diào)度邏輯(如DAGScheduler、TaskSetManager)中經(jīng)常用到,其類型固定為JavaSerializer,不能修改。
BroadcastManager
BroadcastManager即廣播管理器,它在前面的代碼#4.3中已經(jīng)出現(xiàn)過(guò)。它除了為用戶提供廣播共享數(shù)據(jù)的功能之外,在Spark Core內(nèi)部也有廣泛的應(yīng)用,如共享通用配置項(xiàng)或通用數(shù)據(jù)結(jié)構(gòu)等等。其初始化代碼只有一句,不再貼了。
MapOutputTracker
MapOutputTracker即Map輸出跟蹤器。在Shuffle過(guò)程中,Map任務(wù)通過(guò)Shuffle Write階段產(chǎn)生了中間數(shù)據(jù),Reduce任務(wù)進(jìn)行Shuffle Read時(shí)需要知道哪些數(shù)據(jù)位于哪個(gè)節(jié)點(diǎn)上,以及Map輸出的狀態(tài)等信息。MapOutputTracker就負(fù)責(zé)維護(hù)這些信息,其初始化代碼如下。
代碼#7.7 - create()方法中MapOutputTracker的初始化
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
可見(jiàn)是按照當(dāng)前實(shí)體是Driver或Executor分為兩種情況處理的。創(chuàng)建完MapOutputTracker實(shí)例之后,還會(huì)調(diào)用registerOrLookupEndpoint()方法,注冊(cè)(Driver情況)或查找(Executor情況)對(duì)應(yīng)的RPC端點(diǎn),并返回其引用。
ShuffleManager
ShuffleManager即Shuffle管理器。顧名思義,它負(fù)責(zé)管理Shuffle階段的機(jī)制,并提供Shuffle方法的具體實(shí)現(xiàn)。其初始化代碼如下。
代碼#7.8 - create()方法中ShuffleManager的初始化
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
ShuffleManager的種類可以通過(guò)配置項(xiàng)spark.shuffle.manager設(shè)置,默認(rèn)為sort,即SortShuffleManager。取得對(duì)應(yīng)的ShuffleManager類名之后,通過(guò)反射構(gòu)建其實(shí)例。Shuffle是Spark計(jì)算過(guò)程中非常重要的一環(huán),之后會(huì)深入地研究它。
MemoryManager
MemoryManager即內(nèi)存管理器。顧名思義,它負(fù)責(zé)Spark集群節(jié)點(diǎn)內(nèi)存的分配、利用和回收。Spark作為一個(gè)內(nèi)存優(yōu)先的大數(shù)據(jù)處理框架,內(nèi)存管理機(jī)制是非常精細(xì)的,主要涉及存儲(chǔ)和執(zhí)行兩大方面。其初始化代碼如下。
代碼#7.9 - create()方法中MemoryManager的初始化
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
MemoryManager有兩種實(shí)現(xiàn),可以使用spark.memory.useLegacyMode配置項(xiàng)控制使用哪種。舊版的內(nèi)存管理器是StaticMemoryManager,即靜態(tài)內(nèi)存管理器。新版(1.6.0版本之后)的內(nèi)存管理器是UnifiedMemoryManager,即統(tǒng)一內(nèi)存管理器,它也是當(dāng)前的默認(rèn)實(shí)現(xiàn),相對(duì)于靜態(tài)內(nèi)存管理而言也更為先進(jìn)。在之后講解涉及存儲(chǔ)和計(jì)算方面的細(xì)節(jié)時(shí),會(huì)一同探究MemoryManager的具體實(shí)現(xiàn)。
BlockManager
BlockManager即塊管理器。塊作為Spark內(nèi)部數(shù)據(jù)的基本單位,與操作系統(tǒng)中的“塊”和HDFS中的“塊”都不太相同。它可以存在于堆內(nèi)內(nèi)存,也可以存在于堆外內(nèi)存和外存(磁盤)中,是Spark數(shù)據(jù)的通用表示方式。BlockManager就負(fù)責(zé)管理塊的存儲(chǔ)、讀寫流程和狀態(tài)信息,其初始化代碼如下。
代碼#7.10 - create()方法中BlockManager的初始化
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
在初始化BlockManager之前,還需要先初始化塊傳輸服務(wù)BlockTransferService,以及BlockManager的主節(jié)點(diǎn)BlockManagerMaster。BlockManager也是采用主從結(jié)構(gòu)設(shè)計(jì)的,Driver上存在主RPC端點(diǎn)BlockManagerMasterEndpoint,而各個(gè)Executor上都存在從RPC端點(diǎn)BlockManagerSlaveEndpoint。
BlockManager是整個(gè)Spark存儲(chǔ)子系統(tǒng)的基石,之后會(huì)先于上面的MemoryManager做介紹。
MetricsSystem
MetricsSystem即度量系統(tǒng)。它是Spark監(jiān)控體系的后端部分,負(fù)責(zé)收集與輸出度量(也就是各類監(jiān)控指標(biāo))數(shù)據(jù)。度量系統(tǒng)由系統(tǒng)實(shí)例Instance、度量數(shù)據(jù)源Source、度量輸出目的地Sink三部分組成。其在SparkEnv里的初始化代碼如下。
代碼7.11 - create()方法中MetricsSystem的初始化
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
這里也是分兩種情況處理的。在Driver端初始化MetricsSystem時(shí),需要依賴TaskScheduler初始化完畢后生成的Application ID,故不會(huì)馬上啟動(dòng)它,可以參見(jiàn)代碼#2.7。在Executor端初始化時(shí)就不用等待,因?yàn)镋xecutor ID已經(jīng)存在了。
OutputCommitCoordinator
OutputCommitCoordinator即輸出提交協(xié)調(diào)器。如果需要將Spark作業(yè)的結(jié)果數(shù)據(jù)持久化到外部存儲(chǔ)(最常見(jiàn)的就是HDFS),就需要用到它來(lái)判定作業(yè)的每個(gè)Stage是否有權(quán)限提交。其初始化代碼如下。
代碼#7.12 - create()方法中OutputCommitCoordinator的初始化
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
可見(jiàn),在Driver上還注冊(cè)了其RPC端點(diǎn)OutputCommitCoordinatorEndpoint,各個(gè)Executor會(huì)通過(guò)其引用來(lái)訪問(wèn)它。
SparkEnv的創(chuàng)建與保存
在create()方法的最后,會(huì)構(gòu)建SparkEnv類的實(shí)例,創(chuàng)建Driver端的臨時(shí)文件夾,并返回該實(shí)例。
代碼#7.13 - SparkEnv.create()方法的結(jié)尾
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}
envInstance
SparkEnv的全部初始化流程都在伴生對(duì)象中,其類中反而沒(méi)有太多東西,主要是控制SparkEnv停止的相關(guān)邏輯,不再贅述。
如同SparkContext一樣,SparkEnv在伴生對(duì)象中也會(huì)將已創(chuàng)建的實(shí)例保存起來(lái),避免重復(fù)創(chuàng)建,也保證在同一節(jié)點(diǎn)上執(zhí)行環(huán)境的一致性。get()與set()方法的代碼非常簡(jiǎn)單,就不貼出來(lái)了。
總結(jié)
本文從SparkEnv的初始化方法入手,按順序簡(jiǎn)述了十余個(gè)與Spark執(zhí)行環(huán)境相關(guān)的內(nèi)部組件及其初始化邏輯。這些組件與Spark框架的具體執(zhí)行流程息息相關(guān),我們之后也會(huì)深入研究其中的一部分,特別重要的如RPC環(huán)境RpcEnv、Shuffle管理器ShuffleManager、內(nèi)存管理器MemoryManager、塊管理器BlockManager等。
最后仍然用一張簡(jiǎn)圖來(lái)概括。

下一篇文章計(jì)劃研究RPC環(huán)境。它比前面講過(guò)的事件總線更加底層,因此也有更多的細(xì)節(jié)等著我們?nèi)ヌ剿鳌?/p>