6.824 Note1: MapReduce (2004)

一:?jiǎn)栴}背景

很多計(jì)算任務(wù)涉及到海量數(shù)據(jù)的處理,想要在可以接受的時(shí)間內(nèi)完成計(jì)算任務(wù),就必須將這些任務(wù)分布到成百上千的機(jī)器上運(yùn)行。

如何分發(fā)數(shù)據(jù),任務(wù)調(diào)度,處理容錯(cuò),這些問(wèn)題需要大量的代碼來(lái)處理。

因此實(shí)現(xiàn)一個(gè)分布式的任務(wù)需要處理任務(wù)本身的代碼+實(shí)現(xiàn)分布式的大量額外代碼;

為了解決以上問(wèn)題,MapReduce應(yīng)運(yùn)而生。

MapReduce是一個(gè)編程模型,隱藏了關(guān)于并行計(jì)算、容錯(cuò)、數(shù)據(jù)分布、負(fù)載均衡這些細(xì)節(jié)。

即:用戶只用表述想要執(zhí)行的簡(jiǎn)單操作,MapReduce可以負(fù)責(zé)實(shí)現(xiàn)自動(dòng)的并行化和分布式計(jì)算任務(wù);

二:編程模型

MapReduce的用戶將任務(wù)劃分為兩個(gè)計(jì)算操作Map() 和Reduce() 。

  • Map()接受輸入文件,輸出一個(gè) key/value 鍵值對(duì)的集合;
  • MapReduce模型負(fù)責(zé)將 Map()函數(shù)產(chǎn)生的鍵值對(duì)的集合中,相同的 key 值的value值集合到一起,傳遞給Reduce()函數(shù)。
  • Reduce()接受一個(gè) key 值和相應(yīng)的 value 集合,合并這些value值,輸出一個(gè) key/value 鍵值對(duì);

統(tǒng)計(jì)單詞出現(xiàn)次數(shù)的示例:

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

三:實(shí)現(xiàn)

3.1 執(zhí)行過(guò)程
image
  1. The MapReduce library in the user program firstsplits the input files into M pieces of typically 16megabytes to 64 megabytes (MB) per piece (con-trollable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.

  2. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.

    Task:M+N > Worker

  3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.

    Map階段:讀取文件內(nèi)容,調(diào)用map()函數(shù),寫(xiě)入中間文件;

  4. Periodically, the buffered pairs are written to localdisk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.

    Map任務(wù)成功,返回中間文件的位置信息;

  5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all in-termediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit inmemory, an external sort is used

    Reduce階段:獲取key region的所有中間文件內(nèi)容,排序生成key-values集合,調(diào)用reduce()函數(shù),寫(xiě)入輸出文件;

  6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key en-countered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.

  7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user pro- gram returns back to the user code.

3.2 Master數(shù)據(jù)結(jié)構(gòu)

Master存儲(chǔ)每一個(gè)Map任務(wù)和Reduce任務(wù)的狀態(tài):空閑、工作、完成;以及非空閑任務(wù)的worker的機(jī)器標(biāo)示;
Master存儲(chǔ)中間文件的位置信息,因此Map任務(wù)完成時(shí),對(duì)應(yīng)的中間文件位置信息也會(huì)更新,最終傳遞給Reduce任務(wù);

3.3 Fault Tolerance
  • Worker Failer
    master周期性ping worker,超時(shí)標(biāo)記為fail。
    這個(gè)worker正在運(yùn)行的map任務(wù)或reduce任務(wù)將被重置為空閑狀態(tài),等待調(diào)度;
    這個(gè)worker已經(jīng)完成的所有map任務(wù)也將重置為空閑狀態(tài),等待調(diào)度;
    其他worker正在運(yùn)行的reduce任務(wù)也將重置為空閑狀態(tài),等待調(diào)度;

    已經(jīng)完成的Map任務(wù)文件存儲(chǔ)在本地磁盤(pán),節(jié)點(diǎn)故障后無(wú)法訪問(wèn),需要重新執(zhí)行;
    已經(jīng)完成的Reduce任務(wù)文件在全局文件系統(tǒng)GFS,節(jié)點(diǎn)故障也沒(méi)關(guān)系,不用重新執(zhí)行;

  • Master Failer
    一個(gè)簡(jiǎn)單的解決辦法是讓master周期性的將上面描述的數(shù)據(jù)結(jié)構(gòu)寫(xiě)入磁盤(pán),即檢查點(diǎn)(checkpoint)。
    如果這個(gè)master任務(wù)失效了,可以從最后一個(gè)檢查點(diǎn)開(kāi)始啟動(dòng)另一個(gè)master進(jìn)程。
    然而,由于只有一個(gè)master進(jìn)程,master失效后再恢復(fù)是比較麻煩的,因此我們現(xiàn)在的實(shí)現(xiàn)是如果master失效,就中止MapReduce運(yùn)算。客戶可以檢查到這個(gè)狀態(tài),并且可以根據(jù)需要重新執(zhí)行MapReduce操作。
3.4 其他
  • Locality:輸入數(shù)據(jù)由GFS管理,3副本,master調(diào)度map任務(wù)時(shí)會(huì)考慮數(shù)據(jù)文件的位置信息;

  • Backup Tasks:影響一個(gè)mapreduce的總執(zhí)行時(shí)間的是“落伍者”,當(dāng)一個(gè) MapReduce 操作接近完成的時(shí)候,master調(diào)度備用(backup)任務(wù)進(jìn)程來(lái)執(zhí)行剩下的、處于處理中狀態(tài)(in-progress)的任務(wù)。無(wú)論是最初的執(zhí)行進(jìn)程、 還是備用(backup)任務(wù)進(jìn)程完成了任務(wù),我們都把這個(gè)任務(wù)標(biāo)記成為已經(jīng)完成。

  • ?map全部執(zhí)行完畢后,才執(zhí)行reduce?No Reduce calls until all Maps are finished;

  • load balance : many more tasks than workers, fast workers do more. Task數(shù)遠(yuǎn)多于worker數(shù),性能好的機(jī)器執(zhí)行多任務(wù),性能差的機(jī)器執(zhí)行少任務(wù),從而提高集群的動(dòng)態(tài)的負(fù)載均衡能力。

  • What if the master gives two workers the same Map() task?
    perhaps the master incorrectly thinks one worker died.
    it will tell Reduce workers about only one of them.

  • What if the master gives two workers the same Reduce() task?
    they will both try to write the same output file on GFS!
    atomic GFS rename prevents mixing; one complete file will be visible.

  • What if a worker computes incorrect output, due to broken h/w or s/w?
    too bad! MR assumes "fail-stop" CPUs and software.

四:總結(jié)

MapReduce single-handedly made big cluster computation popular.

  • Not the most efficient or flexible.
  • Scales well.
  • Easy to program -- failures and data movement are hidden.
    These were good trade-offs in practice.

[2017.9 夢(mèng)工廠]

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容