前言
相對來說,MapReduce是一個款比較 “古老” 的大數(shù)據(jù)離線計算框架,但該框架對批量數(shù)據(jù)離線計算的思想仍值得借鑒!
在處理過程中需要把mapper階段的數(shù)據(jù)傳遞給reducer階段,這個過程可以廣義地稱為Shuffle,是 MapReduce 框架中最關(guān)鍵的一個流程。
采用圖解的方式進(jìn)行表達(dá)可以降低理解難度
Shuffle
使用自頂向下的方式進(jìn)行理解Shuffle流程。
過程總覽
Shuffle流程橫跨了mapper階段和reducer階段,在mapper階段包括Spill過程,在reducer階段包括Copy過程和Sort過程,如圖所示:

mapper階段的Spill
這個過程包括輸出(collect)、排序(sort)、溢寫(spill)、合并(merge)
collect
Map任務(wù)不斷地以<k,v>對的形式把數(shù)據(jù)輸出到一個存在于內(nèi)存中的環(huán)形數(shù)據(jù)結(jié)構(gòu)中。使用環(huán)形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間。這個數(shù)據(jù)結(jié)構(gòu)其實就是個字節(jié)數(shù)組,叫kvbuffer。這里不僅用來存放數(shù)據(jù),還有了一些索引數(shù)據(jù),放置索引數(shù)據(jù)的區(qū)域叫kvmeta。數(shù)據(jù)區(qū)域和索引數(shù)據(jù)區(qū)域在kvbuffer中是相鄰不重疊的兩個區(qū)域,用一個分界點來劃分兩者,分界點是會動態(tài)變化的,每次溢寫(spill)之后都會變化一次。初始的分界點是0,數(shù)據(jù)的存儲方向是向上增長,索引數(shù)據(jù)的存儲方向是向下增長。

sort
把kvbuffer中的數(shù)據(jù)按照partition值和key兩個關(guān)鍵字升序排序,移動的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。
spill
Spill線程為這次spill過程創(chuàng)建一個磁盤文件: 創(chuàng)建一個類似于“spill13.out”的文件。Spill線程根據(jù)排過序的kvmeta逐個把partition中的數(shù)據(jù)刷寫到這個文件中,一個partition對應(yīng)的數(shù)據(jù)刷寫完之后順序地刷寫下個partition,直到把所有的partition遍歷完。一個partition在文件中對應(yīng)的數(shù)據(jù)也叫段(segment)。
但問題來了
所有的partition對應(yīng)的數(shù)據(jù)都放在這個文件里,雖然是順序存放的,但是怎么直接知道某個partition的數(shù)據(jù)在這個文件中存放的起始位置呢?
答案:利用索引
有一個三元組記錄某個partition對應(yīng)的數(shù)據(jù)在這個文件中的索引:起始位置、原始數(shù)據(jù)長度、壓縮之后的數(shù)據(jù)長度。一個partition對應(yīng)一個三元組。然后把這些索引信息存放在內(nèi)存中,如果內(nèi)存中放不下了,后續(xù)的索引信息就需要寫到磁盤文件中了:創(chuàng)建一個類似于“spill13.index”的文件,存儲了索引數(shù)據(jù),(不一定在磁盤上創(chuàng)建,如果內(nèi)存(默認(rèn)1M空間)中能放得下就放在內(nèi)存中,即使在磁盤上創(chuàng)建了,和spill13.out文件也不一定在同一個目錄下。)
每一次Spill過程就會最少生成一個 *.out文件,有時還會生成 *.index文件。
索引文件和數(shù)據(jù)文件的對應(yīng)關(guān)系如下圖所示:

在Spill線程進(jìn)行SortAndSpill工作的同時,Map任務(wù)會繼續(xù)進(jìn)行數(shù)據(jù)的輸出。Map還是把數(shù)據(jù)寫到kvbuffer中,在兩個指針即將重合時,在kvbuffer中剩余空間的中間位置,用這個位置設(shè)置為新的分界點,bufindex指針移動到這個分界點,kvindex移動到這個分界點的-16位置,然后兩者就可以和諧地按照自己既定的軌跡放置數(shù)據(jù)了,當(dāng)溢寫完成后,空間騰出之后,不需要做任何改動繼續(xù)前進(jìn)。分界點的轉(zhuǎn)換如下圖所示:

變換方向,繼續(xù)~

merge
Map任務(wù)如果輸出數(shù)據(jù)量很大,可能會進(jìn)行好幾次溢寫,out文件和Index文件會產(chǎn)生很多,分布在不同的磁盤上。最后是merge過程把這些文件合并。merge過程創(chuàng)建一個叫file.out的文件和一個叫file.out.Index的文件用來存儲最終的輸出和索引。
逐個partition進(jìn)行合并輸出。對于某個partition來說,從spillXX.index索引列表中查詢這個partition對應(yīng)的所有索引信息,每個對應(yīng)一個段插入到段列表中。也就是這個partition對應(yīng)一個段列表,記錄所有的Spill文件中對應(yīng)的這個partition那段數(shù)據(jù)的文件名、起始位置、長度等等。
然后對這個partition對應(yīng)的所有的段進(jìn)行合并,目標(biāo)是合并成一個segment列表。當(dāng)這個partition對應(yīng)很多個segment時,會分批地進(jìn)行合并:先從segment列表中把第一批取出來,以key為關(guān)鍵字放置成最小堆,然后從最小堆中每次取出最小的輸出到一個臨時文件中,這樣就把這一批段合并成一個臨時的段,把它加回segment列表中;再從segment列表中把第二批取出來合并輸出到一個臨時segment,把其加入到列表中;這樣往復(fù)執(zhí)行,直到剩下的段是一批,輸出到最終的文件中。
最終的索引數(shù)據(jù)仍然輸出到Index文件中。

Map端的Shuffle過程到此結(jié)束。
reducer階段的Copy和Sort
copy
Reduce任務(wù)拖取某個Map對應(yīng)的數(shù)據(jù),如果在內(nèi)存中能放得下這次數(shù)據(jù)的話就直接把數(shù)據(jù)寫到內(nèi)存中。Reduce要向每個Map去拖取數(shù)據(jù),在內(nèi)存中每個Map對應(yīng)一塊數(shù)據(jù),當(dāng)內(nèi)存中存儲的Map數(shù)據(jù)占用空間達(dá)到一定程度的時候,開始啟動內(nèi)存中merge,把內(nèi)存中的數(shù)據(jù)merge輸出到磁盤上一個文件中。
有些Map的數(shù)據(jù)較小是可以放在內(nèi)存中的,有些Map的數(shù)據(jù)較大需要放在磁盤上,這樣最后Reduce任務(wù)拖過來的數(shù)據(jù)有些放在內(nèi)存中了有些放在磁盤上,最后會對這些來一個全局合并。
merge sort
這里使用的Merge和Map端使用的Merge過程一樣。Map的輸出數(shù)據(jù)已經(jīng)是有序的,Merge進(jìn)行一次合并排序,所謂Reduce端的sort過程就是這個合并的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的,迭代進(jìn)行的。
Reduce端的Shuffle過程至此結(jié)束。