每一個 map 都可能會產(chǎn)生大量的本地輸出,Combiner 的作用就是對 map 端的輸出先做一次 合并,以減少在 map 和 reduce 節(jié)點之間的數(shù)據(jù)傳輸量,以提高網(wǎng)絡(luò)IO 性能,是 MapReduce 的一種優(yōu)化手段之一
- combiner 是 MR 程序中 Mapper 和 Reducer 之外的一種組件
- combiner 組件的父類就是 Reducer
- combiner 和 reducer 的區(qū)別在于運行的位置
- Combiner 是在每一個 maptask 所在的節(jié)點運行
- Reducer 是接收全局所有 Mapper 的輸出結(jié)果
- ombiner 的意義就是對每一個 maptask 的輸出進行局部匯總,以減小網(wǎng)絡(luò)傳輸量
實現(xiàn)步驟
- 自定義一個 combiner 繼承 Reducer,重寫 reduce 方法
- 在 job 中設(shè)置 job.setCombinerClass(CustomCombiner.class)
combiner 能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯,而且,combiner 的輸出 kv 應(yīng)該跟 reducer 的輸入 kv 類型要對應(yīng)起來
combiner其實就是對Reducer的提前重寫,以我們數(shù)字統(tǒng)計的案例為例,我們需要新建一個MyCombiner類,在類中繼承Reducer類,進行重寫
MyCombiner
package cn.leon.combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
//1.遍歷集合,將集合中的數(shù)字相加,得到V3
for(LongWritable value : values){
count+=value.get();
}
//2.將K3和V3寫入上下文
context.write(key,new LongWritable(count));
}
}
然后在主類中加上這個combiner類,即可
package cn.leon.combiner;
import cn.leon.mapReduce.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//1 . 創(chuàng)建job對象
Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
//打包到集群上面運行時候,必須要添加以下配置,指定程序的main函數(shù)
job.setJarByClass(JobMain.class);
//2. 配置job任務(wù)(8個步驟)
//第一步:設(shè)置輸入類和輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/wordcount"));
//第二步:設(shè)置mapper類
job.setMapperClass(WordCountMapper.class);
//設(shè)置我們map階段完成之后的輸出類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三步,第四步,第六步,省略
//第五步:設(shè)置combner
job.setCombinerClass(MyCombiner.class);
//第七步:設(shè)置我們的reduce類
job.setReducerClass(WordCountReducer.class);
//設(shè)置我們reduce階段完成之后的輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步:設(shè)置輸出類以及輸出路徑
Path path = new Path("hdfs://node01:8020/output/wordcount_out");
//判斷目錄是否存在
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020/output/wordcount_cout"),new Configuration());
if (fileSystem.exists(path)){
//刪除目標目錄
fileSystem.delete(path,true);
}
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
//3. 等待任務(wù)結(jié)束
return b?0:1;
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Tool tool = new JobMain();
//啟動Job任務(wù)
int run = ToolRunner.run(configuration, tool, args);
System.exit(run);
}
}
我們對比一下運行結(jié)果
首先看沒有加上Combiner,Reducer的執(zhí)行結(jié)果
Reduce input groups=19
Reduce shuffle bytes=16937
Reduce input records=1032
Reduce output records=19
再看加上了Combiner之后Reducer的執(zhí)行結(jié)果
Reduce input groups=19
Reduce shuffle bytes=528
Reduce input records=31
Reduce output records=19
對比上面的結(jié)果,我們可以看到,在運行了Combiner之后,Reducer的執(zhí)行效果大大增加,減少了很多的次數(shù)。所以還是推薦用Combiner的,這樣可以提高Reducer的運行效率和增加網(wǎng)絡(luò)傳輸速度。