批處理引擎MapReduce
MapReduce是一個(gè)經(jīng)典的分布式批處理計(jì)算引擎,被廣泛應(yīng)用于搜索引擎索引構(gòu)建、大規(guī)模數(shù)據(jù)處理等場(chǎng)景中,具有易于編程、良好的擴(kuò)展性與容錯(cuò)性以及高吞吐率等特點(diǎn)。
它主要由兩部分組成:編程模型和運(yùn)行時(shí)環(huán)境。其中,編程模型為用戶提供了非常易用的編程接口,用戶只需像編寫串行程序一樣實(shí)現(xiàn)幾個(gè)簡(jiǎn)單的函數(shù)即可實(shí)現(xiàn)一個(gè)分布式程序,而其他比較復(fù)雜的工作,如節(jié)點(diǎn)間的通信、節(jié)點(diǎn)失效、數(shù)據(jù)切分等,全部由MapReduce運(yùn)行時(shí)環(huán)境完成,用戶無(wú)需關(guān)心這些細(xì)節(jié)。
在本章中,我們將從產(chǎn)生背景、設(shè)計(jì)目標(biāo)、編程模型和基本架構(gòu)等方面對(duì)MapReduc引擎進(jìn)行介紹。
編程思想
MapReduce模型是對(duì)大量分布式處理問題的總結(jié)和抽象,它的核心思想是分而治之,即將一個(gè)分布式計(jì)算過程拆解成兩個(gè)階段:
第一階段:Map階段,由多個(gè)可并行執(zhí)行的Map Task構(gòu)成,主要功能是,將待處理數(shù)據(jù)集按照數(shù)據(jù)量大小切分成等大的數(shù)據(jù)分片,每個(gè)分片交由一個(gè)任務(wù)處理。
第二階段:Reduce階段,由多個(gè)可并行執(zhí)行的Reduce Task構(gòu)成,主要功能是,對(duì)前一階段中各任務(wù)產(chǎn)生的結(jié)果進(jìn)行規(guī)約,得到最終結(jié)果。
MapReduce的出現(xiàn),使得用戶可以把主要精力放在設(shè)計(jì)數(shù)據(jù)處理算法上,至于其他的分布式問題,包括節(jié)點(diǎn)間的通信、節(jié)點(diǎn)失效、數(shù)據(jù)切分、任務(wù)并行化等,全部由MapReduce運(yùn)行時(shí)環(huán)境完成,用戶無(wú)需關(guān)心這些細(xì)節(jié)。
以前面的wordcount為例,用戶只需編寫map()和reduce()兩個(gè)函數(shù),即可完成分布式程序的設(shè)計(jì),這兩個(gè)函數(shù)作用如下:
? map()函數(shù):獲取給定文件中一行字符串,對(duì)其分詞后,依次輸出這些單詞。
? reduce()函數(shù):將相同的詞聚集在一起,統(tǒng)計(jì)每個(gè)詞出現(xiàn)的總頻率,并將結(jié)果輸出。
以上兩個(gè)函數(shù)與“回調(diào)函數(shù)”類似,MapReduce框架將在合適的時(shí)機(jī)主動(dòng)調(diào)用它們,并處理與之相關(guān)的數(shù)據(jù)切分、數(shù)據(jù)讀取、任務(wù)并行化等復(fù)雜問題。
MapReduce編程組件
為了簡(jiǎn)化程序設(shè)計(jì),MapReduce首先對(duì)數(shù)據(jù)進(jìn)行了建模。MapReduce將待處理數(shù)據(jù)劃分成若干個(gè)InputSplit(簡(jiǎn)稱split),它是一個(gè)基本計(jì)算單位??紤]到HDFS以固定大小的block(默認(rèn)是128MB)為基本單位存儲(chǔ)數(shù)據(jù),split與block存在一定的對(duì)應(yīng)關(guān)系,具體如下圖所示。
split是一個(gè)邏輯概念,它只包含一些元數(shù)據(jù)信息,比如數(shù)據(jù)起始位置、數(shù)據(jù)長(zhǎng)度、數(shù)據(jù)所在節(jié)點(diǎn)等,它的劃分方法完全受用戶程序控制,默認(rèn)情況下,每個(gè)split對(duì)應(yīng)一個(gè)block。但需要注意的是,split的多少?zèng)Q定了map task的數(shù)目,因?yàn)槊總€(gè)split會(huì)交由一個(gè)map task處理。

數(shù)據(jù)在MapReduce引擎中是以<key, value>形式流動(dòng)的:
首先,每個(gè)split中的數(shù)據(jù)會(huì)被轉(zhuǎn)換成一系列<key, value>,交由用戶的map()函數(shù)處理,該函數(shù)進(jìn)一步產(chǎn)生另外一系列<key, value>,之后,經(jīng)(按照key)排序分組后,交給用戶編寫的reduce()函數(shù)處理,最終產(chǎn)生結(jié)果??偨Y(jié)起來(lái),MapReduce編程模型實(shí)際上是一種包含5個(gè)步驟的分布式計(jì)算方法:
- 迭代(iteration)遍歷輸入數(shù)據(jù),并將之解析成<key, value>對(duì)。
- 將輸入<key, value>對(duì)映射(map)成另外一些<key, value>對(duì)。
- 依據(jù)key對(duì)中間數(shù)據(jù)進(jìn)行分組(grouping)。
- 以組為單位對(duì)數(shù)據(jù)進(jìn)行歸約(reduce)。
- 迭代(iteration)將最終產(chǎn)生的<key, value>保存到輸出文件中。
MapReduce將計(jì)算過程分解成以上5個(gè)步驟帶來(lái)的最大好處是組件化與并行化。
為了實(shí)現(xiàn)MapReduce編程模型,Hadoop設(shè)計(jì)了一系列對(duì)外編程接口,用戶可通過實(shí)現(xiàn)這些接口完成應(yīng)用程序的開發(fā)。
Hadoop MapReducer對(duì)外提供了5個(gè)可編程組件,分別是InputFormat、Mapper、Partitioner、Reducer和OutputFormat,其中Mapper和Reducer跟應(yīng)用程序邏輯相關(guān),因此必須由用戶編寫(一個(gè)MapReduce程序可以只有Mapper沒有Reducer),至于其他幾個(gè)組件,MapReduce引擎內(nèi)置了默認(rèn)實(shí)現(xiàn),如果這些默認(rèn)實(shí)現(xiàn)能夠滿足用戶需求,則可以直接使用。
Mapper
Mapper中封裝了應(yīng)用程序的數(shù)據(jù)處理邏輯,為了簡(jiǎn)化接口,MapReduce要求所有存儲(chǔ)在底層分布式文件系統(tǒng)上的數(shù)據(jù)均要解釋成<key, value>的形式,并以迭代方式依次交給Mapper中的map函數(shù)處理,產(chǎn)生另外一些<key, value>。

在MapReduce中,key/value對(duì)象可能被寫入磁盤,或者通過網(wǎng)絡(luò)傳輸?shù)讲煌瑱C(jī)器上,因此它們必須是可序列化的。為簡(jiǎn)化用戶開發(fā)工作量,MapReduce對(duì)常用的基本類型進(jìn)行了封裝,使其變得可序列化,包括IntWritable、FloatWritable、LongWritable、BytesWritable、Text等。用戶可以通過繼承Writable類實(shí)現(xiàn)自己的可序列化類。
Reducer
Reducer主要作用是,基于Mapper產(chǎn)生的結(jié)果進(jìn)行規(guī)約操作,產(chǎn)生最終結(jié)果。Map階段產(chǎn)生的數(shù)據(jù),按照key分片后,被遠(yuǎn)程拷貝給不同的Reduce Task。Reduce Task按照key對(duì)其排序,進(jìn)而產(chǎn)生一系列以key為劃分單位的分組,它們迭代被Reducer函數(shù)處理,進(jìn)而產(chǎn)生最終的<key, value>對(duì)。
用戶編寫完MapReduce程序后,按照一定的規(guī)則指定程序的輸入和輸出目錄,并提交到Hadoop集群中。作業(yè)在Hadoop中執(zhí)行過程如圖所示,Hadoop會(huì)將輸入數(shù)據(jù)切分成若干個(gè)split,并將每個(gè)split交給一個(gè)Map Task處理:Map Task以迭代方式從對(duì)應(yīng)的split中解析出一系列<key, value>,并調(diào)用map()函數(shù)處理。待數(shù)據(jù)處理完后,Reduce Task將啟動(dòng)多線程遠(yuǎn)程拷貝各自對(duì)應(yīng)的數(shù)據(jù),然后使用基于排序的方法將key相同的數(shù)據(jù)聚集在一起,并調(diào)用reduce()函數(shù)處理,將結(jié)果輸出到文件中。

InputFormat
InputFormat主要用于描述輸入數(shù)據(jù)的格式,它提供以下兩個(gè)功能:
? 數(shù)據(jù)切分:按照某個(gè)策略將輸入數(shù)據(jù)切分成若干個(gè)split,以便確定Map Task個(gè)數(shù)以及對(duì)應(yīng)的split。
? 為Mapper提供輸入數(shù)據(jù):給定某個(gè)split,能將其解析成一系列<key, value>對(duì)。為了方便用戶編寫MapReduce程序,Hadoop自帶了一些針對(duì)數(shù)據(jù)庫(kù)和文件的InputFormat實(shí)現(xiàn)。
Partitioner
Partitioner的作用是對(duì)Mapper產(chǎn)生的中間結(jié)果進(jìn)行分片,以便將同一組的數(shù)據(jù)交給同一個(gè)Reducer處理,它直接影響Reduce階段的負(fù)載均衡。

MapReduce默認(rèn)采用了HashPartitioner,它實(shí)現(xiàn)了一種基于哈希值的分片方法,HashPartitioner能夠?qū)ey相同的所有<key, value>交給同一個(gè)Reduce Task處理,適用于絕大部分應(yīng)用場(chǎng)景,用戶也可按照自己的需求定制Partitioner。
OutputFormat
OutputFormat主要用于描述輸出數(shù)據(jù)的格式,它能夠?qū)⒂脩籼峁﹌ey/value對(duì)寫入特定格式的文件中。Hadoop自帶了很多OutputFormat實(shí)現(xiàn),它們與InputFormat實(shí)現(xiàn)相對(duì)應(yīng),所有基于文件的OutputFormat實(shí)現(xiàn)的基類為FileOutputFormat,并由此派生出一些基于文本文件格式、二進(jìn)制文件格式的或者多輸出的實(shí)現(xiàn)。
Combiner
除了前面講的5個(gè)可編程組件,MapReduce還允許用戶定制另外一個(gè)組件:Combiner,它是一個(gè)可選的性能優(yōu)化組件,可看作Map端的local reducer,如圖所示,它通常跟Reducer的邏輯是一樣的,運(yùn)行在Map Task中,主要作用是,對(duì)Mapper輸出結(jié)果做一個(gè)局部聚集,以減少本地磁盤寫入量和網(wǎng)絡(luò)數(shù)據(jù)傳輸量,并減少Reducer計(jì)算壓力。

MapReduce作業(yè)生命周期
MapReduce作業(yè)作為一種分布式應(yīng)用程序,可直接運(yùn)行在Hadoop資源管理系統(tǒng)YARN之上(MapReduce On YARN)。每個(gè)MapReduce應(yīng)用程序由一個(gè)MRAppMaster以及一系列MapTask和ReduceTask構(gòu)成,它們通過ResourceManager獲得資源,并由NodeManager啟動(dòng)運(yùn)行。

當(dāng)用戶向YARN中提交一個(gè)MapReduce應(yīng)用程序后,YARN將分兩個(gè)階段運(yùn)行該應(yīng)用程序:第一個(gè)階段是由ResourceManager啟動(dòng)MRAppMaster;第二個(gè)階段是由MRAppMaster創(chuàng)建應(yīng)用程序,為它申請(qǐng)資源,并監(jiān)控它的整個(gè)運(yùn)行過程,直到運(yùn)行成功。
YARN的工作流程分為以下幾個(gè)步驟:
- 用戶向YARN集群提交應(yīng)用程序,該應(yīng)用程序包括以下配置信息:MRAppMaster所在jar包、啟動(dòng)MRAppMaster的命令及其資源需求(CPU、內(nèi)存等)、用戶程序jar包等。
- ResourceManager為該應(yīng)用程序分配第一個(gè)Container,并與對(duì)應(yīng)的NodeManager通信,要求它在這個(gè)Container中啟動(dòng)應(yīng)用程序的MRAppMaster。
- MRAppMaster啟動(dòng)后,首先向ResourceManager注冊(cè)(告之所在節(jié)點(diǎn)、端口號(hào)以及訪問鏈接等),這樣,用戶可以直接通過ResourceManager查看應(yīng)用程序的運(yùn)行狀態(tài),之后,為內(nèi)部Map Task和Reduce Task申請(qǐng)資源并運(yùn)行它們,期間監(jiān)控它們的運(yùn)行狀態(tài),直到所有任務(wù)運(yùn)行結(jié)束,即重復(fù)步驟4~7。
- MRAppMaster采用輪詢的方式通過RPC協(xié)議向ResourceManager申請(qǐng)和領(lǐng)取資源。
- 一旦MRAppMaster申請(qǐng)到(部分)資源后,則通過一定的調(diào)度算法將資源分配給內(nèi)部的任務(wù),之后與對(duì)應(yīng)的NodeManager通信,要求它啟動(dòng)這些任務(wù)。
- NodeManager為任務(wù)準(zhǔn)備運(yùn)行環(huán)境(包括環(huán)境變量、jar包、二進(jìn)制程序等),并將任務(wù)執(zhí)行命令寫到一個(gè)shell腳本中,并通過運(yùn)行該腳本啟動(dòng)任務(wù)。
- 啟動(dòng)的Map Task或Reduce Task通過RPC協(xié)議向MRAppMaster匯報(bào)自己的狀態(tài)和進(jìn)度,以讓MRAppMaster隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí)觸發(fā)相應(yīng)的容錯(cuò)機(jī)制。在應(yīng)用程序運(yùn)行過程中,用戶可隨時(shí)通過RPC向MRAppMaster查詢應(yīng)用程序的當(dāng)前運(yùn)行狀態(tài)。
8.應(yīng)用程序運(yùn)行完成后,MRAppMaster通過RPC向ResourceManager注銷,并關(guān)閉自己。
ResourceManager、NodeManager、MRAppMaster以及MapTask/ReduceTask管理關(guān)系如下圖所示。ResourceManager為MRAppMaster分配資源,并告之NodeManager啟動(dòng)它,MRAppMaster啟動(dòng)后,會(huì)通過心跳維持與ResourceManager之間的聯(lián)系;MRAppMaster負(fù)責(zé)為MapTask/ReduceTask申請(qǐng)資源,并通知NodeManager啟動(dòng)它們,MapTask/ReduceTask啟動(dòng)后,會(huì)通過心跳維持與MRAppMaster之間的聯(lián)系,基于以上設(shè)計(jì)機(jī)制,接下來(lái)介紹MapReduce On YARN架構(gòu)的容錯(cuò)性。

? YARN:YARN本身具有高度容錯(cuò)性
? MRAppMaster:MRAppMaster由ResourceManager管理,一旦MRAppMaster因故障掛掉,ResourceManager會(huì)重新為它分配資源,并啟動(dòng)之。重啟后的MRAppMaster需借助上次運(yùn)行時(shí)記錄的信息恢復(fù)狀態(tài),包括未運(yùn)行、正在運(yùn)行和已運(yùn)行完成的任務(wù)。
? MapTask/ReduceTask:任務(wù)由MRAppMaster管理,一旦MapTask/ReduceTask因故障掛掉或因程序bug阻塞住,MRAppMaster會(huì)為之重新申請(qǐng)資源并啟動(dòng)之。
MapTask與ReduceTask
Map Task可以分解成Read、Map、Collect、Spill和Combine五個(gè)階段;
Reduce Task可以分解成Shuffle、Merge、Sort、Reduce和Write五個(gè)階段。
在MapReduce計(jì)算框架中,一個(gè)應(yīng)用程序被劃成Map和Reduce兩個(gè)計(jì)算階段,它們分別由一個(gè)或者多個(gè)Map Task和Reduce Task組成。
其中,每個(gè)Map Task處理輸入數(shù)據(jù)集合中的一片數(shù)據(jù)(split),產(chǎn)生若干數(shù)據(jù)片段,并將之寫到本地磁盤上;
而Reduce Task則從每個(gè)MapTask上遠(yuǎn)程拷貝一個(gè)數(shù)據(jù)片段,經(jīng)分組聚集和規(guī)約后,將結(jié)果寫到HDFS中。
Map Task與Reduce Task之間的數(shù)據(jù)傳輸采用了pull模型。為了提高容錯(cuò)性,Map Task將中間計(jì)算結(jié)果存放到本地磁盤上,而Reduce Task則通過HTTP協(xié)議從各個(gè)Map Task端拉?。╬ull)相應(yīng)的待處理數(shù)據(jù)。為了更好地支持大量Reduce Task并發(fā)從MapTask端拷貝數(shù)據(jù),Hadoop采用了Netty作為高性能網(wǎng)絡(luò)服務(wù)器。
MapTask詳細(xì)流程
Map Task的整體計(jì)算流程如圖所示,共分為5個(gè)階段,分別是:

- Read階段:Map Task通過InputFormat,從split中解析出一系列<key, value>。
- Map階段:將解析出的<key, value>依次交給用戶編寫的map()函數(shù)處理,并產(chǎn)生一系列新的<key, value>。
- Collect階段:在map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果,在該函數(shù)內(nèi)部,它將<key, value>劃分成若干個(gè)數(shù)據(jù)分片(通過調(diào)用Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
- Spill階段:即“溢寫”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce將數(shù)據(jù)寫到本地磁盤上,生成一個(gè)臨時(shí)文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并、壓縮等操作。
Map Task為何將處理結(jié)果寫入本地磁盤?
該問題實(shí)際上包含兩層含義,即處理結(jié)果為何不寫入內(nèi)存,或者直接發(fā)送給Reduce Task?
首先,Map Task不能夠?qū)?shù)據(jù)寫入內(nèi)存,因?yàn)橐粋€(gè)集群中可能會(huì)同時(shí)運(yùn)行多個(gè)作業(yè),且每個(gè)作業(yè)可能分多批運(yùn)行Map Task,顯然,將計(jì)算結(jié)果直接寫入內(nèi)存會(huì)耗光機(jī)器的內(nèi)存;
其次,MapReduce采用的是動(dòng)態(tài)調(diào)度策略,這意味著,一開始只有Map Task執(zhí)行,而Reduce Task則處于未調(diào)度狀態(tài),因此無(wú)法將Map Task計(jì)算結(jié)果直接發(fā)送給Reduce Task。
將Map Task寫入本地磁盤,使得Reduce Task執(zhí)行失敗時(shí)可直接從磁盤上再次讀取各個(gè)Map Task的結(jié)果,而無(wú)需讓所有Map Task重新執(zhí)行。
總之,MapTask將處理結(jié)果寫入本地磁盤主要目的是減少內(nèi)存存儲(chǔ)壓力和容錯(cuò)。
- Combine(整合,合并)階段:當(dāng)所有數(shù)據(jù)處理完成后,Map Task對(duì)所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件。
每個(gè)Map Task為何最終只產(chǎn)生一個(gè)數(shù)據(jù)文件?
如果每個(gè)Map Task產(chǎn)生多個(gè)數(shù)據(jù)文件(比如每個(gè)Map Task為每個(gè)Reduce Task產(chǎn)生一個(gè)文件),則會(huì)生成大量中間小文件,這將大大降低文件讀取性能,并嚴(yán)重影響系統(tǒng)擴(kuò)展性(M個(gè)Map Task和R個(gè)Reduce Task可能產(chǎn)生M*R個(gè)小文件)。
ReduceTask詳細(xì)流程
Reduce Task的整體計(jì)算流程如圖所示,共分為5個(gè)階段,分別是:

- Shuffle階段:也稱為Copy階段,Reduce Task從各個(gè)Map Task上遠(yuǎn)程拷貝一片數(shù)據(jù),并根據(jù)數(shù)據(jù)分片大小采取不同操作,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內(nèi)存中。
- Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),Reduce Task啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用量過多或磁盤上文件數(shù)目過多。
- Sort階段:按照MapReduce語(yǔ)義,用戶編寫的reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略,由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此Reduce Task只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
- Reduce階段:在該階段中,Reduce Task將每組數(shù)據(jù)依次交給用戶編寫的reduce()函數(shù)處理。
- Write階段:將reduce()函數(shù)輸出結(jié)果寫到HDFS上。
小結(jié)
MapReduce最初源自于Google,主要被用于搜索引擎索引構(gòu)建,之后在Hadoop中得到開源實(shí)現(xiàn)。隨著開源社區(qū)的推進(jìn)和發(fā)展,已經(jīng)成為一個(gè)經(jīng)典的分布式批處理計(jì)算引擎,被廣泛應(yīng)用于搜索引擎索引構(gòu)建、大規(guī)模數(shù)據(jù)處理等場(chǎng)景中,具有易于編程、良好的擴(kuò)展性與容錯(cuò)性以及高吞吐率等特點(diǎn)。它為用戶提供了非常易用的編程接口,用戶只需像編寫串行程序一樣實(shí)現(xiàn)幾個(gè)簡(jiǎn)單的函數(shù)即可實(shí)現(xiàn)一個(gè)分布式程序,而其他比較復(fù)雜的工作,如節(jié)點(diǎn)間的通信、節(jié)點(diǎn)失效、數(shù)據(jù)切分等,全部由MapReduce運(yùn)行時(shí)環(huán)境完成,用戶無(wú)需關(guān)心這些細(xì)節(jié)。MapReduce為用戶提供了InputFormat、Mapper、Partitioner、Reducer和OutputFormat等可編程組件,用戶可通過實(shí)現(xiàn)這些組件完成分布式程序設(shè)計(jì)。為了方便非Java程序員編寫程序,MapReduce提供了Hadoop Streaming工具,用戶可使用任意語(yǔ)言開發(fā)Mapper和Reducer,大大提高了程序開發(fā)效率。