MapReduce:超大機(jī)群上的簡單數(shù)據(jù)處理
摘要
MapReduce是一個編程模型,和處理,產(chǎn)生大數(shù)據(jù)集的相關(guān)實(shí)現(xiàn).用戶指定一個map函數(shù)處理一個key/value對,從而產(chǎn)生中間的key/value對集.然后再指定一個reduce函數(shù)合并所有的具有相同中間key的中間value.下面將列舉許多可以用這個模型來表示的現(xiàn)實(shí)世界的工作.
以這種方式寫的程序能自動的在大規(guī)模的普通機(jī)器上實(shí)現(xiàn)并行化.這個運(yùn)行時系統(tǒng)關(guān)心這些細(xì)節(jié):分割輸入數(shù)據(jù),在機(jī)群上的調(diào)度,機(jī)器的錯誤處理,管理機(jī)器之間必要的通信.這樣就可以讓那些沒有并行分布式處理系統(tǒng)經(jīng)驗(yàn)的程序員利用大量分布式系統(tǒng)的資源.
我們的MapReduce實(shí)現(xiàn)運(yùn)行在規(guī)??梢造`活調(diào)整的由普通機(jī)器組成的機(jī)群上,一個典型的MapReduce計(jì)算處理幾千臺機(jī)器上的以TB計(jì)算的數(shù)據(jù).程序員發(fā)現(xiàn)這個系統(tǒng)非常好用:已經(jīng)實(shí)現(xiàn)了數(shù)以百計(jì)的MapReduce程序,每天在Google的機(jī)群上都有1000多個MapReduce程序在執(zhí)行.
1.介紹
在過去的5年里,作者和Google的許多人已經(jīng)實(shí)現(xiàn)了數(shù)以百計(jì)的為專門目的而寫的計(jì)算來處理大量的原始數(shù)據(jù),比如,爬行的文檔,Web請求日志,等等.為了計(jì)算各種類型的派生數(shù)據(jù),比如,倒排索引,Web文檔的圖結(jié)構(gòu)的各種表示,每個主機(jī)上爬行的頁面數(shù)量的概要,每天被請求數(shù)量最多的集合,等等.很多這樣的計(jì)算在概念上很容易理解.然而,輸入的數(shù)據(jù)量很大,并且只有計(jì)算被分布在成百上千的機(jī)器上才能在可以接受的時間內(nèi)完成.怎樣并行計(jì)算,分發(fā)數(shù)據(jù),處理錯誤,所有這些問題綜合在一起,使得原本很簡介的計(jì)算,因?yàn)橐罅康膹?fù)雜代碼來處理這些問題,而變得讓人難以處理.
作為對這個復(fù)雜性的回應(yīng),我們設(shè)計(jì)一個新的抽象模型,它讓我們表示我們將要執(zhí)行的簡單計(jì)算,而隱藏并行化,容錯,數(shù)據(jù)分布,負(fù)載均衡的那些雜亂的細(xì)節(jié),在一個庫里.我們的抽象模型的靈感來自Lisp和許多其他函數(shù)語言的map和reduce的原始表示.我們認(rèn)識到我們的許多計(jì)算都包含這樣的操作:在我們輸入數(shù)據(jù)的邏輯記錄上應(yīng)用map操作,來計(jì)算出一個中間key/value對集,在所有具有相同key的value上應(yīng)用reduce操作,來適當(dāng)?shù)暮喜⑴缮臄?shù)據(jù).功能模型的使用,再結(jié)合用戶指定的map和reduce操作,讓我們可以非常容易的實(shí)現(xiàn)大規(guī)模并行化計(jì)算,和使用再次執(zhí)行作為初級機(jī)制來實(shí)現(xiàn)容錯.
這個工作的主要貢獻(xiàn)是通過簡單有力的接口來實(shí)現(xiàn)自動的并行化和大規(guī)模分布式計(jì)算,結(jié)合這個接口的實(shí)現(xiàn)來在大量普通的PC機(jī)上實(shí)現(xiàn)高性能計(jì)算.
第二部分描述基本的編程模型,并且給一些例子.第三部分描述符合我們的基于集群的計(jì)算環(huán)境的MapReduce的接口的實(shí)現(xiàn).第四部分描述我們覺得編程模型中一些有用的技巧.第五部分對于各種不同的任務(wù),測量我們實(shí)現(xiàn)的性能.第六部分探究在Google內(nèi)部使用MapReduce作為基礎(chǔ)來重寫我們的索引系統(tǒng)產(chǎn)品.第七部分討論相關(guān)的,和未來的工作.
2.編程模型
計(jì)算利用一個輸入key/value對集,來產(chǎn)生一個輸出key/value對集.MapReduce庫的用戶用兩個函數(shù)表達(dá)這個計(jì)算:map和reduce.
用戶自定義的map函數(shù),接受一個輸入對,然后產(chǎn)生一個中間key/value對集.MapReduce庫把所有具有相同中間key I的中間value聚合在一起,然后把它們傳遞給reduce函數(shù).
用戶自定義的reduce函數(shù),接受一個中間key I和相關(guān)的一個value集.它合并這些value,形成一個比較小的value集.一般的,每次reduce調(diào)用只產(chǎn)生0或1個輸出value.通過一個迭代器把中間value提供給用戶自定義的reduce函數(shù).這樣可以使我們根據(jù)內(nèi)存來控制value列表的大小.
2.1 實(shí)例
考慮這個問題:計(jì)算在一個大的文檔集合中每個詞出現(xiàn)的次數(shù).用戶將寫和下面類似的偽代碼:
map(String key,String value):
//key:文檔的名字
//value:文檔的內(nèi)容
for each word w in value:
EmitIntermediate(w,"1");
reduce(String key,Iterator values):
//key:一個詞
//values:一個計(jì)數(shù)列表
int result=0;
for each v in values:
result+=ParseInt(v);
Emit(AsString(resut));
map函數(shù)產(chǎn)生每個詞和這個詞的出現(xiàn)次數(shù)(在這個簡單的例子里就是1).reduce函數(shù)把產(chǎn)生的每一個特定的詞的計(jì)數(shù)加在一起.
另外,用戶用輸入輸出文件的名字和可選的調(diào)節(jié)參數(shù)來填充一個mapreduce規(guī)范對象.用戶然后調(diào)用MapReduce函數(shù),并把規(guī)范對象傳遞給它.用戶的代碼和MapReduce庫鏈接在一起(用C++實(shí)現(xiàn)).附錄A包含這個實(shí)例的全部文本.
2.2類型
即使前面的偽代碼寫成了字符串輸入和輸出的term格式,但是概念上用戶寫的map和reduce函數(shù)有關(guān)聯(lián)的類型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
例如,輸入的key,value和輸出的key,value的域不同.此外,中間key,value和輸出key,values的域相同.
我們的C++實(shí)現(xiàn)傳遞字符串來和用戶自定義的函數(shù)交互,并把它留給用戶的代碼,來在字符串和適當(dāng)?shù)念愋烷g進(jìn)行轉(zhuǎn)換.
2.3更多實(shí)例
這里有一些讓人感興趣的簡單程序,可以容易的用MapReduce計(jì)算來表示.
分布式的Grep(UNIX工具程序, 可做文件內(nèi)的字符串查找):如果輸入行匹配給定的樣式,map函數(shù)就輸出這一行.reduce函數(shù)就是把中間數(shù)據(jù)復(fù)制到輸出.
計(jì)算URL訪問頻率:map函數(shù)處理web頁面請求的記錄,輸出(URL,1).reduce函數(shù)把相同URL的value都加起來,產(chǎn)生一個(URL,記錄總數(shù))的對.
倒轉(zhuǎn)網(wǎng)絡(luò)鏈接圖:map函數(shù)為每個鏈接輸出(目標(biāo),源)對,一個URL叫做目標(biāo),包含這個URL的頁面叫做源.reduce函數(shù)根據(jù)給定的相關(guān)目標(biāo)URLs連接所有的源URLs形成一個列表,產(chǎn)生(目標(biāo),源列表)對.
每個主機(jī)的術(shù)語向量:一個術(shù)語向量用一個(詞,頻率)列表來概述出現(xiàn)在一個文檔或一個文檔集中的最重要的一些詞.map函數(shù)為每一個輸入文檔產(chǎn)生一個(主機(jī)名,術(shù)語向量)對(主機(jī)名來自文檔的URL).reduce函數(shù)接收給定主機(jī)的所有文檔的術(shù)語向量.它把這些術(shù)語向量加在一起,丟棄低頻的術(shù)語,然后產(chǎn)生一個最終的(主機(jī)名,術(shù)語向量)對.
倒排索引:map函數(shù)分析每個文檔,然后產(chǎn)生一個(詞,文檔號)對的序列.reduce函數(shù)接受一個給定詞的所有對,排序相應(yīng)的文檔IDs,并且產(chǎn)生一個(詞,文檔ID列表)對.所有的輸出對集形成一個簡單的倒排索引.它可以簡單的增加跟蹤詞位置的計(jì)算.
分布式排序:map函數(shù)從每個記錄提取key,并且產(chǎn)生一個(key,record)對.reduce函數(shù)不改變?nèi)魏蔚膶?這個計(jì)算依賴分割工具(在4.1描述)和排序?qū)傩?在4.2描述).
3實(shí)現(xiàn)
MapReduce接口可能有許多不同的實(shí)現(xiàn).根據(jù)環(huán)境進(jìn)行正確的選擇.例如,一個實(shí)現(xiàn)對一個共享內(nèi)存較小的機(jī)器是合適的,另外的適合一個大NUMA的多處理器的機(jī)器,而有的適合一個更大的網(wǎng)絡(luò)機(jī)器的集合.
這部分描述一個在Google廣泛使用的計(jì)算環(huán)境的實(shí)現(xiàn):用交換機(jī)連接的普通PC機(jī)的大機(jī)群.我們的環(huán)境是:
1.Linux操作系統(tǒng),雙處理器,2-4GB內(nèi)存的機(jī)器.
2.普通的網(wǎng)絡(luò)硬件,每個機(jī)器的帶寬或者是百兆或者千兆,但是平均小于全部帶寬的一半.
3.因?yàn)橐粋€機(jī)群包含成百上千的機(jī)器,所有機(jī)器會經(jīng)常出現(xiàn)問題.
4.存儲用直接連到每個機(jī)器上的廉價IDE硬盤.一個從內(nèi)部文件系統(tǒng)發(fā)展起來的分布式文件系統(tǒng)被用來管理存儲在這些磁盤上的數(shù)據(jù).文件系統(tǒng)用復(fù)制的方式在不可靠的硬件上來保證可靠性和有效性.
5.用戶提交工作給調(diào)度系統(tǒng).每個工作包含一個任務(wù)集,每個工作被調(diào)度者映射到機(jī)群中一個可用的機(jī)器集上.
3.1執(zhí)行預(yù)覽
通過自動分割輸入數(shù)據(jù)成一個有M個split的集,map調(diào)用被分布到多臺機(jī)器上.輸入的split能夠在不同的機(jī)器上被并行處理.通過用分割函數(shù)分割中間key,來形成R個片(例如,hash(key) mod R),reduce調(diào)用被分布到多臺機(jī)器上.分割數(shù)量(R)和分割函數(shù)由用戶來指定.
圖1顯示了我們實(shí)現(xiàn)的MapReduce操作的全部流程.當(dāng)用戶的程序調(diào)用MapReduce的函數(shù)的時候,將發(fā)生下面的一系列動作(下面的數(shù)字和圖1中的數(shù)字標(biāo)簽相對應(yīng)):
1.在用戶程序里的MapReduce庫首先分割輸入文件成M個片,每個片的大小一般從 16到64MB(用戶可以通過可選的參數(shù)來控制).然后在機(jī)群中開始大量的拷貝程序.
2.這些程序拷貝中的一個是master,其他的都是由master分配任務(wù)的worker.有M 個map任務(wù)和R個reduce任務(wù)將被分配.管理者分配一個map任務(wù)或reduce任務(wù)給一個空閑的worker.
3.一個被分配了map任務(wù)的worker讀取相關(guān)輸入split的內(nèi)容.它從輸入數(shù)據(jù)中分析出key/value對,然后把key/value對傳遞給用戶自定義的map函數(shù).由map函數(shù)產(chǎn)生的中間key/value對被緩存在內(nèi)存中.
4.緩存在內(nèi)存中的key/value對被周期性的寫入到本地磁盤上,通過分割函數(shù)把它們寫入R個區(qū)域.在本地磁盤上的緩存對的位置被傳送給master,master負(fù)責(zé)把這些位置傳送給reduce worker.
5.當(dāng)一個reduce worker得到master的位置通知的時候,它使用遠(yuǎn)程過程調(diào)用來從map worker的磁盤上讀取緩存的數(shù)據(jù).當(dāng)reduce worker讀取了所有的中間數(shù)據(jù)后,它通過排序使具有相同key的內(nèi)容聚合在一起.因?yàn)樵S多不同的key映射到相同的reduce任務(wù),所以排序是必須的.如果中間數(shù)據(jù)比內(nèi)存還大,那么還需要一個外部排序.
6.reduce worker迭代排過序的中間數(shù)據(jù),對于遇到的每一個唯一的中間key,它把key和相關(guān)的中間value集傳遞給用戶自定義的reduce函數(shù).reduce函數(shù)的輸出被添加到這個reduce分割的最終的輸出文件中.
7.當(dāng)所有的map和reduce任務(wù)都完成了,管理者喚醒用戶程序.在這個時候,在用戶程序里的MapReduce調(diào)用返回到用戶代碼.
在成功完成之后,mapreduce執(zhí)行的輸出存放在R個輸出文件中(每一個reduce任務(wù)產(chǎn)生一個由用戶指定名字的文件).一般,用戶不需要合并這R個輸出文件成一個文件--他們經(jīng)常把這些文件當(dāng)作一個輸入傳遞給其他的MapReduce調(diào)用,或者在可以處理多個分割文件的分布式應(yīng)用中使用他們.
3.2master數(shù)據(jù)結(jié)構(gòu)
master保持一些數(shù)據(jù)結(jié)構(gòu).它為每一個map和reduce任務(wù)存儲它們的狀態(tài)(空閑,工作中,完成),和worker機(jī)器(非空閑任務(wù)的機(jī)器)的標(biāo)識.
master就像一個管道,通過它,中間文件區(qū)域的位置從map任務(wù)傳遞到reduce任務(wù).因此,對于每個完成的map任務(wù),master存儲由map任務(wù)產(chǎn)生的R個中間文件區(qū)域的大小和位置.當(dāng)map任務(wù)完成的時候,位置和大小的更新信息被接受.這些信息被逐步增加的傳遞給那些正在工作的reduce任務(wù).
3.3容錯
因?yàn)镸apReduce庫被設(shè)計(jì)用來使用成百上千的機(jī)器來幫助處理非常大規(guī)模的數(shù)據(jù),所以這個庫必須要能很好的處理機(jī)器故障.
worker故障
master周期性的ping每個worker.如果master在一個確定的時間段內(nèi)沒有收到worker返回的信息,那么它將把這個worker標(biāo)記成失效.因?yàn)槊恳粋€由這個失效的worker完成的map任務(wù)被重新設(shè)置成它初始的空閑狀態(tài),所以它可以被安排給其他的worker.同樣的,每一個在失敗的worker上正在運(yùn)行的map或reduce任務(wù),也被重新設(shè)置成空閑狀態(tài),并且將被重新調(diào)度.
在一個失敗機(jī)器上已經(jīng)完成的map任務(wù)將被再次執(zhí)行,因?yàn)樗妮敵龃鎯υ谒拇疟P上,所以不可訪問.已經(jīng)完成的reduce任務(wù)將不會再次執(zhí)行,因?yàn)樗妮敵龃鎯υ谌治募到y(tǒng)中.
當(dāng)一個map任務(wù)首先被worker A執(zhí)行之后,又被B執(zhí)行了(因?yàn)锳失效了),重新執(zhí)行這個情況被通知給所有執(zhí)行reduce任務(wù)的worker.任何還沒有從A讀數(shù)據(jù)的reduce任務(wù)將從worker B讀取數(shù)據(jù).
MapReduce可以處理大規(guī)模worker失敗的情況.例如,在一個MapReduce操作期間,在正在運(yùn)行的機(jī)群上進(jìn)行網(wǎng)絡(luò)維護(hù)引起80臺機(jī)器在幾分鐘內(nèi)不可訪問了,MapReduce master只是簡單的再次執(zhí)行已經(jīng)被不可訪問的worker完成的工作,繼續(xù)執(zhí)行,最終完成這個MapReduce操作.
master失敗
可以很容易的讓管理者周期的寫入上面描述的數(shù)據(jù)結(jié)構(gòu)的checkpoints.如果這個master任務(wù)失效了,可以從上次最后一個checkpoint開始啟動另一個master進(jìn)程.然而,因?yàn)橹挥幸粋€master,所以它的失敗是比較麻煩的,因此我們現(xiàn)在的實(shí)現(xiàn)是,如果master失敗,就中止MapReduce計(jì)算.客戶可以檢查這個狀態(tài),并且可以根據(jù)需要重新執(zhí)行MapReduce操作.
在錯誤面前的處理機(jī)制
當(dāng)用戶提供的map和reduce操作對它的輸出值是確定的函數(shù)時,我們的分布式實(shí)現(xiàn)產(chǎn)生,和全部程序沒有錯誤的順序執(zhí)行一樣,相同的輸出.
我們依賴對map和reduce任務(wù)的輸出進(jìn)行原子提交來完成這個性質(zhì).每個工作中的任務(wù)把它的輸出寫到私有臨時文件中.一個reduce任務(wù)產(chǎn)生一個這樣的文件,而一個map任務(wù)產(chǎn)生R個這樣的文件(一個reduce任務(wù)對應(yīng)一個文件).當(dāng)一個map任務(wù)完成的時候,worker發(fā)送一個消息給master,在這個消息中包含這R個臨時文件的名字.如果master從一個已經(jīng)完成的map任務(wù)再次收到一個完成的消息,它將忽略這個消息.否則,它在master的數(shù)據(jù)結(jié)構(gòu)里記錄這R個文件的名字.
當(dāng)一個reduce任務(wù)完成的時候,這個reduce worker原子的把臨時文件重命名成最終的輸出文件.如果相同的reduce任務(wù)在多個機(jī)器上執(zhí)行,多個重命名調(diào)用將被執(zhí)行,并產(chǎn)生相同的輸出文件.我們依賴由底層文件系統(tǒng)提供的原子重命名操作來保證,最終的文件系統(tǒng)狀態(tài)僅僅包含一個reduce任務(wù)產(chǎn)生的數(shù)據(jù).
我們的map和reduce操作大部分都是確定的,并且我們的處理機(jī)制等價于一個順序的執(zhí)行的這個事實(shí),使得程序員可以很容易的理解程序的行為.當(dāng)map或/和reduce操作是不確定的時候,我們提供雖然比較弱但是合理的處理機(jī)制.當(dāng)在一個非確定操作的前面,一個reduce任務(wù)R1的輸出等價于一個非確定順序程序執(zhí)行產(chǎn)生的輸出.然而,一個不同的reduce任務(wù)R2的輸出也許符合一個不同的非確定順序程序執(zhí)行產(chǎn)生的輸出.
考慮map任務(wù)M和reduce任務(wù)R1,R2的情況.我們設(shè)定e(Ri)為已經(jīng)提交的Ri的執(zhí)行(有且僅有一個這樣的執(zhí)行).這個比較弱的語義出現(xiàn),因?yàn)閑(R1)也許已經(jīng)讀取了由M的執(zhí)行產(chǎn)生的輸出,而e(R2)也許已經(jīng)讀取了由M的不同執(zhí)行產(chǎn)生的輸出.
3.4存儲位置
在我們的計(jì)算機(jī)環(huán)境里,網(wǎng)絡(luò)帶寬是一個相當(dāng)缺乏的資源.我們利用把輸入數(shù)據(jù)(由GFS管理)存儲在機(jī)器的本地磁盤上來保存網(wǎng)絡(luò)帶寬.GFS把每個文件分成64MB的一些塊,然后每個塊的幾個拷貝存儲在不同的機(jī)器上(一般是3個拷貝).MapReduce的master考慮輸入文件的位置信息,并且努力在一個包含相關(guān)輸入數(shù)據(jù)的機(jī)器上安排一個map任務(wù).如果這樣做失敗了,它嘗試在那個任務(wù)的輸入數(shù)據(jù)的附近安排一個map任務(wù)(例如,分配到一個和包含輸入數(shù)據(jù)塊在一個switch里的worker機(jī)器上執(zhí)行).當(dāng)運(yùn)行巨大的MapReduce操作在一個機(jī)群中的一部分機(jī)器上的時候,大部分輸入數(shù)據(jù)在本地被讀取,從而不消耗網(wǎng)絡(luò)帶寬.
3.5任務(wù)粒度
象上面描述的那樣,我們細(xì)分map階段成M個片,reduce階段成R個片.M和R應(yīng)當(dāng)比worker機(jī)器的數(shù)量大許多.每個worker執(zhí)行許多不同的工作來提高動態(tài)負(fù)載均衡,也可以加速從一個worker失效中的恢復(fù),這個機(jī)器上的許多已經(jīng)完成的map任務(wù)可以被分配到所有其他的worker機(jī)器上.
在我們的實(shí)現(xiàn)里,M和R的范圍是有大小限制的,因?yàn)閙aster必須做O(M+R)次調(diào)度,并且保存O(MR)個狀態(tài)在內(nèi)存中.(這個因素使用的內(nèi)存是很少的,在O(MR)個狀態(tài)片里,大約每個map任務(wù)/reduce任務(wù)對使用一個字節(jié)的數(shù)據(jù)).
此外,R經(jīng)常被用戶限制,因?yàn)槊恳粋€reduce任務(wù)最終都是一個獨(dú)立的輸出文件.實(shí)際上,我們傾向于選擇M,以便每一個單獨(dú)的任務(wù)大概都是16到64MB的輸入數(shù)據(jù)(以便上面描述的位置優(yōu)化是最有效的),我們把R設(shè)置成我們希望使用的worker機(jī)器數(shù)量的小倍數(shù).我們經(jīng)常執(zhí)行MapReduce計(jì)算,在M=200000,R=5000,使用2000臺工作者機(jī)器的情況下.
3.6備用任務(wù)
一個落后者是延長MapReduce操作時間的原因之一:一個機(jī)器花費(fèi)一個異乎尋常地的長時間來完成最后的一些map或reduce任務(wù)中的一個.有很多原因可能產(chǎn)生落后者.例如,一個有壞磁盤的機(jī)器經(jīng)常發(fā)生可以糾正的錯誤,這樣就使讀性能從30MB/s降低到3MB/s.機(jī)群調(diào)度系統(tǒng)也許已經(jīng)安排其他的任務(wù)在這個機(jī)器上,由于計(jì)算要使用CPU,內(nèi)存,本地磁盤,網(wǎng)絡(luò)帶寬的原因,引起它執(zhí)行MapReduce代碼很慢.我們最近遇到的一個問題是,一個在機(jī)器初始化時的Bug引起處理器緩存的失效:在一個被影響的機(jī)器上的計(jì)算性能有上百倍的影響.
我們有一個一般的機(jī)制來減輕這個落后者的問題.當(dāng)一個MapReduce操作將要完成的時候,master調(diào)度備用進(jìn)程來執(zhí)行那些剩下的還在執(zhí)行的任務(wù).無論是原來的還是備用的執(zhí)行完成了,工作都被標(biāo)記成完成.我們已經(jīng)調(diào)整了這個機(jī)制,通常只會占用多幾個百分點(diǎn)的機(jī)器資源.我們發(fā)現(xiàn)這可以顯著的減少完成大規(guī)模MapReduce操作的時間.作為一個例子,將要在5.3描述的排序程序,在關(guān)閉掉備用任務(wù)的情況下,要比有備用任務(wù)的情況下多花44%的時間.
4技巧
盡管簡單的map和reduce函數(shù)的功能對于大多數(shù)需求是足夠的了,但是我們開發(fā)了一些有用的擴(kuò)充.這些將在這個部分描述.
4.1分割函數(shù)
MapReduce用戶指定reduce任務(wù)和reduce任務(wù)需要的輸出文件的數(shù)量.在中間key上使用分割函數(shù),使數(shù)據(jù)分割后通過這些任務(wù).一個缺省的分割函數(shù)使用hash方法(例如,hash(key) mod R).這個導(dǎo)致非常平衡的分割.然后,有的時候,使用其他的key分割函數(shù)來分割數(shù)據(jù)有非常有用的.例如,有時候,輸出的key是URLs,并且我們希望每個主機(jī)的所有條目保持在同一個輸出文件中.為了支持像這樣的情況,MapReduce庫的用戶可以提供專門的分割函數(shù).例如,使用"hash(Hostname(urlkey)) mod R"作為分割函數(shù),使所有來自同一個主機(jī)的URLs保存在同一個輸出文件中.
4.2順序保證
我們保證在一個給定的分割里面,中間key/value對以key遞增的順序處理.這個順序保證可以使每個分割產(chǎn)出一個有序的輸出文件,當(dāng)輸出文件的格式需要支持有效率的隨機(jī)訪問key的時候,或者對輸出數(shù)據(jù)集再作排序的時候,就很容易.
4.3combiner函數(shù)
在某些情況下,允許中間結(jié)果key重復(fù)會占據(jù)相當(dāng)?shù)谋戎?并且用戶定義的reduce函數(shù)
滿足結(jié)合律和交換律.一個很好的例子就是在2.1部分的詞統(tǒng)計(jì)程序.因?yàn)樵~頻率傾向于一個zipf分布(齊夫分布),每個map任務(wù)將產(chǎn)生成百上千個這樣的記錄<the,1>.所有的這些計(jì)數(shù)將通過網(wǎng)絡(luò)被傳輸?shù)揭粋€單獨(dú)的reduce任務(wù),然后由reduce函數(shù)加在一起產(chǎn)生一個數(shù)字.我們允許用戶指定一個可選的combiner函數(shù),先在本地進(jìn)行合并一下,然后再通過網(wǎng)絡(luò)發(fā)送.
在每一個執(zhí)行map任務(wù)的機(jī)器上combiner函數(shù)被執(zhí)行.一般的,相同的代碼被用在combiner和reduce函數(shù).在combiner和reduce函數(shù)之間唯一的區(qū)別是MapReduce庫怎樣控制函數(shù)的輸出.reduce函數(shù)的輸出被保存最終輸出文件里.combiner函數(shù)的輸出被寫到中間文件里,然后被發(fā)送給reduce任務(wù).
部分使用combiner可以顯著的提高一些MapReduce操作的速度.附錄A包含一個使用combiner函數(shù)的例子.
4.4輸入輸出類型
MapReduce庫支持以幾種不同的格式讀取輸入數(shù)據(jù).例如,文本模式輸入把每一行看作是一個key/value對.key是文件的偏移量,value是那一行的內(nèi)容.其他普通的支持格式以key的順序存儲key/value對序列.每一個輸入類型的實(shí)現(xiàn)知道怎樣把輸入分割成對每個單獨(dú)的map任務(wù)來說是有意義的(例如,文本模式的范圍分割確保僅僅在每行的邊界進(jìn)行范圍分割).雖然許多用戶僅僅使用很少的預(yù)定意輸入類型的一個,但是用戶可以通過提供一個簡單的reader接口來支持一個新的輸入類型.
一個reader不必要從文件里讀數(shù)據(jù).例如,我們可以很容易的定義它從數(shù)據(jù)庫里讀記錄,或從內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)讀取.
4.5副作用
有的時候,MapReduce的用戶發(fā)現(xiàn)在map操作或/和reduce操作時產(chǎn)生輔助文件作為一個附加的輸出是很方便的.我們依靠應(yīng)用程序?qū)憗硎惯@個副作用成為原子的.一般的,應(yīng)用程序?qū)懸粋€臨時文件,然后一旦這個文件全部產(chǎn)生完,就自動的被重命名.
對于單個任務(wù)產(chǎn)生的多個輸出文件來說,我們沒有提供其上的兩階段提交的原子操作支持.因此,一個產(chǎn)生需要交叉文件連接的多個輸出文件的任務(wù),應(yīng)該使確定性的任務(wù).不過這個限制在實(shí)際的工作中并不是一個問題.
4.6跳過錯誤記錄
有的時候因?yàn)橛脩舻拇a里有bug,導(dǎo)致在某一個記錄上map或reduce函數(shù)突然crash掉.這樣的bug使得MapReduce操作不能完成.雖然一般是修復(fù)這個bug,但是有時候這是不現(xiàn)實(shí)的;也許這個bug是在源代碼不可得到的第三方庫里.有的時候也可以忽略一些記錄,例如,當(dāng)在一個大的數(shù)據(jù)集上進(jìn)行統(tǒng)計(jì)分析.我們提供一個可選的執(zhí)行模式,在這個模式下,MapReduce庫檢測那些記錄引起的crash,然后跳過那些記錄,來繼續(xù)執(zhí)行程序.
每個worker程序安裝一個信號處理器來獲取內(nèi)存段異常和總線錯誤.在調(diào)用一個用戶自定義的map或reduce操作之前,MapReduce庫把記錄的序列號存儲在一個全局變量里.如果用戶代碼產(chǎn)生一個信號,那個信號處理器就會發(fā)送一個包含序號的"last gasp"UDP包給MapReduce的master.當(dāng)master不止一次看到同一個記錄的時候,它就會指出,當(dāng)相關(guān)的map或reduce任務(wù)再次執(zhí)行的時候,這個記錄應(yīng)當(dāng)被跳過.
4.7本地執(zhí)行
調(diào)試在map或reduce函數(shù)中問題是很困難的,因?yàn)閷?shí)際的計(jì)算發(fā)生在一個分布式的系統(tǒng)中,經(jīng)常是有一個master動態(tài)的分配工作給幾千臺機(jī)器.為了簡化調(diào)試和測試,我們開發(fā)了一個可替換的實(shí)現(xiàn),這個實(shí)現(xiàn)在本地執(zhí)行所有的MapReduce操作.用戶可以控制執(zhí)行,這樣計(jì)算可以限制到特定的map任務(wù)上.用戶以一個標(biāo)志調(diào)用他們的程序,然后可以容易的使用他們認(rèn)為好用的任何調(diào)試和測試工具(例如,gdb).
4.8狀態(tài)信息
master運(yùn)行一個HTTP服務(wù)器,并且可以輸出一組狀況頁來供人們使用.狀態(tài)頁顯示計(jì)算進(jìn)度,象多少個任務(wù)已經(jīng)完成,多少個還在運(yùn)行,輸入的字節(jié)數(shù),中間數(shù)據(jù)字節(jié)數(shù),輸出字節(jié)數(shù),處理百分比,等等.這個頁也包含到標(biāo)準(zhǔn)錯誤的鏈接,和由每個任務(wù)產(chǎn)生的標(biāo)準(zhǔn)輸出的鏈接.用戶可以根據(jù)這些數(shù)據(jù)預(yù)測計(jì)算需要花費(fèi)的時間,和是否需要更多的資源.當(dāng)計(jì)算比預(yù)期的要慢很多的時候,這些頁面也可以被用來判斷是不是這樣.
此外,最上面的狀態(tài)頁顯示已經(jīng)有多少個工作者失敗了,和當(dāng)它們失敗的時候,那個map和reduce任務(wù)正在運(yùn)行.當(dāng)試圖診斷在用戶代碼里的bug時,這個信息也是有用的.
4.9計(jì)數(shù)器
MapReduce庫提供一個計(jì)數(shù)器工具,來計(jì)算各種事件的發(fā)生次數(shù).例如,用戶代碼想要計(jì)算所有處理的詞的個數(shù),或者被索引的德文文檔的數(shù)量.
為了使用這個工具,用戶代碼創(chuàng)建一個命名的計(jì)數(shù)器對象,然后在map或/和reduce函數(shù)里適當(dāng)?shù)脑黾佑?jì)數(shù)器.例如:
Counter * uppercase;
uppercase=GetCounter("uppercase");
map(String name,String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,"1");
來自不同worker機(jī)器上的計(jì)數(shù)器值被周期性的傳送給master(在ping回應(yīng)里).master把來自成功的map和reduce任務(wù)的計(jì)數(shù)器值加起來,在MapReduce操作完成的時候,把它返回給用戶代碼.當(dāng)前計(jì)數(shù)器的值也被顯示在master狀態(tài)頁里,以便人們可以查看實(shí)際的計(jì)算進(jìn)度.當(dāng)計(jì)算計(jì)數(shù)器值的時候消除重復(fù)執(zhí)行的影響,避免數(shù)據(jù)的累加.(在備用任務(wù)的使用,和由于出錯的重新執(zhí)行,可以產(chǎn)生重復(fù)執(zhí)行)
有些計(jì)數(shù)器值被MapReduce庫自動的維護(hù),比如,被處理的輸入key/value對的數(shù)量,和被產(chǎn)生的輸出key/value對的數(shù)量.
用戶發(fā)現(xiàn)計(jì)數(shù)器工具對于檢查MapReduce操作的完整性很有用.例如,在一些MapReduce操作中,用戶代碼也許想要確保輸出對的數(shù)量完全等于輸入對的數(shù)量,或者處理過的德文文檔的數(shù)量是在全部被處理的文檔數(shù)量中屬于合理的范圍.
5性能
在本節(jié),我們用在一個大型集群上運(yùn)行的兩個計(jì)算來衡量MapReduce的性能.一個計(jì)算用來在一個大概1TB的數(shù)據(jù)中查找特定的匹配串.另一個計(jì)算排序大概1TB的數(shù)據(jù).
這兩個程序代表了MapReduce的用戶實(shí)現(xiàn)的真實(shí)的程序的一個大子集.一類是,把數(shù)據(jù)從一種表示轉(zhuǎn)化到另一種表示.另一類是,從一個大的數(shù)據(jù)集中提取少量的關(guān)心的數(shù)據(jù).
5.1機(jī)群配置
所有的程序在包含大概1800臺機(jī)器的機(jī)群上執(zhí)行.機(jī)器的配置是:2個2G的Intel Xeon超線程處理器,4GB內(nèi)存,兩個160GB IDE磁盤,一個千兆網(wǎng)卡.這些機(jī)器部署在一個由兩層的,樹形交換網(wǎng)絡(luò)中,在根節(jié)點(diǎn)上大概有100到2000G的帶寬.所有這些機(jī)器都有相同的部署(對等部署),因此任意兩點(diǎn)之間的來回時間小于1毫秒.
在4GB的內(nèi)存里,大概有1-1.5GB被用來運(yùn)行在機(jī)群中其他的任務(wù).這個程序是在周末的下午開始執(zhí)行的,這個時候CPU,磁盤,網(wǎng)絡(luò)基本上是空閑的.
5.2Grep
這個Grep程序掃描大概10^10個,每個100字節(jié)的記錄,查找比較少的3字符的查找串(這個查找串出現(xiàn)在92337個記錄中).輸入數(shù)據(jù)被分割成大概64MB的片(M=15000),全部 的輸出存放在一個文件中(R=1).
圖2顯示計(jì)算過程隨時間變化的情況.Y軸表示輸入數(shù)據(jù)被掃描的速度.隨著更多的機(jī)群被分配給這個MapReduce計(jì)算,速度在逐步的提高,當(dāng)有1764個worker的時候這個速度達(dá)到最高的30GB/s.當(dāng)map任務(wù)完成的時候,速度開始下降,在計(jì)算開始后80秒,輸入的速度降到0.這個計(jì)算持續(xù)的時間大概是150秒.這包括了前面大概一分鐘的啟動時間.啟動時間用來把程序傳播到所有的機(jī)器上,等待GFS打開1000個輸入文件,得到必要的位置優(yōu)化信息.
5.3排序
這個sort程序排序10^10個記錄,每個記錄100個字節(jié)(大概1TB的數(shù)據(jù)).這個程序是模仿TeraSort的.
這個排序程序只包含不到50行的用戶代碼.其中有3行map函數(shù)用來從文本行提取10字節(jié)的排序key,并且產(chǎn)生一個由這個key和原始文本行組成的中間key/value對.我們使用一個內(nèi)置的Identity函數(shù)作為reduce操作.這個函數(shù)直接把中間key/value對作為輸出的key/value對.最終的排序輸出寫到一個2路復(fù)制的GFS文件中(也就是,程序的輸出會寫2TB的數(shù)據(jù)).
象以前一樣,輸入數(shù)據(jù)被分割成64MB的片(M=15000).我們把排序后的輸出寫到4000個文件中(R=4000).分區(qū)函數(shù)使用key的原始字節(jié)來把數(shù)據(jù)分區(qū)到R個小片中.
我們以這個基準(zhǔn)的分割函數(shù),知道key的分布情況.在一般的排序程序中,我們會增加一個預(yù)處理的MapReduce操作,這個操作用于采樣key的情況,并且用這個采樣的key的分布情況來計(jì)算對最終排序處理的分割點(diǎn)。
圖3(a)顯示這個排序程序的正常執(zhí)行情況.左上圖顯示輸入數(shù)據(jù)的讀取速度.這個速度最高到達(dá)13GB/s,并且在不到200秒所有map任務(wù)完成之后迅速滑落到0.注意到這個輸入速度小于Grep.這是因?yàn)檫@個排序map任務(wù)花費(fèi)大概一半的時間和帶寬,來把中間數(shù)據(jù)寫到本地硬盤中.而Grep相關(guān)的中間數(shù)據(jù)可以忽略不計(jì).
左中圖顯示數(shù)據(jù)通過網(wǎng)絡(luò)從map任務(wù)傳輸給reduce任務(wù)的速度.當(dāng)?shù)谝粋€map任務(wù)完成后,這個排序過程就開始了.圖示上的第一個高峰是啟動了第一批大概1700個reduce任務(wù)(整個MapReduce任務(wù)被分配到1700臺機(jī)器上,每個機(jī)器一次只執(zhí)行一個reduce任務(wù)).大概開始計(jì)算后的300秒,第一批reduce任務(wù)中的一些完成了,我們開始執(zhí)行剩下的reduce任務(wù).全部的排序過程持續(xù)了大概600秒的時間.
左下圖顯示排序后的數(shù)據(jù)被reduce任務(wù)寫入最終文件的速度.因?yàn)闄C(jī)器忙于排序中間數(shù)據(jù),所以在第一個排序階段的結(jié)束和寫階段的開始有一個延遲.寫的速度大概是2-4GB/s.大概開始計(jì)算后的850秒寫過程結(jié)束.包括前面的啟動過程,全部的計(jì)算任務(wù)持續(xù)的891秒.這個和TeraSort benchmark的最高紀(jì)錄1057秒差不多.
需要注意的事情是:因此位置優(yōu)化的原因,很多數(shù)據(jù)都是從本地磁盤讀取的而沒有通過我們有限帶寬的網(wǎng)絡(luò),所以輸入速度比排序速度和輸出速度都要快.排序速度比輸出速度快的原因是輸出階段寫兩個排序后數(shù)據(jù)的拷貝(我們寫兩個副本的原因是為了可靠性和可用性).我們寫兩份的原因是因?yàn)榈讓游募到y(tǒng)的可靠性和可用性的要求.如果底層文件系統(tǒng)用類似容錯編碼(erasure coding)的方式,而不采用復(fù)制寫的方式,在寫盤階段可以降低網(wǎng)絡(luò)帶寬的要求。
5.4備用任務(wù)的影響
在圖3(b)中,顯示我們不用備用任務(wù)的排序程序的執(zhí)行情況.除了它有一個很長的幾乎沒有寫動作發(fā)生的尾巴外,執(zhí)行流程和圖3(a)相似.在960秒后,只有5個reduce任務(wù)沒有完成.然而,就是這最后幾個落后者知道300秒后才完成.全部的計(jì)算任務(wù)執(zhí)行了1283秒,多花了44%的時間.
5.5機(jī)器失效
在圖3(c)中,顯示我們有意的在排序程序計(jì)算過程中停止1746臺worker中的200臺機(jī)器上的程序的情況.底層機(jī)群調(diào)度者在這些機(jī)器上馬上重新開始新的worker程序(因?yàn)閮H僅程序被停止,而機(jī)器仍然在正常運(yùn)行).
因?yàn)橐呀?jīng)完成的map工作丟失了(由于相關(guān)的map worker被殺掉了),需要重新再作,所以worker死掉會導(dǎo)致一個負(fù)數(shù)的輸入速率.相關(guān)map任務(wù)的重新執(zhí)行很快就重新執(zhí)行了.整個計(jì)算過程在933秒內(nèi)完成,包括了前邊的啟動時間(只比正常執(zhí)行時間多了5%的時間).
6經(jīng)驗(yàn)
我們在2003年的2月寫了MapReduce庫的第一個版本,并且在2003年的8月做了顯著的增強(qiáng),包括位置優(yōu)化,worker機(jī)器間任務(wù)執(zhí)行的動態(tài)負(fù)載均衡,等等.從那個時候起,我們驚奇的發(fā)現(xiàn)MapReduce函數(shù)庫廣泛用于我們?nèi)粘L幚淼膯栴}.它現(xiàn)在在Google內(nèi)部各個領(lǐng)域內(nèi)廣泛應(yīng)用,包括:
大規(guī)模機(jī)器學(xué)習(xí)問題
Google News和Froogle產(chǎn)品的機(jī)器問題.
提取數(shù)據(jù)產(chǎn)生一個流行查詢的報(bào)告(例如,Google Zeitgeist).
為新的試驗(yàn)和產(chǎn)品提取網(wǎng)頁的屬性(例如,從一個web頁的大集合中提取位置信息 用在位置查詢).
大規(guī)模的圖計(jì)算.
圖4顯示了我們主要的源代碼管理系統(tǒng)中,隨著時間推移,MapReduce程序的顯著增加,從2003年早先時候的0個增長到2004年9月份的差不多900個不同的程序.MapReduce之所以這樣的成功,是因?yàn)樗軌蛟诓坏桨胄r時間內(nèi)寫出一個簡單的能夠應(yīng)用于上千臺機(jī)器的大規(guī)模并發(fā)程序,并且極大的提高了開發(fā)和原形設(shè)計(jì)的周期效率.并且,他可以讓一個完全沒有分布式和/或并行系統(tǒng)經(jīng)驗(yàn)的程序員,能夠很容易的利用大量的資源.
在每一個任務(wù)結(jié)束的時候,MapReduce函數(shù)庫記錄使用的計(jì)算資源的統(tǒng)計(jì)信息.在圖1里,我們列出了2004年8月份在Google運(yùn)行的一些MapReduce的工作的統(tǒng)計(jì)信息.
6.1大規(guī)模索引
到目前為止,最成功的MapReduce的應(yīng)用就是重寫了Google web 搜索服務(wù)所使用到的index系統(tǒng).索引系統(tǒng)處理爬蟲系統(tǒng)抓回來的超大量的文檔集,這些文檔集保存在GFS文件里.這些文檔的原始內(nèi)容的大小,超過了20TB.索引程序是通過一系列的,大概5到10次MapReduce操作來建立索引.通過利用MapReduce(替換掉上一個版本的特別設(shè)計(jì)的分布處理的索引程序版本)有這樣一些好處:
索引的代碼簡單,量少,容易理解,因?yàn)槿蒎e,分布式,并行處理都隱藏在MapReduce庫中了.例如,當(dāng)使用MapReduce函數(shù)庫的時候,計(jì)算的代碼行數(shù)從原來的3800行C++代碼一下減少到大概700行代碼.
MapReduce的函數(shù)庫的性能已經(jīng)非常好,所以我們可以把概念上不相關(guān)的計(jì)算步驟分開處理,而不是混在一起以期減少在數(shù)據(jù)上的處理.這使得改變索引過程很容易.例如,我們對老索引系統(tǒng)的一個小更改可能要好幾個月的時間,但是在新系統(tǒng)內(nèi),只需要花幾天時間就可以了.
索引系統(tǒng)的操作更容易了,這是因?yàn)闄C(jī)器的失效,速度慢的機(jī)器,以及網(wǎng)絡(luò)失效都已經(jīng)由MapReduce自己解決了,而不需要操作人員的交互.另外,我們可以簡單的通過對索引系統(tǒng)增加機(jī)器的方式提高處理性能.
7相關(guān)工作
很多系統(tǒng)都提供了嚴(yán)格的設(shè)計(jì)模式,并且通過對編程的嚴(yán)格限制來實(shí)現(xiàn)自動的并行計(jì)算.例如,一個結(jié)合函數(shù)可以通過N個元素的數(shù)組的前綴在N個處理器上使用并行前綴計(jì)算在log N的時間內(nèi)計(jì)算完.MapReduce是基于我們的大型現(xiàn)實(shí)計(jì)算的經(jīng)驗(yàn),對這些模型的一個簡化和精煉.并且,我們還提供了基于上千臺處理器的容錯實(shí)現(xiàn).而大部分并發(fā)處理系統(tǒng)都只在小規(guī)模的尺度上實(shí)現(xiàn),并且機(jī)器的容錯還是程序員來控制的.
Bulk Synchronous Programming以及一些MPI primitives提供了更高級別的抽象,可以更容易寫出并行處理的程序.這些系統(tǒng)和MapReduce系統(tǒng)的不同之處在,MapReduce利用嚴(yán)格的編程模式自動實(shí)現(xiàn)用戶程序的并發(fā)處理,并且提供了透明的容錯處理.
我們本地的優(yōu)化策略是受active disks等技術(shù)的啟發(fā),在active disks中,計(jì)算任務(wù)是盡量推送到靠近本地磁盤的處理單元上,這樣就減少了通過I/O子系統(tǒng)或網(wǎng)絡(luò)的數(shù)據(jù)量.我們在少量磁盤直接連接到普通處理機(jī)運(yùn)行,來代替直接連接到磁盤控制器的處理機(jī)上,但是一般的步驟是相似的.
我們的備用任務(wù)的機(jī)制和在Charlotte系統(tǒng)上的積極調(diào)度機(jī)制相似.這個簡單的積極調(diào)度的一個缺陷是,如果一個任務(wù)引起了一個重復(fù)性的失敗,那個整個計(jì)算將無法完成.我們通過在故障情況下跳過故障記錄的機(jī)制,在某種程度上解決了這個問題.
MapReduce實(shí)現(xiàn)依賴一個內(nèi)置的機(jī)群管理系統(tǒng)來在一個大規(guī)模共享機(jī)器組上分布和運(yùn)行用戶任務(wù).雖然這個不是本論文的重點(diǎn),但是集群管理系統(tǒng)在理念上和Condor等其他系統(tǒng)是一樣的.
在MapReduce庫中的排序工具在操作上和NOW-Sort相似.源機(jī)器(map worker)分割將要被排序的數(shù)據(jù),然后把它發(fā)送到R個reduce worker中的一個上.每個reduce worker來本地排序它的數(shù)據(jù)(如果可能,就在內(nèi)存中).當(dāng)然,NOW-Sort沒有用戶自定義的map和reduce函數(shù),使得我們的庫可以廣泛的應(yīng)用.
River提供一個編程模型,在這個模型下,處理進(jìn)程可以靠在分布式的隊(duì)列上發(fā)送數(shù)據(jù)進(jìn)行彼此通訊.和MapReduce一樣,River系統(tǒng)嘗試提供對不同應(yīng)用有近似平均的性能,即使在不對等的硬件環(huán)境下或者在系統(tǒng)顛簸的情況下也能提供近似平均的性.River是通過精心調(diào)度硬盤和網(wǎng)絡(luò)的通訊,來平衡任務(wù)的完成時間.MapReduce不和它不同.利用嚴(yán)格編程模型,MapReduce構(gòu)架來把問題分割成大量的任務(wù).這些任務(wù)被自動的在可用的worker上調(diào)度,以便速度快的worker可以處理更多的任務(wù).這個嚴(yán)格編程模型也讓我們可以在工作快要結(jié)束的時候安排冗余的執(zhí)行,來在非一致處理的情況減少完成時間(比如,在有慢機(jī)或者阻塞的worker的時候).
BAD-FS是一個很MapReduce完全不同的編程模型,它的目標(biāo)是在一個廣闊的網(wǎng)絡(luò)上執(zhí)行工作.然而,它們有兩個基本原理是相同的.(1)這兩個系統(tǒng)使用冗余的執(zhí)行來從由失效引起的數(shù)據(jù)丟失中恢復(fù).(2)這兩個系統(tǒng)使用本地化調(diào)度策略,來減少通過擁擠的網(wǎng)絡(luò)連接發(fā)送的數(shù)據(jù)數(shù)量.
TACC是一個被設(shè)計(jì)用來簡化高有效性網(wǎng)絡(luò)服務(wù)結(jié)構(gòu)的系統(tǒng).和MapReduce一樣,它通過再次執(zhí)行來實(shí)現(xiàn)容錯.
8結(jié)束語
MapReduce編程模型已經(jīng)在Google成功的用在不同的目的.我們把這個成功歸于以下幾個原因:第一,這個模型使用簡單,甚至對沒有并行和分布式經(jīng)驗(yàn)的程序員也是如此,因?yàn)樗[藏了并行化,容錯,位置優(yōu)化和負(fù)載均衡的細(xì)節(jié).第二,大量不同的問題可以用MapReduce計(jì)算來表達(dá).例如,MapReduce被用來,為Google的產(chǎn)品web搜索服務(wù),排序,數(shù)據(jù)挖掘,機(jī)器學(xué)習(xí),和其他許多系統(tǒng),產(chǎn)生數(shù)據(jù).第三,我們已經(jīng)在一個好幾千臺計(jì)算機(jī)的大型集群上開發(fā)實(shí)現(xiàn)了這個MapReduce.這個實(shí)現(xiàn)使得對于這些機(jī)器資源的利用非常簡單,因此也適用于解決Google遇到的其他很多需要大量計(jì)算的問題.
從這個工作中我們也學(xué)習(xí)到了一些東西.首先,嚴(yán)格的編程模型使得并行化和分布式計(jì)算簡單,并且也易于構(gòu)造這樣的容錯計(jì)算環(huán)境.第二,網(wǎng)絡(luò)帶寬是系統(tǒng)的瓶頸.因此在我們的系統(tǒng)中大量的優(yōu)化目標(biāo)是減少通過網(wǎng)絡(luò)發(fā)送的數(shù)據(jù)量,本地優(yōu)化使用我們從本地磁盤讀取數(shù)據(jù),并且把中間數(shù)據(jù)寫到本地磁盤,以保留網(wǎng)絡(luò)帶寬.第三,冗余的執(zhí)行可以用來減少速度慢的機(jī)器的影響,和控制機(jī)器失效和數(shù)據(jù)丟失.
感謝
Josh Levenberg校定和擴(kuò)展了用戶級別的MapReduce API,并且結(jié)合他的適用經(jīng)驗(yàn)和其他人的改進(jìn)建議,增加了很多新的功能.MapReduce從GFS中讀取和寫入數(shù)據(jù).我們要感謝Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他們在開發(fā)GFS中的工作.我們還感謝Percy Liang Olcan Sercinoglu 在開發(fā)用于MapReduce的集群管理系統(tǒng)得工作.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach為本論文提出了寶貴的意見.OSDI的無名審閱者,以及我們的審核者Eric Brewer,在論文應(yīng)當(dāng)如何改進(jìn)方面給出了有益的意見.最后,我們感謝Google的工程部的所有MapReduce的用戶,感謝他們提供了有用的反饋,建議,以及錯誤報(bào)告等等.
A單詞頻率統(tǒng)計(jì)
本節(jié)包含了一個完整的程序,用于統(tǒng)計(jì)在一組命令行指定的輸入文件中,每一個不同的單詞出現(xiàn)頻率.
#include "mapreduce/mapreduce.h"
//用戶map函數(shù)
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
//跳過前導(dǎo)空格
while ((i < n) && isspace(text[i]))
i++;
// 查找單詞的結(jié)束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//用戶的reduce函數(shù)
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//迭代具有相同key的所有條目,并且累加它們的value
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//提交這個輸入key的綜合
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 把輸入文件列表存入"spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定輸出文件:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可選操作:在map任務(wù)中做部分累加工作,以便節(jié)省帶寬
out->set_combiner_class("Adder");
// 調(diào)整參數(shù): 使用2000臺機(jī)器,每個任務(wù)100MB內(nèi)存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 運(yùn)行它
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成: 'result'結(jié)構(gòu)包含計(jì)數(shù),花費(fèi)時間,和使用機(jī)器的信息
return 0;
}