一、概述
MapReduce是一種分布式計算模型,由兩個階段組成:Map和Reduce,用戶只需要實現map()和reduce()兩個函數,即可實現分布式計算。
這兩個函數的形參是key、value對,表示函數的輸入信息。
其中map(),輸入鍵值對<k1,v1>,輸出鍵值對<k2,v2>;
reduce(),輸入鍵值對<k2,{v2}>,輸出鍵值對<k3,v3>。
二、MapReduce原理

三、Map任務處理
1、讀取輸入文件內容,解析成key、value對。對輸入文件的每一行,解析成 key、value對。每一個鍵值對調用一次map函數。
2 、寫自己的邏輯,處理輸入的key、value,轉換成新的key、value輸出。
3 、對輸出的key、value進行分區(qū)。
4 、對不同分區(qū)的數據,按照key進行排序、分組。相同key的value放到一個集合中。
5? 、 (可選)分組后的數據進行歸約。
四、Reduce任務處理
1、對多個map任務的輸出,按照不同的分區(qū),通過網絡copy到不同的reduce節(jié)點。
2 、對多個map任務的輸出進行合并、排序。寫reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
3 、把reduce的輸出保存到HDFS文件中。
五、Map任務和Reduce任務舉例
假如我們要做一個詞頻統計的任務,在HDFS中的words文件內容
YSL 17
紀梵希 303
香奈兒 46
YSL 12
YSL 14
Map輸入這一步由MapReduce框架自動完成,

Reduce輸入這一步由MapReduce框架自動完成,

六、MapReduce流程分析
1、過程各個角色的作用
jobClient:提交作業(yè)
JobTracker:初始化作業(yè),分配作業(yè),TaskTracker與其進行通信,協調監(jiān)控整個作業(yè)
TaskTracker:定期與JobTracker通信,執(zhí)行Map和Reduce任務
HDFS:保存作業(yè)的數據、配置、jar包、結果

2、作業(yè)提交流程
(1)提交作業(yè)準備:編寫自己的MR程序; 配置作業(yè),包括輸入輸出路徑等等。
(2)提交作業(yè):配置完成后,通過JobClient提交。
(3)具體功能:與JobTracker通信得到一個jar的存儲路徑和JobId,輸入輸出路徑檢查、將jobj ar拷貝到的HDFS。
3、作業(yè)初始化
客戶端提交作業(yè)后,JobTracker會將作業(yè)加入到隊列,然后進行調度,默認是FIFO方式。
具體功能:作業(yè)初始化主要是指JobInProgress中完成的。
讀取分片信息。創(chuàng)建task包括Map和Reduce任創(chuàng)建task包括Map和Reduce任務。
創(chuàng)建TaskInProgress執(zhí)行task,包括map任務和reduce任務。
4、任務分配
TaskTracker與JobTracker之間的通信和任務分配是通過心跳機制實現的
TaskTracker會主動定期向JobTracker發(fā)送報告,詢問是否有任務要做,如果有,就會申請到任務。
5、任務執(zhí)行
如果TaskTracker拿到任務,會將所有信息拷貝到本地,包括代碼、配置、分片信息等。
TaskTracker中的localizeJob()方法會被調用進行本地化,拷貝job.jar,jobconf,job.xml到本地。
TaskTracker調用launchTaskForJob()方法加載啟動任務。
MapTaskRunner和ReduceTaskRunner分別啟動java child進程來執(zhí)行相應的任務。
6、錯誤處理
a、JobTracker失敗
存在單點故障,hadoop2.0解決了這個問題
b、TraskTracker失敗
TraskTracker崩潰了會停止向JobTracker發(fā)送心跳信息。
JobTracker會將TraskTracker從等待的任務池中移除,并將該任務轉移到其他的地方執(zhí)行
JobTracker將TaskTracker加入到黑名單中
c、Task失敗
任務失敗,會向TraskTracker拋出異常,最后任務掛起