Hadoop文檔(2.9.2) - MapReduce指南

預(yù)覽

Hadoop MapReduce是一個軟件框架,用于編寫并行處理海量數(shù)據(jù)的應(yīng)用程序,應(yīng)用程序運(yùn)行在一個通用硬件組成的,可靠的,容錯的大型集群之上。

MapReduce作業(yè)通常將輸入數(shù)據(jù)集分割成獨(dú)立的chunk,這些chunk以完全并行的方式由map任務(wù)處理??蚣軐ap任務(wù)的輸出進(jìn)行排序,然后發(fā)送給reduce任務(wù)。通常作業(yè)的輸入和輸出都存儲在文件系統(tǒng)中??蚣茇?fù)責(zé)調(diào)度任務(wù),監(jiān)控狀態(tài)并在任務(wù)失敗時重新執(zhí)行任務(wù)。

通常計算節(jié)點(diǎn)和存儲節(jié)點(diǎn)是相同的,即MapReduce框架和HDFS運(yùn)行在相同的節(jié)點(diǎn)上。這種配置允許框架在數(shù)據(jù)已存在的節(jié)點(diǎn)上調(diào)度任務(wù),從而在集群上獲得非常高的聚合帶寬。

MapReduce框架由一個主ResourceManager,每個節(jié)點(diǎn)一個的從NodeManager,和每個應(yīng)用程序一個的MRAppMaster組成。

最簡單的例子,應(yīng)用程序指定輸入/輸出地址,在其上應(yīng)用實(shí)現(xiàn)了特定接口的mapreduce函數(shù),之后是其他參數(shù),這些統(tǒng)稱作業(yè)配置。

Hadoop作業(yè)客戶端將作業(yè)(jar/可執(zhí)行文件)以及配置提交給ResourceManager,它負(fù)責(zé)將程序/配置分發(fā)到從節(jié)點(diǎn),調(diào)度任務(wù)并監(jiān)控任務(wù),將任務(wù)狀態(tài)和診斷信息返回給客戶端。

Hadoop框架是使用Java實(shí)現(xiàn)的,但是MapReduce應(yīng)用程序可以不使用Java。

輸入和輸出

MapReduce框架只處理<key, value>序?qū)Γ纯蚣軐⒆鳂I(yè)的輸入視為一組<key, value>序?qū)?,并生成一組<key, value>序?qū)ψ鳛檩敵觥?/p>

keyvalue類必須能夠被框架序列化,因此必須實(shí)現(xiàn)Writable接口。此外,key類還必須實(shí)現(xiàn)WritableComparable接口以實(shí)現(xiàn)排序。

MapReduce作業(yè)基本流程:
(input)<k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3>(output)

示例:WordCount v1.0

WordCount是一個簡單的應(yīng)用程序,它統(tǒng)計給定輸入中每個單詞出現(xiàn)的次數(shù)。

源碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreElements()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

用法

假設(shè)環(huán)境變量已經(jīng)設(shè)置好了:

export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

編譯WordCount.java并打包:

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class

假設(shè)有如下目錄:

  • /user/joe/wordcount/input:HDFS中的輸入目錄
  • /user/joe/wordcount/output:HDFS中的輸出目錄

作為輸入的樣本文件:

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

運(yùn)行應(yīng)用程序:

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

輸出的內(nèi)容為:

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

應(yīng)用程序可以使用-files選項指定當(dāng)前工作目錄下的路徑。-libjars選項可以將jar包添加到應(yīng)用程序的類路徑中。-archives選項允許傳遞壓縮文件。

使用-files,-libjars-archives運(yùn)行wordcount示例:

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output

這里,myarchive.zip會被加壓到一個名為“myarchive.zip”的目錄中。

用戶可以使用#號為文件指定不同的符號名:

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

代碼說明

WordCount應(yīng)用程序很簡單明了:

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreElements()) {
        word.set(itr.nextToken());
        context.write(word, one);
    }
}

Mapper實(shí)現(xiàn)使用map方法每次處理一行,數(shù)據(jù)來自TextInputFormat指定的路徑。然后使用StringTokenizer將每行分割成單詞,生成序?qū)?code><<word>, 1>。

在示例中,第一個map會生成:

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二個map生成:

< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

WordCount也指定了combiner

job.setCombinerClass(IntSumReducer.class);

這樣,在按key排序之后,每個map的輸出傳遞給本地的combiner做本地聚合。

第一個聚合的輸出:

< Bye, 1>
< Hello, 1>
< World, 2>

第二個聚合的輸出:

< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
}

Reducer實(shí)現(xiàn)使用reducer方法對結(jié)果求和,最終作業(yè)的輸出為:

< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

main方法指定了作業(yè)的各種配置,例如輸入/輸出路徑,key/value類型,輸入/輸出格式等。然后job.waitForCompletion提交作業(yè)。

MapReduce - 用戶接口

負(fù)載

應(yīng)用程序通常實(shí)現(xiàn)MapperReducer接口來提供mapreducer功能。這些是作業(yè)的核心。

Mapper

Mapper將輸入的key/value序?qū)τ成涑梢唤M中間key/value序?qū)Α?/p>

Map都是獨(dú)立的任務(wù),將輸入記錄轉(zhuǎn)換成中間記錄。中間記錄和輸入記錄的類型不必相同。給定輸入序?qū)赡苡成涑闪銈€或多個輸出序?qū)Α?/p>

MapReduce框架為InputFormat生成的每個InputSplit都創(chuàng)建一個map任務(wù)。

總體來說,Mapper實(shí)現(xiàn)通過Job.setMapperClass(Class)方法傳遞給作業(yè)。之后框架為每個key/value序?qū)φ{(diào)用map(WritableComparable, Writable, Context)方法。應(yīng)用程序可以覆蓋cleanup(Context)方法來執(zhí)行必要的清理工作。

輸出序?qū)Φ念愋涂梢圆煌谳斎胄驅(qū)?。給定輸入序?qū)梢杂成涑闪銈€或多個輸出序?qū)?。輸出序?qū)νㄟ^調(diào)用context.write(WritableComparable, Writable)方法收集起來。

應(yīng)用程序可以使用Counter報告統(tǒng)計結(jié)果。

所有關(guān)聯(lián)到給定輸出key的中間結(jié)果隨后由框架分組,然后傳遞給Reducer(s)。用戶可以通過Job.setGroupingComparatorClass(Class)指定Comparator控制分組。

Mapper的輸出先排序,然后按照Reducer的數(shù)量分區(qū)。分區(qū)數(shù)就是作業(yè)的reduce任務(wù)數(shù)。用戶可以實(shí)現(xiàn)自定義Partitioner來控制分區(qū)情況。

用戶可以使用Job.setCombinerClass(Class)來指定一個可選的combiner,它可以用來執(zhí)行中間結(jié)果的本地聚合,有助于減少MapperReducer之間的數(shù)據(jù)傳輸。

排好序的中間輸出總是以(key-len, key, value-len, value)格式存儲。應(yīng)用程序可以控制是否壓縮中間輸出。

多少個map?
map任務(wù)的數(shù)量通常由輸入的規(guī)模決定,即輸入文件的block總量。
正常map任務(wù)的并行級別是每個節(jié)點(diǎn)10-100個map任務(wù),任務(wù)設(shè)置需要一點(diǎn)時間,所以最好將map任務(wù)執(zhí)行控制在一分鐘之內(nèi)。
這樣,如果輸入數(shù)據(jù)有10TB,blocksize為128MB,那么一共需要82000個map任務(wù)。除非使用Configuration.set(MRJobConfig.NUM_MAPS)設(shè)置。

Reducer

Reducer將中間結(jié)果歸約成一個更小的集合。

Reducer任務(wù)的數(shù)量可以通過Job.setNumReducerTask(int)方法設(shè)置。

Reducer實(shí)現(xiàn)通過Job.setReducerClass(Class)傳遞給作業(yè)。之后應(yīng)用程序調(diào)用reducer(WritableComparable, Iterable<Writable>, Context)。應(yīng)用程序也可以覆蓋cleanup(Context)方法。

Reducer任務(wù)有三個階段:shuffle,sort和reduce。

Shuffle

Reducer任務(wù)的輸入是Mapper任務(wù)的排好序的輸出,在這個階段,框架將map任務(wù)輸出的相關(guān)分區(qū)通過HTTP組織到一起。

Sort

框架按照key值為Reducer的輸入分組(不同的map任務(wù)可能輸出相同的key值)。

shuffle和sort兩個階段同時執(zhí)行。

Secondary Sort

如果有特殊的排序需求,可以使用Job.setSortComparatorClass(Class)指定一個Comparator來控制中間結(jié)果的key值如何分組??梢杂脕砟M二次排序。

Reduce

這個階段會在每個分好組的輸入(<key, (list of values)>)上調(diào)用reduce(WritableComparable, Iterable<Writable>, Context)方法。

reduce任務(wù)的輸出通常通過context.write(WritableComparable, Writable)寫入文件系統(tǒng)。應(yīng)用程序可以使用Counter報告統(tǒng)計信息。

多少個Reduce?

正常的reduce任務(wù)數(shù)量應(yīng)該是0.951.75乘以(<no. of nodes> * <no. of maximum containers per node>)

使用0.95系數(shù)可以讓所有reduce任務(wù)在map任務(wù)結(jié)束后立即開始執(zhí)行。使用1.75系數(shù)可以讓速度快的節(jié)點(diǎn)執(zhí)行完第一輪reduce任務(wù)后,為了負(fù)載平衡再執(zhí)行第二輪reduce任務(wù)。

增加reduce任務(wù)的數(shù)量會增加框架的開銷,但會增加負(fù)載平衡并降低故障成本。

縮放因子要略小于整數(shù),以便在框架中為失敗任務(wù)保留一些位置。

Reduce NONE

如果沒有reduce階段,可以將reduce任務(wù)設(shè)為0。

這種情況下,map任務(wù)的輸出直接存儲到FileSystem,存儲路徑由FileOutputFormat.setOutputPath(Job, Path)設(shè)置。

Partitioner

Partitioner按key值分區(qū)。

Partitioner控制map輸出的key值分區(qū)情況。Key值通常根據(jù)哈希函數(shù)分區(qū)。分區(qū)數(shù)等于reduce任務(wù)數(shù)。HashPartitioner是默認(rèn)的partitioner。

Counter

Counter是應(yīng)用程序用來報告統(tǒng)計結(jié)果的工具。

作業(yè)配置

Job類表示MapReduce作業(yè)的配置。

Job是用戶描述MapReduce作業(yè)的主要接口??蚣軙凑?code>Job的描述執(zhí)行作業(yè),然而:

  • 有些配置參數(shù)會被標(biāo)記為final,從而無法更改。
  • 有些配置參數(shù)可以直接設(shè)置,有些則稍顯復(fù)雜。

Job通常需要指定Mapper,Combiner(如有必要),Partitioner,Reducer,InputFormat,OutputFormat的具體實(shí)現(xiàn)。FileInputFormat表示輸入文件的集合。輸出文件應(yīng)當(dāng)寫入到(FileOutputFormat.setOutputPath(Path))。

Job還可以設(shè)置一些可選的組件,比如Combiner,是否壓縮中間結(jié)果等。

用戶可以使用Configuration.set(String, String)/Configuration.get(String)設(shè)置/獲取任意參數(shù)。不過大量只讀數(shù)據(jù)推薦使用DistributedCache。

任務(wù)執(zhí)行和環(huán)境

MRAppMaster會在獨(dú)立的JVM中以子進(jìn)程的形式執(zhí)行Mapper/Reducer任務(wù)。

子任務(wù)繼承了MRAppMaster的環(huán)境,用戶可以使用mapreduce.(map|reduce).java.opts指定額外的屬性和配置參數(shù)。如果mapreduce.(map|reduce).java.opts參數(shù)包含@taskid@這樣的符號,它會把任務(wù)的taskid插入到配置中。

下面是一個多參數(shù)示例:

<property>
  <name>mapreduce.map.java.opts</name>
  <value>
  -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
  -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

內(nèi)存管理

用戶/管理員可以使用mapreduce.(map|reduce).memory.mb指定可以使用的最大虛擬內(nèi)存,這個值以MB為單位,按進(jìn)程分配。這個值必須大于等于傳遞給JVM的-Xmx參數(shù)的值,否則JVM可能無法啟動。

框架某些部分的內(nèi)存也是可配置的。在map和reduce任務(wù)中,調(diào)整并行操作參數(shù)和磁盤寫入頻率可能會影響性能。監(jiān)控作業(yè)的文件系統(tǒng)計數(shù)器對性能調(diào)優(yōu)是很有幫助的。

Map參數(shù)

Map任務(wù)發(fā)出的記錄首先會被序列化進(jìn)buffer,它的元數(shù)據(jù)存儲在accounting buffer中。只要序列化buffer或者元數(shù)據(jù)buffer達(dá)到閾值,buffer中的內(nèi)容就會在排序后寫入磁盤,這一切都在后臺執(zhí)行。如果執(zhí)行過程中buffer被填滿,map線程會被阻塞。map任務(wù)執(zhí)行完后,所有記錄會寫入磁盤并合并成一個文件。盡量減少數(shù)據(jù)溢出次數(shù)可以減少map任務(wù)執(zhí)行時間,較大的buffer也會減少map任務(wù)可用的內(nèi)存。

Name Type Description
mapreduce.task.io.sort.mb int 序列化buffer和accounting buffer大小的總和
mapreduce.map.sort.spill.percent float 序列化buffer的使用限制,達(dá)到這個值后,線程會將數(shù)據(jù)寫入磁盤

Shuffle/Reduce參數(shù)

如前所述,每個reduce任務(wù)獲取分配給它們的分區(qū)數(shù)據(jù),并周期性的將輸出合并,然后存儲到磁盤上。如果啟用了中間數(shù)據(jù)壓縮功能,還需要對數(shù)據(jù)解壓縮。

Name Type Description
mapreduce.task.io.soft.factor int 指定可以同時合并的分段數(shù)
mapreduce.reduce.merge.inmem.thresholds int 在合并到磁盤之前,可以一次讀取到磁盤的map輸出數(shù)據(jù)的數(shù)量
mapreduce.reduce.shuffle.merge.percent float 在內(nèi)存合并之前讀取map輸出數(shù)據(jù)的內(nèi)存閾值
mapreduce.reduce.shuffle.input.buffer.percent float 相對于最大堆內(nèi)存的百分比
mapreduce.reduce.input.buffer.percent float 相對于最大堆內(nèi)存的百分比

可配置參數(shù)

Name Type Description
mapreduce.job.id stirng 作業(yè)ID
mapreduce.job.jar string jar包地址
mapreduce.job.local.dir string 作業(yè)共享空間
mapreduce.task.id string 任務(wù)ID
mapreduce.task.attempt.id string 任務(wù)嘗試ID
mapreduce.task.is.map boolean 是否是map任務(wù)
mapreduce.task.partition int 任務(wù)分區(qū)數(shù)
mapreduce.map.input.file string map任務(wù)處理數(shù)據(jù)的文件名
mapreduce.map.input.start long 輸入數(shù)據(jù)的偏移量
mapreduce.map.input.length long 輸入數(shù)據(jù)的字節(jié)數(shù)
mapreduce.task.output.dir string 任務(wù)的臨時輸出目錄

任務(wù)日志

日志默認(rèn)輸出到${HADOOP_LOG_DIR}/userlogs。

分布式類庫

(略)

作業(yè)提交和監(jiān)控

作業(yè)是用戶與ResourceManager交互的主接口。

Job可以提交作業(yè),追蹤進(jìn)程狀態(tài),訪問任務(wù)日志,讀取集群節(jié)點(diǎn)狀態(tài)信息。

作業(yè)提交包括以下步驟:

  1. 檢查作業(yè)的輸入和輸出
  2. 計算作業(yè)的InputSplit
  3. 如有必要,設(shè)置必要的accounting信息
  4. 拷貝jar包和配置信息到系統(tǒng)目錄
  5. 提交作業(yè)到ResourceManager

作業(yè)歷史文件輸出目錄可以使用mapreduce.jobhistory.intermediate-done-dirmapreduce.jobhistory.done-dir指定。

用戶可以使用$ mapred job -history output.jhist命令查看歷史日志簡報。

作業(yè)控制

用戶可能需要將作業(yè)鏈接起來以完成無法使用單個作業(yè)完成的任務(wù)。

作業(yè)輸入

InputFormat說明了輸入數(shù)據(jù)的格式。

MapReduce框架使用InputFormat來:

  1. 校驗作業(yè)的輸入數(shù)據(jù)
  2. 將輸入文件分割成本地InputSplit實(shí)例,每個實(shí)例分配給一個獨(dú)立的Mapper
  3. 使用RecordReader的具體實(shí)現(xiàn)從輸入中讀取記錄

基于文件的InputFormat的實(shí)現(xiàn)(比如FileInputFormat的子類)的默認(rèn)行為是將輸入按照大小分割成邏輯上的InputSplit實(shí)例。輸入文件的塊大小指定過了文件分割的上限,mapreduce.input.fileinputformat.split.minsize參數(shù)可以指定文件分割的下限。

顯然,基于輸入大小的邏輯分割對于很多記錄邊界不甚明朗的應(yīng)用來說是不夠的。這是,應(yīng)用應(yīng)當(dāng)實(shí)現(xiàn)一個RecordReader

TextInputFormat是默認(rèn)的InputFormat。

InputSplit

InputSplit表示被單個Mapper處理的數(shù)據(jù)單元。

通常InputSplit表示面向字節(jié)的視圖,而RecordReader負(fù)責(zé)處理和呈現(xiàn)面向記錄的視圖。

FileSplit是默認(rèn)InputSplit。

RecordReader

RecordReader從InputSplit中讀取<key, value>序?qū)Α?/p>

通常RecordReader將面向字節(jié)的視圖轉(zhuǎn)換成面向記錄的視圖供map任務(wù)處理。

作業(yè)輸出

OutputFormat表示作業(yè)的輸出格式。

MapReduce框架需要OutputFormat

  1. 校驗作業(yè)的輸出格式
  2. 提供RecordWriter實(shí)現(xiàn)寫入輸出文件

TextOutputFormat是默認(rèn)的OutputFormat。

OutputCommitter

OutputCommitter表示任務(wù)輸出的提交過程。

MapReduce框架將OutputCommitter用于:

  1. 在初始化階段設(shè)置作業(yè)。例如創(chuàng)建臨時輸出目錄。作業(yè)設(shè)置階段是在作業(yè)狀態(tài)為PREP時使用一個獨(dú)立的任務(wù)完成的。一旦設(shè)置完成,作業(yè)編程RUNNING狀態(tài)。
  2. 作業(yè)執(zhí)行完畢后清理作業(yè)。例如刪除臨時輸出目錄。
  3. 設(shè)置任務(wù)臨時輸出。
  4. 檢查任務(wù)是否需要提交。
  5. 提交任務(wù)輸出。一旦任務(wù)執(zhí)行完畢,如有必要任務(wù)會提交它的輸出。
  6. 廢棄任務(wù)提交。

FileOutputCommitter是默認(rèn)的OutputCommitter。

任務(wù)副作用文件

某些應(yīng)用的任務(wù)除了輸出文件,還需要創(chuàng)建一種副文件。

這種情況下,如果有多個相同的MapperReducer實(shí)例并行操作同一個文件就可能出問題。因此應(yīng)用寫入的時候必須能確定唯一的attempt任務(wù)(使用attemptid)。

使用FileOutputFormat時,為了避免這個問題,框架會為attempt任務(wù)維護(hù)一個${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}子目錄結(jié)構(gòu)。

RecordWriter

RecordWriter將數(shù)據(jù)以<key, value>格式寫入輸出文件。

其他特性

將作業(yè)提交到隊列

隊列是作業(yè)的集合,允許系統(tǒng)提供特定的功能。例如控制權(quán)限。

Hadoop自帶有一個默認(rèn)的“default”隊列。

作業(yè)可以使用mapreduce.job.queuename配置隊列名字。

計數(shù)器

Counter表示全局計數(shù)器。

DistributedCache

DistributedCache可以有效的分發(fā)只讀文件。

示例:WordCount v2.0

下面是一個更復(fù)雜的WordCount示例。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;

public class WordCount2 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = parser.getRemainingArgs();
        if (remainingArgs.length != 2 && remainingArgs.length != 4) {
            System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount2.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        List<String> otherArgs = new ArrayList<>();
        for (int i = 0; i < remainingArgs.length; ++i) {
            if ("-skip".equals(remainingArgs[i])) {
                job.addCacheFile(new Path(remainingArgs[++i]).toUri());
                job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
            } else {
                otherArgs.add(remainingArgs[i]);
            }
        }
        FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        enum CountersEnum {INPUT_WORDS}

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        private boolean caseSensitive;
        private Set<String> patternsToSkip = new HashSet<>();
        private Configuration conf;
        private BufferedReader fis;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
            caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
            if (conf.getBoolean("wordcount.skip.patterns", false)) {
                URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                for (URI patternsURI : patternsURIs) {
                    Path patternsPath = new Path(patternsURI.getPath());
                    String fileName = patternsPath.getName().toString();
                    parseSkipFile(fileName);
                }
            }
        }

        private void parseSkipFile(String fileName) {
            try {
                fis = new BufferedReader(new FileReader(fileName));
                String pattern = null;
                while ((pattern = fis.readLine()) != null) {
                    patternsToSkip.add(pattern);
                }
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = caseSensitive ? value.toString() : value.toString().toLowerCase();
            for (String pattern : patternsToSkip) {
                line = line.replaceAll(pattern, "");
            }
            StringTokenizer itr = new StringTokenizer(line);
            while (itr.hasMoreElements()) {
                word.set(itr.nextToken());
                context.write(word, one);
                Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString());
                counter.increment(1);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容