MapReduce是一個(gè)分布式并行計(jì)算框架。在MapReduce中,一個(gè)存儲(chǔ)在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集會(huì)被切分成許多獨(dú)立的小數(shù)據(jù)塊,這些小數(shù)據(jù)塊可以被多個(gè)Map任務(wù)并行處理。MapReduce框架會(huì)為每個(gè)Map任務(wù)輸入一個(gè)數(shù)據(jù)子集,Map任務(wù)生成的結(jié)果會(huì)繼續(xù)作為Reduce任務(wù)的輸入,最終由Reduce任務(wù)輸出最后結(jié)果,并寫入分布式文件系統(tǒng)。
MapReduce模型的核心是Map函數(shù)和Reduce函數(shù)。函數(shù)都是以作為輸入,按一定的映射規(guī)則轉(zhuǎn)換成另一個(gè)或一批輸出。Map函數(shù)的輸入來自于分布式文件系統(tǒng)的文件塊,其格式是任意的,可以是文檔、也可以是二進(jìn)制格式。Reduce函數(shù)的輸入是一系列具有相同鍵、以某種方式組合的值,并也以鍵值對(duì)的形式輸出到文件。
MapReduce工作流程
MapReduce的核心思想可以用“分而治之”描述,也就是把一個(gè)大的數(shù)據(jù)集拆分成多個(gè)小數(shù)據(jù)塊在多臺(tái)機(jī)器上并行梳理。在MapReduce執(zhí)行過程中,Map任務(wù)的輸入文件、Reduce任務(wù)的處理結(jié)果都是保存在分布式文件系統(tǒng),而Map任務(wù)的中間結(jié)果則保存在本地存儲(chǔ)(比如磁盤),Spark則保存在內(nèi)存。
注:不同Map任務(wù)間不會(huì)進(jìn)行通信,不同Reduce任務(wù)間也不會(huì)發(fā)生信息交換,機(jī)器之間的任務(wù)交換由MapReduce框架實(shí)現(xiàn),不需要用戶定義實(shí)現(xiàn)。

shuffle詳解
shuffle是指對(duì)Map輸出結(jié)果進(jìn)行分區(qū)(partition)、排序(sort)、合并(combine)、歸并(merge)等處理并交給Reeduce的過程,此過程分為Map端的操作和Reduce端的操作,是MapReduce的核心環(huán)節(jié)。

Map端的shuffle過程
Map的輸出結(jié)果首先被寫入緩存,緩存滿時(shí)就啟動(dòng)溢寫操作,把緩存數(shù)據(jù)寫入磁盤文件,并清空緩存。啟動(dòng)溢寫操作時(shí),首先把緩存數(shù)據(jù)進(jìn)行分區(qū),然后對(duì)每個(gè)分區(qū)數(shù)據(jù)進(jìn)行排序、合并,在寫入磁盤。每次溢寫會(huì)生成一個(gè)新磁盤文件,隨著Map任務(wù)的執(zhí)行,磁盤會(huì)生成多個(gè)溢寫文件,所以在Map任務(wù)全部結(jié)束前,這些溢寫文件會(huì)歸并成一個(gè)大的磁盤文件,然后通知相應(yīng)的Reduce任務(wù)來領(lǐng)取屬于自己處理的數(shù)據(jù)。
4個(gè)步驟:
(1)輸入數(shù)據(jù)和執(zhí)行Map任務(wù)。
?? ??? ?輸入數(shù)據(jù)一般為保存在分布式文件系統(tǒng)的文件塊(任意格式,文件或二進(jìn)制),以<key,value>作為輸入,按一定一定映射規(guī)則轉(zhuǎn)換成一批<key,value>輸出。
(2)寫入緩存。
?? ??? ?每個(gè)Map任務(wù)分配一個(gè)緩存,減少多次磁盤尋址開銷,從而減少磁盤I/O。
(3)溢寫(分區(qū)、排序、合并)
緩存數(shù)據(jù)形式,緩存容量有限,所以會(huì)啟動(dòng)溢寫。溢寫到磁盤前,緩存數(shù)據(jù)會(huì)先被分區(qū),默認(rèn)分區(qū)方式:采用Hash函數(shù)對(duì)key進(jìn)行hash,再用Reduce任務(wù)的數(shù)量取模,表示為hash(key) mod R,R即是Reduce任務(wù)數(shù)量。這樣就把Map輸出結(jié)果均勻分配到R個(gè)Reduce任務(wù)并行處理。分區(qū)后,對(duì)每個(gè)分區(qū)數(shù)據(jù)根據(jù)key進(jìn)行排序,以及可選的合并操作。合并,是指將相同key的的value加起來,比如<"xmu",1>和<"xmu",2>經(jīng)過合并操作后得到<"xmu",3>,一般累加、最大值場景可以使用合并操作。
?? ??? ?每次溢寫操作后在磁盤新生成一個(gè)溢寫文件,并且所有鍵值對(duì)都是經(jīng)過分區(qū)、排序的。
(4)文件歸并 <k,v>—><k,v-list>
?? ??? ?歸并是指將相同key的鍵值歸并成一個(gè)新的鍵值對(duì)。<k1, v1>,<k1, v3>......<k1, vn>被歸并成<k1,<v1,v2......vn>>。
Reduce端的shuffle過程
Reduce任務(wù)從Map端的不同Map機(jī)器領(lǐng)回屬于自己處理的數(shù)據(jù),然后對(duì)數(shù)據(jù)進(jìn)行歸并后提交給Reduce處理。
3個(gè)步驟:
(1)領(lǐng)取數(shù)據(jù)
?? ??? ?Map端shuffle結(jié)束后,所有輸出結(jié)果保存在Map機(jī)器的本地磁盤,Reduce任務(wù)會(huì)把(不是這一步)這些數(shù)據(jù)領(lǐng)取(fetch)回來放到自己所在機(jī)器的本地磁盤。
(2)歸并數(shù)據(jù)
?? ??? ?領(lǐng)回的數(shù)據(jù)首先放在本機(jī)緩存,一樣會(huì)進(jìn)行溢寫操作,因?yàn)闀?huì)從多個(gè)Map機(jī)器領(lǐng)取數(shù)據(jù),因此會(huì)進(jìn)行歸并,再合并數(shù)據(jù)。因此當(dāng)所有Map端數(shù)據(jù)被領(lǐng)回,多個(gè)溢寫文件會(huì)被歸并成一個(gè)大文件(同樣會(huì)對(duì)鍵值對(duì)排序)。
(3)數(shù)據(jù)輸入給Reduce任務(wù)
?? ??? ?多輪歸并得到若干大文件,直接將這些文件輸入給Reduce任務(wù)。不再繼續(xù)歸并目的是減少磁盤讀寫開銷。
