二次排序的需求說明
在mapreduce操作時(shí),shuffle階段會(huì)多次根據(jù)key值排序。但是在shuffle分組后,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時(shí)value值也是排序好的,這種需求就是二次排序。

sort1.png
測(cè)試的文件數(shù)據(jù)
a 1
a 5
a 7
a 9
b 3
b 8
b 10
未經(jīng)過二次排序的輸出結(jié)果
a 9
a 7
a 5
a 1
b 10
b 8
b 3
第一種實(shí)現(xiàn)思路
直接在reduce端對(duì)分組后的values進(jìn)行排序。
- reduce關(guān)鍵代碼
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Integer> valuesList = new ArrayList<Integer>();
// 取出value
for(IntWritable value : values) {
valuesList.add(value.get());
}
// 進(jìn)行排序
Collections.sort(valuesList);
for(Integer value : valuesList) {
context.write(key, new IntWritable(value));
}
}
- 輸出結(jié)果
a 1
a 5
a 7
a 9
b 3
b 8
b 10
很容易發(fā)現(xiàn),這樣把排序工作都放到reduce端完成,當(dāng)values序列長度非常大的時(shí)候,會(huì)對(duì)CPU和內(nèi)存造成極大的負(fù)載。
- 注意的地方(容易被“坑”)
在reduce端對(duì)values進(jìn)行迭代的時(shí)候,不要直接直接存儲(chǔ)value值或者key值,因?yàn)閞educe方法會(huì)反復(fù)執(zhí)行多次,但key和value相關(guān)的對(duì)象只有兩個(gè),reduce會(huì)反復(fù)重用這兩個(gè)對(duì)象。需要用相應(yīng)的數(shù)據(jù)類型.get()取出后再存儲(chǔ)。
第二種實(shí)現(xiàn)思路
將map端輸出的<key,value>中的key和value組合成一個(gè)新的key(稱為newKey),value值不變。這里就變成<(key,value),value>,在針對(duì)newKey排序的時(shí)候,如果key相同,就再對(duì)value進(jìn)行排序。
- 需要自定義的地方
- 自定義數(shù)據(jù)類型實(shí)現(xiàn)組合key
實(shí)現(xiàn)方式:繼承WritableComparable - 自定義partioner,形成newKey后保持分區(qū)規(guī)則任然按照key進(jìn)行。保證不打亂原來的分區(qū)。
實(shí)現(xiàn)方式:繼承partitioner - 自動(dòng)以分組,保持分組規(guī)則任然按照key進(jìn)行。不打亂原來的分組
實(shí)現(xiàn)方式:繼承RawComparator
- 自定義數(shù)據(jù)類型關(guān)鍵代碼
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class PairWritable implements WritableComparable<PairWritable> {
// 組合key
private String first;
private int second;
public PairWritable() {
}
public PairWritable(String first, int second) {
this.set(first, second);
}
/**
* 方便設(shè)置字段
*/
public void set(String first, int second) {
this.first = first;
this.second = second;
}
/**
* 反序列化
*/
@Override
public void readFields(DataInput arg0) throws IOException {
this.first = arg0.readUTF();
this.second = arg0.readInt();
}
/**
* 序列化
*/
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(first);
arg0.writeInt(second);
}
/*
* 重寫比較器
*/
public int compareTo(PairWritable o) {
int comp = this.first.compareTo(o.first);
if(comp != 0) {
return comp;
} else { // 若第一個(gè)字段相等,則比較第二個(gè)字段
return Integer.valueOf(this.second).compareTo(
Integer.valueOf(o.getSecond()));
}
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
- 自定義分區(qū)規(guī)則
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
/*
* 默認(rèn)的實(shí)現(xiàn) (key.hashCode() & Integer.MAX_VALUE) % numPartitions
* 讓key中first字段作為分區(qū)依據(jù)
*/
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
- 自定義分組比較器
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
public class SecondGroupComparator implements RawComparator<PairWritable> {
/*
* 對(duì)象比較
*/
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
/*
* 字節(jié)比較
* arg0,arg3為要比較的兩個(gè)字節(jié)數(shù)組
* arg1,arg2表示第一個(gè)字節(jié)數(shù)組要進(jìn)行比較的收尾位置,arg4,arg5表示第二個(gè)
* 從第一個(gè)字節(jié)比到組合key中second的前一個(gè)字節(jié),因?yàn)閟econd為int型,所以長度為4
*/
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
}
}
- map關(guān)鍵代碼
private PairWritable mapOutKey = new PairWritable();
private IntWritable mapOutValue = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineValue = value.toString();
String[] strs = lineValue.split("\t");
//設(shè)置組合key和value ==> <(key,value),value>
mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
mapOutValue.set(Integer.valueOf(strs[1]));
context.write(mapOutKey, mapOutValue);
}
- reduce關(guān)鍵代碼
private Text outPutKey = new Text();
public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//迭代輸出
for(IntWritable value : values) {
outPutKey.set(key.getFirst());
context.write(outPutKey, value);
}
}
- 輸出結(jié)果
a 1
a 5
a 7
a 9
b 3
b 8
b 10