Spark Core源碼精讀計(jì)劃#7:Spark執(zhí)行環(huán)境的初始化

目錄

前言

繼事件總線之后,SparkContext第二個(gè)初始化的主要組件是SparkEnv,即Spark執(zhí)行環(huán)境。Driver和Executor的正常運(yùn)行都依賴SparkEnv提供的環(huán)境作為支持。SparkEnv初始化成功之后,與Spark存儲(chǔ)、計(jì)算、監(jiān)控等相關(guān)的底層功能才會(huì)真正準(zhǔn)備好,可見(jiàn)它幾乎與SparkContext同等重要。

SparkEnv內(nèi)部也包含了很多種組件,比起SparkContext的組件會(huì)稍微接地氣一點(diǎn)。我們采用與研究SparkContext近似的方式來(lái)研究它。

SparkEnv的入口

在文章#2的代碼#2.5~#2.6中,我們已經(jīng)得知Driver執(zhí)行環(huán)境是通過(guò)調(diào)用SparkEnv.createDriverEnv()方法來(lái)創(chuàng)建的,這個(gè)方法位于SparkEnv類的伴生對(duì)象中。同理,也有createExecutorEnv()方法。我們從這兩個(gè)方法入手來(lái)看一下代碼。

代碼#7.1 - o.a.s.SparkEnv.createDriverEnv()與createExecutorEnv()方法

  private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    assert(conf.contains(DRIVER_HOST_ADDRESS),
      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get("spark.driver.port").toInt
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(conf))
    } else {
      None
    }
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }

  private[spark] def createExecutorEnv(
      conf: SparkConf,
      executorId: String,
      hostname: String,
      numCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      isLocal: Boolean): SparkEnv = {
    val env = create(
      conf,
      executorId,
      hostname,
      hostname,
      None,
      isLocal,
      numCores,
      ioEncryptionKey
    )
    SparkEnv.set(env)
    env
  }

可見(jiàn),它們都是調(diào)用伴生對(duì)象內(nèi)的create()方法來(lái)創(chuàng)建SparkEnv的。這個(gè)方法很長(zhǎng),所以先來(lái)看一看它的聲明。

代碼#7.2 - o.a.s.SparkEnv.create()方法的聲明

  private def create(
      conf: SparkConf,
      executorId: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Option[Int],
      isLocal: Boolean,
      numUsableCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { /*...*/ }

其中有幾個(gè)參數(shù)需要說(shuō)明一下。

  • executorId:Executor的唯一標(biāo)識(shí)。如果是Driver的話,值就是字符串"driver"。
  • bindAddress/advertiseAddress:分別是監(jiān)聽(tīng)Socket綁定的地址,與RPC端點(diǎn)的地址。
  • isLocal:是否為本地模式。
  • numUsableCores:分配給Driver或Executor的CPU核心數(shù)。
  • ioEncryptionKey:I/O加密的密鑰,當(dāng)spark.io.encryption.enabled配置項(xiàng)啟用時(shí)才有效。

SparkEnv初始化的組件

我們按照create()方法中的代碼順序,對(duì)SparkEnv內(nèi)涉及到的組件做簡(jiǎn)要介紹。

SecurityManager

SecurityManager即安全管理器。它負(fù)責(zé)通過(guò)共享密鑰的方式進(jìn)行認(rèn)證,以及基于ACL(Access Control List,訪問(wèn)控制列表)管理Spark內(nèi)部的賬號(hào)和權(quán)限。其初始化代碼如下。

代碼#7.3 - create()方法中SecurityManager的初始化

    val securityManager = new SecurityManager(conf, ioEncryptionKey)
    if (isDriver) {
      securityManager.initializeAuth()
    }

    ioEncryptionKey.foreach { _ =>
      if (!securityManager.isEncryptionEnabled()) {
        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
          "wire.")
      }
    }

RpcEnv

RpcEnv即RPC環(huán)境。在前面已經(jīng)講過(guò),Spark的各個(gè)實(shí)體間必然會(huì)涉及大量的網(wǎng)絡(luò)通信,這些通信實(shí)體在Spark的RPC體系中會(huì)抽象為RPC端點(diǎn)(RpcEndpoint)及其引用(RpcEndpointRef)。RpcEnv為RPC端點(diǎn)提供處理消息的環(huán)境,并負(fù)責(zé)RPC端點(diǎn)的注冊(cè),端點(diǎn)之間消息的路由,以及端點(diǎn)的銷毀等。RPC環(huán)境的初始化代碼如下。

代碼#7.4 - create()方法中RpcEnv的初始化

    val systemName = if (isDriver) driverSystemName else executorSystemName
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

    if (isDriver) {
      conf.set("spark.driver.port", rpcEnv.address.port.toString)
    }

代碼#7.5 - o.a.s.rpc.RpcEnv.create()方法

  def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    new NettyRpcEnvFactory().create(config)
  }

Spark的RPC底層是利用Netty實(shí)現(xiàn)的,NettyRpcEnv目前也是RpcEnv唯一的實(shí)現(xiàn)類。RPC的內(nèi)部細(xì)節(jié)很多,之后會(huì)用多篇文章來(lái)詳細(xì)分析。

SerializerManager

SerializerManager即序列化管理器。在Spark存儲(chǔ)或交換數(shù)據(jù)時(shí),往往先需要將數(shù)據(jù)序列化或反序列化,為了節(jié)省空間可能還要對(duì)數(shù)據(jù)進(jìn)行壓縮,SerializerManager就是負(fù)責(zé)這些工作的組件。其初始化代碼如下。

代碼#7.6 - create()方法中SerializerManager的初始化

    val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

    val closureSerializer = new JavaSerializer(conf)

instantiateClassFromConf()方法是create()方法內(nèi)定義的,它調(diào)用了工具類Utils的classForName()方法,通過(guò)反射創(chuàng)建類的實(shí)例。序列化器的類型可以用SparkConf配置項(xiàng)spark.serializer指定,其默認(rèn)值是org.apache.spark.serializer.JavaSerializer。我們?cè)谌粘i_發(fā)中常用的還有KryoSerializer。

序列化器有兩個(gè),serializer是數(shù)據(jù)的序列化器,closureSerializer則是閉包的序列化器。后者在調(diào)度邏輯(如DAGScheduler、TaskSetManager)中經(jīng)常用到,其類型固定為JavaSerializer,不能修改。

BroadcastManager

BroadcastManager即廣播管理器,它在前面的代碼#4.3中已經(jīng)出現(xiàn)過(guò)。它除了為用戶提供廣播共享數(shù)據(jù)的功能之外,在Spark Core內(nèi)部也有廣泛的應(yīng)用,如共享通用配置項(xiàng)或通用數(shù)據(jù)結(jié)構(gòu)等等。其初始化代碼只有一句,不再貼了。

MapOutputTracker

MapOutputTracker即Map輸出跟蹤器。在Shuffle過(guò)程中,Map任務(wù)通過(guò)Shuffle Write階段產(chǎn)生了中間數(shù)據(jù),Reduce任務(wù)進(jìn)行Shuffle Read時(shí)需要知道哪些數(shù)據(jù)位于哪個(gè)節(jié)點(diǎn)上,以及Map輸出的狀態(tài)等信息。MapOutputTracker就負(fù)責(zé)維護(hù)這些信息,其初始化代碼如下。

代碼#7.7 - create()方法中MapOutputTracker的初始化

    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }

    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

可見(jiàn)是按照當(dāng)前實(shí)體是Driver或Executor分為兩種情況處理的。創(chuàng)建完MapOutputTracker實(shí)例之后,還會(huì)調(diào)用registerOrLookupEndpoint()方法,注冊(cè)(Driver情況)或查找(Executor情況)對(duì)應(yīng)的RPC端點(diǎn),并返回其引用。

ShuffleManager

ShuffleManager即Shuffle管理器。顧名思義,它負(fù)責(zé)管理Shuffle階段的機(jī)制,并提供Shuffle方法的具體實(shí)現(xiàn)。其初始化代碼如下。

代碼#7.8 - create()方法中ShuffleManager的初始化

   val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

ShuffleManager的種類可以通過(guò)配置項(xiàng)spark.shuffle.manager設(shè)置,默認(rèn)為sort,即SortShuffleManager。取得對(duì)應(yīng)的ShuffleManager類名之后,通過(guò)反射構(gòu)建其實(shí)例。Shuffle是Spark計(jì)算過(guò)程中非常重要的一環(huán),之后會(huì)深入地研究它。

MemoryManager

MemoryManager即內(nèi)存管理器。顧名思義,它負(fù)責(zé)Spark集群節(jié)點(diǎn)內(nèi)存的分配、利用和回收。Spark作為一個(gè)內(nèi)存優(yōu)先的大數(shù)據(jù)處理框架,內(nèi)存管理機(jī)制是非常精細(xì)的,主要涉及存儲(chǔ)和執(zhí)行兩大方面。其初始化代碼如下。

代碼#7.9 - create()方法中MemoryManager的初始化

    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

MemoryManager有兩種實(shí)現(xiàn),可以使用spark.memory.useLegacyMode配置項(xiàng)控制使用哪種。舊版的內(nèi)存管理器是StaticMemoryManager,即靜態(tài)內(nèi)存管理器。新版(1.6.0版本之后)的內(nèi)存管理器是UnifiedMemoryManager,即統(tǒng)一內(nèi)存管理器,它也是當(dāng)前的默認(rèn)實(shí)現(xiàn),相對(duì)于靜態(tài)內(nèi)存管理而言也更為先進(jìn)。在之后講解涉及存儲(chǔ)和計(jì)算方面的細(xì)節(jié)時(shí),會(huì)一同探究MemoryManager的具體實(shí)現(xiàn)。

BlockManager

BlockManager即塊管理器。塊作為Spark內(nèi)部數(shù)據(jù)的基本單位,與操作系統(tǒng)中的“塊”和HDFS中的“塊”都不太相同。它可以存在于堆內(nèi)內(nèi)存,也可以存在于堆外內(nèi)存和外存(磁盤)中,是Spark數(shù)據(jù)的通用表示方式。BlockManager就負(fù)責(zé)管理塊的存儲(chǔ)、讀寫流程和狀態(tài)信息,其初始化代碼如下。

代碼#7.10 - create()方法中BlockManager的初始化

    val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

在初始化BlockManager之前,還需要先初始化塊傳輸服務(wù)BlockTransferService,以及BlockManager的主節(jié)點(diǎn)BlockManagerMaster。BlockManager也是采用主從結(jié)構(gòu)設(shè)計(jì)的,Driver上存在主RPC端點(diǎn)BlockManagerMasterEndpoint,而各個(gè)Executor上都存在從RPC端點(diǎn)BlockManagerSlaveEndpoint。

BlockManager是整個(gè)Spark存儲(chǔ)子系統(tǒng)的基石,之后會(huì)先于上面的MemoryManager做介紹。

MetricsSystem

MetricsSystem即度量系統(tǒng)。它是Spark監(jiān)控體系的后端部分,負(fù)責(zé)收集與輸出度量(也就是各類監(jiān)控指標(biāo))數(shù)據(jù)。度量系統(tǒng)由系統(tǒng)實(shí)例Instance、度量數(shù)據(jù)源Source、度量輸出目的地Sink三部分組成。其在SparkEnv里的初始化代碼如下。

代碼7.11 - create()方法中MetricsSystem的初始化

    val metricsSystem = if (isDriver) {
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }

這里也是分兩種情況處理的。在Driver端初始化MetricsSystem時(shí),需要依賴TaskScheduler初始化完畢后生成的Application ID,故不會(huì)馬上啟動(dòng)它,可以參見(jiàn)代碼#2.7。在Executor端初始化時(shí)就不用等待,因?yàn)镋xecutor ID已經(jīng)存在了。

OutputCommitCoordinator

OutputCommitCoordinator即輸出提交協(xié)調(diào)器。如果需要將Spark作業(yè)的結(jié)果數(shù)據(jù)持久化到外部存儲(chǔ)(最常見(jiàn)的就是HDFS),就需要用到它來(lái)判定作業(yè)的每個(gè)Stage是否有權(quán)限提交。其初始化代碼如下。

代碼#7.12 - create()方法中OutputCommitCoordinator的初始化

    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

可見(jiàn),在Driver上還注冊(cè)了其RPC端點(diǎn)OutputCommitCoordinatorEndpoint,各個(gè)Executor會(huì)通過(guò)其引用來(lái)訪問(wèn)它。

SparkEnv的創(chuàng)建與保存

在create()方法的最后,會(huì)構(gòu)建SparkEnv類的實(shí)例,創(chuàng)建Driver端的臨時(shí)文件夾,并返回該實(shí)例。

代碼#7.13 - SparkEnv.create()方法的結(jié)尾

    val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

    if (isDriver) {
      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
      envInstance.driverTmpDir = Some(sparkFilesDir)
    }

    envInstance

SparkEnv的全部初始化流程都在伴生對(duì)象中,其類中反而沒(méi)有太多東西,主要是控制SparkEnv停止的相關(guān)邏輯,不再贅述。

如同SparkContext一樣,SparkEnv在伴生對(duì)象中也會(huì)將已創(chuàng)建的實(shí)例保存起來(lái),避免重復(fù)創(chuàng)建,也保證在同一節(jié)點(diǎn)上執(zhí)行環(huán)境的一致性。get()與set()方法的代碼非常簡(jiǎn)單,就不貼出來(lái)了。

總結(jié)

本文從SparkEnv的初始化方法入手,按順序簡(jiǎn)述了十余個(gè)與Spark執(zhí)行環(huán)境相關(guān)的內(nèi)部組件及其初始化邏輯。這些組件與Spark框架的具體執(zhí)行流程息息相關(guān),我們之后也會(huì)深入研究其中的一部分,特別重要的如RPC環(huán)境RpcEnv、Shuffle管理器ShuffleManager、內(nèi)存管理器MemoryManager、塊管理器BlockManager等。

最后仍然用一張簡(jiǎn)圖來(lái)概括。

圖#7.1 - SparkEnv初始化順序

下一篇文章計(jì)劃研究RPC環(huán)境。它比前面講過(guò)的事件總線更加底層,因此也有更多的細(xì)節(jié)等著我們?nèi)ヌ剿鳌?/p>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容