- ShuffleMapTask的runTask()方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
首先得到shuffleManager,shuffleManager分為三種SortShuffleManager,HashshuffleManager,UnsafeShuffleManager。這里我們focus on UnsafeShuffleManager。得到shuffleManager后,再拿到UnsafeShuffleWriter。在調用UnsafeShuffleWriter的write()方法將數(shù)據(jù)寫入shuffle文件。
- UnsafeShuffleWriter的write()方法
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
}
}
}
write()方法調用insertRecordIntoSorter()方法。
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
sorter.insertRecord(
serBuffer.getBuf(), PlatformDependent.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
先將數(shù)據(jù)序列化,insertRecord()方法將其插入到UnsafeShuffleExternalSorter中。
- UnsafeShuffleExternalSorter的insertRecord()方法
public void insertRecord(
Object recordBaseObject,
long recordBaseOffset,
int lengthInBytes,
int partitionId) throws IOException {
// Need 4 bytes to store the record length.
final int totalSpaceRequired = lengthInBytes + 4;
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);
}
final long recordAddress =
memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object dataPageBaseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
currentPagePosition += 4;
freeSpaceInCurrentPage -= 4;
PlatformDependent.copyMemory(
recordBaseObject,
recordBaseOffset,
dataPageBaseObject,
currentPagePosition,
lengthInBytes);
currentPagePosition += lengthInBytes;
freeSpaceInCurrentPage -= lengthInBytes;
sorter.insertRecord(recordAddress, partitionId);
}
先將數(shù)據(jù)存儲到page中,再在UnsafeShuffleExternalSorter中插入數(shù)據(jù)的內存尋址。在存儲到page時,如果內存達到threshold,會調用allocateSpaceForRecord()分配更多內存,如果內存不夠,則會spill()到磁盤。spill()函數(shù)會調用writeSortedFile()先把數(shù)據(jù)排序在落盤。
- UnsafeShuffleInMemorySorter的insertRecord()方法
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
if (pointerArray.length == Integer.MAX_VALUE) {
throw new IllegalStateException("Sort pointer array has reached maximum size");
} else {
expandPointerArray();
}
}
pointerArray[pointerArrayInsertPosition] =
PackedRecordPointer.packPointer(recordPointer, partitionId);
pointerArrayInsertPosition++;
}
PackedRecordPointerPackedRecordPointer對象用一個64bit的long型變量來記錄數(shù)據(jù)信息:
[24 bit partition number][13 bit memory page number][27 bit offset in page]。
這些信息用來數(shù)據(jù)排序。
- UnsafeShuffleWriter的closeAndWriteOutput()方法
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
}
}
}
void closeAndWriteOutput() throws IOException {
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
try {
partitionLengths = mergeSpills(spills);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
closeAndWriteOutput()方法調用mergeSpills()方法將spilled的文件合并成一個文件,調用writeIndexFile()落盤數(shù)據(jù)索引文件。SpillInfo保存spilled文件的信息,最主要的是每個分區(qū)數(shù)據(jù)在文件中的起始位置和終止位置,這樣信息助于merge。