Hadoop Mapreduce shuffle 過程詳解

轉(zhuǎn)載自:https://my.oschina.net/u/2293326/blog/607540 歡迎閱讀原創(chuàng)。

image.png

Map 端導(dǎo)讀 :

1. map buffer

當(dāng)map task開始運算,并產(chǎn)生中間數(shù)據(jù)時,其產(chǎn)生的中間結(jié)果并非直接就簡單的寫入磁盤。

這中間的過程比較復(fù)雜,并且利用到了內(nèi)存buffer來進行已經(jīng)產(chǎn)生的部分結(jié)果的緩存,并在內(nèi)存buffer中進行一些預(yù)排序來優(yōu)化整個map的性能。

如上圖所示,每一個map都會對應(yīng)存在一個內(nèi)存buffer(MapOutputBuffer,即上圖的buffer in memory),map會將已經(jīng)產(chǎn)生的部分結(jié)果先寫入到該buffer中,這個buffer默認是100MB大小,但是這個大小是可以根據(jù)job提交時的參數(shù)設(shè)定來調(diào)整的.

該參數(shù)即為:mapreduce.task.io.sort.mb

當(dāng)map的產(chǎn)生數(shù)據(jù)非常大時,并且把mapreduce.task.io.sort.mb調(diào)大,那么map在整個計算過程中spill的次數(shù)就勢必會降低,map task對磁盤的操作就會變少.

2. map spill size

map在運行過程中,不停的向該buffer中寫入已有的計算結(jié)果,但是該buffer并不一定能將全部的map輸出緩存下來,當(dāng)map輸出超出一定閾值(比如100M),那么map就必須將該buffer中的數(shù)據(jù)寫入到磁盤中去,這個過程在mapreduce中叫做spill。

map并不是要等到將該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的情況。所以,map其實是當(dāng)buffer被寫滿到一定程度(比如80%)時,就開始進行spill。

這個閾值也是由一個job的配置參數(shù)來控制,即mapreduce.map.sort.spill.percent,默認為0.80或80%

這個參數(shù)同樣也是影響spill頻繁程度,進而影響map task運行周期對磁盤的讀寫頻率的。但非特殊情況下,通常不需要人為的調(diào)整。調(diào)整mapreduce.task.io.sort.mb對用戶來說更加方便。

3. map spill file merge

當(dāng) map task 的計算部分全部完成后,如果map有輸出,就會生成一個或者多個spill文件,這些文件就是map的輸出結(jié)果。

map在正常退出之前(cleanup),需要將這些spill合并(merge)成一個,所以map在結(jié)束之前還有一個merge的過程。

merge的過程中,有一個參數(shù)可以調(diào)整這個過程的行為,該參數(shù)為:mapreduce.task.io.sort.factor。該參數(shù)默認為10。

它表示當(dāng)merge spill文件時,最多能有多少并行的stream向merge文件中寫入。比如如果map產(chǎn)生的數(shù)據(jù)非常的大,產(chǎn)生的spill文件大于10,而mapreduce.task.io.sort.factor使用的是默認的10,那么當(dāng)map計算完成做merge時,就沒有辦法一次將所有的spill文件merge成一個,而是會分多次,每次最多10個stream。

這也就是說,當(dāng)map的中間結(jié)果非常大,調(diào)大mapreduce.task.io.sort.factor,有利于減少merge次數(shù),進而減少map對磁盤的讀寫頻率,有可能達到優(yōu)化作業(yè)的目的。

4. map combiner

當(dāng)job指定了combiner的時候,我們都知道m(xù)ap結(jié)束后會在map端根據(jù)combiner定義的函數(shù)將map結(jié)果進行合并。

運行combiner函數(shù)的時機有可能會是merge完成之前,或者之后,這個時機可以由一個參數(shù)控制,即mapreduce.map.combine.minspills(default 3)。

當(dāng)job中設(shè)定了combiner,并且spill數(shù)大于等于3的時候,那么combiner函數(shù)就會在merge產(chǎn)生結(jié)果文件之前運行。通過這樣的方式,就可以在spill非常多需要merge,并且很多數(shù)據(jù)需要做conbine的時候,減少寫入到磁盤文件的數(shù)據(jù)數(shù)量,同樣是為了減少對磁盤的讀寫頻率,有可能達到優(yōu)化作業(yè)的目的。

5. map output compress

減少中間結(jié)果讀寫進出磁盤的方法不止這些,還有就是壓縮。

也就是說map的中間,無論是spill的時候,還是最后merge產(chǎn)生的結(jié)果文件,都是可以壓縮的。壓縮的好處在于,通過壓縮減少寫入讀出磁盤的數(shù)據(jù)量。

對中間結(jié)果非常大,磁盤速度成為map執(zhí)行瓶頸的job,尤其有用。控制map中間結(jié)果是否使用壓縮的參數(shù)為:mapreduce.map.output.compress(true/false)。

將這個參數(shù)設(shè)置為true時,那么map在寫中間結(jié)果時,就會將數(shù)據(jù)壓縮后再寫入磁盤,讀結(jié)果時也會采用先解壓后讀取數(shù)據(jù)。

這樣做的后果就是:寫入磁盤的 中間結(jié)果數(shù)據(jù)量會變少,但是cpu會消耗一些用來壓縮和解壓。

所以這種方式通常適合job中間結(jié)果非常大,瓶頸不在cpu,而是在磁盤的讀寫的情況。說的 直白一些就是用cpu換IO。

根據(jù)觀察,通常大部分的作業(yè)cpu都不是瓶頸,除非運算邏輯異常復(fù)雜。所以對中間結(jié)果采用壓縮通常來說是有收益的。以下是一 個wordcount中間結(jié)果采用壓縮和不采用壓縮產(chǎn)生的map中間結(jié)果本地磁盤讀寫的數(shù)據(jù)量對比:

map中間結(jié)果不壓縮:

image.png

map中間結(jié)果壓縮:

image.png

可以看出,同樣的job,同樣的數(shù)據(jù),在采用壓縮的情況下,map中間結(jié)果能縮小將近10倍,如果map的瓶頸在磁盤,那么job的性能提升將會非??捎^。

當(dāng)采用map中間結(jié)果壓縮的情況下,用戶還可以選擇壓縮時采用哪種壓縮格式進行壓縮,現(xiàn)在hadoop支持的壓縮格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。

通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較適合。但也要取決于job的具體情況。

用戶若想要自行選擇中間結(jié)果的壓縮算法,可以設(shè)置配置參數(shù):mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用戶自行選擇的壓縮方式。

6.Map side相關(guān)參數(shù)調(diào)優(yōu)

選項 類型 默認值 描述
mapreduce.task.io.sort.mb int 100 緩存map中間結(jié)果的buffer大小(in MB)
io.sort.record.percent float 0.05 io.sort.mb中用來保存map output記錄邊界的百分比,其他緩存用來保存數(shù)據(jù)
io.sort.spill.percent mapreduce.map.sort.spill.percent float 0.80 map開始做spill操作的閾值
io.sort.factor mapreduce.task.io.sort.factor int 10 做merge操作時同時操作的stream數(shù)上限。
min.num.spill.for.combine int 3 combiner函數(shù)運行的最小spill數(shù)
mapred.compress.map.output boolean false map中間結(jié)果是否采用壓縮
mapred.map.output.compression.codec class name org.apache.hadoop.io.compress.DefaultCodec map中間結(jié)果的壓縮格式

Reduce 端導(dǎo)讀 :

reduce的運行是分成三個階段的。分別為 copy->sort->reduce。

由于job的每一個map都會根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個partition,所以map的中間結(jié)果中是有可能包含每一個reduce需要處理的部分數(shù)據(jù)的。

所以,為了優(yōu)化reduce的執(zhí)行時間,hadoop中是等job的第一個map結(jié)束后,所有的reduce就開始嘗試從完成的map中下載該reduce對應(yīng)的partition部分數(shù)據(jù)。

這個過程就是通常所說的shuffle,也就是copy過程。

1. reduce shuffle parallelcopies

Reduce task在做shuffle時,實際上就是從不同的已經(jīng)完成的map上去下載屬于自己這個reduce的部分數(shù)據(jù).

由于map通常有許多個,所以對一個reduce來說,下載也可以是并行的從多個map下載這個并行度是可以調(diào)整的,調(diào)整參數(shù)為:mapreduce.reduce.shuffle.parallelcopies(default 5)。

默認情況下,每個只會有5個并行的下載線程在從map下數(shù)據(jù),如果一個時間段內(nèi)job完成的map有100個或者更多,那么reduce也最多只能同時下載 5個map的數(shù)據(jù),所以這個參數(shù)比較適合map很多并且完成的比較快的job的情況下調(diào)大,有利于reduce更快的獲取屬于自己部分的數(shù)據(jù)。

2. reduce merge

Reduce將map結(jié)果下載到本地時,同樣也是需要進行merge的,所以mapreduce.task.io.sort.factor的配置選項同樣會影響reduce進行merge時的行為,該參數(shù)的詳細介紹上文已經(jīng)提到,當(dāng)發(fā)現(xiàn)reduce在shuffle階段iowait非常的高的時候,就有可能通過調(diào)大這個參數(shù)來加大一次merge時的并發(fā)吞吐,優(yōu)化reduce效率。

3. reduce spill

Reduce在shuffle階段對下載來的map數(shù)據(jù),并不是立刻就寫入磁盤的,而是會先緩存在內(nèi)存中,然后當(dāng)使用內(nèi)存達到一定量的時候才刷入磁盤。這個內(nèi)存大小的控制就不像map一樣可以通過 mapreduce.task.io.sort.mb 來設(shè)定了,而是通過另外一個參數(shù)來設(shè)置:mapreduce.reduce.shuffle.input.buffer.percent(default 0.7).

這個參數(shù)其實是一個百分比,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task。
也就是說,如果該reduce task的最大heap使用量(通常通過mapreduce.reduce.java.opts來設(shè)置,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)。
默認情況下,reduce會使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)。

如果reduce的heap由于業(yè)務(wù)原因調(diào)整的比較大,相應(yīng)的緩存大小也會變大,這也是為什么reduce用來做緩存的參數(shù)是一個百分比,而不是一個固定的值了。

假設(shè) mapreduce.reduce.shuffle.input.buffer.percent 為 0.7,reduce task的max heapsize為1G,那么用來做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右,這700M的內(nèi)存,跟map端一樣,也不是要等到全部寫滿才會往磁盤刷的,而是當(dāng)這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁盤刷。

這個限度閾值也是可以通過job參數(shù)來設(shè)定的,設(shè)定參數(shù)為:mapreduce.reduce.shuffle.merge.percent(default 0.66)。如果下載速度很快,很容易就把內(nèi)存緩存撐大,那么調(diào)整一下這個參數(shù)有可能會對reduce的性能有所幫助。

當(dāng)reduce將所有的map上對應(yīng)自己partition的數(shù)據(jù)下載完成后,就會開始真正的reduce計算階段(中間有個sort階段通常時間非常短,幾秒鐘就完成了,因為整個下載階段就已經(jīng)是邊下載邊sort,然后邊merge的)。

當(dāng)reduce task真正進入reduce函數(shù)的計算階段的時候,有一個參數(shù)也是可以調(diào)整reduce的計算行為。也就是:mapreduce.reduce.input.buffer.percent(default 0.0)。

由于reduce計算時肯定也是需要消耗內(nèi)存的,而在讀取reduce需要的數(shù)據(jù)時,同樣是需要內(nèi)存作為buffer,這個參數(shù)是控制,需要多少的內(nèi)存百 分比來作為reduce讀已經(jīng)sort好的數(shù)據(jù)的buffer百分比。默認情況下為0,也就是說,默認情況下,reduce是全部從磁盤開始讀處理數(shù)據(jù)。 如果這個參數(shù)大于0,那么就會有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當(dāng)reduce計算邏輯消耗內(nèi)存很小時,可以分一部分內(nèi)存用來緩存數(shù)據(jù), 反正reduce的內(nèi)存閑著也是閑著。

4. reduce 相關(guān)參數(shù)調(diào)優(yōu)

選項 類型 默認值 描述
mapred.reduce.parallel.copies int 5 每個reduce并行下載map結(jié)果的最大線程數(shù)
io.sort.factor int 10 同上
mapred.job.shuffle.input.buffer.percent float 0.7 用來緩存shuffle數(shù)據(jù)的reduce task heap百分比
mapred.job.shuffle.merge.percent float 0.66 緩存的內(nèi)存中多少百分比后開始做merge操作
mapred.job.reduce.input.buffer.percent float 0.0 sort完成后reduce計算階段用來緩存數(shù)據(jù)的百分比
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容