Spark Core源碼精讀計(jì)劃#25:UnifiedMemoryManager——統(tǒng)一內(nèi)存管理機(jī)制

目錄

前言

在前文的末尾,我們分析了靜態(tài)內(nèi)存管理器StaticMemoryManager的優(yōu)缺點(diǎn),并指出統(tǒng)一內(nèi)存管理器UnifiedMemoryManager能夠彌補(bǔ)它的缺點(diǎn),同時(shí)也是目前Spark內(nèi)存管理的事實(shí)標(biāo)準(zhǔn)。本文盡可能深入地剖析UnifiedMemoryManager的具體實(shí)現(xiàn)。

統(tǒng)一內(nèi)存管理器UnifiedMemoryManager

UnifiedMemoryManager與StaticMemoryManager相比,主要有兩點(diǎn)改進(jìn):

  • 存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存不再是靠比例定死的,而是在一定條件下可以相互借用,更加靈活;
  • 存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存都可以在堆外分配了。

按照慣例,我們?nèi)匀粡钠錁?gòu)造開(kāi)始看起。

構(gòu)造方法

代碼#25.1 - o.a.s.memory.UnifiedMemoryManager類(lèi)的構(gòu)造

private[spark] class UnifiedMemoryManager private[memory] (
    conf: SparkConf,
    val maxHeapMemory: Long,
    onHeapStorageRegionSize: Long,
    numCores: Int)
  extends MemoryManager(
    conf,
    numCores,
    onHeapStorageRegionSize,
    maxHeapMemory - onHeapStorageRegionSize) {

  private def assertInvariants(): Unit = {
    assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
    assert(
      offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
  }

  assertInvariants()

  // ......
}

其構(gòu)造方法參數(shù)與StaticMemoryManager相比有微小的變化,需要傳入堆內(nèi)內(nèi)存總量maxHeapMemory,以及堆內(nèi)存儲(chǔ)內(nèi)存空間的量onHeapStorageRegionSize,堆內(nèi)執(zhí)行內(nèi)存空間的量就是兩者之差。另外,還會(huì)校驗(yàn)堆內(nèi)、堆外內(nèi)存池的大小,保證它們與規(guī)定的內(nèi)存總量對(duì)的上。

但是,在代碼#24.1初始化MemoryManager實(shí)現(xiàn)時(shí),調(diào)用的UnifiedMemoryManager構(gòu)造方法只有兩個(gè)參數(shù),這是因?yàn)槠浒樯鷮?duì)象實(shí)現(xiàn)了apply()方法。如果看官對(duì)Scala不太熟的話(huà),可以去翻翻Scala官方文檔,其中講述了apply()方法的具體作用。下面還是來(lái)看代碼。

計(jì)算內(nèi)存量

代碼#25.2 - o.a.s.memory.UnifiedMemoryManager.apply()方法

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

可見(jiàn),Spark可利用的內(nèi)存總量(為避免混淆,下面叫“統(tǒng)一內(nèi)存”)是調(diào)用getMaxMemory()方法計(jì)算出來(lái)的,存儲(chǔ)內(nèi)存占統(tǒng)一內(nèi)存的初始比例(因?yàn)榭梢越栌?,所以是初始比例)由配置?xiàng)spark.memory.storageFraction決定,默認(rèn)值0.5。剩下的(1 - spark.memory.storageFraction)比例的內(nèi)存就是執(zhí)行內(nèi)存了。

getMaxMemory()方法的代碼如下。

代碼#25.3 - o.a.s.memory.UnifiedMemoryManager.getMaxMemory()方法

  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }

    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }

該方法的執(zhí)行流程是:

  1. 通過(guò)Runtime.maxMemory()這個(gè)native方法取得當(dāng)前JVM可用的最大內(nèi)存(堆內(nèi)存)。spark.testing.memory參數(shù)是測(cè)試參數(shù),幾乎不用。
  2. 取得保留內(nèi)存的大小,由常量RESERVED_SYSTEM_MEMORY_BYTES定義,大小是300MB。這個(gè)量可以通過(guò)spark.testing.reservedMemory參數(shù)來(lái)改,但同樣幾乎不用。
  3. 以保留內(nèi)存的1.5倍(也就是450MB)作為Driver和Executor的最小內(nèi)存并校驗(yàn)之。
  4. 用堆內(nèi)存量減去保留內(nèi)存量,得到可用內(nèi)存。spark.memory.fraction配置項(xiàng)指定了統(tǒng)一內(nèi)存能實(shí)際利用的可用內(nèi)存比例,默認(rèn)值為0.6(60%),最終返回可用內(nèi)存與spark.memory.fraction的乘積。

可見(jiàn),UnifiedMemoryManager的堆內(nèi)內(nèi)存布局其實(shí)比StaticMemoryManager要簡(jiǎn)單。因?yàn)榇鎯?chǔ)內(nèi)存和執(zhí)行內(nèi)存之間的邊界是浮動(dòng)的,所以展開(kāi)內(nèi)存的比例以及安全比例都不再需要了。下面仍然先用一幅框圖來(lái)形象地表示出來(lái)。

統(tǒng)一內(nèi)存管理布局圖示

圖中Storage和Execution區(qū)域的界限是虛線(xiàn),并且有上下箭頭,表示它們之間的邊界是浮動(dòng)的。

#25.1 - Spark的統(tǒng)一內(nèi)存管理布局

然后,我們來(lái)看看申請(qǐng)內(nèi)存的方法的實(shí)現(xiàn),其中也包含了借用內(nèi)存的邏輯。

申請(qǐng)/借用存儲(chǔ)內(nèi)存

以下是重寫(xiě)的acquireStorageMemory()方法代碼。

代碼#25.4 - o.a.s.memory.UnifiedMemoryManager.acquireStorageMemory()方法

  override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapStorageMemory)
    }
    if (numBytes > maxMemory) {
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxMemory bytes)")
      return false
    }
    if (numBytes > storagePool.memoryFree) {
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
        numBytes - storagePool.memoryFree)
      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    storagePool.acquireMemory(blockId, numBytes)
  }

該方法首先根據(jù)MemoryMode決定是在堆內(nèi)還是在堆外申請(qǐng)存儲(chǔ)內(nèi)存。如果申請(qǐng)量沒(méi)有超過(guò)存儲(chǔ)內(nèi)存池的空閑量,就可以直接調(diào)用StorageMemoryPool.acquireMemory()方法申請(qǐng)內(nèi)存。但若存儲(chǔ)池中剩余的內(nèi)存不夠分配,就會(huì)試圖向執(zhí)行池借用內(nèi)存,借用的量為當(dāng)前執(zhí)行池空閑量與(塊大小 - 當(dāng)前存儲(chǔ)池空閑量)兩個(gè)量之間的較小者。然后會(huì)調(diào)用decrementPoolSize()方法縮小執(zhí)行池,調(diào)用incrementPoolSize()方法擴(kuò)大存儲(chǔ)池。

由上面的描述可以看出,借用內(nèi)存的過(guò)程是比較保守的,也就是一次只會(huì)借用當(dāng)時(shí)不足的內(nèi)存量,不會(huì)多借。并且借到的內(nèi)存有可能仍然無(wú)法滿(mǎn)足需求,這時(shí)就只能把原先存儲(chǔ)的一部分塊淘汰掉了,這部分邏輯之前提到過(guò),參見(jiàn)代碼#23.4。

在統(tǒng)一內(nèi)存管理機(jī)制下,展開(kāi)內(nèi)存雖然仍屬于存儲(chǔ)內(nèi)存的一部分,但不再有邊界,所以申請(qǐng)展開(kāi)內(nèi)存的方法與申請(qǐng)存儲(chǔ)內(nèi)存完全相同。

代碼#25.5 - o.a.s.memory.UnifiedMemoryManager.acquireUnrollMemory()方法

  override def acquireUnrollMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    acquireStorageMemory(blockId, numBytes, memoryMode)
  }

申請(qǐng)/借用執(zhí)行內(nèi)存

這個(gè)流程就比較復(fù)雜一些了。

代碼#25.6 - o.a.s.memory.UnifiedMemoryManager.acquireExecutionMemory()方法

  override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        onHeapStorageRegionSize,
        maxHeapMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        offHeapStorageMemory,
        maxOffHeapMemory)
    }

    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
      if (extraMemoryNeeded > 0) {
        val memoryReclaimableFromStorage = math.max(
          storagePool.memoryFree,
          storagePool.poolSize - storageRegionSize)
        if (memoryReclaimableFromStorage > 0) {
          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
          storagePool.decrementPoolSize(spaceToReclaim)
          executionPool.incrementPoolSize(spaceToReclaim)
        }
      }
    }

    def computeMaxExecutionPoolSize(): Long = {
      maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
    }

    executionPool.acquireMemory(
      numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
  }

來(lái)看其內(nèi)部嵌套定義的方法maybeGrowExecutionPool(),它負(fù)責(zé)在執(zhí)行內(nèi)存不夠用時(shí),向存儲(chǔ)內(nèi)存池借用內(nèi)存,邏輯是:

  1. 計(jì)算有多少內(nèi)存能夠從存儲(chǔ)內(nèi)存池回收回來(lái)。該大小為存儲(chǔ)池的空閑空間與之前存儲(chǔ)池向執(zhí)行池借用過(guò)的內(nèi)存量(注意這個(gè)描述)的較大值。
  2. 如果有內(nèi)存可以回收,就調(diào)用存儲(chǔ)池的freeSpaceToShrinkPool()方法,淘汰掉一部分存儲(chǔ)塊。淘汰掉的塊所占內(nèi)存是理論可回收量與實(shí)際需要的執(zhí)行內(nèi)存之間的較小值。
  3. 調(diào)用decrementPoolSize()方法縮小存儲(chǔ)池,調(diào)用incrementPoolSize()方法擴(kuò)大執(zhí)行池。

從這個(gè)邏輯可以看出,執(zhí)行內(nèi)存不夠用時(shí),并不太像是“借用”(borrow)存儲(chǔ)內(nèi)存,而是“占用”或者“回收”(reclaim)。也就是說(shuō),執(zhí)行池可以“要求”存儲(chǔ)池淘汰掉自身持有的塊來(lái)歸還曾經(jīng)借用的空間,而存儲(chǔ)池并不會(huì)反過(guò)來(lái)要求執(zhí)行池也同樣歸還。其原因在于,存儲(chǔ)內(nèi)存中的塊可以方便地持久化到磁盤(pán),而執(zhí)行內(nèi)存中的塊大多為中間數(shù)據(jù)(比如Shuffle數(shù)據(jù)),比較難持久化,并且一旦淘汰掉這些中間數(shù)據(jù),整個(gè)Task很可能就會(huì)直接失敗,重算成本太高。

下圖示出默認(rèn)情況下(spark.memory.storageFraction=0.5),內(nèi)存借用的流程。

圖#25.2 - 內(nèi)存借用示意

在代碼#25.6中還有一個(gè)嵌套定義的方法computeMaxExecutionPoolSize(),它用于獲得執(zhí)行內(nèi)存池的最大可能大小,比較簡(jiǎn)單,不再多說(shuō)。

這兩個(gè)嵌套方法都會(huì)當(dāng)做執(zhí)行內(nèi)存池的acquireMemory()方法的參數(shù),作為函數(shù)傳進(jìn)去。我們?cè)谇懊媛匀チ薊xecutionMemoryPool類(lèi)的解釋過(guò)程,所以現(xiàn)在只是大致瞅一眼與申請(qǐng)內(nèi)存有關(guān)的代碼。

代碼#25.7 - o.a.s.memory.ExecutionMemoryPool.acquireMemory()方法

  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      lock.notifyAll()
    }

    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      maybeGrowPool(numBytes - memoryFree)

      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      val toGrant = math.min(maxToGrant, memoryFree)

      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L
  }

可見(jiàn),ExecutionMemoryPool申請(qǐng)內(nèi)存時(shí)是循環(huán)申請(qǐng)的,每次都調(diào)用參數(shù)中的maybeGrowPool()函數(shù)(實(shí)際上就是上面講的maybeGrowExecutionPool()方法)來(lái)檢查是否需要從StorageMemoryPool回收空間。如果分配到的內(nèi)存比實(shí)際申請(qǐng)的少,或者該Task分配完畢之后的內(nèi)存仍然小于每個(gè)Task應(yīng)獲得內(nèi)存的最小值(即池子的大小除以當(dāng)前活動(dòng)Task數(shù)的兩倍),就調(diào)用MemoryManager對(duì)象的wait()方法阻塞,直到有其他Task釋放內(nèi)存為止,再進(jìn)入下一波循環(huán),直到申請(qǐng)到足夠的內(nèi)存。

關(guān)于ExecutionMemoryPool,之后還會(huì)詳細(xì)地解釋?zhuān)@里只要有個(gè)印象就行。

總結(jié)

本文詳細(xì)閱讀了UnifiedMemoryManager的相關(guān)源碼,對(duì)Spark的統(tǒng)一內(nèi)存管理機(jī)制有了深入的了解。當(dāng)然,統(tǒng)一內(nèi)存管理雖然先進(jìn),但也不代表萬(wàn)事無(wú)憂(yōu)。比如當(dāng)程序中cache了大量RDD并且不及時(shí)釋放時(shí),很多存儲(chǔ)內(nèi)存中的塊都無(wú)法被淘汰,會(huì)造成Shuffle階段頻繁Full GC,作業(yè)執(zhí)行變慢。關(guān)于Spark作業(yè)故障定位和內(nèi)存調(diào)優(yōu)的事情,不屬于這個(gè)系列的范疇,筆者會(huì)專(zhuān)門(mén)挑個(gè)時(shí)間寫(xiě)一篇全面的總結(jié)(就像之前寫(xiě)的兩篇Hive調(diào)優(yōu)總結(jié)一樣),最近實(shí)在是太忙了。

看官也可能會(huì)問(wèn),講內(nèi)存管理講了這么長(zhǎng)時(shí)間,結(jié)果都是在做bookkeeping的工作,內(nèi)存的實(shí)際分配和釋放邏輯到底在哪里呢?這個(gè)由內(nèi)存存儲(chǔ)類(lèi)MemoryStore及其附屬類(lèi)來(lái)實(shí)現(xiàn),下一篇文章就會(huì)講到了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容