
圖片摘自于Alexey Grishchenko博文
Map-side
InputFormat Class :
getSplits: the set of input data splits 返回一組輸入數(shù)據(jù)的拆分文件
-
getRecordReader:
iterableinterface for reading all the records from a single input? 從單個輸入文件提供可迭代的接口用于讀取輸入數(shù)據(jù)
文件塊大小取決于InputFormat(自定義輸入文件拆分需要繼承此FileInputFormat)
輸入文件是文本類型需要配置dfs.blocksize的大小(hdfs-site.xml)
gzip類型的壓縮文件,不可拆分需要用輸入整個文件等。
每個mapper處理一個輸入拆分塊文件,大多數(shù)時(shí)處理128MB大小文件若輸入文件是以GB或PB或更大。
map 函數(shù)運(yùn)用于輸入拆分文件的每對鍵值對(k,v),它們的每對鍵值對都有RecordReader 返回。
根據(jù)業(yè)務(wù)邏輯需求,在mapper對每對鍵值做處理輸出結(jié)果,將結(jié)果由Context文本類傳給reducer端。
負(fù)責(zé)收集map的輸出數(shù)據(jù)(如文件)是mapreduce.job.map.output.collector 屬性 (mapreduce-default.xml),默認(rèn)是由org.apache.hadoop.mapred.MapTask$MapOutputBuffer 實(shí)現(xiàn)。
Map 函數(shù)的輸出結(jié)果首先調(diào)用從Partitioner類getPartition方法。 getPartition需要傳入鍵值對(k,v) 還有reduce的任務(wù)數(shù)量(numReduceTasks),然后返回這些鍵值對匹配的分區(qū)。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
接著把輸出數(shù)據(jù)鍵值對和分區(qū)號一同寫入環(huán)形緩沖區(qū) (ring buffer), 此緩沖區(qū)大小由mapreduce.task.io.sort.mb (mapreduce-default.xml) 定義, 默認(rèn)為100MB, 最大map輸出數(shù)據(jù)信息允許在ring buffer占用的大小. 若輸出數(shù)據(jù)大小大于此值,數(shù)據(jù)會被溢出到(寫入)硬盤。
注意,map 輸出的環(huán)形緩沖區(qū)默認(rèn)是比輸入文件的拆分塊(默認(rèn)是128MB) 要小,所以多余的會被寫到硬盤中。
溢出的操作是由新的線程執(zhí)行,起初環(huán)形緩沖區(qū)的大小為0.8mapreduce.map.sort.spill.percent (mapreduce-default.xml), 所以最初的緩沖區(qū)大小為80MB。 map task的輸出溢出文件大小默認(rèn)是大于80MB的,多余的是會被寫到硬盤里。
溢出用開啟新線程來處理輸出文件數(shù)據(jù)是為了讓mapper在處理溢出同時(shí)也能繼續(xù)執(zhí)行處理輸入文件數(shù)據(jù)。
注意: 當(dāng)處理輸入數(shù)據(jù)(FileInputFormat)的數(shù)率比溢出的數(shù)率快時(shí),Mapper函數(shù)會停止工作 因?yàn)?內(nèi)存的環(huán)形緩沖區(qū)可能會達(dá)到100%。在這種情況,mapper函數(shù)會阻塞并等待溢出線程為下一批輸出數(shù)據(jù)處理清空一些內(nèi)存空間。
溢出線程會把環(huán)形緩沖區(qū)數(shù)據(jù)寫到mapper函數(shù)調(diào)用的服務(wù)器的本地的文件里。溢出線程寫出本地的路勁是由mapreduce.job.local.dir (mapred-default.xml)定義,此配置屬性包含了一組由集群上MapReduce任務(wù)用到的路徑來存儲零時(shí)數(shù)據(jù)。 文件夾被一個接一個使用。 寫入前,數(shù)據(jù)會以快速排序 進(jìn)行排序:comparator函數(shù)先對比partition分區(qū)號然后再對比key值,以至于先排分區(qū),再每個分區(qū)排序key值。
排序完成后,Combiner被調(diào)用用于減少輸入硬盤的數(shù)據(jù)量。Combiner的輸出會被寫入硬盤。 有個邊際情況是當(dāng)mapper產(chǎn)生的輸出數(shù)據(jù)過于大不能容入內(nèi)存時(shí)(大于輸出緩沖區(qū)大?。?,sorter和combiner時(shí)不會被調(diào)用; 這時(shí),mapper的輸出數(shù)據(jù)就會直接寫入硬盤。
無論mapper輸出數(shù)據(jù)是多大,輸出完成時(shí)“Spill”是肯定會至少調(diào)用一次。
(詳細(xì)信息查看:ShuffleHandler.class#sendMapOutput)
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, ? String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
同樣的map任務(wù)會執(zhí)行,如sort和combine,還有寫入本地磁盤文件。
每個由溢出線程溢出文件有個索引包含了每個溢出文件的分區(qū)信息:分區(qū)從哪開始于哪結(jié)束。 這些索引被存在內(nèi)存中,此內(nèi)存塊大小由mapreduce.task.index.cache.limit.bytes決定,默認(rèn)為1MB。 內(nèi)存不足時(shí),所有的下一批生成的溢出文件的索引會與溢出文件一起被寫入到磁盤。
當(dāng)mapper處理輸出文件與最后溢出結(jié)束時(shí),溢出線程完成結(jié)束而合并階段開始。 在合并時(shí),所有的溢出文件應(yīng)該合并一塊為單個map的輸出文件。一個合并過程默認(rèn)可以處理10個溢出文件(由mapreduce.task.io.sort.factor決定)。
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
<description>The number of streams to merge at once while sorting
files. This determines the number of open file handles.</description>
</property>
若溢出文件大于此屬性值, 剩余的文件會被合并為一個大文件。
合并期間,若被合并的文件大于(min.num.spills.for.combine , 默認(rèn)為3), 寫入磁盤前,combiner會在merge結(jié)果上執(zhí)行。
MapTask的結(jié)果是一個包含了所有的Mapper輸出結(jié)果與描述分區(qū)開始-結(jié)束的索引信息的輸出文件, 這些分區(qū)開始-結(jié)束索引信息便于ReduceTask能夠從磁盤中索取每個reducer任務(wù)的運(yùn)行相關(guān)數(shù)據(jù)。
Reduce-Side
Map的任務(wù)數(shù)量由拆分塊決定的,然而reduce的任務(wù)用戶自己設(shè)置的(mapreduce.job.reduces, 默認(rèn)為1)。 Reduce端的shuffle的實(shí)現(xiàn)由mapreduce.job.reduce.shuffle.consumer.plugin.class 屬性決定,默認(rèn)為org.apache.hadoop.mapreduce.task.reduce.Shuffle。
Reduce端做的第一件事就開啟”Event Fetcher" 線程,從Application Master得到Mapper的狀態(tài)并監(jiān)聽mapper的事件是否執(zhí)行結(jié)束。 當(dāng)mapper結(jié)束自己的shuffle過程,mapper的輸出文件數(shù)據(jù)傳送到多個“Fetcher”線程的其中一個。 “Fetcher”線程是由mapreduce.reduce.shuffle.parallelcopies 決定的,默認(rèn)為5個,這意味著單個reduce任務(wù), 有五個線程來從mapper端并行拷貝數(shù)據(jù)。Fetch的節(jié)點(diǎn)間的傳輸是通過HTTP或HTTPS的協(xié)議連接fetcher到相應(yīng)的DataNode URL。
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
<description>The default number of parallel transfers run by reduce
during the copy(shuffle) phase.
</description>
</property>
“Fetcher”從Mapper端下載的數(shù)據(jù)都被存在內(nèi)存中,此內(nèi)存大小占用reducer內(nèi)存比例由mapreduce.reduce.shuffle.input.buffer.percent決定,總的reducer內(nèi)存是mapreduce.reduce.memory.totalbytes。
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
<description>The usage threshold at which an in-memory merge will be
initiated, expressed as a percentage of the total memory allocated to
storing in-memory map outputs, as defined by
mapreduce.reduce.shuffle.input.buffer.percent.
</description>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>1024</value>
<description>The amount of memory to request from the scheduler for each
reduce task.
</description>
</property>
若這些內(nèi)存不夠容納,reducer會把數(shù)據(jù)存入reducer端的本地磁盤中的mapreduce.job.local.dir文件夾。
<property>
<name>mapreduce.cluster.local.dir</name>
<value>${hadoop.tmp.dir}/mapred/local</value>
<description>The local directory where MapReduce stores intermediate
data files. May be a comma-separated list of
directories on different devices in order to spread disk i/o.
Directories that do not exist are ignored.
</description>
</property>
fetcher索取相應(yīng)文件數(shù)據(jù)后,merger線程開啟工作。它們不會等待整個fetching過程完成而是開新線程與其并行執(zhí)行。 Hadoop有三種merger線程。
- InMemory merger (內(nèi)存線程)
- 不能關(guān)閉。 由Reduce任務(wù)索取MapTask的輸出數(shù)據(jù)占用的Reducer內(nèi)存緩存超出了總內(nèi)存允許的占用百分比
reduce.shuffle.merge.percent而啟動。 合并后執(zhí)行combiner。輸出寫入硬盤,總會被調(diào)用至少一次。
- 不能關(guān)閉。 由Reduce任務(wù)索取MapTask的輸出數(shù)據(jù)占用的Reducer內(nèi)存緩存超出了總內(nèi)存允許的占用百分比
- MemToMem merger (內(nèi)存到內(nèi)存)
- 默認(rèn)為關(guān)閉。 可由
reduce.merge.memtomem.enabled開啟。 此線程合并內(nèi)存中mapper的輸出文件數(shù)據(jù)并寫reduce輸出到內(nèi)存。當(dāng)不同的MapTask輸出文件大小達(dá)到mapreduce.reduce.merge.memtomem.threshold(默認(rèn)為1000), 線程會被啟動。
- 默認(rèn)為關(guān)閉。 可由
- OnDisk (硬盤)
- 在一次線程執(zhí)行中, 當(dāng)文件數(shù)量以(
2 * task.io.sort.factor - 1) 上升 , 但是合并不超過mapreduce.task.io.sort.factor的文件數(shù)量而啟動。 OnDisk Merger線程合并本地磁盤的文件。
- 在一次線程執(zhí)行中, 當(dāng)文件數(shù)量以(
最后個線程, finalMerge 在reducer的主線程中運(yùn)行,合并所有由InMemory和OnDisk在本地磁盤產(chǎn)生剩余的文件。 最終合并輸出結(jié)果分布在RAM和硬盤之間。RAM最大允許使用為reduce 輸入大小是由總的棧大小mapred.job.reduce.markreset.buffer.percent百分比來決定的,默認(rèn)為0.
在這些所有的線程啟動完成后,reducer會把輸出寫入HDFS文件系統(tǒng)中。
原文出自于,Alexey Grishchenko的Hadoop MapReduce Comprehensive Description
譯者: 邁大_阿李同學(xué)