WordCount程序是hadoop自帶的案例,我們可以在 hadoop 解壓目錄下找到包含這個程序的 jar 文件(hadoop-mapreduce-examples-2.7.1.jar),該文件所在路徑為 hadoop/share/hadoop/mapreduce。
最簡單的MapReduce應(yīng)用程序至少包含 3 個部分:一個 Map 函數(shù)、一個 Reduce 函數(shù)和一個 main 函數(shù)。在運(yùn)行一個mapreduce計(jì)算任務(wù)時候,任務(wù)過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。main 函數(shù)將作業(yè)控制和文件輸入/輸出結(jié)合起來。
Map過程:并行讀取文本,對讀取的單詞進(jìn)行map操作,每個詞都以<key,value>形式生成。
舉例:
一個有三行文本的文件進(jìn)行MapReduce操作。
1、讀取第一行Hello World Bye World ,分割單詞形成Map:
<Hello,1> <World,1> <Bye,1> <World,1>
2、讀取第二行Hello Hadoop Bye Hadoop ,分割單詞形成Map:
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>
3、讀取第三行Bye Hadoop Hello Hadoop,分割單詞形成Map:
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>
Reduce過程:是對map的結(jié)果進(jìn)行排序,合并,最后得出詞頻。
舉例:
1、經(jīng)過進(jìn)一步處理(combiner),將形成的Map根據(jù)相同的key組合成value數(shù)組:
<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>
2、循環(huán)執(zhí)行Reduce(K,V[]),分別統(tǒng)計(jì)每個單詞出現(xiàn)的次數(shù):
<Bye,3> <Hadoop,4> <Hello,3> <World,2>
WordCount源碼如下:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/**
* TokenizerMapper繼承Mapper類
* Mapper<KEYIN(輸入key類型), VALUEIN(輸入value類型), KEYOUT(輸出key類型), VALUEOUT(輸出value類型)>
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
// 因?yàn)槿裘總€單詞出現(xiàn)后,就置為 1,并將其作為一個<key,value>對,因此可以聲明為常量,值為 1
private final static IntWritable one = new IntWritable(1);//VALUEOUT
private Text word = new Text();//KEYOUT
/**
*重寫map方法,讀取初試劃分的每一個鍵值對,即行偏移量和一行字符串,key為偏移量,value為該行字符串
*/
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
/**
* 因?yàn)槊恳恍芯褪且粋€spilt,并會為之生成一個mapper,所以我們的參數(shù),key就是偏移量,value就是一行字符串
* value是一行的字符串,這里將其切割成多個單詞,將每行的單詞進(jìn)行分割,按照" \t\n\r\f"(空格、制表符、換行符、回車符、換頁)進(jìn)行分割
*/
StringTokenizer itr = new StringTokenizer(value.toString());
//遍歷
while (itr.hasMoreTokens()) {
//獲取每個值并設(shè)置map輸出的key值
word.set(itr.nextToken());
//one代表1,最開始每個單詞都是1次,context直接將<word,1>寫到本地磁盤上
//write函數(shù)直接將兩個參數(shù)封裝成<key,value>
context.write(word, one);
}
}
}
/**
* IntSumReducer繼承Reducer類
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:Map的輸出類型,就是Reduce的輸入類型
*/
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
//輸出結(jié)果,總次數(shù)
private IntWritable result = new IntWritable();
/**
*重寫reduce函數(shù),key為單詞,values是reducer從多個mapper中得到數(shù)據(jù)后進(jìn)行排序并將相同key組
*合成<key.list<V>>中的list<V>,也就是說明排序這些工作都是mapper和reducer自己去做的,
*我們只需要專注與在map和reduce函數(shù)中處理排序處理后的結(jié)果
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
/**
*因?yàn)樵谕粋€spilt對應(yīng)的mapper中,會將其進(jìn)行combine,使得其中單詞(key)不重復(fù),然后將這些鍵值對按照
*hash函數(shù)分配給對應(yīng)的reducer,reducer進(jìn)行排序,和組合成list,然后再調(diào)用的用戶自定義的函數(shù)
*/
int sum = 0;//累加器,累加每個單詞出現(xiàn)的次數(shù)
//遍歷values
for (IntWritable val : values) {
sum += val.get();//累加
}
result.set(sum);//設(shè)置輸出value
context.write(key, result);//context輸出reduce結(jié)果
}
}
public static void main(String[] args) throws Exception {
//獲取配置信息
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");//創(chuàng)建一個job,設(shè)置名稱
job.setJarByClass(WordCount.class);//1、設(shè)置job運(yùn)行的類
//2、設(shè)置mapper類、Combiner類和Reducer類
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//3、設(shè)置輸出結(jié)果key和value的類
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//4、為job設(shè)置輸入陸軍
}
FileOutputFormat.setOutputPath(job,//5、為job設(shè)置輸出路徑
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);//6、結(jié)束程序
}
}