Hadoop開發(fā)--MapReduce分析(五)

一、Mapreduce簡(jiǎn)介

??Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架。
??Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)hadoop集群上。

二、MapReduce并行處理的基本過(guò)程

Hadoop2.0之前和Hadoop2.0之后的區(qū)別:
2.0之前只有MapReduce的運(yùn)行框架,那么它里面有只有兩種節(jié)點(diǎn),一個(gè)是master,一個(gè)是worker。
master既做資源調(diào)度又做程序調(diào)度,worker只是用來(lái)參與計(jì)算的。
在2.0之后加入了YARN集群,Yarn集群的主節(jié)點(diǎn)承擔(dān)了資源調(diào)度,Yarn集群的從節(jié)點(diǎn)中會(huì)選出一個(gè)節(jié)點(diǎn)(這個(gè)由redourcemanager決定)。
用作類似于2.0之前的master的工作,來(lái)進(jìn)行應(yīng)用程序的調(diào)度。
資源調(diào)度: 處理程序所需要的cpu、內(nèi)存資源,以及存儲(chǔ)數(shù)據(jù)所需要的硬盤資源都是resourcemanager去分配的。

MapReduce并行處理的基本過(guò)程

??一切都是從最上方的user program開始的,user program鏈接了MapReduce庫(kù),實(shí)現(xiàn)了最基本的Map函數(shù)和Reduce函數(shù)。
  圖中執(zhí)行的順序都用數(shù)字標(biāo)記了。
  1)MapReduce庫(kù)先把user program的輸入文件劃分為M份(M為用戶定義),如圖左方所示分成了split0~4;然后使用fork將用戶進(jìn)程拷貝到集群內(nèi)其它機(jī)器上。
  2)user program的副本中有一個(gè)稱為master,其余稱為worker,master是負(fù)責(zé)調(diào)度的,為空閑worker分配作業(yè)(Map作業(yè)或者Reduce作業(yè)),worker的數(shù)量也是可以由用戶指定的。
  3)被分配了Map作業(yè)的worker,開始讀取對(duì)應(yīng)分片的輸入數(shù)據(jù),Map作業(yè)數(shù)量是由M決定的,和split一一對(duì)應(yīng);Map作業(yè)從輸入數(shù)據(jù)中抽取出鍵值對(duì),每一個(gè)鍵值對(duì)都作為參數(shù)傳遞給map函數(shù),map函數(shù)產(chǎn)生的中間鍵值對(duì)被緩存在內(nèi)存中。
  4)緩存的中間鍵值對(duì)會(huì)被定期寫入本地磁盤,而且被分為R個(gè)區(qū),R的大小是由用戶定義的,將來(lái)每個(gè)區(qū)會(huì)對(duì)應(yīng)一個(gè)Reduce作業(yè);這些中間鍵值對(duì)的位置會(huì)被通報(bào)給master,master負(fù)責(zé)將信息轉(zhuǎn)發(fā)給Reduce worker。
  5)master通知分配了Reduce作業(yè)的worker它負(fù)責(zé)的分區(qū)在什么位置(肯定不止一個(gè)地方,每個(gè)Map作業(yè)產(chǎn)生的中間鍵值對(duì)都可能映射到所有R個(gè)不同分區(qū)),當(dāng)Reduce worker把所有它負(fù)責(zé)的中間鍵值對(duì)都讀過(guò)來(lái)后,先對(duì)它們進(jìn)行排序,使得相同鍵的鍵值對(duì)聚集在一起。因?yàn)椴煌逆I可能會(huì)映射到同一個(gè)分區(qū)也就是同一個(gè)Reduce作業(yè),所以排序是必須的。
  6)reduce worker遍歷排序后的中間鍵值對(duì),對(duì)于每個(gè)唯一的鍵,都將鍵與關(guān)聯(lián)的值傳遞給reduce函數(shù),reduce函數(shù)產(chǎn)生的輸出會(huì)添加到這個(gè)分區(qū)的輸出文件中。
  7)當(dāng)所有的Map和Reduce作業(yè)都完成了,master喚醒正版的user program,MapReduce函數(shù)調(diào)用返回user program的代碼。
  8)所有執(zhí)行完畢后,MapReduce輸出放在了R個(gè)分區(qū)的輸出文件中(分別對(duì)應(yīng)一個(gè)Reduce作業(yè))。用戶通常并不需要合并這R個(gè)文件,而是將其作為輸入交給另一個(gè)MapReduce程序處理。整個(gè)過(guò)程中,輸入數(shù)據(jù)是來(lái)自底層分布式文件系統(tǒng)(GFS)的,中間數(shù)據(jù)是放在本地文件系統(tǒng)的,最終輸出數(shù)據(jù)是寫入底層分布式文件系統(tǒng)(GFFS)的。而且我們要注意Map/Reduce作業(yè)map/reduce函數(shù)的區(qū)別:Map作業(yè)處理一個(gè)輸入數(shù)據(jù)的分片,可能需要調(diào)用多次map函數(shù)來(lái)處理每個(gè)輸入鍵值對(duì);Reduce作業(yè)處理一個(gè)分區(qū)的中間鍵值對(duì),期間要對(duì)每個(gè)不同的鍵調(diào)用一次reduce函數(shù),Reduce作業(yè)最終也對(duì)應(yīng)一個(gè)輸出文件。

MapReduce并行處理的基本過(guò)程

三、MapRrduce輸入與輸出問(wèn)題

??Map/Reduce框架運(yùn)轉(zhuǎn)在<key, value>鍵值對(duì)上,也就是說(shuō),框架把作業(yè)的輸入看為是一組<key, value>鍵值對(duì),同樣也產(chǎn)出一組 <key, value>鍵值對(duì)做為作業(yè)的輸出,這兩組鍵值對(duì)的類型可能不同。
??框架需要對(duì)key和value的類(classes)進(jìn)行序列化操作,因此,這些類需要實(shí)現(xiàn)Writable接口。另外,為了方便框架執(zhí)行排序操作,key類必須實(shí)現(xiàn) WritableComparable接口。
??注意:不管是哪里的序列化,最主要的作用就是持久化存儲(chǔ)或者是用于網(wǎng)絡(luò)傳輸
??一個(gè)Map/Reduce作業(yè)的輸入和輸出類型如下所示:
??(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。
??其實(shí)在前面講解Hadoop IO的時(shí)候已經(jīng)知道了解了Writale接口:  
??Writable接口是一個(gè)實(shí)現(xiàn)了序列化協(xié)議的序列化對(duì)象。
??在Hadoop中定義一個(gè)結(jié)構(gòu)化對(duì)象都要實(shí)現(xiàn)Writable接口,使得該結(jié)構(gòu)化對(duì)象可以序列化為字節(jié)流,字節(jié)流也可以反序列化為結(jié)構(gòu)化對(duì)象。

四、MapReduce實(shí)際處理流程

??mapreduce 其實(shí)是分治算法的一種現(xiàn),所謂分治算法就是“就是分而治之 ,將大的問(wèn)題分解為相同類型的子問(wèn)題(最好具有相同的規(guī)模),對(duì)子問(wèn)題進(jìn)行求解,然后合并成大問(wèn)題的解。
  mapreduce就是分治法的一種,將輸入進(jìn)行分片,然后交給不同的task進(jìn)行處理,然后合并成最終的解。
  mapreduce實(shí)際的處理過(guò)程可以理解為:Input->Map->Sort->Combine->Partition->Reduce->Output。
  1)Input階段
    數(shù)據(jù)以一定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以設(shè)置,也可以自定義分片函數(shù)。
  2)map階段
    對(duì)輸入的(key,value)進(jìn)行處理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass進(jìn)行設(shè)置。
  3)Sort階段
    對(duì)于Mapper的輸出進(jìn)行排序,使用Job.setOutputKeyComparatorClass進(jìn)行設(shè)置,然后定義排序規(guī)則。
  4)Combine階段
    這個(gè)階段對(duì)于Sort之后又相同key的結(jié)果進(jìn)行合并,使用Job.setCombinerClass進(jìn)行設(shè)置,也可以自定義Combine Class類。
  5)Partition階段
    將Mapper的中間結(jié)果按照key的范圍劃分為R份(Reduce作業(yè)的個(gè)數(shù)),默認(rèn)使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定義劃分的函數(shù)。
    使用Job.setPartitionClass設(shè)置。
  6)Reduce階段
    對(duì)于Mapper階段的結(jié)果進(jìn)行進(jìn)一步處理,Job.setReducerClass進(jìn)行設(shè)置自定義的Reduce類。
  7)Output階段
    Reducer輸出數(shù)據(jù)的格式。

五、一個(gè)job的運(yùn)行流程

??一個(gè)mapreduce作業(yè)的執(zhí)行流程是:作業(yè)提交->作業(yè)初始化->任務(wù)分配->任務(wù)執(zhí)行->更新任務(wù)執(zhí)行進(jìn)度和狀態(tài)->作業(yè)完成。


一個(gè)job的運(yùn)行流程

一個(gè)完整的mapreduce作業(yè)流程,包括4個(gè)獨(dú)立的實(shí)體:
  客戶端:client,編寫mapreduce程序,配置作業(yè),提交作業(yè)。
  JobTracker:協(xié)調(diào)這個(gè)作業(yè)的運(yùn)行,分配作業(yè),初始化作業(yè),與TaskTracker進(jìn)行通信。
  TaskTracker:負(fù)責(zé)運(yùn)行作業(yè),保持與JobTracker進(jìn)行通信。
  HDFS:分布式文件系統(tǒng),保持作業(yè)的數(shù)據(jù)和結(jié)果。

  1. 提交作業(yè)
      JobClient使用runjob方法創(chuàng)建一個(gè)JobClient實(shí)例,然后調(diào)用submitJob()方法進(jìn)行作業(yè)的提交,提交作業(yè)的具體過(guò)程如下:
        1)通過(guò)調(diào)用JobTracker對(duì)象的getNewJobId()方法從JobTracker處獲得一個(gè)作業(yè)ID。
        2)檢查作業(yè)的相關(guān)路徑。如果輸出路徑存在,作業(yè)將不會(huì)被提交(保護(hù)上一個(gè)作業(yè)運(yùn)行結(jié)果)。
        3)計(jì)算作業(yè)的輸入分片,如果無(wú)法計(jì)算,例如輸入路徑不存在,作業(yè)將不被提交,錯(cuò)誤返回給mapreduce程序。
        4)將運(yùn)行作業(yè)所需資源(作業(yè)jar文件,配置文件和計(jì)算得到的分片)復(fù)制到HDFS上。
        5)告知JobTracker作業(yè)準(zhǔn)備執(zhí)行(使用JobTracker對(duì)象的submitJob()方法來(lái)真正提交作業(yè))。
  2. 作業(yè)初始化
      當(dāng)JobTracker收到Job提交的請(qǐng)求后,將Job保存在一個(gè)內(nèi)部隊(duì)列,并讓Job Scheduler(作業(yè)調(diào)度器)處理并初始化。初始化涉及到創(chuàng)建一個(gè)封裝了其tasks的job對(duì)象,并保持對(duì)task的狀態(tài)和進(jìn)度的跟蹤(step 5)。當(dāng)創(chuàng)建要運(yùn)行的一系列task對(duì)象后,Job Scheduler首先開始從文件系統(tǒng)中獲取由JobClient計(jì)算的input splits(step 6),然后再為每個(gè)split創(chuàng)建map task。
  3. 任務(wù)的分配
    ??TaskTracker和JobTracker之間的通信和任務(wù)分配是通過(guò)心跳機(jī)制完成的。TaskTracker作為一個(gè)單獨(dú)的JVM,它執(zhí)行一個(gè)簡(jiǎn)單的循環(huán),主要實(shí)現(xiàn)每隔一段時(shí)間向JobTracker
    ??發(fā)送心跳,告訴JobTracker此TaskTracker是否存活,是否準(zhǔn)備執(zhí)行新的任務(wù)。如果有待分配的任務(wù),它就會(huì)為TaskTracker分配一個(gè)任務(wù)。
  4. 任務(wù)的執(zhí)行
    ??TTaskTracker申請(qǐng)到新的任務(wù)之后,就要在本地運(yùn)行了。首先,是將任務(wù)本地化(包括運(yùn)行任務(wù)所需的數(shù)據(jù)、配置信息、代碼等),即從HDFS復(fù)制到本地。調(diào)用localizeJob()完成的。
    ??T對(duì)于使用Streaming和Pipes創(chuàng)建Map或者Reduce程序的任務(wù),Java會(huì)把key/value傳遞給外部進(jìn)程,然后通過(guò)用戶自定義的Map或者Reduce進(jìn)行處理,然后把key/value傳回到j(luò)ava中。
    ??T其中就好像是TaskTracker的子進(jìn)程在處理Map和Reduce代碼一樣。
  5. 更新任務(wù)的執(zhí)行進(jìn)度和狀態(tài)
    ??進(jìn)度和狀態(tài)是通過(guò)heartbeat(心跳機(jī)制)來(lái)更新和維護(hù)的。對(duì)于Map Task,進(jìn)度就是已處理數(shù)據(jù)和所有輸入數(shù)據(jù)的比例。對(duì)于Reduce Task,情況就有點(diǎn)復(fù)雜,包括3部分,拷貝中間結(jié)果文件、排序、reduce調(diào)用,每部分占1/3。
  6. 任務(wù)完成
    ??當(dāng)Job完成后,JobTracker會(huì)收一個(gè)Job Complete的通知,并將當(dāng)前的Job狀態(tài)更新為successful,同時(shí)JobClient也會(huì)輪循獲知提交的Job已經(jīng)完成,將信息顯示給用戶。
    ??最后,JobTracker會(huì)清理和回收該Job的相關(guān)資源,并通知TaskTracker進(jìn)行相同的操作(比如刪除中間結(jié)果文件)

六、MapTask并行度決定機(jī)制

maptask的并行度決定map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)job的處理速度。
那么,mapTask并行實(shí)例是否越多越好呢?其并行度又是如何決定呢?

  1. mapTask并行度的決定機(jī)制
    一個(gè)job的map階段并行度由客戶端在提交job時(shí)決定而客戶端對(duì)map階段并行度的規(guī)劃的基本邏輯為:
    將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split),然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理。
    默認(rèn)128M一塊。

2.FileInputFormat切片機(jī)制
1)FileInputFormat切片機(jī)制切片定義在InputFormat類中的getSplit()方法
  2)FileInputFormat中默認(rèn)的切片機(jī)制:
    簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片
    切片大小,默認(rèn)等于block大小
    切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片
  比如待處理數(shù)據(jù)有兩個(gè)文件:
    file1.txt 320M
    file2.txt 10M
  經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:
    file1.txt.split1-- 0~128
    file1.txt.split2-- 128~256
    file1.txt.split3-- 256~320
    file2.txt.split1-- 0~10M
  3)FileInputFormat中切片的大小的參數(shù)配置
    通過(guò)分析源碼,在FileInputFormat中,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個(gè)值來(lái)運(yùn)算決定
    minsize:默認(rèn)值:1
    配置參數(shù): mapreduce.input.fileinputformat.split.minsize
    maxsize:默認(rèn)值:Long.MAXValue
    配置參數(shù):mapreduce.input.fileinputformat.split.maxsize
    blocksize
  因此,默認(rèn)情況下,切片大小=blocksize
  maxsize(切片最大值):
  參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小,而且就等于配置的這個(gè)參數(shù)的值
  minsize (切片最小值):
  參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大
  選擇并發(fā)數(shù)的影響因素:
    運(yùn)算節(jié)點(diǎn)的硬件配置
    運(yùn)算任務(wù)的類型:CPU密集型還是IO密集型
    運(yùn)算任務(wù)的數(shù)據(jù)量

  1. ReduceTask并行度的決定
    reducetask的并行度同樣影響整個(gè)job的執(zhí)行并發(fā)度和執(zhí)行效率,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同,Reducetask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
  //默認(rèn)值是1,手動(dòng)設(shè)置為4 
  job.setNumReduceTasks(4);

如果數(shù)據(jù)分布不均勻,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜
  注意: reducetask數(shù)量并不是任意設(shè)置,還要考慮業(yè)務(wù)邏輯需求,有些情況下,需要計(jì)算全局匯總結(jié)果,就只能有1個(gè)reducetask
  盡量不要運(yùn)行太多的reduce task。對(duì)大多數(shù)job來(lái)說(shuō),最好rduce的個(gè)數(shù)最多和集群中的reduce持平,或者比集群的 reduce slots小。這個(gè)對(duì)于小集群而言,尤其重要。

  1. mapreduce的shuffle機(jī)制
    mapreduce中,map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是mapreduce框架中最關(guān)鍵的一個(gè)流程,這個(gè)流程就叫shuffle。
      shuffle: 洗牌、發(fā)牌——(核心機(jī)制:數(shù)據(jù)分區(qū),排序,緩存)。
      具體來(lái)說(shuō):就是將maptask輸出的處理結(jié)果數(shù)據(jù),分發(fā)給reducetask,并在分發(fā)的過(guò)程中,對(duì)數(shù)據(jù)按key進(jìn)行了分區(qū)和排序。
    分區(qū)partition(確定哪個(gè)數(shù)據(jù)進(jìn)入哪個(gè)reduce)
    Sort根據(jù)key排序

詳細(xì)流程  
    1、 maptask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中
    2、 從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件
    3、 多個(gè)溢出文件會(huì)被合并成大的溢出文件
    4、 在溢出過(guò)程中,及合并的過(guò)程中,都要調(diào)用partitoner進(jìn)行分組和針對(duì)key進(jìn)行排序
    5、 reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
    6、 reducetask會(huì)取到同一個(gè)分區(qū)的來(lái)自不同maptask的結(jié)果文件,reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)
    7、 合并成大文件后,shuffle的過(guò)程也就結(jié)束了,后面進(jìn)入reducetask的邏輯運(yùn)算過(guò)程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)group,調(diào)用用戶自定義的reduce()方法)
  Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率,原則上說(shuō),緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快
  緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整, 參數(shù):io.sort.mb 默認(rèn)100MCombiner進(jìn)行局部value的合并。

七、MapReduce與YARN

Yarn是一個(gè)資源調(diào)度平臺(tái),負(fù)責(zé)為運(yùn)算程序提供服務(wù)器運(yùn)算資源,相當(dāng)于一個(gè)分布式的操作系統(tǒng)平臺(tái),而mapreduce等運(yùn)算程序則相當(dāng)于運(yùn)行于操作系統(tǒng)之上的應(yīng)用程序。

YARN中的重要概念
  1) yarn并不清楚用戶提交的程序的運(yùn)行機(jī)制
  2) yarn只提供運(yùn)算資源的調(diào)度(用戶程序向yarn申請(qǐng)資源,yarn就負(fù)責(zé)分配資源)
  3) yarn中的主管角色叫ResourceManager
  4) yarn中具體提供運(yùn)算資源的角色叫NodeManager
  5) 這樣一來(lái),yarn其實(shí)就與運(yùn)行的用戶程序完全解耦,就意味著yarn上可以運(yùn)行各種類型的分布式運(yùn)算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
  6) 所以,spark、storm等運(yùn)算框架都可以整合在yarn上運(yùn)行,只要他們各自的框架中有符合yarn規(guī)范的資源請(qǐng)求機(jī)制即可
  7) Yarn就成為一個(gè)通用的資源調(diào)度平臺(tái),從此,企業(yè)中以前存在的各種運(yùn)算集群都可以整合在一個(gè)物理集群上,提高資源利用率,方便數(shù)據(jù)共享。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • MapReduce框架結(jié)構(gòu)## MapReduce是一個(gè)用于大規(guī)模數(shù)據(jù)處理的分布式計(jì)算模型MapReduce模型主...
    Bloo_m閱讀 3,947評(píng)論 0 4
  • 一、MapReduce應(yīng)用場(chǎng)景 Hadoop的Mapreduce是一個(gè)使用簡(jiǎn)單的框架,基于它寫出來(lái)的程序可以運(yùn)行在...
    老實(shí)李閱讀 1,506評(píng)論 0 9
  • MapReduce工作流程 流程圖如下 解釋上面的流程是整個(gè)mapreduce最全工作流程,但是shuffle過(guò)程...
    ZFH__ZJ閱讀 617評(píng)論 0 3
  • 數(shù)據(jù)切片和MapTask并行度決定機(jī)制 1)一個(gè)Job的Map階段并行度由客戶端在提交Job時(shí)的切片數(shù)決定 2)每...
    bullion閱讀 834評(píng)論 0 1
  • 盼望著長(zhǎng)大成熟的我,在某一刻想要回到二十歲,就好像我還有幻想的機(jī)會(huì)一樣 看芝加哥打字機(jī)劉亞仁有感
    singler碎碎念閱讀 164評(píng)論 0 0

友情鏈接更多精彩內(nèi)容