Spark Shuffle(ExternalSorter)

1、Shuffle流程

spark的shuffle過程如下圖所示,和mapreduce中的類似,但在spark2.0及之后的版本中只存在SortShuffleManager而將原來的HashShuffleManager廢棄掉(但是shuffleWriter的子類BypassMergeSortShuffleWriter和已經(jīng)被廢棄掉的HashShuffleWriter類似)。這樣,每個mapTask在shuffle的sort階段只會生成一個結(jié)果文件,單個文件按照partitionId分成多個region。reducer階段根據(jù)partitionId來fetch對應(yīng)的region數(shù)據(jù)。
整個shuffle過程分為兩個階段,write(核心)和read階段,其中write階段比較重要的實現(xiàn)類為ExternalSorter(后面會重點分析該類)。

shuffle

2、Shuffle Write

  • BypassMergeSortShuffleWriter -
    這種方式是對partition(對應(yīng)的reduce)數(shù)量較少且不需要map-side aggregation的shuffle優(yōu)化,將每個partition的數(shù)據(jù)直接寫到對應(yīng)的文件,在所有數(shù)據(jù)都寫入完成后進行一次合并,下面是部分代碼:
[BypassMergeSortShuffleWriter]->write
public void write(Iterator<Product2<K, V>> records) throws IOException {

                                    ...

    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    /**
      為每個partition創(chuàng)建一個DiskWriter用于寫臨時文件
    **/
    for (int i = 0; i < numPartitions; i++) {
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
        blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      partitionWriters[i] =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
                        ...
    /**
      對每個record用對應(yīng)的writer進行文件寫入操作
    **/
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }
    //flush
    for (DiskBlockObjectWriter writer : partitionWriters) {
      writer.commitAndClose();
    }
    /**
        構(gòu)造最終的輸出文件實例,其中文件名為(reduceId為0):
        "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
         文件所在的local文件夾是根據(jù)該文件名的hash值確定。
        1、如果運行在yarn上,yarn在啟動的時候會根據(jù)配置項'LOCAL_DIRS'在本地創(chuàng)建
        文件夾
    **/
    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    //在實際結(jié)果文件名后加上uuid用于標(biāo)識文件正在寫入,結(jié)束后重命名
    File tmp = Utils.tempFileWith(output);
    try {
      //合并每個partition對應(yīng)的文件到一個文件中
      partitionLengths = writePartitionedFile(tmp);
      //將每個partition的offset寫入index文件方便reduce端fetch數(shù)據(jù)
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
  }
  • UnsafeShuffleWriter(詳見project tungsten)

該writer可將數(shù)據(jù)序列化后寫入到堆外內(nèi)存,只需要按照partitionid對地址進行排序,整個過程不涉及反序列化。
條件
1、使用的序列化類需要支持object relocation.目前只能使用kryoSerializer
2、不需要map side aggregate即不能定義aggregator
3、partition數(shù)量不能大于支持的上限(2^24)
內(nèi)存模型:
每條數(shù)據(jù)地址由一個64位的指針確定,其構(gòu)成為:[24 bit partition number][13 bit memory page number][27 bit offset in page]
在內(nèi)存為非8字節(jié)對齊的情況下,每個page的容量為227bits=128Mb,page總數(shù)為213,因此每個task可操作內(nèi)存總量為:227*213bits=1Tb,在內(nèi)存按字節(jié)對齊的情況下允許每個page的size有1g(即128*8,實際64位系統(tǒng)的內(nèi)存都是8字節(jié)對齊的)的容量,數(shù)據(jù)存放在off heap上。在地址中加入partitionID 是為了排序階段只需要對record的地址排序。

數(shù)據(jù)存儲格式:

4、Shuffle過程中涉及到的幾個參數(shù)

  • spark.shuffle.sort.bypassMergeThreshold
    當(dāng)partition的數(shù)量小于該值并且不需要進行map-side aggregation時使用BypassMergeSortShuffleWriter來進行shuffle的write操作,默認(rèn)值為200.
    [SortShuffleWriter]->shouldBypassMergeSort
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
}```
- *spark.shuffle.compress*、*spark.shuffle.file.buffer*
**[DiskBlockObjectWriter]->open**

def open(): DiskBlockObjectWriter = {
...
/**
'spark.shuffle.compress'-該參數(shù)決定是否對寫入文件的序列化數(shù)據(jù)進行壓縮。
'spark.shuffle.file.buffer'-設(shè)置buffer stream的buffersize,每writey
一個byte時會檢查當(dāng)前buffer容量,容量滿的時候則會flush到磁盤。該參數(shù)值在代碼中
會乘以1024轉(zhuǎn)換為字節(jié)長度。默認(rèn)值為'32k',該值太大可能導(dǎo)致內(nèi)存溢出。
**/
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
...
}```

  • spark.file.transferTo
    決定在使用BypassMergeWriter過程中,最后對文件進行合并時是否使用NIO方式進行file stream的copy。默認(rèn)為true,在為false的情況下合并文件效率比較低(創(chuàng)建一個大小為8192的字節(jié)數(shù)組作為buffer,從in stream中讀滿后寫入out stream,單線程讀寫),版本號為2.6.32的linux內(nèi)核在使用NIO方式會產(chǎn)生bug,需要將該參數(shù)設(shè)置為false。

  • spark.shuffle.spill.numElementsForceSpillThreshold
    在使用UnsafeShuffleWriter時,如果內(nèi)存中的數(shù)據(jù)超過這個值則對當(dāng)前內(nèi)存數(shù)據(jù)進行排序并寫入磁盤臨時文件。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容