預(yù)覽
Hadoop MapReduce是一個軟件框架,用于編寫并行處理海量數(shù)據(jù)的應(yīng)用程序,應(yīng)用程序運(yùn)行在一個通用硬件組成的,可靠的,容錯的大型集群之上。
MapReduce作業(yè)通常將輸入數(shù)據(jù)集分割成獨(dú)立的chunk,這些chunk以完全并行的方式由map任務(wù)處理??蚣軐ap任務(wù)的輸出進(jìn)行排序,然后發(fā)送給reduce任務(wù)。通常作業(yè)的輸入和輸出都存儲在文件系統(tǒng)中??蚣茇?fù)責(zé)調(diào)度任務(wù),監(jiān)控狀態(tài)并在任務(wù)失敗時重新執(zhí)行任務(wù)。
通常計算節(jié)點(diǎn)和存儲節(jié)點(diǎn)是相同的,即MapReduce框架和HDFS運(yùn)行在相同的節(jié)點(diǎn)上。這種配置允許框架在數(shù)據(jù)已存在的節(jié)點(diǎn)上調(diào)度任務(wù),從而在集群上獲得非常高的聚合帶寬。
MapReduce框架由一個主ResourceManager,每個節(jié)點(diǎn)一個的從NodeManager,和每個應(yīng)用程序一個的MRAppMaster組成。
最簡單的例子,應(yīng)用程序指定輸入/輸出地址,在其上應(yīng)用實(shí)現(xiàn)了特定接口的map和reduce函數(shù),之后是其他參數(shù),這些統(tǒng)稱作業(yè)配置。
Hadoop作業(yè)客戶端將作業(yè)(jar/可執(zhí)行文件)以及配置提交給ResourceManager,它負(fù)責(zé)將程序/配置分發(fā)到從節(jié)點(diǎn),調(diào)度任務(wù)并監(jiān)控任務(wù),將任務(wù)狀態(tài)和診斷信息返回給客戶端。
Hadoop框架是使用Java實(shí)現(xiàn)的,但是MapReduce應(yīng)用程序可以不使用Java。
輸入和輸出
MapReduce框架只處理<key, value>序?qū)Γ纯蚣軐⒆鳂I(yè)的輸入視為一組<key, value>序?qū)?,并生成一組<key, value>序?qū)ψ鳛檩敵觥?/p>
key和value類必須能夠被框架序列化,因此必須實(shí)現(xiàn)Writable接口。此外,key類還必須實(shí)現(xiàn)WritableComparable接口以實(shí)現(xiàn)排序。
MapReduce作業(yè)基本流程:
(input)<k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3>(output)
示例:WordCount v1.0
WordCount是一個簡單的應(yīng)用程序,它統(tǒng)計給定輸入中每個單詞出現(xiàn)的次數(shù)。
源碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreElements()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
用法
假設(shè)環(huán)境變量已經(jīng)設(shè)置好了:
export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
編譯WordCount.java并打包:
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
假設(shè)有如下目錄:
-
/user/joe/wordcount/input:HDFS中的輸入目錄 -
/user/joe/wordcount/output:HDFS中的輸出目錄
作為輸入的樣本文件:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
運(yùn)行應(yīng)用程序:
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
輸出的內(nèi)容為:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
應(yīng)用程序可以使用-files選項指定當(dāng)前工作目錄下的路徑。-libjars選項可以將jar包添加到應(yīng)用程序的類路徑中。-archives選項允許傳遞壓縮文件。
使用-files,-libjars和-archives運(yùn)行wordcount示例:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
這里,myarchive.zip會被加壓到一個名為“myarchive.zip”的目錄中。
用戶可以使用#號為文件指定不同的符號名:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
代碼說明
WordCount應(yīng)用程序很簡單明了:
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreElements()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
Mapper實(shí)現(xiàn)使用map方法每次處理一行,數(shù)據(jù)來自TextInputFormat指定的路徑。然后使用StringTokenizer將每行分割成單詞,生成序?qū)?code><<word>, 1>。
在示例中,第一個map會生成:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二個map生成:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
WordCount也指定了combiner:
job.setCombinerClass(IntSumReducer.class);
這樣,在按key排序之后,每個map的輸出傳遞給本地的combiner做本地聚合。
第一個聚合的輸出:
< Bye, 1>
< Hello, 1>
< World, 2>
第二個聚合的輸出:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
Reducer實(shí)現(xiàn)使用reducer方法對結(jié)果求和,最終作業(yè)的輸出為:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
main方法指定了作業(yè)的各種配置,例如輸入/輸出路徑,key/value類型,輸入/輸出格式等。然后job.waitForCompletion提交作業(yè)。
MapReduce - 用戶接口
負(fù)載
應(yīng)用程序通常實(shí)現(xiàn)Mapper和Reducer接口來提供map和reducer功能。這些是作業(yè)的核心。
Mapper
Mapper將輸入的key/value序?qū)τ成涑梢唤M中間key/value序?qū)Α?/p>
Map都是獨(dú)立的任務(wù),將輸入記錄轉(zhuǎn)換成中間記錄。中間記錄和輸入記錄的類型不必相同。給定輸入序?qū)赡苡成涑闪銈€或多個輸出序?qū)Α?/p>
MapReduce框架為InputFormat生成的每個InputSplit都創(chuàng)建一個map任務(wù)。
總體來說,Mapper實(shí)現(xiàn)通過Job.setMapperClass(Class)方法傳遞給作業(yè)。之后框架為每個key/value序?qū)φ{(diào)用map(WritableComparable, Writable, Context)方法。應(yīng)用程序可以覆蓋cleanup(Context)方法來執(zhí)行必要的清理工作。
輸出序?qū)Φ念愋涂梢圆煌谳斎胄驅(qū)?。給定輸入序?qū)梢杂成涑闪銈€或多個輸出序?qū)?。輸出序?qū)νㄟ^調(diào)用context.write(WritableComparable, Writable)方法收集起來。
應(yīng)用程序可以使用Counter報告統(tǒng)計結(jié)果。
所有關(guān)聯(lián)到給定輸出key的中間結(jié)果隨后由框架分組,然后傳遞給Reducer(s)。用戶可以通過Job.setGroupingComparatorClass(Class)指定Comparator控制分組。
Mapper的輸出先排序,然后按照Reducer的數(shù)量分區(qū)。分區(qū)數(shù)就是作業(yè)的reduce任務(wù)數(shù)。用戶可以實(shí)現(xiàn)自定義Partitioner來控制分區(qū)情況。
用戶可以使用Job.setCombinerClass(Class)來指定一個可選的combiner,它可以用來執(zhí)行中間結(jié)果的本地聚合,有助于減少Mapper到Reducer之間的數(shù)據(jù)傳輸。
排好序的中間輸出總是以(key-len, key, value-len, value)格式存儲。應(yīng)用程序可以控制是否壓縮中間輸出。
多少個map?
map任務(wù)的數(shù)量通常由輸入的規(guī)模決定,即輸入文件的block總量。
正常map任務(wù)的并行級別是每個節(jié)點(diǎn)10-100個map任務(wù),任務(wù)設(shè)置需要一點(diǎn)時間,所以最好將map任務(wù)執(zhí)行控制在一分鐘之內(nèi)。
這樣,如果輸入數(shù)據(jù)有10TB,blocksize為128MB,那么一共需要82000個map任務(wù)。除非使用Configuration.set(MRJobConfig.NUM_MAPS)設(shè)置。
Reducer
Reducer將中間結(jié)果歸約成一個更小的集合。
Reducer任務(wù)的數(shù)量可以通過Job.setNumReducerTask(int)方法設(shè)置。
Reducer實(shí)現(xiàn)通過Job.setReducerClass(Class)傳遞給作業(yè)。之后應(yīng)用程序調(diào)用reducer(WritableComparable, Iterable<Writable>, Context)。應(yīng)用程序也可以覆蓋cleanup(Context)方法。
Reducer任務(wù)有三個階段:shuffle,sort和reduce。
Shuffle
Reducer任務(wù)的輸入是Mapper任務(wù)的排好序的輸出,在這個階段,框架將map任務(wù)輸出的相關(guān)分區(qū)通過HTTP組織到一起。
Sort
框架按照key值為Reducer的輸入分組(不同的map任務(wù)可能輸出相同的key值)。
shuffle和sort兩個階段同時執(zhí)行。
Secondary Sort
如果有特殊的排序需求,可以使用Job.setSortComparatorClass(Class)指定一個Comparator來控制中間結(jié)果的key值如何分組??梢杂脕砟M二次排序。
Reduce
這個階段會在每個分好組的輸入(<key, (list of values)>)上調(diào)用reduce(WritableComparable, Iterable<Writable>, Context)方法。
reduce任務(wù)的輸出通常通過context.write(WritableComparable, Writable)寫入文件系統(tǒng)。應(yīng)用程序可以使用Counter報告統(tǒng)計信息。
多少個Reduce?
正常的reduce任務(wù)數(shù)量應(yīng)該是0.95或1.75乘以(<no. of nodes> * <no. of maximum containers per node>)。
使用0.95系數(shù)可以讓所有reduce任務(wù)在map任務(wù)結(jié)束后立即開始執(zhí)行。使用1.75系數(shù)可以讓速度快的節(jié)點(diǎn)執(zhí)行完第一輪reduce任務(wù)后,為了負(fù)載平衡再執(zhí)行第二輪reduce任務(wù)。
增加reduce任務(wù)的數(shù)量會增加框架的開銷,但會增加負(fù)載平衡并降低故障成本。
縮放因子要略小于整數(shù),以便在框架中為失敗任務(wù)保留一些位置。
Reduce NONE
如果沒有reduce階段,可以將reduce任務(wù)設(shè)為0。
這種情況下,map任務(wù)的輸出直接存儲到FileSystem,存儲路徑由FileOutputFormat.setOutputPath(Job, Path)設(shè)置。
Partitioner
Partitioner按key值分區(qū)。
Partitioner控制map輸出的key值分區(qū)情況。Key值通常根據(jù)哈希函數(shù)分區(qū)。分區(qū)數(shù)等于reduce任務(wù)數(shù)。HashPartitioner是默認(rèn)的partitioner。
Counter
Counter是應(yīng)用程序用來報告統(tǒng)計結(jié)果的工具。
作業(yè)配置
Job類表示MapReduce作業(yè)的配置。
Job是用戶描述MapReduce作業(yè)的主要接口??蚣軙凑?code>Job的描述執(zhí)行作業(yè),然而:
- 有些配置參數(shù)會被標(biāo)記為
final,從而無法更改。 - 有些配置參數(shù)可以直接設(shè)置,有些則稍顯復(fù)雜。
Job通常需要指定Mapper,Combiner(如有必要),Partitioner,Reducer,InputFormat,OutputFormat的具體實(shí)現(xiàn)。FileInputFormat表示輸入文件的集合。輸出文件應(yīng)當(dāng)寫入到(FileOutputFormat.setOutputPath(Path))。
Job還可以設(shè)置一些可選的組件,比如Combiner,是否壓縮中間結(jié)果等。
用戶可以使用Configuration.set(String, String)/Configuration.get(String)設(shè)置/獲取任意參數(shù)。不過大量只讀數(shù)據(jù)推薦使用DistributedCache。
任務(wù)執(zhí)行和環(huán)境
MRAppMaster會在獨(dú)立的JVM中以子進(jìn)程的形式執(zhí)行Mapper/Reducer任務(wù)。
子任務(wù)繼承了MRAppMaster的環(huán)境,用戶可以使用mapreduce.(map|reduce).java.opts指定額外的屬性和配置參數(shù)。如果mapreduce.(map|reduce).java.opts參數(shù)包含@taskid@這樣的符號,它會把任務(wù)的taskid插入到配置中。
下面是一個多參數(shù)示例:
<property>
<name>mapreduce.map.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>
-Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
內(nèi)存管理
用戶/管理員可以使用mapreduce.(map|reduce).memory.mb指定可以使用的最大虛擬內(nèi)存,這個值以MB為單位,按進(jìn)程分配。這個值必須大于等于傳遞給JVM的-Xmx參數(shù)的值,否則JVM可能無法啟動。
框架某些部分的內(nèi)存也是可配置的。在map和reduce任務(wù)中,調(diào)整并行操作參數(shù)和磁盤寫入頻率可能會影響性能。監(jiān)控作業(yè)的文件系統(tǒng)計數(shù)器對性能調(diào)優(yōu)是很有幫助的。
Map參數(shù)
Map任務(wù)發(fā)出的記錄首先會被序列化進(jìn)buffer,它的元數(shù)據(jù)存儲在accounting buffer中。只要序列化buffer或者元數(shù)據(jù)buffer達(dá)到閾值,buffer中的內(nèi)容就會在排序后寫入磁盤,這一切都在后臺執(zhí)行。如果執(zhí)行過程中buffer被填滿,map線程會被阻塞。map任務(wù)執(zhí)行完后,所有記錄會寫入磁盤并合并成一個文件。盡量減少數(shù)據(jù)溢出次數(shù)可以減少map任務(wù)執(zhí)行時間,較大的buffer也會減少map任務(wù)可用的內(nèi)存。
| Name | Type | Description |
|---|---|---|
mapreduce.task.io.sort.mb |
int | 序列化buffer和accounting buffer大小的總和 |
mapreduce.map.sort.spill.percent |
float | 序列化buffer的使用限制,達(dá)到這個值后,線程會將數(shù)據(jù)寫入磁盤 |
Shuffle/Reduce參數(shù)
如前所述,每個reduce任務(wù)獲取分配給它們的分區(qū)數(shù)據(jù),并周期性的將輸出合并,然后存儲到磁盤上。如果啟用了中間數(shù)據(jù)壓縮功能,還需要對數(shù)據(jù)解壓縮。
| Name | Type | Description |
|---|---|---|
mapreduce.task.io.soft.factor |
int | 指定可以同時合并的分段數(shù) |
mapreduce.reduce.merge.inmem.thresholds |
int | 在合并到磁盤之前,可以一次讀取到磁盤的map輸出數(shù)據(jù)的數(shù)量 |
mapreduce.reduce.shuffle.merge.percent |
float | 在內(nèi)存合并之前讀取map輸出數(shù)據(jù)的內(nèi)存閾值 |
mapreduce.reduce.shuffle.input.buffer.percent |
float | 相對于最大堆內(nèi)存的百分比 |
mapreduce.reduce.input.buffer.percent |
float | 相對于最大堆內(nèi)存的百分比 |
可配置參數(shù)
| Name | Type | Description |
|---|---|---|
mapreduce.job.id |
stirng | 作業(yè)ID |
mapreduce.job.jar |
string | jar包地址 |
mapreduce.job.local.dir |
string | 作業(yè)共享空間 |
mapreduce.task.id |
string | 任務(wù)ID |
mapreduce.task.attempt.id |
string | 任務(wù)嘗試ID |
mapreduce.task.is.map |
boolean | 是否是map任務(wù) |
mapreduce.task.partition |
int | 任務(wù)分區(qū)數(shù) |
mapreduce.map.input.file |
string | map任務(wù)處理數(shù)據(jù)的文件名 |
mapreduce.map.input.start |
long | 輸入數(shù)據(jù)的偏移量 |
mapreduce.map.input.length |
long | 輸入數(shù)據(jù)的字節(jié)數(shù) |
mapreduce.task.output.dir |
string | 任務(wù)的臨時輸出目錄 |
任務(wù)日志
日志默認(rèn)輸出到${HADOOP_LOG_DIR}/userlogs。
分布式類庫
(略)
作業(yè)提交和監(jiān)控
作業(yè)是用戶與ResourceManager交互的主接口。
Job可以提交作業(yè),追蹤進(jìn)程狀態(tài),訪問任務(wù)日志,讀取集群節(jié)點(diǎn)狀態(tài)信息。
作業(yè)提交包括以下步驟:
- 檢查作業(yè)的輸入和輸出
- 計算作業(yè)的
InputSplit值 - 如有必要,設(shè)置必要的accounting信息
- 拷貝jar包和配置信息到系統(tǒng)目錄
- 提交作業(yè)到
ResourceManager
作業(yè)歷史文件輸出目錄可以使用mapreduce.jobhistory.intermediate-done-dir和mapreduce.jobhistory.done-dir指定。
用戶可以使用$ mapred job -history output.jhist命令查看歷史日志簡報。
作業(yè)控制
用戶可能需要將作業(yè)鏈接起來以完成無法使用單個作業(yè)完成的任務(wù)。
作業(yè)輸入
InputFormat說明了輸入數(shù)據(jù)的格式。
MapReduce框架使用InputFormat來:
- 校驗作業(yè)的輸入數(shù)據(jù)
- 將輸入文件分割成本地
InputSplit實(shí)例,每個實(shí)例分配給一個獨(dú)立的Mapper - 使用
RecordReader的具體實(shí)現(xiàn)從輸入中讀取記錄
基于文件的InputFormat的實(shí)現(xiàn)(比如FileInputFormat的子類)的默認(rèn)行為是將輸入按照大小分割成邏輯上的InputSplit實(shí)例。輸入文件的塊大小指定過了文件分割的上限,mapreduce.input.fileinputformat.split.minsize參數(shù)可以指定文件分割的下限。
顯然,基于輸入大小的邏輯分割對于很多記錄邊界不甚明朗的應(yīng)用來說是不夠的。這是,應(yīng)用應(yīng)當(dāng)實(shí)現(xiàn)一個RecordReader。
TextInputFormat是默認(rèn)的InputFormat。
InputSplit
InputSplit表示被單個Mapper處理的數(shù)據(jù)單元。
通常InputSplit表示面向字節(jié)的視圖,而RecordReader負(fù)責(zé)處理和呈現(xiàn)面向記錄的視圖。
FileSplit是默認(rèn)InputSplit。
RecordReader
RecordReader從InputSplit中讀取<key, value>序?qū)Α?/p>
通常RecordReader將面向字節(jié)的視圖轉(zhuǎn)換成面向記錄的視圖供map任務(wù)處理。
作業(yè)輸出
OutputFormat表示作業(yè)的輸出格式。
MapReduce框架需要OutputFormat:
- 校驗作業(yè)的輸出格式
- 提供
RecordWriter實(shí)現(xiàn)寫入輸出文件
TextOutputFormat是默認(rèn)的OutputFormat。
OutputCommitter
OutputCommitter表示任務(wù)輸出的提交過程。
MapReduce框架將OutputCommitter用于:
- 在初始化階段設(shè)置作業(yè)。例如創(chuàng)建臨時輸出目錄。作業(yè)設(shè)置階段是在作業(yè)狀態(tài)為PREP時使用一個獨(dú)立的任務(wù)完成的。一旦設(shè)置完成,作業(yè)編程RUNNING狀態(tài)。
- 作業(yè)執(zhí)行完畢后清理作業(yè)。例如刪除臨時輸出目錄。
- 設(shè)置任務(wù)臨時輸出。
- 檢查任務(wù)是否需要提交。
- 提交任務(wù)輸出。一旦任務(wù)執(zhí)行完畢,如有必要任務(wù)會提交它的輸出。
- 廢棄任務(wù)提交。
FileOutputCommitter是默認(rèn)的OutputCommitter。
任務(wù)副作用文件
某些應(yīng)用的任務(wù)除了輸出文件,還需要創(chuàng)建一種副文件。
這種情況下,如果有多個相同的Mapper或Reducer實(shí)例并行操作同一個文件就可能出問題。因此應(yīng)用寫入的時候必須能確定唯一的attempt任務(wù)(使用attemptid)。
使用FileOutputFormat時,為了避免這個問題,框架會為attempt任務(wù)維護(hù)一個${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}子目錄結(jié)構(gòu)。
RecordWriter
RecordWriter將數(shù)據(jù)以<key, value>格式寫入輸出文件。
其他特性
將作業(yè)提交到隊列
隊列是作業(yè)的集合,允許系統(tǒng)提供特定的功能。例如控制權(quán)限。
Hadoop自帶有一個默認(rèn)的“default”隊列。
作業(yè)可以使用mapreduce.job.queuename配置隊列名字。
計數(shù)器
Counter表示全局計數(shù)器。
DistributedCache
DistributedCache可以有效的分發(fā)只讀文件。
示例:WordCount v2.0
下面是一個更復(fù)雜的WordCount示例。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;
public class WordCount2 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] remainingArgs = parser.getRemainingArgs();
if (remainingArgs.length != 2 && remainingArgs.length != 4) {
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<>();
for (int i = 0; i < remainingArgs.length; ++i) {
if ("-skip".equals(remainingArgs[i])) {
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else {
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
enum CountersEnum {INPUT_WORDS}
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<>();
private Configuration conf;
private BufferedReader fis;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
if (conf.getBoolean("wordcount.skip.patterns", false)) {
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : patternsURIs) {
Path patternsPath = new Path(patternsURI.getPath());
String fileName = patternsPath.getName().toString();
parseSkipFile(fileName);
}
}
}
private void parseSkipFile(String fileName) {
try {
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = caseSensitive ? value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreElements()) {
word.set(itr.nextToken());
context.write(word, one);
Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}