Hadoop源碼分析之WordCount

WordCount程序是hadoop自帶的案例,我們可以在 hadoop 解壓目錄下找到包含這個程序的 jar 文件(hadoop-mapreduce-examples-2.7.1.jar),該文件所在路徑為 hadoop/share/hadoop/mapreduce。

最簡單的MapReduce應(yīng)用程序至少包含 3 個部分:一個 Map 函數(shù)、一個 Reduce 函數(shù)和一個 main 函數(shù)。在運(yùn)行一個mapreduce計(jì)算任務(wù)時候,任務(wù)過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。main 函數(shù)將作業(yè)控制和文件輸入/輸出結(jié)合起來。

Map過程:并行讀取文本,對讀取的單詞進(jìn)行map操作,每個詞都以<key,value>形式生成。
舉例:
一個有三行文本的文件進(jìn)行MapReduce操作。
1、讀取第一行Hello World Bye World ,分割單詞形成Map:
<Hello,1> <World,1> <Bye,1> <World,1>

2、讀取第二行Hello Hadoop Bye Hadoop ,分割單詞形成Map:
<Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1>

3、讀取第三行Bye Hadoop Hello Hadoop,分割單詞形成Map:
<Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1>

Reduce過程:是對map的結(jié)果進(jìn)行排序,合并,最后得出詞頻。
舉例:
1、經(jīng)過進(jìn)一步處理(combiner),將形成的Map根據(jù)相同的key組合成value數(shù)組:
<Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1>

2、循環(huán)執(zhí)行Reduce(K,V[]),分別統(tǒng)計(jì)每個單詞出現(xiàn)的次數(shù):
<Bye,3> <Hadoop,4> <Hello,3> <World,2>

WordCount源碼如下:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
  /**
   * TokenizerMapper繼承Mapper類
   * Mapper<KEYIN(輸入key類型), VALUEIN(輸入value類型), KEYOUT(輸出key類型), VALUEOUT(輸出value類型)>
   */
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

    // 因?yàn)槿裘總€單詞出現(xiàn)后,就置為 1,并將其作為一個<key,value>對,因此可以聲明為常量,值為 1
    private final static IntWritable one = new IntWritable(1);//VALUEOUT
    private Text word = new Text();//KEYOUT

    /**
     *重寫map方法,讀取初試劃分的每一個鍵值對,即行偏移量和一行字符串,key為偏移量,value為該行字符串
     */
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      /**
       * 因?yàn)槊恳恍芯褪且粋€spilt,并會為之生成一個mapper,所以我們的參數(shù),key就是偏移量,value就是一行字符串
       * value是一行的字符串,這里將其切割成多個單詞,將每行的單詞進(jìn)行分割,按照"  \t\n\r\f"(空格、制表符、換行符、回車符、換頁)進(jìn)行分割
       */
      StringTokenizer itr = new StringTokenizer(value.toString());

      //遍歷
      while (itr.hasMoreTokens()) {
        //獲取每個值并設(shè)置map輸出的key值
        word.set(itr.nextToken());
        //one代表1,最開始每個單詞都是1次,context直接將<word,1>寫到本地磁盤上
        //write函數(shù)直接將兩個參數(shù)封裝成<key,value>
        context.write(word, one);
      }
    }
  }

  /**
   * IntSumReducer繼承Reducer類
   * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:Map的輸出類型,就是Reduce的輸入類型
   */
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    //輸出結(jié)果,總次數(shù)
    private IntWritable result = new IntWritable();

    /**
     *重寫reduce函數(shù),key為單詞,values是reducer從多個mapper中得到數(shù)據(jù)后進(jìn)行排序并將相同key組
     *合成<key.list<V>>中的list<V>,也就是說明排序這些工作都是mapper和reducer自己去做的,
     *我們只需要專注與在map和reduce函數(shù)中處理排序處理后的結(jié)果
     */
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {

      /**
       *因?yàn)樵谕粋€spilt對應(yīng)的mapper中,會將其進(jìn)行combine,使得其中單詞(key)不重復(fù),然后將這些鍵值對按照
       *hash函數(shù)分配給對應(yīng)的reducer,reducer進(jìn)行排序,和組合成list,然后再調(diào)用的用戶自定義的函數(shù)
       */
      int sum = 0;//累加器,累加每個單詞出現(xiàn)的次數(shù)
      //遍歷values
      for (IntWritable val : values) {
        sum += val.get();//累加
      }
      result.set(sum);//設(shè)置輸出value
      context.write(key, result);//context輸出reduce結(jié)果
    }
  }

  public static void main(String[] args) throws Exception {
    //獲取配置信息
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");//創(chuàng)建一個job,設(shè)置名稱
    job.setJarByClass(WordCount.class);//1、設(shè)置job運(yùn)行的類
    //2、設(shè)置mapper類、Combiner類和Reducer類
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    //3、設(shè)置輸出結(jié)果key和value的類
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//4、為job設(shè)置輸入陸軍
    }
    FileOutputFormat.setOutputPath(job,//5、為job設(shè)置輸出路徑
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);//6、結(jié)束程序
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容