MapReduce 分區(qū)

MapReduce 分區(qū)

分區(qū)概述

在 MapReduce 中, 通過我們指定分區(qū), 會將同一個分區(qū)的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中進(jìn)行處理

例如: 為了數(shù)據(jù)的統(tǒng)計, 可以把一批類似的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中, 在同一個 Reduce 當(dāng)中統(tǒng)計相同類型的數(shù)據(jù), 就可以實(shí)現(xiàn)類似的數(shù)據(jù)分區(qū)和統(tǒng)計等

其實(shí)就是相同類型的數(shù)據(jù), 有共性的數(shù)據(jù), 送到一起去處理

Reduce 當(dāng)中默認(rèn)的分區(qū)只有一個

需求:將以下數(shù)據(jù)進(jìn)行分開處理

詳細(xì)數(shù)據(jù)參見partition.csv 這個文本文件,其中第五個字段表示開獎結(jié)果數(shù)值,現(xiàn)在需求將15以上的結(jié)果以及15以下的結(jié)果進(jìn)行分開成兩個文件進(jìn)行保存

wps1.jpg

分區(qū)步驟:

Step 1. 定義 Mapper

這個 Mapper 程序不做任何邏輯, 也不對 Key-Value 做任何改變, 只是接收數(shù)據(jù), 然后往下發(fā)送

public class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}
Step 2. 自定義 Partitioner

主要的邏輯就在這里, 這也是這個案例的意義, 通過 Partitioner 將數(shù)據(jù)分發(fā)給不同的 Reducer

/**
 * 這里的輸入類型與我們map階段的輸出類型相同
 */
public class MyPartitioner extends Partitioner<Text,NullWritable>{
    /**
     * 返回值表示我們的數(shù)據(jù)要去到哪個分區(qū)
     * 返回值只是一個分區(qū)的標(biāo)記,標(biāo)記所有相同的數(shù)據(jù)去到指定的分區(qū)
     */
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        String result = text.toString().split("\t")[5];
        if (Integer.parseInt(result) > 15){
            return 1;
        }else{
            return 0;
        }
    }
}
Step 3. 定義 Reducer 邏輯

這個 Reducer 也不做任何處理, 將數(shù)據(jù)原封不動的輸出即可

public class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}
Step 4. 主類中設(shè)置分區(qū)類和ReduceTask個數(shù)
public class PartitionMain  extends Configured implements Tool {
    public static void main(String[] args) throws  Exception{
        int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
        System.exit(run);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), PartitionMain.class.getSimpleName());
        job.setJarByClass(PartitionMain.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://192.168.52.250:8020/partitioner"));
        TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.52.250:8020/outpartition"));
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(MyReducer.class);
        /**
         * 設(shè)置我們的分區(qū)類,以及我們的reducetask的個數(shù),注意reduceTask的個數(shù)一定要與我們的
         * 分區(qū)數(shù)保持一致
         */
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(2);
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }
}

MapReduce 中的計數(shù)器

計數(shù)器是收集作業(yè)統(tǒng)計信息的有效手段之一,用于質(zhì)量控制或應(yīng)用級統(tǒng)計。計數(shù)器還可輔助診斷系統(tǒng)故障。如果需要將日志信息傳輸?shù)?map 或 reduce 任務(wù), 更好的方法通常是看能否用一個計數(shù)器值來記錄某一特定事件的發(fā)生。對于大型分布式作業(yè)而言,使用計數(shù)器更為方便。除了因?yàn)楂@取計數(shù)器值比輸出日志更方便,還有根據(jù)計數(shù)器值統(tǒng)計特定事件的發(fā)生次數(shù)要比分析一堆日志文件容易得多。

hadoop內(nèi)置計數(shù)器列表

MapReduce任務(wù)計數(shù)器 org.apache.hadoop.mapreduce.TaskCounter
文件系統(tǒng)計數(shù)器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat計數(shù)器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat計數(shù)器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作業(yè)計數(shù)器 org.apache.hadoop.mapreduce.JobCounter

每次mapreduce執(zhí)行完成之后,我們都會看到一些日志記錄出來,其中最重要的一些日志記錄如下截圖

1561707398432.png

所有的這些都是MapReduce的計數(shù)器的功能,既然MapReduce當(dāng)中有計數(shù)器的功能,我們?nèi)绾螌?shí)現(xiàn)自己的計數(shù)器???

需求:以以上分區(qū)代碼為案例,統(tǒng)計map接收到的數(shù)據(jù)記錄條數(shù)

第一種方式

第一種方式定義計數(shù)器,通過context上下文對象可以獲取我們的計數(shù)器,進(jìn)行記錄
通過context上下文對象,在map端使用計數(shù)器進(jìn)行統(tǒng)計

public class PartitionMapper  extends Mapper<LongWritable,Text,Text,NullWritable>{
    //map方法將K1和V1轉(zhuǎn)為K2和V2
    @Override
    protected void map(LongWritable key, Text value, Context context) throws Exception{
        Counter counter = context.getCounter("MR_COUNT", "MyRecordCounter");
        counter.increment(1L);
        context.write(value,NullWritable.get());
    }
}

運(yùn)行程序之后就可以看到我們自定義的計數(shù)器在map階段讀取了七條數(shù)據(jù)

1561707440117.png
第二種方式

通過enum枚舉類型來定義計數(shù)器
統(tǒng)計reduce端數(shù)據(jù)的輸入的key有多少個

public class PartitionerReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
   public static enum Counter{
       MY_REDUCE_INPUT_RECORDS,MY_REDUCE_INPUT_BYTES
   }
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
       context.getCounter(Counter.MY_REDUCE_INPUT_RECORDS).increment(1L);
       context.write(key, NullWritable.get());
    }
}
1561707423325.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容