黑猴子的家:MapReduce流量匯總程序案例二

將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(Partitioner)

1、分析

(1)Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發(fā)給不同的reducetask。默認(rèn)的分發(fā)規(guī)則為:根據(jù)key的hashcode%reducetask數(shù)來分發(fā)

(2)如果要按照我們自己的需求進(jìn)行分組,則需要改寫數(shù)據(jù)分發(fā)(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner

(3)在job驅(qū)動中,設(shè)置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)

2、在流量匯總程序案例一的基礎(chǔ)上,增加一個分區(qū)類

import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * K2 V2 對應(yīng)的是map輸出kv類型
* @author Administrator
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 1 獲取電話號碼的前三位
        String preNum = key.toString().substring(0, 3);
        
        int partition = 4;
        
        // 2 判斷是哪個省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}

3、在驅(qū)動函數(shù)中增加自定義數(shù)據(jù)分區(qū)設(shè)置和reduce task設(shè)置

public static void main(String[] args) throws Exception {
        // 1 獲取配置信息,或者job對象實(shí)例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowCount.class);

        // 8 指定自定義數(shù)據(jù)分區(qū)
        job.setPartitionerClass(ProvincePartitioner.class);
        
        // 9 同時指定相應(yīng)數(shù)量的reduce task
        job.setNumReduceTasks(5); 
        
        // 2 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper輸出數(shù)據(jù)的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最終輸出的數(shù)據(jù)的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

4、將程序打成jar包,然后拷貝到hadoop集群中。

flowcountPartitionser.jar 

5、啟動hadoop集群

[victor@hadoop102 hadoop]$ sbin/start-all.sh 

6、執(zhí)行flowcountPartitionser程序

[victor@hadoop102 hadoop]$ hadoop jar flowcountPartitionser.jar \
com.victor.mr.partitioner.FlowCount \
/user/victor/flowcount/input /user/victor/flowcount/output

7、查看結(jié)果

[victor@hadoop102 software]$ hadoop fs -lsr /
/user/victor/flowcount/output/part-r-00000
/user/victor/flowcount/output/part-r-00001
/user/victor/flowcount/output/part-r-00002
/user/victor/flowcount/output/part-r-00003
/user/victor/flowcount/output/part-r-00004

8、Code -> GitHub

https://github.com/liufengji/hadoop_mapreduce.git

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容