將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(Partitioner)
1、分析
(1)Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發(fā)給不同的reducetask。默認(rèn)的分發(fā)規(guī)則為:根據(jù)key的hashcode%reducetask數(shù)來分發(fā)
(2)如果要按照我們自己的需求進(jìn)行分組,則需要改寫數(shù)據(jù)分發(fā)(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
(3)在job驅(qū)動中,設(shè)置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
2、在流量匯總程序案例一的基礎(chǔ)上,增加一個分區(qū)類
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* K2 V2 對應(yīng)的是map輸出kv類型
* @author Administrator
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號碼的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判斷是哪個省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
3、在驅(qū)動函數(shù)中增加自定義數(shù)據(jù)分區(qū)設(shè)置和reduce task設(shè)置
public static void main(String[] args) throws Exception {
// 1 獲取配置信息,或者job對象實(shí)例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 6 指定本程序的jar包所在的本地路徑
job.setJarByClass(FlowCount.class);
// 8 指定自定義數(shù)據(jù)分區(qū)
job.setPartitionerClass(ProvincePartitioner.class);
// 9 同時指定相應(yīng)數(shù)量的reduce task
job.setNumReduceTasks(5);
// 2 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 3 指定mapper輸出數(shù)據(jù)的kv類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 4 指定最終輸出的數(shù)據(jù)的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 5 指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
4、將程序打成jar包,然后拷貝到hadoop集群中。
flowcountPartitionser.jar
5、啟動hadoop集群
[victor@hadoop102 hadoop]$ sbin/start-all.sh
6、執(zhí)行flowcountPartitionser程序
[victor@hadoop102 hadoop]$ hadoop jar flowcountPartitionser.jar \
com.victor.mr.partitioner.FlowCount \
/user/victor/flowcount/input /user/victor/flowcount/output
7、查看結(jié)果
[victor@hadoop102 software]$ hadoop fs -lsr /
/user/victor/flowcount/output/part-r-00000
/user/victor/flowcount/output/part-r-00001
/user/victor/flowcount/output/part-r-00002
/user/victor/flowcount/output/part-r-00003
/user/victor/flowcount/output/part-r-00004