在SortShuffleWriter中調(diào)用ExternalSorter的兩個方法insertAll和writePartitionedFile
1】、blockManager
2】、diskBlockManager
3】、serializerManager
4】、fileBufferSize
spark.shuffle.file.buffer=32k
5】、serializerBatchSize
spark.shuffle.spill.batchSize=10000
6】、map(PartitionedAppendOnlyMap)
private var data = new Array[AnyRef](2 * capacity)
即消耗的并不是Storage的內(nèi)存
7】、buffer(PartitionedPairBuffer)
8】、forceSpillFiles(ArrayBuffer[SpilledFile])
PartitionedAppendOnlyMap 放不下,要落地,那么不能硬生生的寫磁盤,所以需要個buffer,然后把buffer再一次性寫入磁盤文件,buffer的大小由fileBufferSize決定
9】、spills(ArrayBuffer[SpilledFile])
10】、insertAll
insertAll方法將數(shù)據(jù)存儲在緩沖區(qū)
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
val shouldCombine = aggregator.isDefined
//shouldCombine為true則緩沖區(qū)為PartitionedAppendOnlyMap反之為PartitionedPairBuffer
if (shouldCombine) {
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
//緩存區(qū)中對于新放入數(shù)據(jù)的更新策略,如果緩沖區(qū)中沒有key則新建一個Combiner,反之將合并value
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
//map中放入一條數(shù)據(jù),計數(shù)加一,用于判斷是否進行spill(當緩沖區(qū)快滿的時候需要將緩沖區(qū)的數(shù)據(jù)以一個臨時文件的方式存放到磁盤,這個過程就是spill),詳情見下文的maybeSpillCollection以及maybeSpill方法
addElementsRead()
kv = records.next()
// 數(shù)據(jù)通過partitioner獲取到partitionID,按照partitionID,key將數(shù)據(jù)存放在內(nèi)存AppendOnlyMap對象中
map.changeValue((getPartition(kv._1), kv._1), update)
//判斷是否需要進行spill
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
我們看一下上段代碼中中提到的PartitionedAppendOnlyMap.changeValue方法:
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
當被混入的集合的每次update操作以后,需要執(zhí)行SizeTracker的afterUpdate方法,afterUpdate會判斷這是第幾次更新,需要的話就會使用SizeEstimator的estimate方法來估計下集合的大小。由于SizeEstimator的調(diào)用開銷比較大,注釋上說會是數(shù)毫秒,所以不能頻繁調(diào)用。所以SizeTracker會記錄更新的次數(shù),發(fā)生estimate的次數(shù)是指數(shù)級增長的,基數(shù)是1.1,所以調(diào)用estimate時更新的次數(shù)會是1.1, 1.1 * 1.1, 1.1 * 1.1 *1.1, ....
這是指數(shù)的初始增長是很慢的, 1.1的96次方會是1w, 1.1 ^ 144次方是100w,即對于1w次update,它會執(zhí)行96次estimate,對10w次update執(zhí)行120次estimate, 對100w次update執(zhí)行144次estimate,對1000w次update執(zhí)行169次。
11】、maybeSpillCollection
我們看一下上段代碼中中提到的maybeSpillCollection方法,他的作用是檢查一次PartitionedAppendOnlyMap是否需要spill,所以每放一條記錄就會檢查一次PartitionedAppendOnlyMap是否需要spill。
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
從代碼中我們可以看到如果需要spill,緩沖區(qū)會被釋放,重新new一個緩沖區(qū),那么新的緩沖區(qū)的內(nèi)存空間會有多大呢,這個在maybeSpill方法中有詳細的解釋
12】、maybeSpill
如果每放一條記錄就檢查一次PartitionedAppendOnlyMap的內(nèi)存,假設(shè)檢查一次內(nèi)存1ms, 1kw 就不得了的時間了。所以肯定是不行的,所以 estimateSize其實是使用采樣算法來做的。
(1)、放入數(shù)據(jù)每32次且currentMemory (estimatedSize)大于myMemoryThreshold
(2)、滿足1,則向 shuffleMemoryManager 申請新的緩沖區(qū)內(nèi)存,新的內(nèi)存大于要 2 * currentMemory - myMemoryThreshold 的內(nèi)存
(3)、滿足1、2或者內(nèi)存中放入的記錄大于numElementsForceSpillThreshold時可以進行spill
注:
currentMemory 通過map的estimatedSize獲取
myMemoryThreshold可設(shè)置spark.shuffle.spill.initialMemoryThreshold配置,默認5 * 1024 * 1024
shuffleMemoryManager 可分配的內(nèi)存是ExecutorHeapMemeory * 0.2 * 0.8
numElementsForceSpillThreshold通過spark.shuffle.spill.numElementsForceSpillThreshold配置,默認值Long.MaxValue
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
13】、writePartitionedFile
將in memory(map)以及spillFiles真實的寫入文件
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {
// 如果map過程中沒有數(shù)據(jù)落地到磁盤則對緩沖區(qū)的數(shù)據(jù)進行排序即可
val collection = if (aggregator.isDefined) map else buffer
//destructiveSortedWritablePartitionedIterator調(diào)用partitionedDestructiveSortedIterator對map進行排序,如果map階段不需要combiner的操作則無需對數(shù)據(jù)按照key值排序,只需要按照partitionID排序即可
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
//如果有spill文件則需要獲取到各個partition的iterator,逐個讀取記錄寫入磁盤,下文有詳細介紹怎么獲取到各個partition的iterator
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
destructiveSortedWritablePartitionedIterator調(diào)用partitionedDestructiveSortedIterator對map進行排序,首先構(gòu)建comparator,如果傳入key比較器則進行partitionID排序之后進行key排序,反之僅僅按照partitionID排序。接著調(diào)用destructiveSortedIteratordestructiveSortedIterator是真正的排序器,根據(jù)傳入comparator的comparator以破壞map特性為代價使對map排序時不需要占用額外空間
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)
并返回一個 WritablePartitionedIterator對象。WritablePartitionedIterator可以使用 BlockObjectWriter來寫入它的元素。
14】、partitionedIterator
partitionedIterator返回一個對應(yīng)partitionID的iterators
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
//如果spillFile為空則只需要返回緩沖區(qū)排序后的iterator
if (!ordering.isDefined) {
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// 如果spillFile不為空則需要將緩沖區(qū)以及spillFile合并排序后返回各個partitions的iterator,這里使用的是對外排序的思想,下文中詳細介紹了merge方法,注意此時傳參spills和緩沖區(qū)的的iterator都是已經(jīng)進行過排序的
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
15】、merge
將in memory(map)的數(shù)據(jù)以及spillFiles中的數(shù)據(jù)按照partitionID讀取到內(nèi)存中合并并按照comparator重新排序(堆外排序),返回一個對應(yīng)partitionID的iterators
傳入?yún)?shù)
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
//將spills以及緩沖區(qū)取出當前partition的數(shù)據(jù)合并后按需排序?qū)懭胛募? val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// Perform partial aggregation across partitions
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
16】、mergeWithAggregation
將傳入的iterator根據(jù)是否需要排序,返回一個對應(yīng)partitionID的iterator。如果iterator不需要排序,則在next時進行combine需要多做一些工作,而如果iterator進行過排序,則在直接combine時直接combine下一個key值相同的value即可
17】、mergeSort
將傳入的iterators按照comparator排序的具體實現(xiàn)
19】、上文提到過spills文件是已經(jīng)排過序的了,那他們是在何時進行排序的呢?
在maybespill方法中有這么一段代碼:
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
下面我們來詳細看一下spill方法:
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
collection.destructiveSortedWritablePartitionedIterator(comparator)將緩沖區(qū)數(shù)據(jù)進行排序,上文中出現(xiàn)過,將數(shù)據(jù)寫入最終文件時也是使用destructiveSortedWritablePartitionedIterator,該方法返回一個可寫的iterator,該方法其實是調(diào)用了partitionedDestructiveSortedIterator(keyComparator)進行排序的,上下文出現(xiàn)的需要對map進行拍的的地方其實都是在調(diào)用partitionedDestructiveSortedIterator方法
spillMemoryIteratorToDisk方法則是將緩沖區(qū)數(shù)據(jù)寫入磁盤上的臨時文件上
------------------------------------------------------------------------------------------------------------------------------------------------
shuffleMemoryManager 是被Executor 所有正在運行的Task(Core) 共享的,能夠分配出去的內(nèi)存是:ExecutorHeapMemeory * 0.2 * 0.8
上面的數(shù)字可通過下面兩個配置來更改:
spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8
PartitionedAppendOnlyMap 放不下,要落地先寫入buffer,然后把buffer再一次性寫入磁盤文件。這個buffer是由參數(shù)fileBufferSize決定,通過下面配置來更改:
spark.shuffle.file.buffer=32k
數(shù)據(jù)獲取的過程中,序列化反序列化,也是需要空間的,所以Spark 對數(shù)量做了限制,通過如下參數(shù)serializerBatchSize決定,通過下面配置來更改:
spark.shuffle.spill.batchSize=10000
假設(shè)一個Executor的可使用的Core為 C個,那么對應(yīng)需要的內(nèi)存消耗為:
C * 32k + C * 10000個Record + C * PartitionedAppendOnlyMap
這么看來
,寫文件的buffer不是問題,而序列化的batchSize也不是問題,幾萬或者十幾萬個Record 而已。那C * PartitionedAppendOnlyMap 到底會有多大呢?我先給個結(jié)論:C * PartitionedAppendOnlyMap <shuffleManager可分配的內(nèi)存空間
PartitionedAppendOnlyMap 通過map.estimateSize()獲取占用內(nèi)存大小,而map.estimateSize()是近似估計,所以會出現(xiàn)oom的情況
如果你內(nèi)存開的比較大,其實反倒風險更高,因為estimateSize 并不是每次都去真實的算緩存。它是通過采樣來完成的,而采樣的周期不是固定的,而是指數(shù)增長的,比如第一次采樣完后,PartitionedAppendOnlyMap 要經(jīng)過1.1次的update/insert操作之后才進行第二次采樣,然后經(jīng)過1.1*.1.1次之后進行第三次采樣,以此遞推,假設(shè)你內(nèi)存開的大,那PartitionedAppendOnlyMap可能要經(jīng)過幾十萬次更新之后之后才會進行一次采樣,然后才能計算出新的大小,這個時候幾十萬次更新帶來的新的內(nèi)存壓力,可能已經(jīng)讓你的GC不堪重負了。
ExternalSorter的關(guān)鍵調(diào)用
注:下文中的buffer/map都是指存放inmemery obj的buffer
- insertAll【將數(shù)據(jù)放入buffer,注意此處并不占用storage內(nèi)存】
- ---changeValue(map的方法)
- ---maybeSpillCollection
- ---maybeSpill
- ---insert(buffer的方法)
- ---maybeSpillCollection
- ---maybeSpill
- writePartitionedFile【將buffer以及spillfile中的文件合并為一個文件】
- ---destructiveSortedWritablePartitionedIterator(map的方法)
- ---partitionedDestructiveSortedIterator(map的方法)
- ---destructiveSortedIterator(map的方法)
- ---WritablePartitionedIterator (map的方法)
- ---partitionedIterator
- ---groupByPartition
- ---destructiveSortedIterator
- ---groupByPartition
- ---destructiveSortedIterator
- ---merge
- ---destructiveSortedIterator
- ---mergeWithAggregation
- ------mergeSort
- ---mergeSort
參考文章: