MapReduce 基礎(chǔ) (五)規(guī)約

每一個 map 都可能會產(chǎn)生大量的本地輸出,Combiner 的作用就是對 map 端的輸出先做一次 合并,以減少在 map 和 reduce 節(jié)點之間的數(shù)據(jù)傳輸量,以提高網(wǎng)絡(luò)IO 性能,是 MapReduce 的一種優(yōu)化手段之一

  • combiner 是 MR 程序中 Mapper 和 Reducer 之外的一種組件
  • combiner 組件的父類就是 Reducer
  • combiner 和 reducer 的區(qū)別在于運行的位置
    • Combiner 是在每一個 maptask 所在的節(jié)點運行
    • Reducer 是接收全局所有 Mapper 的輸出結(jié)果
  • ombiner 的意義就是對每一個 maptask 的輸出進行局部匯總,以減小網(wǎng)絡(luò)傳輸量

實現(xiàn)步驟

  1. 自定義一個 combiner 繼承 Reducer,重寫 reduce 方法
  2. 在 job 中設(shè)置 job.setCombinerClass(CustomCombiner.class)

combiner 能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯,而且,combiner 的輸出 kv 應(yīng)該跟 reducer 的輸入 kv 類型要對應(yīng)起來

combiner其實就是對Reducer的提前重寫,以我們數(shù)字統(tǒng)計的案例為例,我們需要新建一個MyCombiner類,在類中繼承Reducer類,進行重寫

MyCombiner

package cn.leon.combiner;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyCombiner extends Reducer<Text, LongWritable, Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        //1.遍歷集合,將集合中的數(shù)字相加,得到V3
        for(LongWritable value : values){
            count+=value.get();
        }
        //2.將K3和V3寫入上下文
        context.write(key,new LongWritable(count));
    }
}

然后在主類中加上這個combiner類,即可

package cn.leon.combiner;

import cn.leon.mapReduce.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        //1 . 創(chuàng)建job對象
        Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
        //打包到集群上面運行時候,必須要添加以下配置,指定程序的main函數(shù)
        job.setJarByClass(JobMain.class);
        //2. 配置job任務(wù)(8個步驟)
            //第一步:設(shè)置輸入類和輸入路徑
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/wordcount"));

            //第二步:設(shè)置mapper類
            job.setMapperClass(WordCountMapper.class);
            //設(shè)置我們map階段完成之后的輸出類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);

            //第三步,第四步,第六步,省略
            //第五步:設(shè)置combner
            job.setCombinerClass(MyCombiner.class);

            //第七步:設(shè)置我們的reduce類
            job.setReducerClass(WordCountReducer.class);
            //設(shè)置我們reduce階段完成之后的輸出類型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            //第八步:設(shè)置輸出類以及輸出路徑
            Path path = new Path("hdfs://node01:8020/output/wordcount_out");

            //判斷目錄是否存在
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020/output/wordcount_cout"),new Configuration());
            if (fileSystem.exists(path)){
                //刪除目標目錄
                fileSystem.delete(path,true);
            }

            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job,path);
            boolean b = job.waitForCompletion(true);

        //3. 等待任務(wù)結(jié)束
        return b?0:1;
    }

    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();

        Tool tool = new JobMain();
        //啟動Job任務(wù)
        int run = ToolRunner.run(configuration, tool, args);

        System.exit(run);
    }
}

我們對比一下運行結(jié)果

首先看沒有加上Combiner,Reducer的執(zhí)行結(jié)果

Reduce input groups=19
Reduce shuffle bytes=16937
Reduce input records=1032
Reduce output records=19

再看加上了Combiner之后Reducer的執(zhí)行結(jié)果

Reduce input groups=19
Reduce shuffle bytes=528
Reduce input records=31
Reduce output records=19

對比上面的結(jié)果,我們可以看到,在運行了Combiner之后,Reducer的執(zhí)行效果大大增加,減少了很多的次數(shù)。所以還是推薦用Combiner的,這樣可以提高Reducer的運行效率和增加網(wǎng)絡(luò)傳輸速度。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容