在簡書的第1篇文章,寫得不好多多包涵哦
partition的作用是把環(huán)形緩沖區(qū)中的map輸出分區(qū)存儲,以便分配給不同的reducer。
把內(nèi)部的實現(xiàn)寫下來,作為一個學(xué)習(xí)筆記
- 在map函數(shù),調(diào)用context.write()時,會去調(diào)用分區(qū)函數(shù),得到分區(qū)號,把分區(qū)號一塊寫進(jìn)keyvalue的元數(shù)據(jù)。
- 當(dāng)環(huán)形緩沖區(qū)達(dá)到溢寫磁盤時
- a) 對每個分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序
- b) 把每個分區(qū)內(nèi)的數(shù)據(jù)寫到磁盤
下面通過代碼來說明
1
context.write(K,V) -> MapTask.NewOutputCollector.write(K, V) -> MapOutputBuffer.collect(K, V, partion)
void MapTask.NewOutputCollector.write(K key, V value) {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions)); // 調(diào)用分區(qū)函數(shù)
}
MapOutputBuffer.collect(K, V, partion) {
...
kvmeta.put(kvindex + PARTITION, partition); // 把分區(qū)號一塊寫進(jìn)keyvalue元數(shù)據(jù)
...
}
2-a)
MapTask.MapOutputBuffer.flush()->MapTask.MapOutputBuffer.sortAndSpill()->IndexedSortable.compare(final int mi, final int mj)
void MapTask.MapOutputBuffer.sortAndSpill() {
...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); // 對數(shù)據(jù)進(jìn)行排序,默認(rèn)采用快速排序。調(diào)用了下面的compare()方法
...
}
// 比較 mi和mj所對應(yīng)的兩個key,這個方法先比較分區(qū)號,如果分區(qū)號相同,才有必要比較key,實現(xiàn)了按各個分區(qū)內(nèi)的key進(jìn)行排序
public int MapTask.MapOutputBuffer.compare(final int mi, final int mj) {
final int kvi = offsetFor(mi % maxRec);
final int kvj = offsetFor(mj % maxRec);
final int kvip = kvmeta.get(kvi + PARTITION); // 從keyvalue元數(shù)據(jù)取出mi的分區(qū)號
final int kvjp = kvmeta.get(kvj + PARTITION); // 從keyvalue元數(shù)據(jù)取出mj的分區(qū)號
// sort by partition
if (kvip != kvjp) { // 如果分區(qū)號不相同,直接比較分區(qū)號:分區(qū)號的大小決定了寫磁盤時的先后順序
return kvip - kvjp;
}
// sort by key // 分區(qū)號相同,再比較key,這個方法調(diào)用RawComparator.compare(buffer, s1, l1, s2, l2);
return comparator.compare(kvbuffer,
kvmeta.get(kvi + KEYSTART), // key1的開始位置
kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), // key1的結(jié)束位置
kvbuffer,
kvmeta.get(kvj + KEYSTART), //key2的開始位置
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); // key2的開始位置
}
2-b)
a和b都是在sortAndSpill()中
void MapTask.MapOutputBuffer.sortAndSpill() {
...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); // 對數(shù)據(jù)進(jìn)行排序,默認(rèn)采用快速排序。調(diào)用了下面的compare()方法
...
// 按分區(qū)號從小到大,一個分區(qū)一個分區(qū)寫進(jìn)磁盤
for (int i = 0; i < partitions; ++i) {
...
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { // 從元數(shù)據(jù)讀出kv分區(qū)號,如果是當(dāng)前正在寫磁盤的分區(qū)號,就把這個kv寫到磁盤
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value); // 把kv寫到磁盤
++spindex;
}
}
...
}
經(jīng)過上面這些步驟,環(huán)形緩沖區(qū)內(nèi)的kv,就按分區(qū)寫到磁盤,并且每個分區(qū)內(nèi)的數(shù)據(jù)是有序的。
當(dāng)然,這并不能保證同一個分區(qū)內(nèi),先后溢寫的數(shù)據(jù)是有序的。后面使用歸并排序?qū)Υ疟P上的分區(qū)數(shù)據(jù)再做一輪排序,這個以后再做分析。