MapReduce簡介
在Hadoop MapReduce中,框架會確保reduce收到的輸入數(shù)據(jù)是根據(jù)key排序過的。數(shù)據(jù)從Mapper輸出到Reducer接收,是一個很復雜的過程,框架處理了所有問題,并提供了很多配置項及擴展點。一個MapReduce的大致數(shù)據(jù)流如下圖:

更詳細的MapReduce介紹參考Hadoop MapReduce技術(shù)內(nèi)幕
Mapper的輸出排序、然后傳送到Reducer的過程,稱為shuffle。
深入理解這個過程對于MapReduce調(diào)優(yōu)至關(guān)重要,某種程度上說,shuffle過程是MapReduce的核心內(nèi)容。
Map端:
當map函數(shù)通過`context.write()`開始輸出數(shù)據(jù)時,不是單純地將數(shù)據(jù)寫入到磁盤。為了性能,map輸出的數(shù)據(jù)會寫入到緩沖區(qū),并進行預排序的一些工作,整個過程如下圖:

環(huán)形Buffer數(shù)據(jù)結(jié)構(gòu)
每一個map任務(wù)有一個環(huán)形Buffer,map將輸出寫入到這個Buffer。環(huán)形Buffer是內(nèi)存中的一種首尾相連的[數(shù)據(jù)結(jié)構(gòu)](http://lib.csdn.net/base/datastructure "算法與數(shù)據(jù)結(jié)構(gòu)知識庫"),專門用來存儲Key-Value格式的數(shù)據(jù):

Hadoop中,環(huán)形緩沖其實就是一個字節(jié)數(shù)組:
// MapTask.java
private byte[] kvbuffer;
// main output buffer
kvbuffer = new byte[maxMemUsage - recordCapacity];
kvbuffer包含數(shù)據(jù)區(qū)和索引區(qū),這兩個區(qū)是相鄰不重疊的區(qū)域,用一個分界點來標識。分界點不是永恒不變的,每次Spill之后都會更新一次。初始分界點為0,數(shù)據(jù)存儲方向為向上增長,索引存儲方向向下:

bufferindex一直往上增長,例如最初為0,寫入一個int類型的key之后變?yōu)?,寫入一個int類型的value之后變成8。
索引是對key-value在kvbuffer中的索引,是個四元組,占用四個Int長度,包括:
- value的起始位置
- key的起始位置
- partition值
- value的長度
private static final int VALSTART = 0;
// val offset in acct
private static final int KEYSTART = 1;
// key offset in acct
private static final int PARTITION = 2;
// partition offset in acct
private static final int VALLEN = 3;
// length of value
private static final int NMETA = 4;
// num meta ints
private static final int METASIZE = NMETA * 4;
// size in bytes // write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
kvmeta的存放指針kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數(shù)據(jù)。比如kvindex初始位置是-4,當?shù)谝粋€key-value寫完之后,(kvindex+0)的位置存放value的起始位置、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值、(kvindex+3)的位置存放value的長度,然后kvindex跳到-8位置。
緩沖區(qū)的大小默認為100M,但是可以通過mapreduce.task.io.sort.mb這個屬性來配置。
Spill溢寫過程:
map將輸出不斷寫入到這個緩沖區(qū)中,當緩沖區(qū)使用量達到一定比例之后,一個后臺線程開始把緩沖區(qū)的數(shù)據(jù)寫入磁盤,這個寫入的過程叫spill。開始spill的Buffer比例默認為0.80,可以通過mapreduce.map.sort.spill.percent配置。在后臺線程寫入的同時,map繼續(xù)將輸出寫入這個環(huán)形緩沖,如果緩沖池寫滿了,map會阻塞直到spill過程完成,而不會覆蓋緩沖池中的已有的數(shù)據(jù)。
在寫入之前,后臺線程把數(shù)據(jù)按照他們將送往的reducer進行劃分,通過調(diào)用Partitioner的getPartition()方法就能知道該輸出要送往哪個Reducer。默認的Partitioner使用Hash算法來分區(qū),即通過key.hashCode() mode R來計算,R為Reducer的個數(shù)。getPartition返回Partition事實上是個整數(shù),例如有10個Reducer,則返回0-9的整數(shù),每個Reducer會對應(yīng)到一個Partition。map輸出的鍵值對,與partition一起存在緩沖中(即前面提到的kvmeta中)。假設(shè)作業(yè)有2個reduce任務(wù),則數(shù)據(jù)在內(nèi)存中被劃分為reduce1和reduce2:

并且針對每部分數(shù)據(jù),使用快速排序算法(QuickSort)對key排序。
如果設(shè)置了Combiner,則在排序的結(jié)果上運行combine。
排序后的數(shù)據(jù)被寫入到mapreduce.cluster.local.dir配置的目錄中的其中一個,使用round robin fashion的方式輪流。注意寫入的是本地文件目錄,而不是HDFS。Spill文件名像sipll0.out,spill1.out等。
不同Partition的數(shù)據(jù)都放在同一個文件,通過索引來區(qū)分partition的邊界和起始位置。索引是一個三元組結(jié)構(gòu),包括起始位置、數(shù)據(jù)長度、壓縮后的數(shù)據(jù)長度,對應(yīng)IndexRecord類:
public class IndexRecord {
public long startOffset; public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}
每個mapper也有對應(yīng)的一個索引環(huán)形Buffer,默認為1KB,可以通過mapreduce.task.index.cache.limit.bytes來配置,索引如果足夠小則存在內(nèi)存中,如果內(nèi)存放不下,需要寫入磁盤。
Spill文件索引名稱類似這樣 spill110.out.index, spill111.out.index。
Spill文件的索引事實上是 org.apache.hadoop.mapred.SpillRecord的一個數(shù)組,每個Map任務(wù)(源碼中的MapTask.Java類)維護一個這樣的列表:
final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();
創(chuàng)建一個SpillRecord時,會分配(Number_Of_Reducers * 24)Bytes緩沖:
public SpillRecord(int numPartitions) {
buf = ByteBuffer.allocate( numPartitions*MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
entries = buf.asLongBuffer();
}
numPartitions是Partition的個數(shù),其實也就是Reducer的個數(shù):
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; partitions = jobContext.getNumReduceTasks();final SpillRecord spillRec = new SpillRecord(partitions);
默認的索引緩沖為1KB,即10241024 Bytes,假設(shè)有2個Reducer,則每個Spill文件的索引大小為224=48 Bytes,當Spill文件超過21845.3時,索引文件就需要寫入磁盤。
索引及spill文件如下圖示意:

Spill的過程至少需要運行一次,因為Mapper的輸出結(jié)果必須要寫入磁盤,供Reducer進一步處理。
合并Spill文件
在整個map任務(wù)中,一旦緩沖達到設(shè)定的閾值,就會觸發(fā)spill操作,寫入spill文件到磁盤,因此最后可能有多個spill文件。在map任務(wù)結(jié)束之前,這些文件會根據(jù)情況合并到一個大的分區(qū)的、排序的文件中,排序是在內(nèi)存排序的基礎(chǔ)上進行全局排序。下圖是合并過程的簡單示意:

相對應(yīng)的索引文件也會被合并,以便在Reducer請求對應(yīng)Partition的數(shù)據(jù)的時候能夠快速讀取。
另外,如果spill文件數(shù)量大于mapreduce.map.combiner.minspills配置的數(shù),則在合并文件寫入之前,會再次運行combiner。如果spill文件數(shù)量太少,運行combiner的收益可能小于調(diào)用的代價。
mapreduce.task.io.sort.factor屬性配置每次最多合并多少個文件,默認為10,即一次最多合并10個spill文件。最后,多輪合并之后,所有的輸出文件被合并為唯一一個大文件,以及相應(yīng)的索引文件(可能只在內(nèi)存中存在)。
壓縮:
在數(shù)據(jù)量大的時候,對map輸出進行壓縮通常是個好主意。要啟用壓縮,將mapreduce.map.output.compress設(shè)為true,并使用mapreduce.map.output.compress.codec設(shè)置使用的壓縮算法。
通過HTTP拷貝Map輸出結(jié)果到Reduce中:
map輸出數(shù)據(jù)完成之后,通過運行一個HTTP Server暴露出來,供reduce端獲取。用來相應(yīng)reduce數(shù)據(jù)請求的線程數(shù)量可以配置,默認情況下為機器內(nèi)核數(shù)量的兩倍,如需自己配置,通過mapreduce.shuffle.max.threads屬性來配置,注意該配置是針對NodeManager配置的,而不是每個作業(yè)配置。
同時,Map任務(wù)完成后,也會通知Application Master,以便Reducer能夠及時來拉取數(shù)據(jù)。
通過緩沖、劃分(partition)、排序、combiner、合并、壓縮等過程之后,map端的工作就算完畢:

Reduce端:
各個map任務(wù)運行完之后,輸出寫入運行任務(wù)的機器磁盤中。Reducer需要從各map任務(wù)中提取自己的那一部分數(shù)據(jù)(對應(yīng)的partition)。每個map任務(wù)的完成時間可能是不一樣的,reduce任務(wù)在map任務(wù)結(jié)束之后會盡快取走輸出結(jié)果,這個階段叫copy。
Reducer是如何知道要去哪些機器去數(shù)據(jù)呢?一旦map任務(wù)完成之后,就會通過常規(guī)心跳通知應(yīng)用程序的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有數(shù)據(jù)(如何知道提取完?)。
數(shù)據(jù)被reduce提走之后,map機器不會立刻刪除數(shù)據(jù),這是為了預防reduce任務(wù)失敗需要重做。因此map輸出數(shù)據(jù)是在整個作業(yè)完成之后才被刪除掉的。
reduce維護幾個copier線程,并行地從map任務(wù)機器提取數(shù)據(jù)。默認情況下有5個copy線程,可以通過mapreduce.reduce.shuffle.parallelcopies配置。
如果map輸出的數(shù)據(jù)足夠小,則會被拷貝到reduce任務(wù)的JVM內(nèi)存中。mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆內(nèi)存的多少比例可以用于存放map任務(wù)的輸出結(jié)果。如果數(shù)據(jù)太大容不下,則被拷貝到reduce的機器磁盤上。
Reduce中的數(shù)據(jù)Merge:
內(nèi)存中合并:
當緩沖中數(shù)據(jù)達到配置的閾值時,這些數(shù)據(jù)在內(nèi)存中被合并、寫入機器磁盤。閾值有2種配置方式:
- 配置內(nèi)存比例: 前面提到reduce JVM堆內(nèi)存的一部分用于存放來自map任務(wù)的輸入,在這基礎(chǔ)之上配置一個開始合并數(shù)據(jù)的比例。假設(shè)用于存放map輸出的內(nèi)存為500M,
mapreduce.reduce.shuffle.merger.percent配置為0.80,則當內(nèi)存中的數(shù)據(jù)達到400M的時候,會觸發(fā)合并寫入。 - 配置map輸出數(shù)量: 通過
mapreduce.reduce.merge.inmem.threshold配置。
在合并的過程中,會對被合并的文件做全局的排序。如果作業(yè)配置了Combiner,則會運行combine函數(shù),減少寫入磁盤的數(shù)據(jù)量。
Copy過程中磁盤合并:
在copy過來的數(shù)據(jù)不斷寫入磁盤的過程中,一個后臺線程會把這些文件合并為更大的、有序的文件。如果map的輸出結(jié)果進行了壓縮,則在合并過程中,需要在內(nèi)存中解壓后才能給進行合并。這里的合并只是為了減少最終合并的工作量,也就是在map輸出還在拷貝時,就開始進行一部分合并工作。合并的過程一樣會進行全局排序。
最終磁盤中合并:
當所有map輸出都拷貝完畢之后,所有數(shù)據(jù)被最后合并成一個排序的文件,作為reduce任務(wù)的輸入。這個合并過程是一輪一輪進行的,最后一輪的合并結(jié)果直接推送給reduce作為輸入,節(jié)省了磁盤操作的一個來回。最后(所以map輸出都拷貝到reduce之后)進行合并的map輸出可能來自合并后寫入磁盤的文件,也可能來及內(nèi)存緩沖,在最后寫入內(nèi)存的map輸出可能沒有達到閾值觸發(fā)合并,所以還留在內(nèi)存中。
每一輪合并并不一定合并平均數(shù)量的文件數(shù),指導原則是使用整個合并過程中寫入磁盤的數(shù)據(jù)量最小,為了達到這個目的,則需要最終的一輪合并中合并盡可能多的數(shù)據(jù),因為最后一輪的數(shù)據(jù)直接作為reduce的輸入,無需寫入磁盤再讀出。因此我們讓最終的一輪合并的文件數(shù)達到最大,即合并因子的值,通過mapreduce.task.io.sort.factor來配置。
假設(shè)現(xiàn)在有50個map輸出文件,合并因子配置為10,則需要5輪的合并。最終的一輪確保合并10個文件,其中包括4個來自前4輪的合并結(jié)果,因此原始的50個中,再留出6個給最終一輪。所以最后的5輪合并可能情況如下:

前4輪合并后的數(shù)據(jù)都是寫入到磁盤中的,注意到最后的2格顏色不一樣,是為了標明這些數(shù)據(jù)可能直接來自于內(nèi)存
MemToMem合并:
除了內(nèi)存中合并和磁盤中合并外,Hadoop還定義了一種MemToMem合并,這種合并將內(nèi)存中的map輸出合并,然后再寫入內(nèi)存。這種合并默認關(guān)閉,可以通過reduce.merge.memtomem.enabled打開,當map輸出文件達到reduce.merge.memtomem.threshold時,觸發(fā)這種合并。
最后一次合并后傳遞給reduce方法:
合并后的文件作為輸入傳遞給Reducer,Reducer針對每個key及其排序的數(shù)據(jù)調(diào)用reduce函數(shù)。產(chǎn)生的reduce輸出一般寫入到HDFS,reduce輸出的文件第一個副本寫入到當前運行reduce的機器,其他副本選址原則按照常規(guī)的HDFS數(shù)據(jù)寫入原則來進行。
性能調(diào)優(yōu):
如果能夠根據(jù)情況對shuffle過程進行調(diào)優(yōu),對于提供MapReduce性能很有幫助。相關(guān)的參數(shù)配置列在后面的表格中。
一個通用的原則是給shuffle過程分配盡可能大的內(nèi)存,當然你需要確保map和reduce有足夠的內(nèi)存來運行業(yè)務(wù)邏輯。因此在實現(xiàn)Mapper和Reducer時,應(yīng)該盡量減少內(nèi)存的使用,例如避免在Map中不斷地疊加。
運行map和reduce任務(wù)的JVM,內(nèi)存通過mapred.child.java.opts屬性來設(shè)置,盡可能設(shè)大內(nèi)存。容器的內(nèi)存大小通過mapreduce.map.memory.mb和mapreduce.reduce.memory.mb來設(shè)置,默認都是1024M。
map優(yōu)化:
在map端,避免寫入多個spill文件可能達到最好的性能,一個spill文件是最好的。通過估計map的輸出大小,設(shè)置合理的mapreduce.task.io.sort.*屬性,使得spill文件數(shù)量最小。例如盡可能調(diào)大mapreduce.task.io.sort.mb。
map端相關(guān)的屬性如下表:
| 屬性名 | 值類型 | 默認值 | 說明 |
|---|---|---|---|
| mapreduce.task.io.sort.mb | int | 100 | 用于map輸出排序的內(nèi)存大小 |
| mapreduce.map.sort.spill.percent | float | 0.80 | 開始spill的緩沖池閾值 |
| mapreduce.task.io.sort.factor | int | 10 | 合并文件數(shù)最大值,與reduce共用 |
| mapreduce.map.combine.minspills | int | 3 | 運行combiner的最低spill文件數(shù) |
| mapreduce.map.out.compress | boolean | false | 輸出是否壓縮 |
| mapreduce.map.out.compress | 類名 | DefaultCodec | 壓縮算法 |
| mapreduce.shuffle.max.threads | int | 0 | 服務(wù)于reduce提取結(jié)果的線程數(shù)量 |
Reduce優(yōu)化:
在reduce端,如果能夠讓所有數(shù)據(jù)都保存在內(nèi)存中,可以達到最佳的性能。通常情況下,內(nèi)存都保留給reduce函數(shù),但是如果reduce函數(shù)對內(nèi)存需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發(fā)合并的map輸出文件數(shù))設(shè)為0,mapreduce.reduce.input.buffer.percent(用于保存map輸出文件的堆內(nèi)存比例)設(shè)為1.0,可以達到很好的性能提升。
| 屬性名 | 值類型 | 默認值 | 說明 |
|---|---|---|---|
| mapreduce.reduce.shuffle.parallelcopies | int | 5 | 提取map輸出的copier線程數(shù) |
| mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | 提取map輸出最大嘗試次數(shù),超出后報錯 |
| mapreduce.task.io.sort.factor | int | 10 | 合并文件數(shù)最大值,與map共用 |
| mapreduce.reduce.shuffle.input.buffer.percent | float | 0.70 | copy階段用于保存map輸出的堆內(nèi)存比例 |
| mapreduce.reduce.shuffle.merge.percent | float | 0.66 | 開始spill的緩沖池比例閾值 |
| mapreduce.reduce.shuffle.inmem.threshold | int | 1000 | 開始spill的map輸出文件數(shù)閾值,小于等于0表示沒有閾值,此時只由緩沖池比例來控制 |
| mapreduce.reduce.input.buffer.percent | float | 0.0 | reduce函數(shù)開始運行時,內(nèi)存中的map輸出所占的堆內(nèi)存比例不得高于這個值,默認情況內(nèi)存都用于reduce函數(shù),也就是map輸出都寫入到磁盤 |
通用優(yōu)化:
Hadoop默認使用4KB作為緩沖,這個算是很小的,可以通過io.file.buffer.size來調(diào)高緩沖池大小
原創(chuàng) https://blog.csdn.net/u012151684/article/details/72589302