Hadoop之MapReduce中的shuffle詳解

目錄

正文

概述

1、MapReduce 中,mapper 階段處理的數(shù)據(jù)如何傳遞給 reducer 階段,是 MapReduce 框架中 最關(guān)鍵的一個(gè)流程,這個(gè)流程就叫 Shuffle

2、Shuffle: 數(shù)據(jù)混洗 ——(核心機(jī)制:數(shù)據(jù)分區(qū),排序,局部聚合,緩存,拉取,再合并 排序)

3、具體來說:就是將 MapTask 輸出的處理結(jié)果數(shù)據(jù),按照 Partitioner 組件制定的規(guī)則分發(fā) 給 ReduceTask,并在分發(fā)的過程中,對(duì)數(shù)據(jù)按 key 進(jìn)行了分區(qū)和排序

MapReduce的Shuffle過程介紹

Shuffle的本義是洗牌、混洗,把一組有一定規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組無規(guī)則的數(shù)據(jù),越隨機(jī)越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組具有一定規(guī)則的數(shù)據(jù)。

為什么MapReduce計(jì)算模型需要Shuffle過程?我們都知道MapReduce計(jì)算模型一般包括兩個(gè)重要的階段:Map是映射,負(fù)責(zé)數(shù)據(jù)的過濾分發(fā);Reduce是規(guī)約,負(fù)責(zé)數(shù)據(jù)的計(jì)算歸并。Reduce的數(shù)據(jù)來源于Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來獲取數(shù)據(jù)。

從Map輸出到Reduce輸入的整個(gè)過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:

image

Spill過程

Spill過程包括輸出、排序、溢寫、合并等步驟,如圖所示:

image

Collect

每個(gè)Map任務(wù)不斷地以對(duì)的形式把數(shù)據(jù)輸出到在內(nèi)存中構(gòu)造的一個(gè)環(huán)形數(shù)據(jù)結(jié)構(gòu)中。使用環(huán)形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間,在內(nèi)存中放置盡可能多的數(shù)據(jù)。

這個(gè)數(shù)據(jù)結(jié)構(gòu)其實(shí)就是個(gè)字節(jié)數(shù)組,叫Kvbuffer,名如其義,但是這里面不光放置了數(shù)據(jù),還放置了一些索引數(shù)據(jù),給放置索引數(shù)據(jù)的區(qū)域起了一個(gè)Kvmeta的別名,在Kvbuffer的一塊區(qū)域上穿了一個(gè)IntBuffer(字節(jié)序采用的是平臺(tái)自身的字節(jié)序)的馬甲。數(shù)據(jù)區(qū)域和索引數(shù)據(jù)區(qū)域在Kvbuffer中是相鄰不重疊的兩個(gè)區(qū)域,用一個(gè)分界點(diǎn)來劃分兩者,分界點(diǎn)不是亙古不變的,而是每次Spill之后都會(huì)更新一次。初始的分界點(diǎn)是0,數(shù)據(jù)的存儲(chǔ)方向是向上增長(zhǎng),索引數(shù)據(jù)的存儲(chǔ)方向是向下增長(zhǎng),如圖所示:

image

Kvbuffer的存放指針bufindex是一直悶著頭地向上增長(zhǎng),比如bufindex初始值為0,一個(gè)Int型的key寫完之后,bufindex增長(zhǎng)為4,一個(gè)Int型的value寫完之后,bufindex增長(zhǎng)為8。

索引是對(duì)在kvbuffer中的索引,是個(gè)四元組,包括:value的起始位置、key的起始位置、partition值、value的長(zhǎng)度,占用四個(gè)Int長(zhǎng)度,Kvmeta的存放指針Kvindex每次都是向下跳四個(gè)“格子”,然后再向上一個(gè)格子一個(gè)格子地填充四元組的數(shù)據(jù)。比如Kvindex初始位置是-4,當(dāng)?shù)谝粋€(gè)寫完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長(zhǎng)度,然后Kvindex跳到-8位置,等第二個(gè)和索引寫完之后,Kvindex跳到-32位置。

Kvbuffer的大小雖然可以通過參數(shù)設(shè)置,但是總共就那么大,和索引不斷地增加,加著加著,Kvbuffer總有不夠用的那天,那怎么辦?把數(shù)據(jù)從內(nèi)存刷到磁盤上再接著往內(nèi)存寫數(shù)據(jù),把Kvbuffer中的數(shù)據(jù)刷到磁盤上的過程就叫Spill,多么明了的叫法,內(nèi)存中的數(shù)據(jù)滿了就自動(dòng)地spill到具有更大空間的磁盤。

關(guān)于Spill觸發(fā)的條件,也就是Kvbuffer用到什么程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點(diǎn)縫都不剩的時(shí)候再開始Spill,那Map任務(wù)就需要等Spill完成騰出空間之后才能繼續(xù)寫數(shù)據(jù);如果Kvbuffer只是滿到一定程度,比如80%的時(shí)候就開始Spill,那在Spill的同時(shí),Map任務(wù)還能繼續(xù)寫數(shù)據(jù),如果Spill夠快,Map可能都不需要為空閑空間而發(fā)愁。兩利相衡取其大,一般選擇后者。

Spill這個(gè)重要的過程是由Spill線程承擔(dān),Spill線程從Map任務(wù)接到“命令”之后就開始正式干活,干的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個(gè)頗具爭(zhēng)議性的Sort。

Sort

先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個(gè)關(guān)鍵字升序排序,移動(dòng)的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起,同一partition內(nèi)的按照key有序。

Spill

Spill線程為這次Spill過程創(chuàng)建一個(gè)磁盤文件:從所有的本地目錄中輪訓(xùn)查找能存儲(chǔ)這么大空間的目錄,找到之后在其中創(chuàng)建一個(gè)類似于“spill12.out”的文件。Spill線程根據(jù)排過序的Kvmeta挨個(gè)partition的把數(shù)據(jù)吐到這個(gè)文件中,一個(gè)partition對(duì)應(yīng)的數(shù)據(jù)吐完之后順序地吐下個(gè)partition,直到把所有的partition遍歷完。一個(gè)partition在文件中對(duì)應(yīng)的數(shù)據(jù)也叫段(segment)。

所有的partition對(duì)應(yīng)的數(shù)據(jù)都放在這個(gè)文件里,雖然是順序存放的,但是怎么直接知道某個(gè)partition在這個(gè)文件中存放的起始位置呢?強(qiáng)大的索引又出場(chǎng)了。有一個(gè)三元組記錄某個(gè)partition對(duì)應(yīng)的數(shù)據(jù)在這個(gè)文件中的索引:起始位置、原始數(shù)據(jù)長(zhǎng)度、壓縮之后的數(shù)據(jù)長(zhǎng)度,一個(gè)partition對(duì)應(yīng)一個(gè)三元組。然后把這些索引信息存放在內(nèi)存中,如果內(nèi)存中放不下了,后續(xù)的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪訓(xùn)查找能存儲(chǔ)這么大空間的目錄,找到之后在其中創(chuàng)建一個(gè)類似于“spill12.out.index”的文件,文件中不光存儲(chǔ)了索引數(shù)據(jù),還存儲(chǔ)了crc32的校驗(yàn)數(shù)據(jù)。(spill12.out.index不一定在磁盤上創(chuàng)建,如果內(nèi)存(默認(rèn)1M空間)中能放得下就放在內(nèi)存中,即使在磁盤上創(chuàng)建了,和spill12.out文件也不一定在同一個(gè)目錄下。)

每一次Spill過程就會(huì)最少生成一個(gè)out文件,有時(shí)還會(huì)生成index文件,Spill的次數(shù)也烙印在文件名中。索引文件和數(shù)據(jù)文件的對(duì)應(yīng)關(guān)系如下圖所示:

image

在Spill線程如火如荼的進(jìn)行SortAndSpill工作的同時(shí),Map任務(wù)不會(huì)因此而停歇,而是一無既往地進(jìn)行著數(shù)據(jù)輸出。Map還是把數(shù)據(jù)寫到kvbuffer中,那問題就來了:只顧著悶頭按照bufindex指針向上增長(zhǎng),kvmeta只顧著按照Kvindex向下增長(zhǎng),是保持指針起始位置不變繼續(xù)跑呢,還是另謀它路?如果保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之后再重新開始或者移動(dòng)內(nèi)存都比較麻煩,不可取。Map取kvbuffer中剩余空間的中間位置,用這個(gè)位置設(shè)置為新的分界點(diǎn),bufindex指針移動(dòng)到這個(gè)分界點(diǎn),Kvindex移動(dòng)到這個(gè)分界點(diǎn)的-16位置,然后兩者就可以和諧地按照自己既定的軌跡放置數(shù)據(jù)了,當(dāng)Spill完成,空間騰出之后,不需要做任何改動(dòng)繼續(xù)前進(jìn)。分界點(diǎn)的轉(zhuǎn)換如下圖所示:

image

Map任務(wù)總要把輸出的數(shù)據(jù)寫到磁盤上,即使輸出數(shù)據(jù)量很小在內(nèi)存中全部能裝得下,在最后也會(huì)把數(shù)據(jù)刷到磁盤上。

Merge

Map任務(wù)如果輸出數(shù)據(jù)量很大,可能會(huì)進(jìn)行好幾次Spill,out文件和Index文件會(huì)產(chǎn)生很多,分布在不同的磁盤上。最后把這些文件進(jìn)行合并的merge過程閃亮登場(chǎng)。

Merge過程怎么知道產(chǎn)生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產(chǎn)生的Spill文件,然后把路徑存儲(chǔ)在一個(gè)數(shù)組里。Merge過程又怎么知道Spill的索引信息呢?沒錯(cuò),也是從所有的本地目錄上掃描得到Index文件,然后把索引信息存儲(chǔ)在一個(gè)列表里。到這里,又遇到了一個(gè)值得納悶的地方。在之前Spill過程中的時(shí)候?yàn)槭裁床恢苯影堰@些信息存儲(chǔ)在內(nèi)存中呢,何必又多了這步掃描的操作?特別是Spill的索引數(shù)據(jù),之前當(dāng)內(nèi)存超限之后就把數(shù)據(jù)寫到磁盤,現(xiàn)在又要從磁盤把這些數(shù)據(jù)讀出來,還是需要裝到更多的內(nèi)存中。之所以多此一舉,是因?yàn)檫@時(shí)kvbuffer這個(gè)內(nèi)存大戶已經(jīng)不再使用可以回收,有內(nèi)存空間來裝這些數(shù)據(jù)了。(對(duì)于內(nèi)存空間較大的土豪來說,用內(nèi)存來省卻這兩個(gè)io步驟還是值得考慮的。)

然后為merge過程創(chuàng)建一個(gè)叫file.out的文件和一個(gè)叫file.out.Index的文件用來存儲(chǔ)最終的輸出和索引。

一個(gè)partition一個(gè)partition的進(jìn)行合并輸出。對(duì)于某個(gè)partition來說,從索引列表中查詢這個(gè)partition對(duì)應(yīng)的所有索引信息,每個(gè)對(duì)應(yīng)一個(gè)段插入到段列表中。也就是這個(gè)partition對(duì)應(yīng)一個(gè)段列表,記錄所有的Spill文件中對(duì)應(yīng)的這個(gè)partition那段數(shù)據(jù)的文件名、起始位置、長(zhǎng)度等等。

然后對(duì)這個(gè)partition對(duì)應(yīng)的所有的segment進(jìn)行合并,目標(biāo)是合并成一個(gè)segment。當(dāng)這個(gè)partition對(duì)應(yīng)很多個(gè)segment時(shí),會(huì)分批地進(jìn)行合并:先從segment列表中把第一批取出來,以key為關(guān)鍵字放置成最小堆,然后從最小堆中每次取出最小的輸出到一個(gè)臨時(shí)文件中,這樣就把這一批段合并成一個(gè)臨時(shí)的段,把它加回到segment列表中;再從segment列表中把第二批取出來合并輸出到一個(gè)臨時(shí)segment,把其加入到列表中;這樣往復(fù)執(zhí)行,直到剩下的段是一批,輸出到最終的文件中。

最終的索引數(shù)據(jù)仍然輸出到Index文件中。

image

Map端的Shuffle過程到此結(jié)束。

Copy

Reduce任務(wù)通過HTTP向各個(gè)Map任務(wù)拖取它所需要的數(shù)據(jù)。每個(gè)節(jié)點(diǎn)都會(huì)啟動(dòng)一個(gè)常駐的HTTP server,其中一項(xiàng)服務(wù)就是響應(yīng)Reduce拖取Map數(shù)據(jù)。當(dāng)有MapOutput的HTTP請(qǐng)求過來的時(shí)候,HTTP server就讀取相應(yīng)的Map輸出文件中對(duì)應(yīng)這個(gè)Reduce部分的數(shù)據(jù)通過網(wǎng)絡(luò)流輸出給Reduce。

Reduce任務(wù)拖取某個(gè)Map對(duì)應(yīng)的數(shù)據(jù),如果在內(nèi)存中能放得下這次數(shù)據(jù)的話就直接把數(shù)據(jù)寫到內(nèi)存中。Reduce要向每個(gè)Map去拖取數(shù)據(jù),在內(nèi)存中每個(gè)Map對(duì)應(yīng)一塊數(shù)據(jù),當(dāng)內(nèi)存中存儲(chǔ)的Map數(shù)據(jù)占用空間達(dá)到一定程度的時(shí)候,開始啟動(dòng)內(nèi)存中merge,把內(nèi)存中的數(shù)據(jù)merge輸出到磁盤上一個(gè)文件中。

如果在內(nèi)存中不能放得下這個(gè)Map的數(shù)據(jù)的話,直接把Map數(shù)據(jù)寫到磁盤上,在本地目錄創(chuàng)建一個(gè)文件,從HTTP流中讀取數(shù)據(jù)然后寫到磁盤,使用的緩存區(qū)大小是64K。拖一個(gè)Map數(shù)據(jù)過來就會(huì)創(chuàng)建一個(gè)文件,當(dāng)文件數(shù)量達(dá)到一定閾值時(shí),開始啟動(dòng)磁盤文件merge,把這些文件合并輸出到一個(gè)文件。

有些Map的數(shù)據(jù)較小是可以放在內(nèi)存中的,有些Map的數(shù)據(jù)較大需要放在磁盤上,這樣最后Reduce任務(wù)拖過來的數(shù)據(jù)有些放在內(nèi)存中了有些放在磁盤上,最后會(huì)對(duì)這些來一個(gè)全局合并。

Merge Sort

這里使用的Merge和Map端使用的Merge過程一樣。Map的輸出數(shù)據(jù)已經(jīng)是有序的,Merge進(jìn)行一次合并排序,所謂Reduce端的sort過程就是這個(gè)合并的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個(gè)階段是重疊而不是完全分開的。

Reduce端的Shuffle過程至此結(jié)束。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容