一 MapReduce概述
1??
MapReduce定義
2??
MapReduce優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn)
- 缺點(diǎn)
3??
MapReduce核心思想1)分布式的運(yùn)算程序往往需要分成至少2個階段。
2)第一個階段的MapTask并發(fā)實(shí)例,完全并行運(yùn)行,互不相干。
3)第二個階段的ReduceTask并發(fā)實(shí)例互不相干,但是他們的數(shù)據(jù)依賴于上一個階段的所有MapTask并發(fā)實(shí)例的輸出。
4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個MapReduce程序,串行運(yùn)行。
總結(jié):分析WordCount數(shù)據(jù)流走向深入理解MapReduce核心思想。
4??
MapReduce進(jìn)程
5??官方
WordCount源碼
??采用反編譯工具反編譯源碼,發(fā)現(xiàn)WordCount案例有Map類、Reduce類和驅(qū)動類。且數(shù)據(jù)的類型是Hadoop自身封裝的序列化類型。
6??常用數(shù)據(jù)序列化類型
7??
MapReduce編程規(guī)范 : 用戶編寫的程序分成三個部分:Mapper、Reducer和Driver。
8??
WordCount案例實(shí)操
- 需求 : 在給定的文本文件中統(tǒng)計(jì)輸出每一個單詞出現(xiàn)的總次數(shù)
(1)輸入數(shù)據(jù)ffadsfsda fasdfsad fsadfa Fsadfa fsadfa Ffadsfsda?(2)期望輸出數(shù)據(jù)
Ffadsfsda 1 Fsadfa 1 fasdfsad 1 ffadsfsda 1 fsadfa 22.需求分析:按照
MapReduce編程規(guī)范,分別編寫Mapper,Reducer,Driver
- 編寫程序
(1)創(chuàng)建一個Maven工程
(2)在pom.xml文件中添加如下依賴<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies>?(3)在項(xiàng)目的
src/main/resources目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n?(4)編寫
Mapper類public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 輸出 for (String word : words) { k.set(word); context.write(k, v); } } }?(5)編寫
Reducer類public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 輸出 v.set(sum); context.write(key,v); } }?(6)編寫
Driver驅(qū)動類public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 獲取配置信息以及封裝任務(wù) Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 設(shè)置jar加載路徑 job.setJarByClass(WordcountDriver.class); // 3 設(shè)置map和reduce類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 4 設(shè)置map輸出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 設(shè)置最終輸出kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 設(shè)置輸入和輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
9??集群上測試
(1)用maven打jar包,需要添加的打包插件依賴(注意:標(biāo)記紅顏色的部分需要替換為自己工程主類)<build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin </artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.xxx.mr.WordcountDriver</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>注意:如果工程上顯示紅叉。在
項(xiàng)目上右鍵->maven->update project即可。
(1)將程序打成jar包,然后拷貝到Hadoop集群中
步驟詳情:右鍵->Run as->maven install。等待編譯完成就會在項(xiàng)目的target文件夾中生成jar包。如果看不到。在項(xiàng)目上右鍵-》Refresh,即可看到。修改不帶依賴的jar包名稱為wc.jar,并拷貝該jar包到Hadoop集群。
(2)啟動Hadoop集群
(3)執(zhí)行WordCount程序hadoop jar wc.jar com.xxx.wordcount.WordcountDriver /user/xxx/input /user/xxx/output
二 Hadoop序列化
1??序列化概述
2??自定義
bean對象實(shí)現(xiàn)序列化接口(Writable)
?在企業(yè)開發(fā)中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內(nèi)部傳遞一個bean對象,那么該對象就需要實(shí)現(xiàn)序列化接口。具體實(shí)現(xiàn)bean對象序列化步驟如下7步:
(1)必須實(shí)現(xiàn)Writable接口
(2)反序列化時,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造public FlowBean() { super(); }(3)重寫序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }(4)重寫反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }(5)
注意反序列化的順序和序列化的順序完全一致
(6)要想把結(jié)果顯示在文件中,需要重寫toString(),可用”\t”分開,方便后續(xù)用。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)Comparable接口,因?yàn)?code>MapReduce框中的Shuffle過程要求對key必須能排序。詳見后面排序案例。@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
3??序列化案例實(shí)操
?1. 需求 : 統(tǒng)計(jì)每一個手機(jī)號耗費(fèi)的總上行流量、下行流量、總流量;
(1)輸入數(shù)據(jù)id 手機(jī)號碼 網(wǎng)絡(luò)ip 上行流量 下行流量 網(wǎng)絡(luò)狀態(tài)碼 1 1363157985066 120.196.100.82 2481 24681 200 2 1363157995052 120.197.40.4 264 0 200 3 1363157991076 120.196.100.99 132 1512 200 4 1363154400022 120.197.40.4 240 0 200 5 1363157993044 120.196.100.99 1527 2106 200 6 1363157995074 120.197.40.4 4116 1432 200 7 1363157993055 120.196.100.99 1116 954 200 8 1363157995033 120.197.40.4 3156 2936 200 9 1363157983019 120.196.100.82 240 0 200 10 1363157984041 120.197.40.4 6960 690 200 11 1363157973098 120.197.40.4 3659 3538 200 12 1363157986029 120.196.100.99 1938 180 200 13 1363157992093 120.196.100.99 918 4938 200 14 1363157986041 120.197.40.4 180 180 200 15 1363157984040 120.197.40.4 1938 2910 200 16 1363157995093 120.196.100.82 3008 3720 200 17 1363157982040 120.196.100.99 7335 110349 200 18 1363157986072 120.196.100.99 9531 2412 200 19 1363157990043 120.196.100.55 11058 48243 200 20 1363157988072 120.196.100.82 120 120 200 21 1363157985066 120.196.100.82 2481 24681 200 22 1363157993055 120.196.100.99 1116 954 200(2)期望輸出數(shù)據(jù)格式:
手機(jī)號碼 上行流量 下行流量 總流量 13560436666 1116 954 2070?2. 需求分析
?3.編寫MapReduce程序
(1)編寫流量統(tǒng)計(jì)的Bean對象package com.xxx.mapreduce.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; // 1 實(shí)現(xiàn)writable接口 public class FlowBean implements Writable{ private long upFlow; private long downFlow; private long sumFlow; //2 反序列化時,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //3 寫序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //4 反序列化方法 //5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } // 6 編寫toString方法,方便后續(xù)打印到文本 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }(2)編寫
Mapper類package com.xxx.mapreduce.flowsum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ FlowBean v = new FlowBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 切割字段 String[] fields = line.split("\t"); // 3 封裝對象 // 取出手機(jī)號碼 String phoneNum = fields[1]; // 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(downFlow, upFlow); // 4 寫出 context.write(k, v); } }(3)編寫
Reducer類package com.xxx.mapreduce.flowsum; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 1 遍歷所用bean,將其中的上行流量,下行流量分別累加 for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sum_downFlow += flowBean.getDownFlow(); } // 2 封裝對象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); // 3 寫出 context.write(key, resultBean); } }(4)編寫
Driver驅(qū)動類package com.xxx.mapreduce.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowsumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[] { "e:/input/inputflow", "e:/output1" }; // 1 獲取配置信息,或者job對象實(shí)例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowsumDriver.class); // 2 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 3 指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 4 指定最終輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
三 MapReduce框架原理
1??
InputFormat數(shù)據(jù)輸入
1.1 切片與MapTask并行度決定機(jī)制
??MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個Job的處理速度。
??思考:1G的數(shù)據(jù),啟動8個MapTask,可以提高集群的并發(fā)處理能力。那么1K的數(shù)據(jù),也啟動8個MapTask,會提高集群性能嗎?MapTask并行任務(wù)是否越多越好呢?哪些因素影響了MapTask并行度?
1.MapTask并行度決定機(jī)制
?數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。
?數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對輸入進(jìn)行分片,并不會在磁盤上將其切分成片進(jìn)行存儲。
2??Job提交流程源碼和切片源碼詳解
1.Job提交流程源碼詳解waitForCompletion() submit(); // 1建立連接 connect(); // 1)創(chuàng)建提交Job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn還是遠(yuǎn)程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,并創(chuàng)建Job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計(jì)算切片,生成切片規(guī)劃文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫XML配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交Job,返回提交狀態(tài) status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());2.FileInputFormat切片源碼解析(input.getSplits(job))
3??FileInputFormat切片機(jī)制
4??CombineTextInputFormat切片機(jī)制
??框架默認(rèn)的TextInputFormat切片機(jī)制是對任務(wù)按文件規(guī)劃切片,不管文件多小,都會是一個單獨(dú)的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產(chǎn)生大量的MapTask,處理效率極其低下。
?1、應(yīng)用場景:
?CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。
?2、虛擬存儲切片最大值設(shè)置
?CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虛擬存儲切片最大值設(shè)置最好根據(jù)實(shí)際的小文件大小情況來設(shè)置具體的值。
?3、切片機(jī)制
?生成切片過程包括:虛擬存儲過程和切片過程二部分。
?(1)虛擬存儲過程:
??將輸入目錄下所有文件大小,依次和設(shè)置的setMaxInputSplitSize值比較,如果不大于設(shè)置的最大值,邏輯上劃分一個塊。如果輸入文件大于設(shè)置的最大值且大于兩倍,那么以最大值切割一塊;當(dāng)剩余數(shù)據(jù)大小超過設(shè)置的最大值且不大于最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現(xiàn)太小切片)。
??例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M,則先邏輯上分成一個4M。剩余的大小為4.02M,如果按照4M邏輯劃分,就會出現(xiàn)0.02M的小的虛擬存儲文件,所以將剩余的4.02M文件切分成(2.01M和2.01M)兩個文件。
?(2)切片過程:
??(a)判斷虛擬存儲的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨(dú)形成一個切片。
??(b)如果不大于則跟下一個虛擬存儲文件進(jìn)行合并,共同形成一個切片。
??(c)測試舉例:有4個小文件大小分別為1.7M、5.1M、3.4M以及6.8M這四個小文件,則虛擬存儲之后形成6個文件塊,大小分別為:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最終會形成3個切片,大小分別為:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
5??CombineTextInputFormat案例實(shí)操
1.需求 : 將輸入的大量小文件合并成一個切片統(tǒng)一處理。
(1)輸入數(shù)據(jù) : 準(zhǔn)備4個小文件
(2)期望 : 期望一個切片處理4個文件
2.實(shí)現(xiàn)過程
(1)不做任何處理,運(yùn)行1.6節(jié)的WordCount案例程序,觀察切片個數(shù)為4.(2)在WordcountDriver中增加如下代碼,運(yùn)行程序,并觀察運(yùn)行的切片個數(shù)為3。
?(a)驅(qū)動類中添加代碼如下:// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虛擬存儲切片最大值設(shè)置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);?(b)運(yùn)行如果為3個切片。
(3)在WordcountDriver中增加如下代碼,運(yùn)行程序,并觀察運(yùn)行的切片個數(shù)為1。
?(a)驅(qū)動中添加代碼如下:// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虛擬存儲切片最大值設(shè)置20m CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);?(b)運(yùn)行如果為1個切片。
6??FileInputFormat實(shí)現(xiàn)類
7??KeyValueTextInputFormat使用案例
1.需求 : 統(tǒng)計(jì)輸入文件中每一行的第一個單詞相同的行數(shù)。
(1)輸入數(shù)據(jù)banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang(2)期望結(jié)果數(shù)據(jù)
banzhang 2 xihuan 22.需求分析
3.代碼實(shí)現(xiàn)
(1)編寫Mapper類package com.xxx.mapreduce.KeyValueTextInputFormat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{ // 1 設(shè)置value LongWritable v = new LongWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { // banzhang ni hao // 2 寫出 context.write(key, v); } }(2)編寫
Reducer類package com.xxx.mapreduce.KeyValueTextInputFormat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0L; // 1 匯總統(tǒng)計(jì) for (LongWritable value : values) { sum += value.get(); } v.set(sum); // 2 輸出 context.write(key, v); } }(3)編寫
Driver類package com.xxx.mapreduce.keyvaleTextInputFormat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class KVTextDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 設(shè)置切割符 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); // 1 獲取job對象 Job job = Job.getInstance(conf); // 2 設(shè)置jar包位置,關(guān)聯(lián)mapper和reducer job.setJarByClass(KVTextDriver.class); job.setMapperClass(KVTextMapper.class); job.setReducerClass(KVTextReducer.class); // 3 設(shè)置map輸出kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 4 設(shè)置最終輸出kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 5 設(shè)置輸入輸出數(shù)據(jù)路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); // 設(shè)置輸入格式 job.setInputFormatClass(KeyValueTextInputFormat.class); // 6 設(shè)置輸出數(shù)據(jù)路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交job job.waitForCompletion(true); } }8??
NLineInputFormat使用案例
1.需求 : 對每個單詞進(jìn)行個數(shù)統(tǒng)計(jì),要求根據(jù)每個輸入文件的行數(shù)來規(guī)定輸出多少個切片。此案例要求每三行放入一個切片中。
(1)輸入數(shù)據(jù)banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang(2)期望輸出數(shù)據(jù)
Number of splits:4
2.需求分析3.代碼實(shí)現(xiàn)
(1)編寫Mapper類package com.xxx.mapreduce.nline; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ private Text k = new Text(); private LongWritable v = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 切割 String[] splited = line.split(" "); // 3 循環(huán)寫出 for (int i = 0; i < splited.length; i++) { k.set(splited[i]); context.write(k, v); } } }(2)編寫
Reducer類package com.xxx.mapreduce.nline; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0l; // 1 匯總 for (LongWritable value : values) { sum += value.get(); } v.set(sum); // 2 輸出 context.write(key, v); } }(3)編寫
Driver類package com.xxx.mapreduce.nline; import java.io.IOException; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class NLineDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[] { "e:/input/inputword", "e:/output1" }; // 1 獲取job對象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 7設(shè)置每個切片InputSplit中劃分三條記錄 NLineInputFormat.setNumLinesPerSplit(job, 3); // 8使用NLineInputFormat處理記錄數(shù) job.setInputFormatClass(NLineInputFormat.class); // 2設(shè)置jar包位置,關(guān)聯(lián)mapper和reducer job.setJarByClass(NLineDriver.class); job.setMapperClass(NLineMapper.class); job.setReducerClass(NLineReducer.class); // 3設(shè)置map輸出kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 4設(shè)置最終輸出kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 5設(shè)置輸入輸出數(shù)據(jù)路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6提交job job.waitForCompletion(true); } }9??自定義
InputFormat
??自定義InputFormat案例實(shí)操
?無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應(yīng)解決方案。可以自定義InputFormat實(shí)現(xiàn)小文件的合并。
1.需求 : 將多個小文件合并成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進(jìn)制形式的key-value對的文件格式),SequenceFile里面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內(nèi)容為value。
(1)輸入數(shù)據(jù)//one.txt yongpeng weidong weinan sanfeng luozong xiaoming//two.txt longlong fanfan mazong kailun yuhang yixin longlong fanfan mazong kailun yuhang yixin//three.txt shuaige changmo zhenqiang dongli lingu xuanxuan(2)期望輸出文件格式
2.需求分析3.程序?qū)崿F(xiàn)
(1)自定義InputFromatpackage com.xxx.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // 定義類繼承FileInputFormat public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; } }(2)自定義
RecordReader類package com.xxx.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader<Text, BytesWritable>{ private Configuration configuration; private FileSplit split; private boolean isProgress= true; private BytesWritable value = new BytesWritable(); private Text k = new Text(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit)split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { // 1 定義緩存區(qū) byte[] contents = new byte[(int)split.getLength()]; FileSystem fs = null; FSDataInputStream fis = null; try { // 2 獲取文件系統(tǒng) Path path = split.getPath(); fs = path.getFileSystem(configuration); // 3 讀取數(shù)據(jù) fis = fs.open(path); // 4 讀取文件內(nèi)容 IOUtils.readFully(fis, contents, 0, contents.length); // 5 輸出文件內(nèi)容 value.set(contents, 0, contents.length); // 6 獲取文件路徑及名稱 String name = split.getPath().toString(); // 7 設(shè)置輸出的key值 k.set(name); } catch (Exception e) { }finally { IOUtils.closeStream(fis); } isProgress = false; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }(3)編寫
SequenceFileMapper類處理流程package com.xxx.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{ @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } }(4)編寫
SequenceFileReducer類處理流程package com.xxx.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); } }(5)編寫
SequenceFileDriver類處理流程package com.xxx.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[] { "e:/input/inputinputformat", "e:/output1" }; // 1 獲取job對象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 設(shè)置jar包存儲位置、關(guān)聯(lián)自定義的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); // 7設(shè)置輸入的inputFormat job.setInputFormatClass(WholeFileInputformat.class); // 8設(shè)置輸出的outputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); // 3 設(shè)置map輸出端的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 4 設(shè)置最終輸出端的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 5 設(shè)置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
四 MapReduce工作流程
MapReduce詳細(xì)工作流程1
MapReduce詳細(xì)工作流程2


上面的流程是整個
MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結(jié)束,具體Shuffle過程詳解,如下:
1)MapTask收集我們的map()方法輸出的kv對,放到內(nèi)存緩沖區(qū)中
2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合并成大的溢出文件
4)在溢出過程及合并的過程中,都要調(diào)用Partitioner進(jìn)行分區(qū)和針對key進(jìn)行排序
5)ReduceTask根據(jù)自己的分區(qū)號,去各個MapTask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6)ReduceTask會取到同一個分區(qū)的來自不同MapTask的結(jié)果文件,ReduceTask會將這些文件再進(jìn)行合并(歸并排序)
7)合并成大文件后,Shuffle的過程也就結(jié)束了,后面進(jìn)入ReduceTask的邏輯運(yùn)算過程(從文件中取出一個一個的鍵值對Group,調(diào)用用戶自定義的reduce()方法)注意
Shuffle中的緩沖區(qū)大小會影響到MapReduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快。緩沖區(qū)的大小可以通過參數(shù)調(diào)整,參數(shù):io.sort.mb默認(rèn)100M。源碼解析流程
context.write(k, NullWritable.get()); output.write(key, value); collector.collect(key, value,partitioner.getPartition(key, value, partitions)); HashPartitioner(); collect() close() collect.flush() sortAndSpill() sort() QuickSort mergeParts(); collector.close();




























