一.MapReduce簡介
最簡單的WordCount采用MR算法:

MapReduce是一種用于數(shù)據(jù)處理的編程模型 。該模型非常簡單 。同一個(gè)程序Hadoop可以運(yùn)行用各種語言編寫的Map Reduce程序 。最重要的是, MapReduce 程序本 質(zhì)上是并行的 ,因此可以將大規(guī)模的數(shù)據(jù)分析交給任何一個(gè)擁有足夠多機(jī)器的運(yùn)營商。MapReduce的優(yōu)勢在于處理大型數(shù)據(jù)集。
MapReduce的工作過程分為兩個(gè)階段:map階段和reduce階段。每個(gè)階段都有鍵/值對(duì)作為輸入和輸出,并且它們的類型可由程序員選擇 。程序員還具體定義了兩個(gè)函數(shù):map函數(shù)和reduce函數(shù) 。
在一個(gè)mapreduce的作業(yè)中必定會(huì)涉及到如下一些組件:
1、客戶端,提交mapreduce作業(yè)
2、yarn資源管理器,負(fù)責(zé)集群上計(jì)算資源的協(xié)調(diào)
3、yarn節(jié)點(diǎn)管理器,負(fù)責(zé)啟動(dòng)和監(jiān)控集群中機(jī)器上的計(jì)算容器(container)
4、mapreduce的application master,負(fù)責(zé)協(xié)調(diào)運(yùn)行mapreduce的作業(yè)
5、hdfs,分布式文件系統(tǒng),負(fù)責(zé)與其他實(shí)體共享作業(yè)文件
二.MapReduce作業(yè)提交過程
MapReduce作業(yè)的提交過程包含如下幾個(gè)步驟
1.作業(yè)的提交
2.作業(yè)的初始化
3.作業(yè)任務(wù)的分配
4.作業(yè)任務(wù)的執(zhí)行
5.作業(yè)執(zhí)行狀態(tài)更新
6.作業(yè)完成

2.1 作業(yè)的提交
JobSubmmiter實(shí)現(xiàn)的大概過程如下:
1、向資源管理器resourcemanager提交申請,用于一個(gè)mapreduce作業(yè)ID,如圖步驟2所示
2、檢查作業(yè)的輸出配置,判斷目錄是否已經(jīng)存在等信息
3、計(jì)算作業(yè)的輸入分片的大小
4、將運(yùn)行作業(yè)的jar,配置文件,輸入分片的計(jì)算資源復(fù)制到一個(gè)以作業(yè)ID命名的hdfs臨時(shí)目錄下,作業(yè)jar的復(fù)本比較多,默認(rèn)為10個(gè)(通過參數(shù)mapreduce.client.submit.file.replication控制),
5、通過資源管理器的submitApplication方法提交作業(yè)
2.2 作業(yè)的初始化
1、當(dāng)資源管理器通過方法submitApplication方法被調(diào)用后,便將請求傳給了yarn的調(diào)度器,然后調(diào)度器在一個(gè)節(jié)點(diǎn)管理器上分配一個(gè)容器(container0)用來啟動(dòng)application master(主類是MRAppMaster)進(jìn)程。該進(jìn)程一旦啟動(dòng)就會(huì)向resourcemanager注冊并報(bào)告自己的信息,application master并且可以監(jiān)控map和reduce的運(yùn)行狀態(tài)。因此application master對(duì)作業(yè)的初始化是通過創(chuàng)建多個(gè)薄記對(duì)象以保持對(duì)作業(yè)進(jìn)度的跟蹤。
2、application master接收作業(yè)提交時(shí)的hdfs臨時(shí)共享目錄中的資源文件,jar,分片信息,配置信息等。并對(duì)每一個(gè)分片創(chuàng)建一個(gè)map對(duì)象,以及通過mapreduce.job.reduces參數(shù)(作業(yè)通過setNumReduceTasks()方法設(shè)定)確定reduce的數(shù)量。
3、application master會(huì)判斷是否使用uber(作業(yè)與application master在同一個(gè)jvm運(yùn)行,也就是maptask和reducetask運(yùn)行在同一個(gè)節(jié)點(diǎn)上)模式運(yùn)行作業(yè),uber模式運(yùn)行條件:map數(shù)量小于10個(gè),1個(gè)reduce,且輸入數(shù)據(jù)小于一個(gè)hdfs塊
可以通過參數(shù):
mapreduce.job.ubertask.enable #是否啟用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map數(shù)
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce數(shù)
mapreduce.job.ubertask.maxbytes #ubertask最大作業(yè)大小
4、application master調(diào)用setupJob方法設(shè)置OutputCommiter,FileOutputCommiter為默認(rèn)值,表示建立做的最終輸出目錄和任務(wù)輸出的臨時(shí)工作空間
2.3 作業(yè)任務(wù)分配
1、在application master判斷作業(yè)不符合uber模式的情況下,那么application master則會(huì)向資源管理器為map和reduce任務(wù)申請資源容器。
2、首先就是為map任務(wù)發(fā)出資源申請請求,直到有5%的map任務(wù)完成時(shí),才會(huì)為reduce任務(wù)所需資源申請發(fā)出請求。
3、在任務(wù)的分配過程中,reduce任務(wù)可以在任何的datanode節(jié)點(diǎn)運(yùn)行,但是map任務(wù)執(zhí)行的時(shí)候需要考慮到數(shù)據(jù)本地化的機(jī)制,在給任務(wù)指定資源的時(shí)候每個(gè)map和reduce默認(rèn)為1G內(nèi)存,可以通過如下參數(shù)配置:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
2.4 作業(yè)任務(wù)的執(zhí)行
application master提交申請后,資源管理器為其按需分配資源,這時(shí),application master就與節(jié)點(diǎn)管理器通信來啟動(dòng)容器。該任務(wù)由主類YarnChild的一個(gè)java應(yīng)用程序執(zhí)行。在運(yùn)行任務(wù)之前,首先將所需的資源進(jìn)行本地化,包括作業(yè)的配置,jar文件等。接下來就是運(yùn)行map和reduce任務(wù)。YarnChild在單獨(dú)的JVM中運(yùn)行。
2.5 作業(yè)任務(wù)的狀態(tài)更新
每個(gè)作業(yè)和它的每個(gè)任務(wù)都有一個(gè)狀態(tài):作業(yè)或者任務(wù)的狀態(tài)(運(yùn)行中,成功,失敗等),map和reduce的進(jìn)度,作業(yè)計(jì)數(shù)器的值,狀態(tài)消息或描述當(dāng)作業(yè)處于正在運(yùn)行中的時(shí)候,客戶端可以直接與application master通信,每秒(可以通過參數(shù)mapreduce.client.progressmonitor.pollinterval設(shè)置)輪詢作業(yè)的執(zhí)行狀態(tài),進(jìn)度等信息。
2.6 作業(yè)的完成
當(dāng)application master收到最后一個(gè)任務(wù)已完成的通知,便把作業(yè)的狀態(tài)設(shè)置為成功。
在job輪詢作業(yè)狀態(tài)時(shí),知道任務(wù)已經(jīng)完成,然后打印消息告知用戶,并從waitForCompletion()方法返回。
當(dāng)作業(yè)完成時(shí),application master和container會(huì)清理中間數(shù)據(jù)結(jié)果等臨時(shí)問題。OutputCommiter的commitJob()方法被調(diào)用,作業(yè)信息由作業(yè)歷史服務(wù)存檔,以便用戶日后查詢。
三.shuffle
mapreduce確保每個(gè)reduce的輸入都是按照鍵值排序的,系統(tǒng)執(zhí)行排序,將map的輸入作為reduce的輸入過程稱之為shuffle過程。shuffle也是我們優(yōu)化的重點(diǎn)部分。shuffle流程圖如下圖所示:

3.1 map端
在生成map之前,會(huì)計(jì)算文件分片的大小
然后會(huì)根據(jù)分片的大小計(jì)算map的個(gè)數(shù),對(duì)每一個(gè)分片都會(huì)產(chǎn)生一個(gè)map作業(yè),或者是一個(gè)文件(小于分片大小*1.1)生成一個(gè)map作業(yè),然后通過自定的map方法進(jìn)行自定義的邏輯計(jì)算,計(jì)算完畢后會(huì)寫到本地磁盤。
在這里不是直接寫入磁盤,為了保證IO效率,采用了先寫入內(nèi)存的環(huán)形緩沖區(qū),并做一次預(yù)排序(快速排序)。緩沖區(qū)的大小默認(rèn)為100MB(可通過修改配置項(xiàng)mpareduce.task.io.sort.mb進(jìn)行修改),當(dāng)寫入內(nèi)存緩沖區(qū)的大小到達(dá)一定比例時(shí),默認(rèn)為80%(可通過mapreduce.map.sort.spill.percent配置項(xiàng)修改),將啟動(dòng)一個(gè)溢寫線程將內(nèi)存緩沖區(qū)的內(nèi)容溢寫到磁盤(spill to disk),這個(gè)溢寫線程是獨(dú)立的,不影響map向緩沖區(qū)寫結(jié)果的線程,在溢寫到磁盤的過程中,map繼續(xù)輸入到緩沖中,如果期間緩沖區(qū)被填滿,則map寫會(huì)被阻塞到溢寫磁盤過程完成。溢寫是通過輪詢的方式將緩沖區(qū)中的內(nèi)存寫入到本地mapreduce.cluster.local.dir目錄下。在溢寫到磁盤之前,我們會(huì)知道reduce的數(shù)量,然后會(huì)根據(jù)reduce的數(shù)量劃分分區(qū),默認(rèn)根據(jù)hashpartition對(duì)溢寫的數(shù)據(jù)寫入到相對(duì)應(yīng)的分區(qū)。在每個(gè)分區(qū)中,后臺(tái)線程會(huì)根據(jù)key進(jìn)行排序,所以溢寫到磁盤的文件是分區(qū)且排序的。如果有combiner函數(shù),它在排序后的輸出運(yùn)行,使得map輸出更緊湊。減少寫到磁盤的數(shù)據(jù)和傳輸給reduce的數(shù)據(jù)。
每次環(huán)形換沖區(qū)的內(nèi)存達(dá)到閾值時(shí),就會(huì)溢寫到一個(gè)新的文件,因此當(dāng)一個(gè)map溢寫完之后,本地會(huì)存在多個(gè)分區(qū)切排序的文件。在map完成之前會(huì)把這些文件合并成一個(gè)分區(qū)且排序(歸并排序)的文件,可以通過參數(shù)mapreduce.task.io.sort.factor控制每次可以合并多少個(gè)文件。
在map溢寫磁盤的過程中,對(duì)數(shù)據(jù)進(jìn)行壓縮可以提交速度的傳輸,減少磁盤io,減少存儲(chǔ)。默認(rèn)情況下不壓縮,使用參數(shù)mapreduce.map.output.compress控制,壓縮算法使用mapreduce.map.output.compress.codec參數(shù)控制。
3.2 reduce端
map任務(wù)完成后,監(jiān)控作業(yè)狀態(tài)的application master便知道m(xù)ap的執(zhí)行情況,并啟動(dòng)reduce任務(wù),application master并且知道m(xù)ap輸出和主機(jī)之間的對(duì)應(yīng)映射關(guān)系,reduce輪詢application master便知道主機(jī)所要復(fù)制的數(shù)據(jù)。
一個(gè)Map任務(wù)的輸出,可能被多個(gè)Reduce任務(wù)抓取。每個(gè)Reduce任務(wù)可能需要多個(gè)Map任務(wù)的輸出作為其特殊的輸入文件,而每個(gè)Map任務(wù)的完成時(shí)間可能不同,當(dāng)有一個(gè)Map任務(wù)完成時(shí),Reduce任務(wù)就開始運(yùn)行。Reduce任務(wù)根據(jù)分區(qū)號(hào)在多個(gè)Map輸出中抓?。╢etch)對(duì)應(yīng)分區(qū)的數(shù)據(jù),這個(gè)過程也就是Shuffle的copy過程。。reduce有少量的復(fù)制線程,因此能夠并行的復(fù)制map的輸出,默認(rèn)為5個(gè)線程。可以通過參數(shù)mapreduce.reduce.shuffle.parallelcopies控制。
這個(gè)復(fù)制過程和map寫入磁盤過程類似,也有閥值和內(nèi)存大小,閥值一樣可以在配置文件里配置,而內(nèi)存大小是直接使用reduce的tasktracker的內(nèi)存大小,復(fù)制時(shí)候reduce還會(huì)進(jìn)行排序操作和合并文件操作。
如果map輸出很小,則會(huì)被復(fù)制到Reducer所在節(jié)點(diǎn)的內(nèi)存緩沖區(qū),緩沖區(qū)的大小可以通過mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在節(jié)點(diǎn)的內(nèi)存緩沖區(qū)達(dá)到閥值,或者緩沖區(qū)中的文件數(shù)達(dá)到閥值,則合并溢寫到磁盤。
如果map輸出較大,則直接被復(fù)制到Reducer所在節(jié)點(diǎn)的磁盤中。隨著Reducer所在節(jié)點(diǎn)的磁盤中溢寫文件增多,后臺(tái)線程會(huì)將它們合并為更大且有序的文件。當(dāng)完成復(fù)制map輸出,進(jìn)入sort階段。這個(gè)階段通過歸并排序逐步將多個(gè)map輸出小文件合并成大文件。最后幾個(gè)通過歸并合并成的大文件作為reduce的輸出
3.3 總結(jié)
當(dāng)Reducer的輸入文件確定后,整個(gè)Shuffle操作才最終結(jié)束。之后就是Reducer的執(zhí)行了,最后Reducer會(huì)把結(jié)果存到HDFS上。
在Hadoop集群環(huán)境中,大部分map 任務(wù)與reduce任務(wù)的執(zhí)行是在不同的節(jié)點(diǎn)上。當(dāng)然很多情況下Reduce執(zhí)行時(shí)需要跨節(jié)點(diǎn)去拉取其它節(jié)點(diǎn)上的map任務(wù)結(jié)果。如果集群正在運(yùn)行的job有很多,那么task的正常執(zhí)行對(duì)集群內(nèi)部的網(wǎng)絡(luò)資源消耗會(huì)很嚴(yán)重。這種網(wǎng)絡(luò)消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節(jié)點(diǎn)內(nèi),相比于內(nèi)存,磁盤IO對(duì)job完成時(shí)間的影響也是可觀的。從最基本的要求來說,我們對(duì)Shuffle過程的期望可以有:
1、完整地從map task端拉取數(shù)據(jù)到reduce 端。
2、在跨節(jié)點(diǎn)拉取數(shù)據(jù)時(shí),盡可能地減少對(duì)帶寬的不必要消耗。
3、減少磁盤IO對(duì)task執(zhí)行的影響。
在MapReduce計(jì)算框架中,主要用到兩種排序算法:快速排序和歸并排序。在Map任務(wù)發(fā)生了2次排序,Reduce任務(wù)發(fā)生一次排序:
1、第1次排序發(fā)生在Map輸出的內(nèi)存環(huán)形緩沖區(qū),使用快速排序。當(dāng)緩沖區(qū)達(dá)到閥值時(shí),在溢寫到磁盤之前,后臺(tái)線程會(huì)將緩沖區(qū)的數(shù)據(jù)劃分成相應(yīng)分區(qū),在每個(gè)分區(qū)中按照鍵值進(jìn)行內(nèi)排序。
2、第2次排序是在Map任務(wù)輸出的磁盤空間上將多個(gè)溢寫文件歸并成一個(gè)已分區(qū)且有序的輸出文件。由于溢寫文件已經(jīng)經(jīng)過一次排序,所以合并溢寫文件時(shí)只需一次歸并排序即可使輸出文件整體有序。
3、第3次排序發(fā)生在Shuffle階段,將多個(gè)復(fù)制過來的Map輸出文件進(jìn)行歸并,同樣經(jīng)過一次歸并排序即可得到有序文件。
四.作業(yè)失敗和容錯(cuò)
既然有作業(yè)的運(yùn)行,肯定會(huì)有作業(yè)的失敗,作業(yè)的失?。ú豢紤]硬件,平臺(tái)原因引起的失?。┛赡軙?huì)存在不同的問題
4.1 任務(wù)運(yùn)行失敗
用戶代碼拋出異常(代碼沒寫好):這種情況任務(wù)JVM會(huì)在退出之前向application master發(fā)送錯(cuò)誤報(bào)告,并記錄進(jìn)用戶日志,application master對(duì)該作業(yè)標(biāo)記為failed,并釋放掉占有的資源容器。
另一種就是JVM突然退出,這種情況節(jié)點(diǎn)管理器會(huì)注意到進(jìn)程已經(jīng)退出,并通知application master將此任務(wù)標(biāo)記為失敗,如果是因?yàn)橥茰y執(zhí)行而導(dǎo)致任務(wù)被終止,則不會(huì)被被標(biāo)記為失敗。而任務(wù)掛起又不同,一旦application master注意到有一段時(shí)間沒有收到進(jìn)度更新,便會(huì)把任務(wù)標(biāo)記為失敗,默認(rèn)為10分鐘,參數(shù)mapreduce.task.timeout控制application master被告知一個(gè)任務(wù)失敗,將會(huì)重新調(diào)度該任務(wù)執(zhí)行(會(huì)在與之前失敗的不同節(jié)點(diǎn)上運(yùn)行),默認(rèn)重試4次,如果四次都失敗,則作業(yè)判定為失敗,參數(shù)控制為:
mapreduce.map.maxattempts
mapreduce.reduce.maxattempts
4.2 application master運(yùn)行失敗
AM也可能由于各種原因(如網(wǎng)絡(luò)問題或者硬件故障)失效,Yarn同樣會(huì)嘗試重啟AM
可以為每個(gè)作業(yè)單獨(dú)配置AM的嘗試重啟次數(shù):mapreduce.am.max-attempts,默認(rèn)值為2
Yarn中的上限一起提高:yarn.resourcemanager.am.nax-attempts,默認(rèn)為2,單個(gè)應(yīng)用程序不可以超過這個(gè)限制,除非同時(shí)修改這兩個(gè)參數(shù)。
恢復(fù)過程:application master向資源管理器發(fā)送周期性的心跳。當(dāng)application master失敗時(shí),資源管理器會(huì)檢測到該失敗,并在一個(gè)新的容器中啟動(dòng)application master,并使用作業(yè)歷史來恢復(fù)失敗的應(yīng)用程序中的運(yùn)行任務(wù)狀態(tài),使其不必重新運(yùn)行,默認(rèn)情況下恢復(fù)功能是開啟的,yarn.app.mapreduce.am.job.recovery.enable控制客戶端向application master輪詢作業(yè)狀態(tài)時(shí),如果application master運(yùn)行失敗了,則客戶端會(huì)向資源管理器resourcemanager詢問和緩存application master地址。
4.3 節(jié)點(diǎn)管理器運(yùn)行失敗
如果節(jié)點(diǎn)管理器崩潰或者運(yùn)行非常緩慢,則就會(huì)停止向資源管理器發(fā)送心跳信息,如果10分鐘(可以通過參數(shù)yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms設(shè)置)資源管理器沒有收到一條心跳信息,則資源管理器將會(huì)通知停止發(fā)送心跳的節(jié)點(diǎn)管理器,并將其從自己的資源池中移除該節(jié)點(diǎn)管理器,在該節(jié)點(diǎn)上的application master和任務(wù)的失敗,都通過如上兩種恢復(fù)機(jī)制進(jìn)行恢復(fù)。
4.4 資源管理器運(yùn)行失敗
資源管理器失敗時(shí)一個(gè)很嚴(yán)重的問題,所有的任務(wù)將不能被分配資源,作業(yè)和容器都無法啟動(dòng),那么整個(gè)通過yarn控制資源的集群都處于癱瘓狀態(tài)。
容錯(cuò)機(jī)制:resourcemanager HA
參考
內(nèi)容來自 《Hadoop權(quán)威指南》及 https://www.cnblogs.com/zsql/p/11600136.html