一、MapReduce 的運(yùn)行及 WordCount 程序
1. MapReduce 運(yùn)行過(guò)程
MapReduce 充分借鑒了分而治之的思想來(lái)處理海量數(shù)據(jù),當(dāng)一臺(tái)機(jī)器對(duì)龐大的數(shù)據(jù)力有未逮時(shí),便可以通過(guò)搭建 MapReduce 集群來(lái)進(jìn)行數(shù)據(jù)的處理。在集群中,多臺(tái)機(jī)器并行計(jì)算,執(zhí)行同一邏輯任務(wù)。集群中的機(jī)器分為三類(lèi):Master、Mapper 和 Reducer。Master 是整個(gè)集群的主機(jī),負(fù)責(zé)任務(wù)的調(diào)度,為 Mapper 和 Reducer 分配對(duì)應(yīng)的 map 和 reduce 任務(wù)。map 任務(wù)被稱為映射,就是對(duì)數(shù)據(jù)的每一個(gè)元素進(jìn)行指定的操作。reduce 任務(wù)被稱為歸約,它會(huì)對(duì) map 任務(wù)的輸出進(jìn)行處理從而得出最終的結(jié)果。
簡(jiǎn)單地舉個(gè)例子,現(xiàn)在需要統(tǒng)計(jì)圖書(shū)館中的所有書(shū)。你數(shù)1號(hào)書(shū)架,我數(shù)2號(hào)書(shū)架。這就是“Map”。我們?nèi)嗽蕉?,?shù)書(shū)就更快。
現(xiàn)在我們到一起,把所有人的統(tǒng)計(jì)數(shù)加在一起。這就是“Reduce”。

在編程時(shí),MapReduce 計(jì)算框架將數(shù)據(jù)處理的過(guò)程簡(jiǎn)化為 Map 和 Reduce 兩步。假設(shè)我們有一個(gè)巨大的數(shù)據(jù)集,里面有海量規(guī)模的元素,元素的個(gè)數(shù)為 M,每個(gè)元素都需要進(jìn)行同一個(gè)函數(shù)處理。于是將 M 分成許多小份,然后每一份分給一個(gè) Mapper 來(lái)做,Mapper 執(zhí)行完函數(shù),將結(jié)果傳給 Reducer。Reducer 之后統(tǒng)計(jì)匯總各個(gè) Mapper 傳過(guò)來(lái)的結(jié)果,得到最后的任務(wù)的答案。用戶只需用 Map 和 Reduce 描述清楚需要處理的問(wèn)題,即可使用 MapReduce 編程框架編寫(xiě) map() 和 reduce() 函數(shù)實(shí)現(xiàn)分布式計(jì)算。
在 MapReduce 編程中,數(shù)據(jù)操作的最小單位是一個(gè)鍵值對(duì)。用戶在使用 MapReduce 編程模型時(shí),首先需要將數(shù)據(jù)抽象為鍵值對(duì)的形式來(lái)作為 map() 函數(shù)的輸入,map() 的結(jié)果將交給 MapReduce 框架進(jìn)行 shfffle 處理,shuffle 的結(jié)果將會(huì)作為 reduce() 函數(shù)的輸入,再得到最終的結(jié)果。
MapReduce 中數(shù)據(jù)的變化過(guò)程如下所示:
{Key1, Value1} -> {Key2, List<Value2>} -> {Key3, Value3}
對(duì)于所有的 MapReduce 程序來(lái)說(shuō),運(yùn)行的基本流程如下所示:
input -> map() -> shuffle -> reduce() ->output
以 WordCount 程序?yàn)槔?,輸入的文本文件?huì)被一行一行讀取為{key, value}格式,key 是當(dāng)前行在文件中的偏移量,value 是該行的內(nèi)容,這些鍵值對(duì)就是 map() 函數(shù)的輸入,如{0, hello world}代表文件剛開(kāi)始的一行為 "hello world"。
map() 函數(shù)需要對(duì)每一行的內(nèi)容按照空格分割,出現(xiàn)某個(gè)單詞時(shí)即記錄一次,得到多個(gè)鍵值對(duì)作為結(jié)果,如{"hello", 1}、{"world", 1}。
shuffle 是 MapReduce 框架的工作,它會(huì)對(duì) map() 的輸出結(jié)果進(jìn)行處理,將相同的 key 的 value 組合成一個(gè)列表,如{"hello", [1, 1, 1]}。
reduce() 函數(shù)只需要將 value 列表中的值相加就可以得到這個(gè)單詞在文本中出現(xiàn)的頻率,最終得到形如{"hello", 3}的結(jié)果。
2. WordCount 的 Mapper 及 Reducer
了解 MapReduce 框架下 WordCount 的流程后,可以得出 Mapper 和 Reducer 分別要處理的工作。
Mapper 需要讀取每行的文本,并以空格分割得到每個(gè)單詞,每得到一個(gè)單詞就記錄一次。定義 Mapper 類(lèi)時(shí),定義其輸入鍵值對(duì)的類(lèi)型為<LongWritable, Text>,LongWritable 是 Hadoop 自定義類(lèi)型,對(duì)應(yīng) Java 中的 Long,用于表示當(dāng)前行在文件中的偏移量;Text 對(duì)應(yīng) Java 中的 String,表示當(dāng)前行的內(nèi)容。Mapper 輸出的鍵值對(duì)類(lèi)型為<Text, IntWritable>,記錄某個(gè)單詞出現(xiàn)一次。
Map 類(lèi)及 map() 函數(shù)如下:
public static class WordMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String wordStr : words) {
word.set(wordStr);
context.write(word, one);
}
}
}
Mapper 的結(jié)果經(jīng)過(guò) shuffle 之后傳遞給 Reducer,Reducer 得到<Text, List<IntWritable>> 類(lèi)型的輸入,只需將 List 中的值相加,就會(huì)得到該單詞在文本中出現(xiàn)的總次數(shù)。需要注意的是,在定義 Reduce 類(lèi)時(shí),Reduce 類(lèi)的輸入類(lèi)型與 Map 的輸出類(lèi)型相同,都是<Text, IntWritable>。但是在生成 reduce() 函數(shù)中,輸入類(lèi)型會(huì)轉(zhuǎn)變?yōu)?lt;Text, Iterable<IntWritable>>以便用戶處理。
Reduce 類(lèi)及 reduce() 函數(shù)如下:
public static class WordReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : values) {
sum += intWritable.get();
}
outputValue.set(sum);
context.write(key, outputValue);
}
}
3. WordCount 完整程序
在 WordCount 程序中,WordMapper 類(lèi)和 WordReducer 都被定義為內(nèi)部靜態(tài)類(lèi),它們繼承自 Hadoop 中基礎(chǔ)的 Mapper 和 Reducer 類(lèi),在繼承時(shí)需要指定輸入輸出的鍵值對(duì)的類(lèi)型。
在主函數(shù)中,我們需要定義一個(gè) Job 并為其配置一系列參數(shù),再指定 MapReduce 程序的輸入輸出路徑,最后通過(guò) job.waitForCompletion(true) 即可運(yùn)行。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyWordCount {
public static class WordMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" "); // 按空格分割一行文本
for (String wordStr : words) {
word.set(wordStr);
context.write(word, one);
}
}
}
public static class WordReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : values) {
sum += intWritable.get(); // 計(jì)算 List<Intwritable> 列表中的值的和
}
outputValue.set(sum);
context.write(key, outputValue);
}
}
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "mywordcount");
job.setJarByClass(MyWordCount.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(WordMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(WordReducer.class);
FileInputFormat.addInputPath(job, new Path("/datas/test/mapred/example1/test_input"));
FileOutputFormat.setOutputPath(job, new Path("/datas/test/mapred/result4-16"));
boolean isSuccess = job.waitForCompletion(true);
System.exit(isSuccess ? 0 : 1);
}
}
二、shuffle 過(guò)程解析
MapReduce 計(jì)算模型主要由三部分組成:Map、Shuffle、Reduce,在編程時(shí),Map 和 Reduce 由用戶設(shè)計(jì),而將 Map 的輸出進(jìn)行進(jìn)一步處理再交給 Reduce 的過(guò)程就是 Shuffle。
MapReduce 整個(gè)流程大致如下:

Spill 的關(guān)鍵操作有三步:partition、sort、group,分別為分區(qū)、排序和分組,這三步操作都與 map 中輸出數(shù)據(jù)的 key 有關(guān)。
Map 操作結(jié)束后進(jìn)入 Shuffle,首先將 Map 端結(jié)果(如<"hello", 1>)寫(xiě)入內(nèi)存中,該內(nèi)存區(qū)默認(rèn)大小為100MB,是一個(gè)環(huán)形的緩沖區(qū)。當(dāng)內(nèi)存使用到80%時(shí),內(nèi)存中的數(shù)據(jù)會(huì)被 spill 到本地磁盤(pán)中。這里的本地磁盤(pán)指的是 map task 所運(yùn)行的 NodeManager 機(jī)器的本地磁盤(pán)。
partition 依據(jù) reduce task 的任務(wù)數(shù)進(jìn)行分區(qū),決定 map 輸出的數(shù)據(jù)將會(huì)被哪個(gè) reduce task 處理。sort 操作會(huì)對(duì)分區(qū)中數(shù)據(jù)進(jìn)行排序。最后分區(qū)中排序完的數(shù)據(jù)會(huì)被寫(xiě)到本地磁盤(pán)的一個(gè)文件中。
以上操作會(huì)反復(fù)進(jìn)行,當(dāng) map 處理數(shù)據(jù)結(jié)束之后,需要將本地磁盤(pán)的文件合并成一個(gè)文件,此時(shí)各個(gè)分區(qū)中的數(shù)據(jù)已經(jīng)進(jìn)行了排序。
Reduce 的輸入數(shù)據(jù)拷貝自 Map 的輸出,map task 處理數(shù)據(jù)結(jié)束之后會(huì)通知 Master,Master 將會(huì)告知 reduce task,Reducer 會(huì)自動(dòng)去讀取需要處理的數(shù)據(jù)。當(dāng) Reducer 獲得所有數(shù)據(jù)之后,對(duì)數(shù)據(jù)進(jìn)行 group,將相同的 key 的 value 合并,得到如<"hello", [1, 1, 1]>的數(shù)據(jù),這就是用戶編寫(xiě)的 reduce() 函數(shù)的輸入。
默認(rèn)情況下,reduce task 的個(gè)數(shù)為1,這是所有的 map 輸出都會(huì)被分配到同一個(gè) reduce task 中,此時(shí)所有的輸出都是有序的。如果需要多個(gè) reduce task,可以通過(guò)job.setNumReduceTasks()進(jìn)行設(shè)置。如果將 reduce task 的數(shù)目設(shè)置為0,則表示整個(gè) MapReduce Job 只有 map task,這時(shí)僅僅使用并行計(jì)算功能對(duì)數(shù)據(jù)進(jìn)行操作,不需要聚合。