自定義排序?qū)崿F(xiàn)
bean對象做為key傳輸,需要實現(xiàn)WritableComparable接口重寫compareTo方法,就可以實現(xiàn)排序。
WritableComparable 的底層實現(xiàn)
WritableComparable 本身繼承了 Writable 和 Comparable 接口。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {}
自定義排序選需要同時實現(xiàn) 序列化、反序列化及比較器
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
public int compareTo(T o);
案例實操
數(shù)據(jù)準備
序號 電話號碼 IP地址 域名 上行流量 下行流量 狀態(tài)
1 13736230513 192.196.100.1 www.tenxun.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.tenxun.com 1527 2106 200
6 84188413 192.168.100.3 www.tenxun.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.tenxun.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200
需求
求出總流量(上行流量+下行流量)并按照總流量進行排序,最后輸出結(jié)果包含手機號碼、上行流量、下行流量,總流量
輸出結(jié)果
電話號碼 上行下行 總流量
13768778790 120 120 240
13966251146 240 0 240
13729199489 240 0 240
13846544121 264 0 264
13470253144 180 180 360
13956435636 132 1512 1644
13590439668 1116 954 2070
13568436656 1116 954 2070
15959002129 1938 180 2118
18271575951 1527 2106 3633
13682846555 1938 2910 4848
84188413 4116 1432 5548
13560439638 918 4938 5856
15910133277 3156 2936 6092
13992314666 3008 3720 6728
15043685818 3659 3538 7197
13630577991 6960 690 7650
18390173782 9531 2412 11943
13568436656 2481 24681 27162
13736230513 2481 24681 27162
13975057813 11058 48243 59301
13509468723 7335 110349 117684
排序分類
- 全排序:
最終輸出結(jié)果只有一個文件,且文件內(nèi)部有序。實現(xiàn)方式是只設(shè)置一個ReduceTask。但該方法在處理大型文件時效率極低,因此一臺機器處理所有文件,完全喪失了MapReduce所提供的并行架構(gòu)。
- 部分排序(區(qū)內(nèi)排序):
MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內(nèi)部有序。
- 二次排序(多次排序):
在自定義排序過程中,如果compareTo的判斷條件為兩個即為二次排序。
全排序
創(chuàng)建實體類
實現(xiàn) WritableComparable 接口、實現(xiàn)compareTo、write、readFields方法
注意點:
1:readFields 讀取數(shù)據(jù)的順序必須與write寫入的順序一致。
2:必須保證有一個空構(gòu)造器。
3:必須實現(xiàn)toString()方法,否則輸出數(shù)據(jù)是對象的地址信息。
/**
* 流量數(shù)據(jù)bean
*/
public class PhoneComparableBean implements WritableComparable<PhoneComparableBean> {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 總流量
*/
private long sumFlow;
public PhoneComparableBean(){}
public PhoneComparableBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
/**
* 若多個排序條件就可以稱為二次排序或多次排序
*/
@Override
public int compareTo(PhoneComparableBean o) {
return Long.compare(this.sumFlow, o.sumFlow);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
// getter/setter
@Override
public String toString() {
return String.format("%s %s %s", upFlow, downFlow, sumFlow);
}
}
mapper
/**
* LongWritable:數(shù)據(jù)偏移量,可以理解為文本內(nèi)容所在的行號
* Text:默認采用 TextInputFromat,按照一行行內(nèi)容進行讀取
* PhoneComparableBean:將自定義的Bean作為key 進行返回
* Text:把手機號碼作為value返回
*/
public class PhoneComparableMapper extends Mapper<LongWritable, Text, PhoneComparableBean, Text> {
// 分割條件
private static Pattern pattern=Pattern.compile("[\\s+]");
private Text outKey = new Text();
/**
* 重寫 Mapper 中的Map方法
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 獲取一行內(nèi)容中的分割數(shù)據(jù)
List<String> list = pattern.splitAsStream(value.toString())
.filter(s -> s.length()>0).collect(Collectors.toList());
// 獲取手機號碼
String phone = list.get(1);
outKey.set(phone);
// 由于域名可能為空,為了保證數(shù)據(jù)不能為空,從尾進行獲取
// 上行流量
long upFlow = Long.parseLong(list.get(list.size() - 3));
// 下行流量
long downFlow = Long.parseLong(list.get(list.size() - 2));
// 將內(nèi)容包裝成對象,并作為key
PhoneComparableBean outValue = new PhoneComparableBean(upFlow,downFlow);
//返回 K,V
context.write(outValue,outKey);
}
}
reduce
/**
* PhoneComparableBean:Mapper的返回key,自定義的對象
* Text:Mapper的返回vlaue,手機號碼
* Text:Reducer返回的key,手機號碼
* PhoneComparableBean:Reducer返回的Value,自定義的對象
*/
public class PhoneComparableReduce extends Reducer<PhoneComparableBean, Text,Text,PhoneComparableBean> {
@Override
protected void reduce(PhoneComparableBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 進行讀取寫出
Iterator<Text> iterator = values.iterator();
while (iterator.hasNext()) {
context.write(iterator.next(), key);
}
}
}
main:主類(采用本地測試)
public class PhoneComparableDriver {
public static void main(String[] args) {
Configuration configuration = new Configuration();
try {
Job job = Job.getInstance(configuration);
// mapper 返回的key、value數(shù)據(jù)類型
job.setMapOutputKeyClass(PhoneComparableBean.class);
job.setMapOutputValueClass(Text.class);
// 調(diào)用的 mapper 及 reduce
job.setMapperClass(PhoneComparableMapper.class);
job.setReducerClass(PhoneComparableReduce.class);
// 實際返回(Reduce)的 key、value 數(shù)據(jù)類型
job.setOutputValueClass(PhoneComparableBean.class);
job.setOutputKeyClass(Text.class);
// 數(shù)據(jù)輸入路徑
FileInputFormat.setInputPaths(job, new Path("D:\\development tool\\document\\temp\\input\\phone_data .txt"));
// 數(shù)據(jù)輸出路徑
String str = UUID.randomUUID().toString().replaceAll("-", "");
FileOutputFormat.setOutputPath(job, new Path("D:\\development tool\\document\\temp\\output\\",str));
// 提交任務(wù),等待執(zhí)行結(jié)果
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
測試結(jié)果:
13768778790 120 120 240
13966251146 240 0 240
13729199489 240 0 240
13846544121 264 0 264
13470253144 180 180 360
13956435636 132 1512 1644
13590439668 1116 954 2070
13568436656 1116 954 2070
15959002129 1938 180 2118
18271575951 1527 2106 3633
13682846555 1938 2910 4848
84188413 4116 1432 5548
13560439638 918 4938 5856
15910133277 3156 2936 6092
13992314666 3008 3720 6728
15043685818 3659 3538 7197
13630577991 6960 690 7650
18390173782 9531 2412 11943
13568436656 2481 24681 27162
13736230513 2481 24681 27162
13975057813 11058 48243 59301
13509468723 7335 110349 117684
部分排序(區(qū)內(nèi)排序):
實現(xiàn)區(qū)內(nèi)排序的前提時需要進行先分區(qū),上面的數(shù)據(jù)不需要變動。
自定義分區(qū)
繼承 org.apache.hadoop.mapreduce.Partitioner 類,實現(xiàn) getPartition() 方法
/**
* 自定義分區(qū)
* 由于 分區(qū)階段 處理 Mapper -->分區(qū) --> 環(huán)形緩沖區(qū) 之間。
* 所以 Partitioner<KEY,VALUE> 是 Mapper 返回的數(shù)據(jù)類型(PhoneComparableBean和Text)
*/
public class PhoneComparablePartitioner extends Partitioner<PhoneComparableBean, Text> {
/**
* 按照手機號碼開頭進行分區(qū)
* 分別按照 134、135、137、139、其他號碼 分為5個區(qū)
* @param phoneComparableBean
* @param text
* @param numPartitions
* @return
*/
@Override
public int getPartition(PhoneComparableBean phoneComparableBean, Text text, int numPartitions) {
String str = text.toString();
if (str.startsWith("134")) {
return 0;
} else if (str.startsWith("135")) {
return 1;
} else if (str.startsWith("137")) {
return 2;
} else if (str.startsWith("139")) {
return 3;
}
return 4;
}
}
主類(main):
mapper 和 reduce 都不會有改動,這里只需要更改主類就行。
public class PhoneComparableDriver {
public static void main(String[] args) {
Configuration configuration = new Configuration();
try {
Job job = Job.getInstance(configuration);
// mapper 返回的key、value數(shù)據(jù)類型
job.setMapOutputKeyClass(PhoneComparableBean.class);
job.setMapOutputValueClass(Text.class);
// 調(diào)用的 mapper 及 reduce
job.setMapperClass(PhoneComparableMapper.class);
job.setReducerClass(PhoneComparableReduce.class);
// 添加自定義分區(qū)類
job.setPartitionerClass(PhoneComparablePartitioner.class);
// 設(shè)置Reducetask個數(shù),該數(shù)量應(yīng)該與分區(qū)數(shù)量保持一致
job.setNumReduceTasks(5);
// 實際返回(Reduce)的 key、value 數(shù)據(jù)類型
job.setOutputValueClass(PhoneComparableBean.class);
job.setOutputKeyClass(Text.class);
// 數(shù)據(jù)輸入路徑
FileInputFormat.setInputPaths(job, new Path("D:\\development tool\\document\\temp\\input\\phone_data .txt"));
// 數(shù)據(jù)輸出路徑
String str = UUID.randomUUID().toString().replaceAll("-", "");
FileOutputFormat.setOutputPath(job, new Path("D:\\development tool\\document\\temp\\output\\",str));
// 提交任務(wù),等待執(zhí)行結(jié)果
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
運行結(jié)果:

生成五個文件,就不一個個區(qū)查看了,在這里查看一下 135 開頭的數(shù)據(jù)分區(qū)號為1(part-r-00001的文件)。
13590439668 1116 954 2070
13568436656 1116 954 2070
13560439638 918 4938 5856
13568436656 2481 24681 27162
13509468723 7335 110349 117684