spark源碼閱讀之ExternalSorter

在SortShuffleWriter中調(diào)用ExternalSorter的兩個方法insertAll和writePartitionedFile

1】、blockManager

2】、diskBlockManager

3】、serializerManager

4】、fileBufferSize

spark.shuffle.file.buffer=32k

5】、serializerBatchSize

spark.shuffle.spill.batchSize=10000

6】、map(PartitionedAppendOnlyMap)

private var data = new Array[AnyRef](2 * capacity)

即消耗的并不是Storage的內(nèi)存

7】、buffer(PartitionedPairBuffer)

8】、forceSpillFiles(ArrayBuffer[SpilledFile])

PartitionedAppendOnlyMap 放不下,要落地,那么不能硬生生的寫磁盤,所以需要個buffer,然后把buffer再一次性寫入磁盤文件,buffer的大小由fileBufferSize決定

9】、spills(ArrayBuffer[SpilledFile])

10】、insertAll
insertAll方法將數(shù)據(jù)存儲在緩沖區(qū)

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
   
    val shouldCombine = aggregator.isDefined
     //shouldCombine為true則緩沖區(qū)為PartitionedAppendOnlyMap反之為PartitionedPairBuffer
    if (shouldCombine) {
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      //緩存區(qū)中對于新放入數(shù)據(jù)的更新策略,如果緩沖區(qū)中沒有key則新建一個Combiner,反之將合并value
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        //map中放入一條數(shù)據(jù),計數(shù)加一,用于判斷是否進行spill(當緩沖區(qū)快滿的時候需要將緩沖區(qū)的數(shù)據(jù)以一個臨時文件的方式存放到磁盤,這個過程就是spill),詳情見下文的maybeSpillCollection以及maybeSpill方法
        addElementsRead()
        kv = records.next()
        // 數(shù)據(jù)通過partitioner獲取到partitionID,按照partitionID,key將數(shù)據(jù)存放在內(nèi)存AppendOnlyMap對象中
        map.changeValue((getPartition(kv._1), kv._1), update)
        //判斷是否需要進行spill
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

我們看一下上段代碼中中提到的PartitionedAppendOnlyMap.changeValue方法:

override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val newValue = super.changeValue(key, updateFunc)
    super.afterUpdate()
    newValue
  }

當被混入的集合的每次update操作以后,需要執(zhí)行SizeTracker的afterUpdate方法,afterUpdate會判斷這是第幾次更新,需要的話就會使用SizeEstimator的estimate方法來估計下集合的大小。由于SizeEstimator的調(diào)用開銷比較大,注釋上說會是數(shù)毫秒,所以不能頻繁調(diào)用。所以SizeTracker會記錄更新的次數(shù),發(fā)生estimate的次數(shù)是指數(shù)級增長的,基數(shù)是1.1,所以調(diào)用estimate時更新的次數(shù)會是1.1, 1.1 * 1.1, 1.1 * 1.1 *1.1, ....

這是指數(shù)的初始增長是很慢的, 1.1的96次方會是1w, 1.1 ^ 144次方是100w,即對于1w次update,它會執(zhí)行96次estimate,對10w次update執(zhí)行120次estimate, 對100w次update執(zhí)行144次estimate,對1000w次update執(zhí)行169次。

11】、maybeSpillCollection
我們看一下上段代碼中中提到的maybeSpillCollection方法,他的作用是檢查一次PartitionedAppendOnlyMap是否需要spill,所以每放一條記錄就會檢查一次PartitionedAppendOnlyMap是否需要spill。

private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]
      }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }

從代碼中我們可以看到如果需要spill,緩沖區(qū)會被釋放,重新new一個緩沖區(qū),那么新的緩沖區(qū)的內(nèi)存空間會有多大呢,這個在maybeSpill方法中有詳細的解釋
12】、maybeSpill

如果每放一條記錄就檢查一次PartitionedAppendOnlyMap的內(nèi)存,假設(shè)檢查一次內(nèi)存1ms, 1kw 就不得了的時間了。所以肯定是不行的,所以 estimateSize其實是使用采樣算法來做的。

(1)、放入數(shù)據(jù)每32次且currentMemory (estimatedSize)大于myMemoryThreshold

(2)、滿足1,則向 shuffleMemoryManager 申請新的緩沖區(qū)內(nèi)存,新的內(nèi)存大于要 2 * currentMemory - myMemoryThreshold 的內(nèi)存

(3)、滿足1、2或者內(nèi)存中放入的記錄大于numElementsForceSpillThreshold時可以進行spill

注:

currentMemory 通過map的estimatedSize獲取

myMemoryThreshold可設(shè)置spark.shuffle.spill.initialMemoryThreshold配置,默認5 * 1024 * 1024

shuffleMemoryManager 可分配的內(nèi)存是ExecutorHeapMemeory * 0.2 * 0.8

numElementsForceSpillThreshold通過spark.shuffle.spill.numElementsForceSpillThreshold配置,默認值Long.MaxValue

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

13】、writePartitionedFile

將in memory(map)以及spillFiles真實的寫入文件

def writePartitionedFile(
      blockId: BlockId,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
      context.taskMetrics().shuffleWriteMetrics)

    if (spills.isEmpty) {
      // 如果map過程中沒有數(shù)據(jù)落地到磁盤則對緩沖區(qū)的數(shù)據(jù)進行排序即可
      val collection = if (aggregator.isDefined) map else buffer
      //destructiveSortedWritablePartitionedIterator調(diào)用partitionedDestructiveSortedIterator對map進行排序,如果map階段不需要combiner的操作則無需對數(shù)據(jù)按照key值排序,只需要按照partitionID排序即可
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      while (it.hasNext) {
        val partitionId = it.nextPartition()
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer)
        }
        val segment = writer.commitAndGet()
        lengths(partitionId) = segment.length
      }
    } else {
      //如果有spill文件則需要獲取到各個partition的iterator,逐個讀取記錄寫入磁盤,下文有詳細介紹怎么獲取到各個partition的iterator
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          val segment = writer.commitAndGet()
          lengths(id) = segment.length
        }
      }
    }

destructiveSortedWritablePartitionedIterator調(diào)用partitionedDestructiveSortedIterator對map進行排序,首先構(gòu)建comparator,如果傳入key比較器則進行partitionID排序之后進行key排序,反之僅僅按照partitionID排序。接著調(diào)用destructiveSortedIteratordestructiveSortedIterator是真正的排序器,根據(jù)傳入comparator的comparator以破壞map特性為代價使對map排序時不需要占用額外空間

val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)

并返回一個 WritablePartitionedIterator對象。WritablePartitionedIterator可以使用 BlockObjectWriter來寫入它的元素。

14】、partitionedIterator

partitionedIterator返回一個對應(yīng)partitionID的iterators

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
    val usingMap = aggregator.isDefined
    val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
    if (spills.isEmpty) {
      //如果spillFile為空則只需要返回緩沖區(qū)排序后的iterator
      if (!ordering.isDefined) {
        groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      } else {
        groupByPartition(destructiveIterator(
          collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
      }
    } else {
      // 如果spillFile不為空則需要將緩沖區(qū)以及spillFile合并排序后返回各個partitions的iterator,這里使用的是對外排序的思想,下文中詳細介紹了merge方法,注意此時傳參spills和緩沖區(qū)的的iterator都是已經(jīng)進行過排序的
      merge(spills, destructiveIterator(
        collection.partitionedDestructiveSortedIterator(comparator)))
    }
  }

15】、merge

將in memory(map)的數(shù)據(jù)以及spillFiles中的數(shù)據(jù)按照partitionID讀取到內(nèi)存中合并并按照comparator重新排序(堆外排序),返回一個對應(yīng)partitionID的iterators
傳入?yún)?shù)

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      //將spills以及緩沖區(qū)取出當前partition的數(shù)據(jù)合并后按需排序?qū)懭胛募?      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        // Perform partial aggregation across partitions
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

16】、mergeWithAggregation

將傳入的iterator根據(jù)是否需要排序,返回一個對應(yīng)partitionID的iterator。如果iterator不需要排序,則在next時進行combine需要多做一些工作,而如果iterator進行過排序,則在直接combine時直接combine下一個key值相同的value即可

17】、mergeSort

將傳入的iterators按照comparator排序的具體實現(xiàn)

19】、上文提到過spills文件是已經(jīng)排過序的了,那他們是在何時進行排序的呢?
在maybespill方法中有這么一段代碼:

if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }

下面我們來詳細看一下spill方法:

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    spills += spillFile
  }

collection.destructiveSortedWritablePartitionedIterator(comparator)將緩沖區(qū)數(shù)據(jù)進行排序,上文中出現(xiàn)過,將數(shù)據(jù)寫入最終文件時也是使用destructiveSortedWritablePartitionedIterator,該方法返回一個可寫的iterator,該方法其實是調(diào)用了partitionedDestructiveSortedIterator(keyComparator)進行排序的,上下文出現(xiàn)的需要對map進行拍的的地方其實都是在調(diào)用partitionedDestructiveSortedIterator方法
spillMemoryIteratorToDisk方法則是將緩沖區(qū)數(shù)據(jù)寫入磁盤上的臨時文件上

------------------------------------------------------------------------------------------------------------------------------------------------

shuffleMemoryManager 是被Executor 所有正在運行的Task(Core) 共享的,能夠分配出去的內(nèi)存是:ExecutorHeapMemeory * 0.2 * 0.8

上面的數(shù)字可通過下面兩個配置來更改:

spark.shuffle.memoryFraction=0.2

spark.shuffle.safetyFraction=0.8

PartitionedAppendOnlyMap 放不下,要落地先寫入buffer,然后把buffer再一次性寫入磁盤文件。這個buffer是由參數(shù)fileBufferSize決定,通過下面配置來更改:

spark.shuffle.file.buffer=32k

數(shù)據(jù)獲取的過程中,序列化反序列化,也是需要空間的,所以Spark 對數(shù)量做了限制,通過如下參數(shù)serializerBatchSize決定,通過下面配置來更改:

spark.shuffle.spill.batchSize=10000

假設(shè)一個Executor的可使用的Core為 C個,那么對應(yīng)需要的內(nèi)存消耗為:

C * 32k + C * 10000個Record + C * PartitionedAppendOnlyMap

這么看來

,寫文件的buffer不是問題,而序列化的batchSize也不是問題,幾萬或者十幾萬個Record 而已。那C * PartitionedAppendOnlyMap 到底會有多大呢?我先給個結(jié)論:C * PartitionedAppendOnlyMap <shuffleManager可分配的內(nèi)存空間

PartitionedAppendOnlyMap 通過map.estimateSize()獲取占用內(nèi)存大小,而map.estimateSize()是近似估計,所以會出現(xiàn)oom的情況

如果你內(nèi)存開的比較大,其實反倒風險更高,因為estimateSize 并不是每次都去真實的算緩存。它是通過采樣來完成的,而采樣的周期不是固定的,而是指數(shù)增長的,比如第一次采樣完后,PartitionedAppendOnlyMap 要經(jīng)過1.1次的update/insert操作之后才進行第二次采樣,然后經(jīng)過1.1*.1.1次之后進行第三次采樣,以此遞推,假設(shè)你內(nèi)存開的大,那PartitionedAppendOnlyMap可能要經(jīng)過幾十萬次更新之后之后才會進行一次采樣,然后才能計算出新的大小,這個時候幾十萬次更新帶來的新的內(nèi)存壓力,可能已經(jīng)讓你的GC不堪重負了。


ExternalSorter的關(guān)鍵調(diào)用

注:下文中的buffer/map都是指存放inmemery obj的buffer

  1. insertAll【將數(shù)據(jù)放入buffer,注意此處并不占用storage內(nèi)存】
  2. ---changeValue(map的方法)
  3. ---maybeSpillCollection
  4. ---maybeSpill
  5. ---insert(buffer的方法)
  6. ---maybeSpillCollection
  7. ---maybeSpill
  8. writePartitionedFile【將buffer以及spillfile中的文件合并為一個文件】
  9. ---destructiveSortedWritablePartitionedIterator(map的方法)
  10. ---partitionedDestructiveSortedIterator(map的方法)
  11. ---destructiveSortedIterator(map的方法)
  12. ---WritablePartitionedIterator (map的方法)
  13. ---partitionedIterator
  14. ---groupByPartition
  15. ---destructiveSortedIterator
  16. ---groupByPartition
  17. ---destructiveSortedIterator
  18. ---merge
  19. ---destructiveSortedIterator
  20. ---mergeWithAggregation
  21. ------mergeSort
  22. ---mergeSort

參考文章:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容