Spark shuffle機(jī)制概述

shuffle及Spark shuffle歷史簡(jiǎn)介

shuffle,中文意譯“洗牌”,是所有采用map-reduce思想的大數(shù)據(jù)計(jì)算框架的必經(jīng)階段,也是最重要的階段。它處在map與reduce之間,又可以分為兩個(gè)子階段:

  • shuffle write:map任務(wù)寫(xiě)上游計(jì)算產(chǎn)生的中間數(shù)據(jù);
  • shuffle read:reduce任務(wù)讀map任務(wù)產(chǎn)生的中間數(shù)據(jù),用于下游計(jì)算。

下圖示出在Hadoop MapReduce框架中,shuffle發(fā)生的時(shí)機(jī)和細(xì)節(jié)。

Hadoop MR過(guò)程

Spark的shuffle機(jī)制雖然也采用MR思想,但Spark是基于RDD進(jìn)行計(jì)算的,實(shí)現(xiàn)方式與Hadoop有差異,并且中途經(jīng)歷了比較大的變動(dòng),簡(jiǎn)述如下:

  • 在久遠(yuǎn)的Spark 0.8版本及之前,只有最簡(jiǎn)單的hash shuffle,后來(lái)引入了consolidation機(jī)制;
  • 1.1版本新加入sort shuffle機(jī)制,但默認(rèn)仍然使用hash shuffle;
  • 1.2版本開(kāi)始默認(rèn)使用sort shuffle;
  • 1.4版本引入了tungsten-sort shuffle,是基于普通sort shuffle創(chuàng)新的序列化shuffle方式;
  • 1.6版本將tungsten-sort shuffle與sort shuffle合并,由Spark自動(dòng)決定采用哪一種方式;
  • 2.0版本之后,hash shuffle機(jī)制被刪除,只保留sort shuffle機(jī)制至今。

下面的代碼分析致力于對(duì)Spark shuffle先有一個(gè)大致的了解。

shuffle機(jī)制的最頂層:ShuffleManager特征

鑒于shuffle的重要性,shuffle機(jī)制的初始化在Spark執(zhí)行環(huán)境初始化時(shí)就會(huì)進(jìn)行。查看SparkEnv.create()方法:

  val shortShuffleMgrNames = Map(
    "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
    "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
  val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
  val shuffleMgrClass =
    shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
  // 通過(guò)反射創(chuàng)建ShuffleManager
  val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

上面的代碼可以印證hash shuffle已經(jīng)成為歷史了。另外,還可以通過(guò)spark.shuffle.manager參數(shù)手動(dòng)指定shuffle機(jī)制,不過(guò)意義不大。

o.a.s.shuffle.ShuffleManager是一個(gè)Scala特征(相當(dāng)于Java接口的增強(qiáng)版)。其中定義的核心方法有3個(gè):

  /**
   * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** Get a writer for a given partition. Called on executors by map tasks. */
  def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

  /**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]
  • registerShuffle()方法用于注冊(cè)一種shuffle機(jī)制,并返回對(duì)應(yīng)的ShuffleHandle(類(lèi)似于句柄),handle內(nèi)會(huì)存儲(chǔ)shuffle依賴(lài)信息。根據(jù)該handle可以進(jìn)一步確定采用的ShuffleWriter/ShuffleReader的種類(lèi)。
  • getWriter()方法用于獲取ShuffleWriter。它是executor執(zhí)行map任務(wù)時(shí)調(diào)用的。
  • getReader()方法用于獲取ShuffleReader。它是executor執(zhí)行reduce任務(wù)時(shí)調(diào)用的。

sort shuffle機(jī)制概況:SortShuffleManager類(lèi)

在hash shuffle取消后,o.a.s.shuffle.sort.SortShuffleManager就是ShuffleManager目前唯一的實(shí)現(xiàn)類(lèi)。來(lái)看它對(duì)上面提到的三個(gè)方法的具體實(shí)現(xiàn)。

registerShuffle()方法

  override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

可以看出,根據(jù)條件的不同,會(huì)返回3種不同的handle,對(duì)應(yīng)3種shuffle機(jī)制。從上到下來(lái)分析一下:

  1. 檢查是否符合SortShuffleWriter.shouldBypassMergeSort()方法的條件:
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
  }

也就是說(shuō),如果同時(shí)滿(mǎn)足以下兩個(gè)條件:

  • 該shuffle依賴(lài)中沒(méi)有map端聚合操作(如groupByKey()算子)
  • 分區(qū)數(shù)不大于參數(shù)spark.shuffle.sort.bypassMergeThreshold規(guī)定的值(默認(rèn)200)

那么會(huì)返回BypassMergeSortShuffleHandle,啟用bypass merge-sort shuffle機(jī)制。

  1. 如果不啟用上述bypass機(jī)制,那么繼續(xù)檢查是否符合canUseSerializedShuffle()方法的條件:
  def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(/*...*/)
      false
    } else if (dependency.aggregator.isDefined) {
      log.debug(/*...*/)
      false
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(/*...*/)
      false
    } else {
      log.debug(/*...*/)
      true
    }
  }
}

也就是說(shuō),如果同時(shí)滿(mǎn)足以下三個(gè)條件:

  • 使用的序列化器支持序列化對(duì)象的重定位(如KryoSerializer)
  • shuffle依賴(lài)中完全沒(méi)有聚合操作
  • 分區(qū)數(shù)不大于常量MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分區(qū)ID號(hào)+1,即2^24=16777216)

那么會(huì)返回SerializedShuffleHandle,啟用序列化sort shuffle機(jī)制(也就是tungsten-sort)。

  1. 如果既不用bypass也不用tungsten-sort,那么就返回默認(rèn)的BaseShuffleHandle,采用基本的sort shuffle機(jī)制。

getWriter()方法

  override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

根據(jù)不同的handle,獲取不同的ShuffleWriter。對(duì)于tungsten-sort會(huì)使用UnsafeShuffleWriter,bypass會(huì)使用BypassMergeSortShuffleWriter,普通的sort則使用SortShuffleWriter。它們都繼承自ShuffleWriter抽象類(lèi),并且都實(shí)現(xiàn)了write()方法。

getReader()方法

  override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }

ShuffleReader比較簡(jiǎn)單,只有一種,即BlockStoreShuffleReader。它繼承自ShuffleReader特征,并實(shí)現(xiàn)了read()方法。

總結(jié)

以ShuffleManager展開(kāi)的簡(jiǎn)單UML類(lèi)圖
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容