shuffle及Spark shuffle歷史簡(jiǎn)介
shuffle,中文意譯“洗牌”,是所有采用map-reduce思想的大數(shù)據(jù)計(jì)算框架的必經(jīng)階段,也是最重要的階段。它處在map與reduce之間,又可以分為兩個(gè)子階段:
- shuffle write:map任務(wù)寫(xiě)上游計(jì)算產(chǎn)生的中間數(shù)據(jù);
- shuffle read:reduce任務(wù)讀map任務(wù)產(chǎn)生的中間數(shù)據(jù),用于下游計(jì)算。
下圖示出在Hadoop MapReduce框架中,shuffle發(fā)生的時(shí)機(jī)和細(xì)節(jié)。

Spark的shuffle機(jī)制雖然也采用MR思想,但Spark是基于RDD進(jìn)行計(jì)算的,實(shí)現(xiàn)方式與Hadoop有差異,并且中途經(jīng)歷了比較大的變動(dòng),簡(jiǎn)述如下:
- 在久遠(yuǎn)的Spark 0.8版本及之前,只有最簡(jiǎn)單的hash shuffle,后來(lái)引入了consolidation機(jī)制;
- 1.1版本新加入sort shuffle機(jī)制,但默認(rèn)仍然使用hash shuffle;
- 1.2版本開(kāi)始默認(rèn)使用sort shuffle;
- 1.4版本引入了tungsten-sort shuffle,是基于普通sort shuffle創(chuàng)新的序列化shuffle方式;
- 1.6版本將tungsten-sort shuffle與sort shuffle合并,由Spark自動(dòng)決定采用哪一種方式;
- 2.0版本之后,hash shuffle機(jī)制被刪除,只保留sort shuffle機(jī)制至今。
下面的代碼分析致力于對(duì)Spark shuffle先有一個(gè)大致的了解。
shuffle機(jī)制的最頂層:ShuffleManager特征
鑒于shuffle的重要性,shuffle機(jī)制的初始化在Spark執(zhí)行環(huán)境初始化時(shí)就會(huì)進(jìn)行。查看SparkEnv.create()方法:
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
// 通過(guò)反射創(chuàng)建ShuffleManager
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
上面的代碼可以印證hash shuffle已經(jīng)成為歷史了。另外,還可以通過(guò)spark.shuffle.manager參數(shù)手動(dòng)指定shuffle機(jī)制,不過(guò)意義不大。
o.a.s.shuffle.ShuffleManager是一個(gè)Scala特征(相當(dāng)于Java接口的增強(qiáng)版)。其中定義的核心方法有3個(gè):
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
- registerShuffle()方法用于注冊(cè)一種shuffle機(jī)制,并返回對(duì)應(yīng)的ShuffleHandle(類(lèi)似于句柄),handle內(nèi)會(huì)存儲(chǔ)shuffle依賴(lài)信息。根據(jù)該handle可以進(jìn)一步確定采用的ShuffleWriter/ShuffleReader的種類(lèi)。
- getWriter()方法用于獲取ShuffleWriter。它是executor執(zhí)行map任務(wù)時(shí)調(diào)用的。
- getReader()方法用于獲取ShuffleReader。它是executor執(zhí)行reduce任務(wù)時(shí)調(diào)用的。
sort shuffle機(jī)制概況:SortShuffleManager類(lèi)
在hash shuffle取消后,o.a.s.shuffle.sort.SortShuffleManager就是ShuffleManager目前唯一的實(shí)現(xiàn)類(lèi)。來(lái)看它對(duì)上面提到的三個(gè)方法的具體實(shí)現(xiàn)。
registerShuffle()方法
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
可以看出,根據(jù)條件的不同,會(huì)返回3種不同的handle,對(duì)應(yīng)3種shuffle機(jī)制。從上到下來(lái)分析一下:
- 檢查是否符合SortShuffleWriter.shouldBypassMergeSort()方法的條件:
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
也就是說(shuō),如果同時(shí)滿(mǎn)足以下兩個(gè)條件:
- 該shuffle依賴(lài)中沒(méi)有map端聚合操作(如groupByKey()算子)
- 分區(qū)數(shù)不大于參數(shù)
spark.shuffle.sort.bypassMergeThreshold規(guī)定的值(默認(rèn)200)
那么會(huì)返回BypassMergeSortShuffleHandle,啟用bypass merge-sort shuffle機(jī)制。
- 如果不啟用上述bypass機(jī)制,那么繼續(xù)檢查是否符合canUseSerializedShuffle()方法的條件:
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(/*...*/)
false
} else if (dependency.aggregator.isDefined) {
log.debug(/*...*/)
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(/*...*/)
false
} else {
log.debug(/*...*/)
true
}
}
}
也就是說(shuō),如果同時(shí)滿(mǎn)足以下三個(gè)條件:
- 使用的序列化器支持序列化對(duì)象的重定位(如KryoSerializer)
- shuffle依賴(lài)中完全沒(méi)有聚合操作
- 分區(qū)數(shù)不大于常量
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分區(qū)ID號(hào)+1,即2^24=16777216)
那么會(huì)返回SerializedShuffleHandle,啟用序列化sort shuffle機(jī)制(也就是tungsten-sort)。
- 如果既不用bypass也不用tungsten-sort,那么就返回默認(rèn)的BaseShuffleHandle,采用基本的sort shuffle機(jī)制。
getWriter()方法
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
根據(jù)不同的handle,獲取不同的ShuffleWriter。對(duì)于tungsten-sort會(huì)使用UnsafeShuffleWriter,bypass會(huì)使用BypassMergeSortShuffleWriter,普通的sort則使用SortShuffleWriter。它們都繼承自ShuffleWriter抽象類(lèi),并且都實(shí)現(xiàn)了write()方法。
getReader()方法
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
ShuffleReader比較簡(jiǎn)單,只有一種,即BlockStoreShuffleReader。它繼承自ShuffleReader特征,并實(shí)現(xiàn)了read()方法。
總結(jié)
