mapreduce框架詳解

參考:
hadoop 學(xué)習(xí)筆記:mapreduce框架詳解

[toc]

總結(jié)

Mapreduce是一個(gè)計(jì)算框架,既然是做計(jì)算的框架,那么表現(xiàn)形式就是有個(gè)輸入(input),mapreduce操作這個(gè)輸入(input),通過(guò)本身定義好的計(jì)算模型,得到一個(gè)輸出(output),這個(gè)輸出就是我們所需要的結(jié)果。

我們要學(xué)習(xí)的就是這個(gè)計(jì)算模型的運(yùn)行規(guī)則。在運(yùn)行一個(gè)mapreduce計(jì)算任務(wù)時(shí)候,任務(wù)過(guò)程被分為兩個(gè)階段:map階段和reduce階段,每個(gè)階段都是用鍵值對(duì)(key/value)作為輸入(input)和輸出(output)。而我們要做的就是定義好這兩個(gè)階段的函數(shù):map函數(shù)和reduce函數(shù)。

mapreduce框架

combine、partition和shuffle的區(qū)別:

combine:作用是把同一個(gè)key的鍵值對(duì)合并在一起,combine函數(shù)把一個(gè)map函數(shù)產(chǎn)生的<key,value>對(duì)(多個(gè)key,value)合并成一個(gè)新的<key2,value2>.將新的<key2,value2>作為輸入到reduce函數(shù)中,這個(gè)value2亦可稱之為values,因?yàn)橛卸鄠€(gè)。這個(gè)合并的目的是為了減少網(wǎng)絡(luò)傳輸。

partition:partition的作用就是把這些數(shù)據(jù)歸類,主要在Shuffle過(guò)程中按照Key值將中間結(jié)果分成R份,其中每份都有一個(gè)Reduce去負(fù)責(zé),可以通過(guò)job.setPartitionerClass()方法進(jìn)行設(shè)置,默認(rèn)的使用hashPartitioner類。

shuffle:shuffle就是map和reduce之間的過(guò)程,包含了兩端的combine和partition。它比較難以理解,因?yàn)槲覀兠恢?,看不到它,它只是理論存在的,而且確實(shí)存在,它屬于mapreduce的框架。

詳細(xì)介紹

第一部分:Wordcount運(yùn)行流程

1 簡(jiǎn)介:

首先講講物理實(shí)體,參入mapreduce作業(yè)執(zhí)行涉及4個(gè)獨(dú)立的實(shí)體:

  • 客戶端(client):編寫(xiě)mapreduce程序,配置作業(yè),提交作業(yè),這就是程序員完成的工作;
  • JobTracker:初始化作業(yè),分配作業(yè),與TaskTracker通信,協(xié)調(diào)整個(gè)作業(yè)的執(zhí)行;
  • TaskTracker:保持與JobTracker的通信,在分配的數(shù)據(jù)片段上執(zhí)行Map或-
  • Reduce任務(wù),TaskTracker和JobTracker的不同有個(gè)很重要的方面,就是在執(zhí)行任務(wù)時(shí)候TaskTracker可以有n多個(gè),JobTracker則只會(huì)有一個(gè)(JobTracker只能有一個(gè)就和hdfs里namenode一樣存在單點(diǎn)故障,我會(huì)在后面的mapreduce的相關(guān)問(wèn)題里講到這個(gè)問(wèn)題的)
  • Hdfs:保存作業(yè)的數(shù)據(jù)、配置信息等等,最后的結(jié)果也是保存在hdfs上面
MapReduce作業(yè)提價(jià)圖

MapReduce作業(yè)提價(jià)圖

2 mapreduce提交作業(yè)的流程

首先是客戶端要編寫(xiě)好mapreduce程序,配置好mapreduce的作業(yè)也就是job,接下來(lái)就是提交job了;

提交job是提交到JobTracker上的,這個(gè)時(shí)候JobTracker就會(huì)構(gòu)建這個(gè)job,具體就是分配一個(gè)新的job任務(wù)的ID值;

接下來(lái)它會(huì)做檢查操作,這個(gè)檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運(yùn)行下去,JobTracker會(huì)拋出錯(cuò)誤給客戶端;

解釋:輸出目錄,不能提前在存在,如果輸出目錄存在reduce時(shí)就搞不清楚你到底是要追加還是覆蓋。

接下來(lái)還要檢查輸入目錄是否存在,如果不存在同樣拋出錯(cuò)誤,如果存在JobTracker會(huì)根據(jù)輸入計(jì)算輸入分片(Input Split),如果分片計(jì)算不出來(lái)也會(huì)拋出錯(cuò)誤,至于輸入分片我后面會(huì)做講解的,這些都做好了JobTracker就會(huì)配置Job需要的資源了。

分配好資源后,JobTracker就會(huì)初始化作業(yè),初始化主要做的是將Job放入一個(gè)內(nèi)部的隊(duì)列,讓配置好的作業(yè)調(diào)度器能調(diào)度到這個(gè)作業(yè),作業(yè)調(diào)度器會(huì)初始化這個(gè)job,初始化就是創(chuàng)建一個(gè)正在運(yùn)行的job對(duì)象(封裝任務(wù)和記錄信息),以便JobTracker跟蹤job的狀態(tài)和進(jìn)程;

初始化完畢后,作業(yè)調(diào)度器會(huì)獲取輸入分片信息(input split),每個(gè)分片創(chuàng)建一個(gè)map任務(wù)。

接下來(lái)就是任務(wù)分配了,這個(gè)時(shí)候tasktracker會(huì)運(yùn)行一個(gè)簡(jiǎn)單的循環(huán)機(jī)制定期發(fā)送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個(gè)時(shí)間,心跳就是jobtracker和tasktracker溝通的橋梁,通過(guò)心跳,jobtracker可以監(jiān)控tasktracker是否存活,也可以獲取tasktracker處理的狀態(tài)和問(wèn)題,同時(shí)tasktracker也可以通過(guò)心跳里的返回值獲取jobtracker給它的操作指令。任務(wù)分配好后就是執(zhí)行任務(wù)了;

在任務(wù)執(zhí)行時(shí)候jobtracker可以通過(guò)心跳機(jī)制監(jiān)控tasktracker的狀態(tài)和進(jìn)度,同時(shí)也能計(jì)算出整個(gè)job的狀態(tài)和進(jìn)度,而tasktracker也可以本地監(jiān)控自己的狀態(tài)和進(jìn)度。當(dāng)jobtracker獲得了最后一個(gè)完成指定任務(wù)的tasktracker操作成功的通知時(shí)候,jobtracker會(huì)把整個(gè)job狀態(tài)置為成功,然后當(dāng)客戶端查詢job運(yùn)行狀態(tài)時(shí)候(注意:這個(gè)是異步操作),客戶端會(huì)查到j(luò)ob完成的通知的。如果job中途失敗,mapreduce也會(huì)有相應(yīng)機(jī)制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯(cuò)誤處理機(jī)制都能保證提交的job能正常完成。

3 mapreduce計(jì)算流程

輸入分片 ---> map階段 ---> combiner階段(可選) ---> shuffle階段 ---> reduce階段

1. 輸入分片(input split):
在進(jìn)行map計(jì)算之前,mapreduce會(huì)根據(jù)輸入文件計(jì)算輸入分片(input split),每個(gè)輸入分片(input split)針對(duì)一個(gè)map任務(wù),輸入分片(input split)存儲(chǔ)的并非數(shù)據(jù)本身,而是一個(gè)分片長(zhǎng)度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組,輸入分片(input split)往往和hdfs的block(塊)關(guān)系很密切,假如我們?cè)O(shè)定hdfs的塊的大小是64mb,如果我們輸入有三個(gè)文件,大小分別是3mb、65mb和127mb,那么mapreduce會(huì)把3mb文件分為一個(gè)輸入分片(input split),65mb則是兩個(gè)輸入分片(input split)而127mb也是兩個(gè)輸入分片(input split),換句話說(shuō)我們?nèi)绻趍ap計(jì)算前做輸入分片調(diào)整,例如合并小文件,那么就會(huì)有5個(gè)map任務(wù)將執(zhí)行,而且每個(gè)map執(zhí)行的數(shù)據(jù)大小不均,這個(gè)也是mapreduce優(yōu)化計(jì)算的一個(gè)關(guān)鍵點(diǎn)。[分塊分片的參考:Hadoop分塊與分片介紹及分片和分塊大小相同的原因]

默認(rèn)分片大小與分塊大小是相同的原因
hadoop在存儲(chǔ)有輸入數(shù)據(jù)(HDFS中的數(shù)據(jù))的節(jié)點(diǎn)上運(yùn)行map任務(wù),可以獲得高性能,這就是所謂的數(shù)據(jù)本地化。所以最佳分片的大小應(yīng)該與HDFS上的塊大小一樣,因?yàn)槿绻制缭?個(gè)數(shù)據(jù)塊,對(duì)于任何一個(gè)HDFS節(jié)點(diǎn)(基本不肯能同時(shí)存儲(chǔ)這2個(gè)數(shù)據(jù)塊),分片中的另外一塊數(shù)據(jù)就需要通過(guò)網(wǎng)絡(luò)傳輸?shù)絤ap任務(wù)節(jié)點(diǎn),與使用本地?cái)?shù)據(jù)運(yùn)行map任務(wù)相比,效率則更低!

小結(jié):分塊優(yōu)化,減少網(wǎng)絡(luò)傳輸數(shù)據(jù),使用本地?cái)?shù)據(jù)運(yùn)行map任務(wù)。

  1. map階段:就是程序員編寫(xiě)好的map函數(shù)了,因此map函數(shù)效率相對(duì)好控制,而且一般map操作都是本地化操作也就是在數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)上進(jìn)行;

  2. combiner階段:combiner階段是程序員可以選擇的,combiner其實(shí)也是一種reduce操作,因此我們看見(jiàn)WordCount類里是用reduce進(jìn)行加載的。Combiner是一個(gè)本地化的reduce操作,它是map運(yùn)算的后續(xù)操作,主要是在map計(jì)算出中間文件前做一個(gè)簡(jiǎn)單的合并重復(fù)key值的操作,例如我們對(duì)文件里的單詞頻率做統(tǒng)計(jì),map計(jì)算時(shí)候如果碰到一個(gè)hadoop的單詞就會(huì)記錄為1,但是這篇文章里hadoop可能會(huì)出現(xiàn)n多次,那么map輸出文件冗余就會(huì)很多,因此在reduce計(jì)算前對(duì)相同的key做一個(gè)合并操作,那么文件會(huì)變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計(jì)算力寬帶資源往往是計(jì)算的瓶頸也是最為寶貴的資源,但是combiner操作是有風(fēng)險(xiǎn)的,使用它的原則是combiner的輸入不會(huì)影響到reduce計(jì)算的最終輸入,例如:如果計(jì)算只是求總數(shù),最大值,最小值可以使用combiner,但是做平均值計(jì)算使用combiner的話,最終的reduce計(jì)算結(jié)果就會(huì)出錯(cuò)。

小結(jié):combine時(shí)一個(gè)本地化的reduce操作,對(duì)相同的key做一個(gè)合并操作,提高帶寬的利用率

  1. shuffle階段:將map的輸出作為reduce的輸入的過(guò)程就是shuffle了,這個(gè)是mapreduce優(yōu)化的重點(diǎn)地方。這里我不講怎么優(yōu)化shuffle階段,講講shuffle階段的原理,因?yàn)榇蟛糠值臅?shū)籍里都沒(méi)講清楚shuffle階段。Shuffle一開(kāi)始就是map階段做輸出操作,一般mapreduce計(jì)算的都是海量數(shù)據(jù),map輸出時(shí)候不可能把所有文件都放到內(nèi)存操作,因此map寫(xiě)入磁盤的過(guò)程十分的復(fù)雜,更何況map輸出時(shí)候要對(duì)結(jié)果進(jìn)行排序,內(nèi)存開(kāi)銷是很大的,map在做輸出時(shí)候會(huì)在內(nèi)存里開(kāi)啟一個(gè)環(huán)形內(nèi)存緩沖區(qū),這個(gè)緩沖區(qū)專門用來(lái)輸出的,默認(rèn)大小是100mb,并且在配置文件里為這個(gè)緩沖區(qū)設(shè)定了一個(gè)閥值,默認(rèn)是0.80(這個(gè)大小和閥值都是可以在配置文件里進(jìn)行配置的),同時(shí)map還會(huì)為輸出操作啟動(dòng)一個(gè)守護(hù)線程,如果緩沖區(qū)的內(nèi)存達(dá)到了閥值的80%時(shí)候,這個(gè)守護(hù)線程就會(huì)把內(nèi)容寫(xiě)到磁盤上,這個(gè)過(guò)程叫spill,另外的20%內(nèi)存可以繼續(xù)寫(xiě)入要寫(xiě)進(jìn)磁盤的數(shù)據(jù),寫(xiě)入磁盤和寫(xiě)入內(nèi)存操作是互不干擾的,如果緩存區(qū)被撐滿了,那么map就會(huì)阻塞寫(xiě)入內(nèi)存的操作,讓寫(xiě)入磁盤操作完成后再繼續(xù)執(zhí)行寫(xiě)入內(nèi)存操作,前面我講到寫(xiě)入磁盤前會(huì)有個(gè)排序操作,這個(gè)是在寫(xiě)入磁盤操作時(shí)候進(jìn)行,不是在寫(xiě)入內(nèi)存時(shí)候進(jìn)行的,如果我們定義了combiner函數(shù),那么排序前還會(huì)執(zhí)行combiner操作。每次spill操作也就是寫(xiě)入磁盤操作時(shí)候就會(huì)寫(xiě)一個(gè)溢出文件,也就是說(shuō)在做map輸出有幾次spill就會(huì)產(chǎn)生多少個(gè)溢出文件,等map輸出全部做完后,map會(huì)合并這些輸出文件。這個(gè)過(guò)程里還會(huì)有一個(gè)Partitioner操作,對(duì)于這個(gè)操作很多人都很迷糊,其實(shí)Partitioner操作和map階段的輸入分片(Input split)很像,一個(gè)Partitioner對(duì)應(yīng)一個(gè)reduce作業(yè),如果我們mapreduce操作只有一個(gè)reduce操作,那么Partitioner就只有一個(gè),如果我們有多個(gè)reduce操作,那么Partitioner對(duì)應(yīng)的就會(huì)有多個(gè),Partitioner因此就是reduce的輸入分片,這個(gè)程序員可以編程控制,主要是根據(jù)實(shí)際key和value的值,根據(jù)實(shí)際業(yè)務(wù)類型或者為了更好的reduce負(fù)載均衡要求進(jìn)行,這是提高reduce效率的一個(gè)關(guān)鍵所在。到了reduce階段就是合并map輸出文件了,Partitioner會(huì)找到對(duì)應(yīng)的map輸出文件,然后進(jìn)行復(fù)制操作,復(fù)制操作時(shí)reduce會(huì)開(kāi)啟幾個(gè)復(fù)制線程,這些線程默認(rèn)個(gè)數(shù)是5個(gè),程序員也可以在配置文件更改復(fù)制線程的個(gè)數(shù),這個(gè)復(fù)制過(guò)程和map寫(xiě)入磁盤過(guò)程類似,也有閥值和內(nèi)存大小,閥值一樣可以在配置文件里配置,而內(nèi)存大小是直接使用reduce的tasktracker的內(nèi)存大小,復(fù)制時(shí)候reduce還會(huì)進(jìn)行排序操作和合并文件操作,這些操作完了就會(huì)進(jìn)行reduce計(jì)算了。

小結(jié):

  1. shuffle是mapreduce優(yōu)化的重點(diǎn)地方;
  2. 環(huán)形內(nèi)存緩沖區(qū) :因此map寫(xiě)入磁盤的過(guò)程十分的復(fù)雜,更何況map輸出時(shí)候要對(duì)結(jié)果進(jìn)行排序,內(nèi)存開(kāi)銷是很大的,所以開(kāi)啟環(huán)形內(nèi)存緩沖區(qū)專門用于輸出;默認(rèn)是100MB,閾值是0.8;
  3. spill(溢寫(xiě)):緩沖區(qū)>80%,寫(xiě)入磁盤;溢寫(xiě)前先排序,后合并,寫(xiě)入磁盤;
  4. Partition:Partitioner操作和map階段的輸入分片(Input split)很像,Partitioner會(huì)找到對(duì)應(yīng)的map輸出文件,然后進(jìn)行復(fù)制操作,作為reduce的輸入;
  1. reduce階段:和map函數(shù)一樣也是程序員編寫(xiě)的,最終結(jié)果是存儲(chǔ)在hdfs上的。

4 問(wèn)題:

  1. jobtracker的單點(diǎn)故障:
    jobtracker和hdfs的namenode一樣也存在單點(diǎn)故障,單點(diǎn)故障一直是hadoop被人詬病的大問(wèn)題,為什么hadoop的做的文件系統(tǒng)和mapreduce計(jì)算框架都是高容錯(cuò)的,但是最重要的管理節(jié)點(diǎn)的故障機(jī)制卻如此不好,我認(rèn)為主要是namenode和jobtracker在實(shí)際運(yùn)行中都是在內(nèi)存操作,而做到內(nèi)存的容錯(cuò)就比較復(fù)雜了,只有當(dāng)內(nèi)存數(shù)據(jù)被持久化后容錯(cuò)才好做,namenode和jobtracker都可以備份自己持久化的文件,但是這個(gè)持久化都會(huì)有延遲,因此真的出故障,任然不能整體恢復(fù),另外hadoop框架里包含zookeeper框架,zookeeper可以結(jié)合jobtracker,用幾臺(tái)機(jī)器同時(shí)部署jobtracker,保證一臺(tái)出故障,有一臺(tái)馬上能補(bǔ)充上,不過(guò)這種方式也沒(méi)法恢復(fù)正在跑的mapreduce任務(wù)。

  2. 輸出文件目錄為什么不能提前存在?
    做mapreduce計(jì)算時(shí)候,輸出一般是一個(gè)文件夾,而且該文件夾是不能存在,我在出面試題時(shí)候提到了這個(gè)問(wèn)題,而且這個(gè)檢查做的很早,當(dāng)我們提交job時(shí)候就會(huì)進(jìn)行,mapreduce之所以這么設(shè)計(jì)是保證數(shù)據(jù)可靠性,如果輸出目錄存在reduce就搞不清楚你到底是要追加還是覆蓋,不管是追加和覆蓋操作都會(huì)有可能導(dǎo)致最終結(jié)果出問(wèn)題,mapreduce是做海量數(shù)據(jù)計(jì)算,一個(gè)生產(chǎn)計(jì)算的成本很高,例如一個(gè)job完全執(zhí)行完可能要幾個(gè)小時(shí),因此一切影響錯(cuò)誤的情況mapreduce是零容忍的。

  3. InputFormat和OutputFormat的格式?
    Mapreduce還有一個(gè)InputFormat和OutputFormat,我們?cè)诰帉?xiě)map函數(shù)時(shí)候發(fā)現(xiàn)map方法的參數(shù)是直接操作行數(shù)據(jù),沒(méi)有牽涉到InputFormat,這些事情在我們new Path時(shí)候mapreduce計(jì)算框架幫我們做好了,而OutputFormat也是reduce幫我們做好了,我們使用什么樣的輸入文件,就要調(diào)用什么樣的InputFormat,InputFormat是和我們輸入的文件類型相關(guān)的,mapreduce里常用的InputFormat有FileInputFormat普通文本文件,SequenceFileInputFormat是指hadoop的序列化文件,另外還有KeyValueTextInputFormat。OutputFormat就是我們想最終存儲(chǔ)到hdfs系統(tǒng)上的文件格式了,這個(gè)根據(jù)你需要定義了,hadoop有支持很多文件格式,這里不一一列舉,想知道百度下就看到了。

FileInputFormat.addInputPath(job, in);// 輸入路徑
FileOutputFormat.setOutputPath(job, out);// 輸出路徑

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容