一、前言
Shuffle翻譯為”洗牌“,在大數(shù)據(jù)MapReduce或Spark中都是比較經(jīng)典的數(shù)據(jù)處理機(jī)制。而今天大獅兄將和大家一起重溫MapReduce的Shuffle機(jī)制。對(duì)于MapReduce,從map( )方法的輸出到reduce方法( )的輸入,中間的過程被稱為shuffle過程。Shuffle 過程又分為Map端的Shuffle和Reduce端的Shuffle過程。經(jīng)典老圖鎮(zhèn)文:

二、Shuffle詳解
1. Map端的Shuffle
首先來看幾個(gè)重要的概念:環(huán)形緩沖區(qū)、分區(qū)、排序、合并
因?yàn)轭l繁的磁盤I/O操作會(huì)嚴(yán)重的降低效率,因此“中間結(jié)果”不會(huì)立馬寫入磁盤,而是優(yōu)先存儲(chǔ)到map節(jié)點(diǎn)的“環(huán)形內(nèi)存緩沖區(qū)”,在寫入的過程中進(jìn)行分區(qū)(partition),也就是對(duì)于每個(gè)鍵值對(duì)來說,都增加了一個(gè)partition屬性值,然后連同鍵值對(duì)一起序列化成字節(jié)數(shù)組寫入到緩沖區(qū)(緩沖區(qū)采用的就是字節(jié)數(shù)組,默認(rèn)大小為100M)。
當(dāng)寫入的數(shù)據(jù)量達(dá)到預(yù)先設(shè)置的閾值后便會(huì)啟動(dòng)溢寫出線程將緩沖區(qū)中的那部分?jǐn)?shù)據(jù)溢出寫(spill)到磁盤的臨時(shí)文件(這個(gè)文件沒有固定大?。┲?,并在寫入前根據(jù)key進(jìn)行排序(sort)和合并(combine,可選操作)。
溢出寫過程按輪詢方式將緩沖區(qū)中的內(nèi)容寫到mapreduce.cluster.local.dir屬性指定的本地目錄中。當(dāng)整個(gè)map任務(wù)完成溢出寫后,會(huì)對(duì)磁盤中這個(gè)map任務(wù)產(chǎn)生的所有臨時(shí)文件(spill文件)進(jìn)行歸并(merge)操作生成最終的正式輸出文件,此時(shí)的歸并是將所有spill文件中的相同partition合并到一起,并對(duì)各個(gè)partition中的數(shù)據(jù)再進(jìn)行一次排序(sort),生成key和對(duì)應(yīng)的value-list,文件歸并時(shí),如果溢寫文件數(shù)量超過參數(shù)min.num.spills.for.combine的值(默認(rèn)為3)時(shí),可以再次進(jìn)行合并。
至此map端的工作已經(jīng)全部結(jié)束,最終生成的文件也會(huì)存儲(chǔ)在TaskTracker能夠訪問的位置。每個(gè)reduce task不間斷的通過RPC從JobTracker那里獲取map task是否完成的信息,如果得到的信息是map task已經(jīng)完成,那么Shuffle的后半段ReduceShuffle開始啟動(dòng)。
2. Reduce端的Shuffle
Reduce端的shuffle主要包括三個(gè)階段,copy、merge和reduce。
Copy過程,簡(jiǎn)單地拉取數(shù)據(jù)。Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請(qǐng)求map task所在的TaskTracker獲取map task的輸出文件。因?yàn)閙ap task早已結(jié)束,這些文件就歸TaskTracker管理在本地磁盤中。
Merge階段。這里的merge如map端的merge動(dòng)作,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中,這里的緩沖區(qū)大小要比map端的更為靈活,它基于JVM的heap size設(shè)置,因?yàn)镾huffle階段Reducer不運(yùn)行,所以應(yīng)該把絕大部分的內(nèi)存都給Shuffle用。這里需要強(qiáng)調(diào)的是,merge有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤 3)磁盤到磁盤。默認(rèn)情況下第一種形式不啟用,讓人比較困惑,是吧。當(dāng)內(nèi)存中的數(shù)據(jù)量到達(dá)一定閾值,就啟動(dòng)內(nèi)存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)第三種磁盤到磁盤的merge方式生成最終的那個(gè)文件。
Reducer的輸入文件。不斷地merge后,最后會(huì)生成一個(gè)“最終文件”。為什么加引號(hào)?因?yàn)檫@個(gè)文件可能存在于磁盤上,也可能存在于內(nèi)存中。對(duì)我們來說,當(dāng)然希望它存放于內(nèi)存中,直接作為Reducer的輸入,但默認(rèn)情況下,這個(gè)文件是存放于磁盤中的。至于怎樣才能讓這個(gè)文件出現(xiàn)在內(nèi)存中,之后的性能優(yōu)化篇我再說。當(dāng)Reducer的輸入文件已定,整個(gè)Shuffle才最終結(jié)束。然后就是Reducer執(zhí)行,把結(jié)果放到HDFS上。
為什么要排序?
key存在combine操作,排序之后相同的key放到一塊顯然方便做合并操作。
reduce task是按key去處理數(shù)據(jù)的。如果沒有排序那必須從所有數(shù)據(jù)中把當(dāng)前相同key的所有value數(shù)據(jù)拿出來,然后進(jìn)行reduce邏輯處理。顯然每個(gè)key到這個(gè)邏輯都需要做一次全量數(shù)據(jù)掃描,影響性能,有了排序很方便的得到一個(gè)key對(duì)于的value集合。
reduce task按key去處理數(shù)據(jù)時(shí),如果key按順序排序,那么reduce task就按key順序去讀取,顯然當(dāng)讀到的key是文件末尾的key那么就標(biāo)志數(shù)據(jù)處理完畢。如果沒有排序那還得有其他邏輯來記錄哪些key處理完了,哪些key沒有處理完。
為什么要合并?
因?yàn)閮?nèi)存放不下就會(huì)溢寫文件,就會(huì)發(fā)生多次溢寫,形成很多小文件,如果不合并,顯然會(huì)小文件泛濫,集群需要資源開銷去管理這些小文件數(shù)據(jù)。
任務(wù)去讀取文件的數(shù)增多,打開的文件句柄數(shù)也會(huì)增多。
mapreduce是全局有序。單個(gè)文件有序,不代表全局有序,只有把小文件合并一起排序才會(huì)全局有序。
補(bǔ)充:
Map數(shù)量:
對(duì)于大文件:由任務(wù)的 split 切片決定的,一個(gè) split 切片對(duì)應(yīng)一個(gè)map任務(wù)。先明確一點(diǎn) split 切片的大小可自己配置的,一般來說對(duì)于大文件會(huì)選擇split == block,如果split < block的情況下會(huì)增加 map 的數(shù)量,雖然這樣可以增加map執(zhí)行的并行度,但是會(huì)導(dǎo)致map任務(wù)在不同節(jié)點(diǎn)拉取數(shù)據(jù),浪費(fèi)了網(wǎng)絡(luò)資源等。ps:HDFS 中 block 是最小的存儲(chǔ)單元,默認(rèn)128M。
對(duì)于小文件:由參與任務(wù)的文件數(shù)量決定,默認(rèn)情況一個(gè)小文件啟動(dòng)一個(gè) map 任務(wù),小文件數(shù)量較多會(huì)導(dǎo)致啟動(dòng)較大數(shù)量的 map 任務(wù),增加資源消耗。此時(shí)建議將多個(gè)小文件通過 InputFormat 合并成一個(gè)大文件加入到一個(gè) split 中,并增加 split 的大小,這樣可以有效減少 map 的數(shù)量,降低并發(fā)度,減少資源消耗。
Reduce數(shù)量:
由分區(qū)(partiton)的數(shù)量決定的,我們可以在代碼中配置 job.setNumReduceTasks(*)來控制 reduce 的任務(wù)數(shù)量。
三、 總結(jié)
以上就是和大家要重溫MapReduce Shuffle的內(nèi)容,希望對(duì)大家的工作或者面試中有所幫助。