MapReduce分析

mapreduce是什么?

是一個(gè)編程模型, 分為map和reduce. map接受一條record, 將這條record進(jìn)行各種想要得到的變換輸出為中間結(jié)果, 而reduce把key相同的中間結(jié)果放在一起(key, iterable value list), 進(jìn)行聚合輸出0個(gè)或者1個(gè)結(jié)果.

mapreduce(mr)不是什么

mr不是一個(gè)新概念, mr來(lái)自函數(shù)式編程中已有的概念. Google對(duì)mr做出的貢獻(xiàn)不在于創(chuàng)造了這個(gè)編程模板, 而是把mr整合到分布式的存儲(chǔ)和任務(wù)管理中去, 實(shí)現(xiàn)分布式的計(jì)算. 所以就mr而言,重點(diǎn)并不在這個(gè)編程模板上, 而是如何通過(guò)分布式去實(shí)現(xiàn)mr的. 這是我接下來(lái)要關(guān)注的重點(diǎn).

一個(gè)mr過(guò)程的overview:

通過(guò)分割[1], 輸入數(shù)據(jù)變成一個(gè)有M個(gè)split的子集(每一個(gè)split從16M到64M不等[2]). map函數(shù)被分布到多臺(tái)服務(wù)器上去執(zhí)行map任務(wù). 使得輸入的split能夠在不同的機(jī)器上被并行處理.

map函數(shù)的輸出通過(guò)用split函數(shù)來(lái)分割中間key, 來(lái)形成R個(gè)partition(例如, hash(key) mod R), 然后reduce調(diào)用被分布到多態(tài)機(jī)器上去. partition的數(shù)據(jù)和分割函數(shù)由用戶來(lái)指定.

一個(gè)mr的完整過(guò)程:

1> mr的庫(kù)首先分割輸入文件成M個(gè)片, 然后再集群中開(kāi)始大量的copy程序

2> 這些copy中有一個(gè)特殊的: 是master. 其它的都是worker. 有M個(gè)map任務(wù)和R個(gè)reduce任務(wù)將被分配. mater會(huì)把一個(gè)map任務(wù)或者是一個(gè)reduce任務(wù)分配給idle worker(空閑機(jī)器).

3> 一個(gè)被分配了map任務(wù)的worker讀取相關(guān)輸入split的內(nèi)容. 它從輸入數(shù)據(jù)中分析出key/value pair, 然后把key/value對(duì)傳遞給用戶自定義的map函數(shù), 有map函數(shù)產(chǎn)生的中間key/value pair被緩存在內(nèi)存中

4> 緩存到內(nèi)存的中kv paoir會(huì)被周期性的寫入本地磁盤上. 怎么寫? 通過(guò)partitioning function把他們寫入R個(gè)分區(qū). 這些buffered pair在本地磁盤的位置會(huì)被傳回給master. master會(huì)在后面把這個(gè)位置轉(zhuǎn)發(fā)給reduce的worker.

5> 當(dāng)reduce的worker接收到master發(fā)來(lái)的位置信息后, 它通過(guò)遠(yuǎn)程訪問(wèn)來(lái)讀map worker溢寫到磁盤上的數(shù)據(jù). 當(dāng)reduce worker把所有的中間結(jié)果都讀完了以后, 它要根據(jù)中間結(jié)果的key做一個(gè)sort --> 這樣的話, key相同的record會(huì)被group到一起. 這個(gè)sort是必須的, 因?yàn)橥ǔO嗤膔educe task會(huì)收到很多不同的key(如果不做sort, 就沒(méi)法把key相同的record group在一起了). 如果中間結(jié)果太大超過(guò)了內(nèi)存容量, 需要做一個(gè)外部的sort.

6> reducer worker會(huì)對(duì)每一個(gè)unique key進(jìn)行一次遍歷, 把每一個(gè)unique key和它c(diǎn)orresponding的value list傳送給用戶定義的reduce function中去. reduce的輸出被append到這個(gè)reduce的partition的最終的輸出文件中去

7> 當(dāng)所有的map任務(wù)和reduce任務(wù)都完成后, master結(jié)點(diǎn)會(huì)喚醒user program. 這個(gè)時(shí)候, 在user program中的對(duì)mapreduce的call會(huì)返回到用戶的code中去.

最終, mr執(zhí)行的輸出會(huì)被分到R個(gè)輸出文件中去(每個(gè)reduce輸出一個(gè)partition, 共R個(gè).) 通常來(lái)講, 用戶不需要把這R個(gè)輸出文件合并成一個(gè), 因?yàn)樗麄兘?jīng)常會(huì)被作為下一個(gè)mapreduce程序的輸入. 或者是通過(guò)別的程序來(lái)調(diào)用他們, 這個(gè)程序必須可以handle有多個(gè)partition作為輸入的情況.

master的數(shù)據(jù)結(jié)構(gòu):
master維護(hù)的主要是metadata. 它為每一個(gè)map和reduce任務(wù)存儲(chǔ)他們的狀態(tài)(idle, in-progress,
or completed).
master就像一個(gè)管道,通過(guò)它,中間文件區(qū)域的位置從map任務(wù)傳遞到reduce任務(wù).因此,對(duì)于每個(gè)完成的map任務(wù),master存儲(chǔ)由map任務(wù)產(chǎn)生的R個(gè)中間文件區(qū)域的大小和位置.當(dāng)map任務(wù)完成的時(shí)候,位置和大小的更新信息被接受.這些信息被逐步增加的傳遞給那些正在工作的reduce任務(wù).

Fault Tolerance

錯(cuò)誤分為2中 worker的故障和master的故障.

worker故障:

master會(huì)周期性的ping每個(gè)worker. 如果在一個(gè)缺點(diǎn)的時(shí)間段內(nèi)沒(méi)有收到worker返回的信息, master會(huì)把這個(gè)worker標(biāo)記成失效. 失敗的任務(wù)是如何重做的呢? 每一個(gè)worker完成的map任務(wù)會(huì)被reset為idle的狀態(tài), 所以它可以被安排給其它的worker. 對(duì)于一個(gè)failed掉的worker上的map任務(wù)和reduce任務(wù), 也通同樣可以通過(guò)這種方式來(lái)處理.

master失敗:

master只有一個(gè), 它的失敗會(huì)造成single point failure. 就是說(shuō), 如果master失敗, 就會(huì)終止mr計(jì)算. 讓用戶來(lái)檢查這個(gè)狀態(tài), 根據(jù)需要重新執(zhí)行mr操作.

在錯(cuò)誤面前的處理機(jī)制(類似于exactly once?)

當(dāng)map當(dāng)user提供的map和reduce operator是關(guān)于輸入的確定性的操作, 我們提供的分布式implementation能夠提供相同的輸出. 什么相同的輸出呢? 和一個(gè)非容錯(cuò)的順序執(zhí)行的程序一樣的輸出. 是如何做到這一點(diǎn)的?

是依賴于map和reduce任務(wù)輸出的原子性提交來(lái)實(shí)現(xiàn)這個(gè)性質(zhì)的. 對(duì)所有的task而言, task會(huì)把輸出寫到private temporary files中去. 一個(gè)map任務(wù)會(huì)產(chǎn)生R個(gè)這樣的臨時(shí)文件, 一個(gè)reduce任務(wù)會(huì)產(chǎn)生1個(gè)這樣的臨時(shí)文件. 當(dāng)map任務(wù)完成的時(shí)候, worker會(huì)給master發(fā)一個(gè)信息, 這個(gè)信息包含了R個(gè)臨時(shí)文件的name. 如果master收到了一條已經(jīng)完成的map任務(wù)的新的完成信息, master會(huì)忽略這個(gè)信息.否則的話, master會(huì)紀(jì)錄這R個(gè)文件的名字到自己的data structure中去.

當(dāng)reduce任務(wù)完成了, reduce worker會(huì)自動(dòng)把自己輸出的臨時(shí)文件重命名為final output file. 如果相同的在多態(tài)機(jī)器上執(zhí)行, 那么在相同的final output file上都會(huì)執(zhí)行重命名. 通過(guò)這種方式來(lái)保證最終的輸出文件只包含被一個(gè)reduce task執(zhí)行過(guò)的數(shù)據(jù).

存儲(chǔ)位置

mr是如果利用網(wǎng)絡(luò)帶寬的?
論文中說(shuō), 利用把輸入數(shù)據(jù)(HDFS中)存儲(chǔ)在機(jī)器的本地磁盤來(lái)save網(wǎng)絡(luò)帶寬. HDFS把每個(gè)文件分成64MB的block. 然后每個(gè)block在別的機(jī)器上做replica(一般是3份). 做mr時(shí), master會(huì)考慮輸入文件的位置信息, 并努力在某個(gè)機(jī)器上安排一個(gè)map任務(wù).什么樣的機(jī)器? 包含了這個(gè)map任務(wù)的數(shù)據(jù)的replica的機(jī)器上. 如果失敗的話, 則嘗試就近安排(比如安排到一個(gè)worker machine上, 這個(gè)machine和包含input data的machine在同一個(gè)network switch上), 這樣的話, 想使得大部分輸入數(shù)據(jù)在本地讀取, 不消耗網(wǎng)絡(luò)帶寬.

任務(wù)粒度

把map的輸入拆成了M個(gè)partition, 把reduce的輸入拆分成R個(gè)partition. 因?yàn)镽通常是用戶指定的,所以我們?cè)O(shè)定M的值. 讓每一個(gè)partition都在16-64MB(對(duì)應(yīng)于HDFS的存儲(chǔ)策略, 每一個(gè)block是64MB) 另外, 經(jīng)常把R的值設(shè)置成worker數(shù)量的小的倍數(shù).

備用任務(wù)

straggler(落伍者): 一個(gè)mr的總的執(zhí)行時(shí)間總是由落伍者決定的. 導(dǎo)致一臺(tái)machine 慢的原因有很多:可能硬盤出了問(wèn)題, 可能是key的分配出了問(wèn)題等等. 這里通過(guò)一個(gè)通用的用的機(jī)制來(lái)處理這個(gè)情況:
當(dāng)一個(gè)MapReduce操作接近完成的時(shí)候,master調(diào)度備用(backup)任務(wù)進(jìn)程來(lái)執(zhí)行剩下的、處于處理中狀態(tài)(in-progress)的任務(wù)。無(wú)論是最初的執(zhí)行進(jìn)程、還是備用(backup)任務(wù)進(jìn)程完成了任務(wù),我們都把這個(gè)任務(wù)標(biāo)記成為已經(jīng)完成。我們調(diào)優(yōu)了這個(gè)機(jī)制,通常只會(huì)占用比正常操作多幾個(gè)百分點(diǎn)的計(jì)算資源。我們發(fā)現(xiàn)采用這樣的機(jī)制對(duì)于減少超大MapReduce操作的總處理時(shí)間效果顯著。

技巧

  1. partition 函數(shù)
    map的輸出會(huì)劃分到R個(gè)partition中去. 默認(rèn)的partition的方法是使用hash進(jìn)行分區(qū). 然而有時(shí)候, hash不能滿足我們的需求. 比如: 輸出的key的值是URLs, 我們希望每個(gè)主機(jī)的所有條目保持在同一個(gè)partition中, 那么我們就要自己寫一個(gè)分區(qū)函數(shù), 如hash(Hostname(urlkey) mod R)

  2. 順序保證
    我們確保在給定的partition中, 中間的kv pair的值增量順序處理的. 這樣的順序保證對(duì)每個(gè)partition生成一個(gè)有序的輸出文件.

  3. Combiner函數(shù)
    在某些情況下,Map函數(shù)產(chǎn)生的中間key值的重復(fù)數(shù)據(jù)會(huì)占很大的比重. 如果把這些重復(fù)的keybu'zu我們?cè)试S用戶指定一個(gè)可選的combiner函數(shù),combiner函數(shù)首先在本地將這些記錄進(jìn)行一次合并,然后將合并的結(jié)果再通過(guò)網(wǎng)絡(luò)發(fā)送出去。
    Combiner函數(shù)在每臺(tái)執(zhí)行Map任務(wù)的機(jī)器上都會(huì)被執(zhí)行一次。因此combiner是map側(cè)的一個(gè)reduce. 一般情況下,Combiner和Reduce函數(shù)是一樣的。Combiner函數(shù)和Reduce函數(shù)之間唯一的區(qū)別是MapReduce庫(kù)怎樣控制函數(shù)的輸出。Reduce函數(shù)的輸出被保存在最終的輸出文件里,而Combiner函數(shù)的輸出被寫到中間文件里,然后被發(fā)送給Reduce任務(wù)。

  4. 輸入輸出類型
    支持多種. 比如文本的話, key是offset, value是這一行的內(nèi)容. 每種輸入類型的豎線都必須能夠把輸入數(shù)據(jù)分割成split. 這個(gè)split能夠由單獨(dú)的map任務(wù)來(lái)進(jìn)行后續(xù)處理. 使用者可以通過(guò)提供一個(gè)reader接口的實(shí)現(xiàn)來(lái)支持新的輸入類型. 而且reader不一定需要從文件中讀取數(shù)據(jù).

  5. 跳過(guò)損耗的紀(jì)錄
    有時(shí)候, 用戶程序中的bug導(dǎo)致map或者reduce在處理某些record的時(shí)候crash掉. 我們提供一種忽略這些record的模式, mr會(huì)檢測(cè)檢測(cè)哪些記錄導(dǎo)致確定性的crash,并且跳過(guò)這些記錄不處理。
    具體做法是: 在執(zhí)行MR操作之前, MR庫(kù)會(huì)通過(guò)全局變量保存record的sequence number, 如果用戶程序出發(fā)了一個(gè)系統(tǒng)信號(hào), 消息處理函數(shù)將用"最后一口氣" 通過(guò)UDP包向master發(fā)送處理的最后一條紀(jì)錄的序號(hào). 當(dāng)master看到在處理某條特定的record不止失敗一次時(shí), 就對(duì)它進(jìn)行標(biāo)記需要被跳過(guò), 在下次重新執(zhí)行相關(guān)的mr任務(wù)的時(shí)候跳過(guò)這條紀(jì)錄.

在Google給的例子中, 有一點(diǎn)值得注意.
通過(guò)benchmark的測(cè)試, 能知道key的分區(qū)情況. 而通常對(duì)于需要排序的程序來(lái)說(shuō), 會(huì)增加一個(gè)預(yù)處理的mapreduce操作用于采樣key值的分布情況. 通過(guò)采樣的數(shù)據(jù)來(lái)計(jì)算對(duì)最終排序處理的分區(qū)點(diǎn).

當(dāng)時(shí)最成功的應(yīng)用: 重寫了Google網(wǎng)絡(luò)搜索服務(wù)所使用到的index系統(tǒng)

總結(jié): mr的牛逼之處在于:
1> MapReduce封裝了并行處理、容錯(cuò)處理、數(shù)據(jù)本地化優(yōu)化、負(fù)載均衡等等技術(shù)難點(diǎn)的細(xì)節(jié),這使得MapReduce庫(kù)易于使用。
2> 編程模板好. 大量不同類型的問(wèn)題都可以通過(guò)MapReduce簡(jiǎn)單的解決。

3> 部署方便.

總結(jié)的經(jīng)驗(yàn):

1> 約束編程模式使得并行和分布式計(jì)算非常容易,也易于構(gòu)造容錯(cuò)的計(jì)算環(huán)境(暫時(shí)不懂)
2> 網(wǎng)絡(luò)帶寬是稀有資源, 大量的系統(tǒng)優(yōu)化是針對(duì)減少網(wǎng)絡(luò)傳輸量為目的的: 本地優(yōu)化策略使得大量的數(shù)據(jù)從本地磁盤讀取, 中間文件寫入本地磁盤, 并且只寫一份中間文件.
3> 多次執(zhí)行相同的任務(wù)可以減少性能緩慢的機(jī)器帶來(lái)的負(fù)面影響,同時(shí)解決了由于機(jī)器失效導(dǎo)致的數(shù)據(jù)丟失問(wèn)題。

關(guān)于shuffle, combiner 和partition

shuffle: 從map寫出開(kāi)始到reduce執(zhí)行之前的過(guò)程可以統(tǒng)一稱為shuffle. 具體可以分為map端的shuffle和reduce端的shuffle.
combiner和partition: 都是在map端的.

具體過(guò)程:

  1. Collect階段
    1> 在map()端, 最后一步通過(guò)context.write(key,value)輸出map處理的中間結(jié)果. 然后調(diào)用partitioner.getPartiton(key, value, numPartitions)來(lái)取得這條record的分區(qū)號(hào). record 從kv pair(k,v) -->變?yōu)?(k,v,partition).

2> 將變換后的record暫時(shí)保存在內(nèi)存中的MapOutputBuffer內(nèi)部的環(huán)形數(shù)據(jù)緩沖區(qū)(默認(rèn)大小是100MB, 可以通過(guò)參數(shù)io.sort.mb調(diào)整, 設(shè)置這個(gè)緩存是為了排序速度提高, 減少IO開(kāi)銷). 當(dāng)緩沖區(qū)的數(shù)據(jù)使用率到達(dá)一定閾值后, 觸發(fā)一次spill操作. 將環(huán)形緩沖區(qū)的部分?jǐn)?shù)據(jù)寫到磁盤上, 生成一個(gè)臨時(shí)的linux本地?cái)?shù)據(jù)的spill文件, 在緩沖區(qū)的使用率再次達(dá)到閾值后, 再次生成一個(gè)spill文件. 直到數(shù)據(jù)處理完畢, 在磁盤上會(huì)生成很多臨時(shí)文件.
關(guān)于緩沖區(qū)的結(jié)構(gòu)先不討論

2.spill階段
當(dāng)緩沖區(qū)的使用率到達(dá)一定閾值后(默認(rèn)是80%, 為什么要設(shè)置比例, 因?yàn)橐寣懞妥x同時(shí)進(jìn)行), 出發(fā)一次"spill", 將一部分緩沖區(qū)的數(shù)據(jù)寫到本地磁盤(而不是HDFS).
特別注意: 在將數(shù)據(jù)寫入磁盤前, 會(huì)對(duì)這一部分?jǐn)?shù)據(jù)進(jìn)行sort. 默認(rèn)是使用QuickSort.先按(key,value,partition)中的partition分區(qū)號(hào)排序,然后再按key排序. 如果設(shè)置了對(duì)中間數(shù)據(jù)做壓縮的配置還會(huì)做壓縮操作.

注:當(dāng)達(dá)到溢出條件后,比如默認(rèn)的是0.8,則會(huì)讀出80M的數(shù)據(jù),根據(jù)之前的分區(qū)元數(shù)據(jù),按照分區(qū)號(hào)進(jìn)行排序,這樣就可實(shí)現(xiàn)同一分區(qū)的數(shù)據(jù)都在一起,然后再根據(jù)map輸出的key進(jìn)行排序。最后實(shí)現(xiàn)溢出的文件內(nèi)是分區(qū)的,且分區(qū)內(nèi)是有序的

3.Merge階段
map輸出數(shù)據(jù)比較多的時(shí)候,會(huì)生成多個(gè)溢出文件,任務(wù)完成的最后一件事情就是把這些文件合并為一個(gè)大文件。合并的過(guò)程中一定會(huì)做merge操作,可能會(huì)做combine操作。
merge與combine的對(duì)比:
在map側(cè)可能有2次combine. 在spill出去之前, 會(huì)combine一次(在user設(shè)置的前提下). 如果map的溢寫文件個(gè)數(shù)大于3時(shí)(可配置:min.num.spills.for.combine)在merge的過(guò)程中(多個(gè)spill文件合并為一個(gè)大文件)中還會(huì)執(zhí)行combine操作.

Combine: a:1,a:2 ---> a:3
Merge: a:1,a:2 ---> a,[1,2]

Reducer端: copy, sort, reduce
4.copy
copy的過(guò)程是指reduce嘗試從完成的map中copy該reduce對(duì)應(yīng)的partition的部分?jǐn)?shù)據(jù).
什么時(shí)候開(kāi)始做copy呢? 等job的第一個(gè)map結(jié)束后就開(kāi)始copy的過(guò)程了.因?yàn)閷?duì)每一個(gè)map,都根據(jù)你reduce的數(shù)將map的輸出結(jié)果分成R個(gè)partition. 所以map的中間結(jié)果中是有可能包含每一個(gè)reduce需要處理的部分?jǐn)?shù)據(jù)的. 由于每一個(gè)map產(chǎn)生的中間結(jié)果都有可能包含某個(gè)reduce所在的partition的數(shù)據(jù), 所以這個(gè)copy是從多個(gè)map并行copy的(默認(rèn)是5個(gè)).

注: 這里因?yàn)榫W(wǎng)絡(luò)問(wèn)題down失敗了怎么辦? 重試, 在一定時(shí)間后若仍然失敗, 那么下載現(xiàn)成就會(huì)放棄這次下載, 隨后嘗試從別的地方下載.

5.merge
Reduce將map結(jié)果下載到本地時(shí),同樣也是需要進(jìn)行merge的所以io.sort.factor的配置選項(xiàng)同樣會(huì)影響reduce進(jìn)行merge時(shí)的行為.
當(dāng)發(fā)現(xiàn)reduce在shuffle階段iowait非常的高的時(shí)候,就有可能通過(guò)調(diào)大這個(gè)參數(shù)來(lái)加大一次merge時(shí)的并發(fā)吞吐,優(yōu)化reduce效率。

(copy到哪兒, 先是內(nèi)存的buffer, 然后是disk) reduce在shuffle階段對(duì)下載下來(lái)的map數(shù)據(jù)也不是立刻寫入磁盤, 而是先用一個(gè)buffer存在內(nèi)存中. 然后當(dāng)使用內(nèi)存達(dá)到一定量的時(shí)候才spill到磁盤. 這個(gè)百分比是通過(guò)另一個(gè)參數(shù)來(lái)控制.

reduce端的merge不是等所有溢寫完成后再merge的. 而是一邊copy一邊sort一邊merge. 在執(zhí)行完merge sort后, reduce task會(huì)將數(shù)據(jù)交給reduce()方法進(jìn)行處理

參考:

  1. http://blog.51cto.com/xigan/1163820
  2. http://flyingdutchman.iteye.com/blog/1879642
  3. http://www.cnblogs.com/edisonchou/p/4298423.html
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容