一、Mapreduce簡(jiǎn)介
??Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架。
??Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)hadoop集群上。
二、MapReduce并行處理的基本過(guò)程
Hadoop2.0之前和Hadoop2.0之后的區(qū)別:
2.0之前只有MapReduce的運(yùn)行框架,那么它里面有只有兩種節(jié)點(diǎn),一個(gè)是master,一個(gè)是worker。
master既做資源調(diào)度又做程序調(diào)度,worker只是用來(lái)參與計(jì)算的。
在2.0之后加入了YARN集群,Yarn集群的主節(jié)點(diǎn)承擔(dān)了資源調(diào)度,Yarn集群的從節(jié)點(diǎn)中會(huì)選出一個(gè)節(jié)點(diǎn)(這個(gè)由redourcemanager決定)。
用作類似于2.0之前的master的工作,來(lái)進(jìn)行應(yīng)用程序的調(diào)度。
資源調(diào)度: 處理程序所需要的cpu、內(nèi)存資源,以及存儲(chǔ)數(shù)據(jù)所需要的硬盤資源都是resourcemanager去分配的。

??一切都是從最上方的user program開始的,user program鏈接了MapReduce庫(kù),實(shí)現(xiàn)了最基本的Map函數(shù)和Reduce函數(shù)。
圖中執(zhí)行的順序都用數(shù)字標(biāo)記了。
1)MapReduce庫(kù)先把user program的輸入文件劃分為M份(M為用戶定義),如圖左方所示分成了split0~4;然后使用fork將用戶進(jìn)程拷貝到集群內(nèi)其它機(jī)器上。
2)user program的副本中有一個(gè)稱為master,其余稱為worker,master是負(fù)責(zé)調(diào)度的,為空閑worker分配作業(yè)(Map作業(yè)或者Reduce作業(yè)),worker的數(shù)量也是可以由用戶指定的。
3)被分配了Map作業(yè)的worker,開始讀取對(duì)應(yīng)分片的輸入數(shù)據(jù),Map作業(yè)數(shù)量是由M決定的,和split一一對(duì)應(yīng);Map作業(yè)從輸入數(shù)據(jù)中抽取出鍵值對(duì),每一個(gè)鍵值對(duì)都作為參數(shù)傳遞給map函數(shù),map函數(shù)產(chǎn)生的中間鍵值對(duì)被緩存在內(nèi)存中。
4)緩存的中間鍵值對(duì)會(huì)被定期寫入本地磁盤,而且被分為R個(gè)區(qū),R的大小是由用戶定義的,將來(lái)每個(gè)區(qū)會(huì)對(duì)應(yīng)一個(gè)Reduce作業(yè);這些中間鍵值對(duì)的位置會(huì)被通報(bào)給master,master負(fù)責(zé)將信息轉(zhuǎn)發(fā)給Reduce worker。
5)master通知分配了Reduce作業(yè)的worker它負(fù)責(zé)的分區(qū)在什么位置(肯定不止一個(gè)地方,每個(gè)Map作業(yè)產(chǎn)生的中間鍵值對(duì)都可能映射到所有R個(gè)不同分區(qū)),當(dāng)Reduce worker把所有它負(fù)責(zé)的中間鍵值對(duì)都讀過(guò)來(lái)后,先對(duì)它們進(jìn)行排序,使得相同鍵的鍵值對(duì)聚集在一起。因?yàn)椴煌逆I可能會(huì)映射到同一個(gè)分區(qū)也就是同一個(gè)Reduce作業(yè),所以排序是必須的。
6)reduce worker遍歷排序后的中間鍵值對(duì),對(duì)于每個(gè)唯一的鍵,都將鍵與關(guān)聯(lián)的值傳遞給reduce函數(shù),reduce函數(shù)產(chǎn)生的輸出會(huì)添加到這個(gè)分區(qū)的輸出文件中。
7)當(dāng)所有的Map和Reduce作業(yè)都完成了,master喚醒正版的user program,MapReduce函數(shù)調(diào)用返回user program的代碼。
8)所有執(zhí)行完畢后,MapReduce輸出放在了R個(gè)分區(qū)的輸出文件中(分別對(duì)應(yīng)一個(gè)Reduce作業(yè))。用戶通常并不需要合并這R個(gè)文件,而是將其作為輸入交給另一個(gè)MapReduce程序處理。整個(gè)過(guò)程中,輸入數(shù)據(jù)是來(lái)自底層分布式文件系統(tǒng)(GFS)的,中間數(shù)據(jù)是放在本地文件系統(tǒng)的,最終輸出數(shù)據(jù)是寫入底層分布式文件系統(tǒng)(GFFS)的。而且我們要注意Map/Reduce作業(yè)和map/reduce函數(shù)的區(qū)別:Map作業(yè)處理一個(gè)輸入數(shù)據(jù)的分片,可能需要調(diào)用多次map函數(shù)來(lái)處理每個(gè)輸入鍵值對(duì);Reduce作業(yè)處理一個(gè)分區(qū)的中間鍵值對(duì),期間要對(duì)每個(gè)不同的鍵調(diào)用一次reduce函數(shù),Reduce作業(yè)最終也對(duì)應(yīng)一個(gè)輸出文件。

三、MapRrduce輸入與輸出問(wèn)題
??Map/Reduce框架運(yùn)轉(zhuǎn)在<key, value>鍵值對(duì)上,也就是說(shuō),框架把作業(yè)的輸入看為是一組<key, value>鍵值對(duì),同樣也產(chǎn)出一組 <key, value>鍵值對(duì)做為作業(yè)的輸出,這兩組鍵值對(duì)的類型可能不同。
??框架需要對(duì)key和value的類(classes)進(jìn)行序列化操作,因此,這些類需要實(shí)現(xiàn)Writable接口。另外,為了方便框架執(zhí)行排序操作,key類必須實(shí)現(xiàn) WritableComparable接口。
??注意:不管是哪里的序列化,最主要的作用就是持久化存儲(chǔ)或者是用于網(wǎng)絡(luò)傳輸
??一個(gè)Map/Reduce作業(yè)的輸入和輸出類型如下所示:
??(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。
??其實(shí)在前面講解Hadoop IO的時(shí)候已經(jīng)知道了解了Writale接口:
??Writable接口是一個(gè)實(shí)現(xiàn)了序列化協(xié)議的序列化對(duì)象。
??在Hadoop中定義一個(gè)結(jié)構(gòu)化對(duì)象都要實(shí)現(xiàn)Writable接口,使得該結(jié)構(gòu)化對(duì)象可以序列化為字節(jié)流,字節(jié)流也可以反序列化為結(jié)構(gòu)化對(duì)象。
四、MapReduce實(shí)際處理流程
??mapreduce 其實(shí)是分治算法的一種現(xiàn),所謂分治算法就是“就是分而治之 ,將大的問(wèn)題分解為相同類型的子問(wèn)題(最好具有相同的規(guī)模),對(duì)子問(wèn)題進(jìn)行求解,然后合并成大問(wèn)題的解。
mapreduce就是分治法的一種,將輸入進(jìn)行分片,然后交給不同的task進(jìn)行處理,然后合并成最終的解。
mapreduce實(shí)際的處理過(guò)程可以理解為:Input->Map->Sort->Combine->Partition->Reduce->Output。
1)Input階段
數(shù)據(jù)以一定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以設(shè)置,也可以自定義分片函數(shù)。
2)map階段
對(duì)輸入的(key,value)進(jìn)行處理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass進(jìn)行設(shè)置。
3)Sort階段
對(duì)于Mapper的輸出進(jìn)行排序,使用Job.setOutputKeyComparatorClass進(jìn)行設(shè)置,然后定義排序規(guī)則。
4)Combine階段
這個(gè)階段對(duì)于Sort之后又相同key的結(jié)果進(jìn)行合并,使用Job.setCombinerClass進(jìn)行設(shè)置,也可以自定義Combine Class類。
5)Partition階段
將Mapper的中間結(jié)果按照key的范圍劃分為R份(Reduce作業(yè)的個(gè)數(shù)),默認(rèn)使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定義劃分的函數(shù)。
使用Job.setPartitionClass設(shè)置。
6)Reduce階段
對(duì)于Mapper階段的結(jié)果進(jìn)行進(jìn)一步處理,Job.setReducerClass進(jìn)行設(shè)置自定義的Reduce類。
7)Output階段
Reducer輸出數(shù)據(jù)的格式。
五、一個(gè)job的運(yùn)行流程
??一個(gè)mapreduce作業(yè)的執(zhí)行流程是:作業(yè)提交->作業(yè)初始化->任務(wù)分配->任務(wù)執(zhí)行->更新任務(wù)執(zhí)行進(jìn)度和狀態(tài)->作業(yè)完成。

一個(gè)完整的mapreduce作業(yè)流程,包括4個(gè)獨(dú)立的實(shí)體:
客戶端:client,編寫mapreduce程序,配置作業(yè),提交作業(yè)。
JobTracker:協(xié)調(diào)這個(gè)作業(yè)的運(yùn)行,分配作業(yè),初始化作業(yè),與TaskTracker進(jìn)行通信。
TaskTracker:負(fù)責(zé)運(yùn)行作業(yè),保持與JobTracker進(jìn)行通信。
HDFS:分布式文件系統(tǒng),保持作業(yè)的數(shù)據(jù)和結(jié)果。
- 提交作業(yè)
JobClient使用runjob方法創(chuàng)建一個(gè)JobClient實(shí)例,然后調(diào)用submitJob()方法進(jìn)行作業(yè)的提交,提交作業(yè)的具體過(guò)程如下:
1)通過(guò)調(diào)用JobTracker對(duì)象的getNewJobId()方法從JobTracker處獲得一個(gè)作業(yè)ID。
2)檢查作業(yè)的相關(guān)路徑。如果輸出路徑存在,作業(yè)將不會(huì)被提交(保護(hù)上一個(gè)作業(yè)運(yùn)行結(jié)果)。
3)計(jì)算作業(yè)的輸入分片,如果無(wú)法計(jì)算,例如輸入路徑不存在,作業(yè)將不被提交,錯(cuò)誤返回給mapreduce程序。
4)將運(yùn)行作業(yè)所需資源(作業(yè)jar文件,配置文件和計(jì)算得到的分片)復(fù)制到HDFS上。
5)告知JobTracker作業(yè)準(zhǔn)備執(zhí)行(使用JobTracker對(duì)象的submitJob()方法來(lái)真正提交作業(yè))。 - 作業(yè)初始化
當(dāng)JobTracker收到Job提交的請(qǐng)求后,將Job保存在一個(gè)內(nèi)部隊(duì)列,并讓Job Scheduler(作業(yè)調(diào)度器)處理并初始化。初始化涉及到創(chuàng)建一個(gè)封裝了其tasks的job對(duì)象,并保持對(duì)task的狀態(tài)和進(jìn)度的跟蹤(step 5)。當(dāng)創(chuàng)建要運(yùn)行的一系列task對(duì)象后,Job Scheduler首先開始從文件系統(tǒng)中獲取由JobClient計(jì)算的input splits(step 6),然后再為每個(gè)split創(chuàng)建map task。 - 任務(wù)的分配
??TaskTracker和JobTracker之間的通信和任務(wù)分配是通過(guò)心跳機(jī)制完成的。TaskTracker作為一個(gè)單獨(dú)的JVM,它執(zhí)行一個(gè)簡(jiǎn)單的循環(huán),主要實(shí)現(xiàn)每隔一段時(shí)間向JobTracker
??發(fā)送心跳,告訴JobTracker此TaskTracker是否存活,是否準(zhǔn)備執(zhí)行新的任務(wù)。如果有待分配的任務(wù),它就會(huì)為TaskTracker分配一個(gè)任務(wù)。 - 任務(wù)的執(zhí)行
??TTaskTracker申請(qǐng)到新的任務(wù)之后,就要在本地運(yùn)行了。首先,是將任務(wù)本地化(包括運(yùn)行任務(wù)所需的數(shù)據(jù)、配置信息、代碼等),即從HDFS復(fù)制到本地。調(diào)用localizeJob()完成的。
??T對(duì)于使用Streaming和Pipes創(chuàng)建Map或者Reduce程序的任務(wù),Java會(huì)把key/value傳遞給外部進(jìn)程,然后通過(guò)用戶自定義的Map或者Reduce進(jìn)行處理,然后把key/value傳回到j(luò)ava中。
??T其中就好像是TaskTracker的子進(jìn)程在處理Map和Reduce代碼一樣。 - 更新任務(wù)的執(zhí)行進(jìn)度和狀態(tài)
??進(jìn)度和狀態(tài)是通過(guò)heartbeat(心跳機(jī)制)來(lái)更新和維護(hù)的。對(duì)于Map Task,進(jìn)度就是已處理數(shù)據(jù)和所有輸入數(shù)據(jù)的比例。對(duì)于Reduce Task,情況就有點(diǎn)復(fù)雜,包括3部分,拷貝中間結(jié)果文件、排序、reduce調(diào)用,每部分占1/3。 - 任務(wù)完成
??當(dāng)Job完成后,JobTracker會(huì)收一個(gè)Job Complete的通知,并將當(dāng)前的Job狀態(tài)更新為successful,同時(shí)JobClient也會(huì)輪循獲知提交的Job已經(jīng)完成,將信息顯示給用戶。
??最后,JobTracker會(huì)清理和回收該Job的相關(guān)資源,并通知TaskTracker進(jìn)行相同的操作(比如刪除中間結(jié)果文件)
六、MapTask并行度決定機(jī)制
maptask的并行度決定map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)job的處理速度。
那么,mapTask并行實(shí)例是否越多越好呢?其并行度又是如何決定呢?
- mapTask并行度的決定機(jī)制
一個(gè)job的map階段并行度由客戶端在提交job時(shí)決定而客戶端對(duì)map階段并行度的規(guī)劃的基本邏輯為:
將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split),然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理。
默認(rèn)128M一塊。
2.FileInputFormat切片機(jī)制
1)FileInputFormat切片機(jī)制切片定義在InputFormat類中的getSplit()方法
2)FileInputFormat中默認(rèn)的切片機(jī)制:
簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片
切片大小,默認(rèn)等于block大小
切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片
比如待處理數(shù)據(jù)有兩個(gè)文件:
file1.txt 320M
file2.txt 10M
經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
3)FileInputFormat中切片的大小的參數(shù)配置
通過(guò)分析源碼,在FileInputFormat中,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個(gè)值來(lái)運(yùn)算決定
minsize:默認(rèn)值:1
配置參數(shù): mapreduce.input.fileinputformat.split.minsize
maxsize:默認(rèn)值:Long.MAXValue
配置參數(shù):mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默認(rèn)情況下,切片大小=blocksize
maxsize(切片最大值):
參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小,而且就等于配置的這個(gè)參數(shù)的值
minsize (切片最小值):
參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大
選擇并發(fā)數(shù)的影響因素:
運(yùn)算節(jié)點(diǎn)的硬件配置
運(yùn)算任務(wù)的類型:CPU密集型還是IO密集型
運(yùn)算任務(wù)的數(shù)據(jù)量
- ReduceTask并行度的決定
reducetask的并行度同樣影響整個(gè)job的執(zhí)行并發(fā)度和執(zhí)行效率,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同,Reducetask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
//默認(rèn)值是1,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);
如果數(shù)據(jù)分布不均勻,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜
注意: reducetask數(shù)量并不是任意設(shè)置,還要考慮業(yè)務(wù)邏輯需求,有些情況下,需要計(jì)算全局匯總結(jié)果,就只能有1個(gè)reducetask
盡量不要運(yùn)行太多的reduce task。對(duì)大多數(shù)job來(lái)說(shuō),最好rduce的個(gè)數(shù)最多和集群中的reduce持平,或者比集群的 reduce slots小。這個(gè)對(duì)于小集群而言,尤其重要。
- mapreduce的shuffle機(jī)制
mapreduce中,map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是mapreduce框架中最關(guān)鍵的一個(gè)流程,這個(gè)流程就叫shuffle。
shuffle: 洗牌、發(fā)牌——(核心機(jī)制:數(shù)據(jù)分區(qū),排序,緩存)。
具體來(lái)說(shuō):就是將maptask輸出的處理結(jié)果數(shù)據(jù),分發(fā)給reducetask,并在分發(fā)的過(guò)程中,對(duì)數(shù)據(jù)按key進(jìn)行了分區(qū)和排序。
分區(qū)partition(確定哪個(gè)數(shù)據(jù)進(jìn)入哪個(gè)reduce)
Sort根據(jù)key排序
詳細(xì)流程
1、 maptask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中
2、 從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件
3、 多個(gè)溢出文件會(huì)被合并成大的溢出文件
4、 在溢出過(guò)程中,及合并的過(guò)程中,都要調(diào)用partitoner進(jìn)行分組和針對(duì)key進(jìn)行排序
5、 reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6、 reducetask會(huì)取到同一個(gè)分區(qū)的來(lái)自不同maptask的結(jié)果文件,reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)
7、 合并成大文件后,shuffle的過(guò)程也就結(jié)束了,后面進(jìn)入reducetask的邏輯運(yùn)算過(guò)程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)group,調(diào)用用戶自定義的reduce()方法)
Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率,原則上說(shuō),緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快
緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整, 參數(shù):io.sort.mb 默認(rèn)100MCombiner進(jìn)行局部value的合并。
七、MapReduce與YARN
Yarn是一個(gè)資源調(diào)度平臺(tái),負(fù)責(zé)為運(yùn)算程序提供服務(wù)器運(yùn)算資源,相當(dāng)于一個(gè)分布式的操作系統(tǒng)平臺(tái),而mapreduce等運(yùn)算程序則相當(dāng)于運(yùn)行于操作系統(tǒng)之上的應(yīng)用程序。
YARN中的重要概念
1) yarn并不清楚用戶提交的程序的運(yùn)行機(jī)制
2) yarn只提供運(yùn)算資源的調(diào)度(用戶程序向yarn申請(qǐng)資源,yarn就負(fù)責(zé)分配資源)
3) yarn中的主管角色叫ResourceManager
4) yarn中具體提供運(yùn)算資源的角色叫NodeManager
5) 這樣一來(lái),yarn其實(shí)就與運(yùn)行的用戶程序完全解耦,就意味著yarn上可以運(yùn)行各種類型的分布式運(yùn)算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
6) 所以,spark、storm等運(yùn)算框架都可以整合在yarn上運(yùn)行,只要他們各自的框架中有符合yarn規(guī)范的資源請(qǐng)求機(jī)制即可
7) Yarn就成為一個(gè)通用的資源調(diào)度平臺(tái),從此,企業(yè)中以前存在的各種運(yùn)算集群都可以整合在一個(gè)物理集群上,提高資源利用率,方便數(shù)據(jù)共享。