MapReduce實(shí)現(xiàn)二次排序

二次排序的需求說明

在mapreduce操作時(shí),shuffle階段會(huì)多次根據(jù)key值排序。但是在shuffle分組后,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時(shí)value值也是排序好的,這種需求就是二次排序。


sort1.png

測(cè)試的文件數(shù)據(jù)

a 1
a 5
a 7
a 9
b 3
b 8
b 10

未經(jīng)過二次排序的輸出結(jié)果

a   9
a   7
a   5
a   1
b   10
b   8
b   3

第一種實(shí)現(xiàn)思路

直接在reduce端對(duì)分組后的values進(jìn)行排序。

  • reduce關(guān)鍵代碼
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            
             List<Integer> valuesList = new ArrayList<Integer>();

             // 取出value
             for(IntWritable value : values) {
                 valuesList.add(value.get());
             }
             // 進(jìn)行排序
             Collections.sort(valuesList);
            
             for(Integer value : valuesList) {
                context.write(key, new IntWritable(value));
             }
            
        }
  • 輸出結(jié)果
a   1
a   5
a   7
a   9
b   3
b   8
b   10

很容易發(fā)現(xiàn),這樣把排序工作都放到reduce端完成,當(dāng)values序列長度非常大的時(shí)候,會(huì)對(duì)CPU和內(nèi)存造成極大的負(fù)載。

  • 注意的地方(容易被“坑”)
    在reduce端對(duì)values進(jìn)行迭代的時(shí)候,不要直接直接存儲(chǔ)value值或者key值,因?yàn)閞educe方法會(huì)反復(fù)執(zhí)行多次,但key和value相關(guān)的對(duì)象只有兩個(gè),reduce會(huì)反復(fù)重用這兩個(gè)對(duì)象。需要用相應(yīng)的數(shù)據(jù)類型.get()取出后再存儲(chǔ)。

第二種實(shí)現(xiàn)思路

將map端輸出的<key,value>中的key和value組合成一個(gè)新的key(稱為newKey),value值不變。這里就變成<(key,value),value>,在針對(duì)newKey排序的時(shí)候,如果key相同,就再對(duì)value進(jìn)行排序。

  • 需要自定義的地方
  1. 自定義數(shù)據(jù)類型實(shí)現(xiàn)組合key
    實(shí)現(xiàn)方式:繼承WritableComparable
  2. 自定義partioner,形成newKey后保持分區(qū)規(guī)則任然按照key進(jìn)行。保證不打亂原來的分區(qū)。
    實(shí)現(xiàn)方式:繼承partitioner
  3. 自動(dòng)以分組,保持分組規(guī)則任然按照key進(jìn)行。不打亂原來的分組
    實(shí)現(xiàn)方式:繼承RawComparator
  • 自定義數(shù)據(jù)類型關(guān)鍵代碼
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

 public class PairWritable implements WritableComparable<PairWritable> {
    // 組合key
      private String first;
      private int second;

    public PairWritable() {
    }

    public PairWritable(String first, int second) {
        this.set(first, second);
    }

    /**
     * 方便設(shè)置字段
     */
    public void set(String first, int second) {
        this.first = first;
        this.second = second;
    }
    
    /**
     * 反序列化
     */
    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.first = arg0.readUTF();
        this.second = arg0.readInt();
    }
    /**
     * 序列化
     */
    @Override
    public void write(DataOutput arg0) throws IOException {
        arg0.writeUTF(first);
        arg0.writeInt(second);
    }

    /*
     * 重寫比較器
     */
    public int compareTo(PairWritable o) {
        int comp = this.first.compareTo(o.first);
        
        if(comp != 0) {
            return comp;
        } else { // 若第一個(gè)字段相等,則比較第二個(gè)字段
            return Integer.valueOf(this.second).compareTo(
                    Integer.valueOf(o.getSecond()));
        }
    }
    
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    public String getFirst() {
        return first;
    }
    public void setFirst(String first) {
        this.first = first;
    }
  • 自定義分區(qū)規(guī)則
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
        /* 
         * 默認(rèn)的實(shí)現(xiàn) (key.hashCode() & Integer.MAX_VALUE) % numPartitions
         * 讓key中first字段作為分區(qū)依據(jù)
         */
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 
    }
}

  • 自定義分組比較器
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class SecondGroupComparator implements RawComparator<PairWritable> {

    /*
     * 對(duì)象比較
     */
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

    /*
     * 字節(jié)比較
     * arg0,arg3為要比較的兩個(gè)字節(jié)數(shù)組
     * arg1,arg2表示第一個(gè)字節(jié)數(shù)組要進(jìn)行比較的收尾位置,arg4,arg5表示第二個(gè)
     * 從第一個(gè)字節(jié)比到組合key中second的前一個(gè)字節(jié),因?yàn)閟econd為int型,所以長度為4
     */
    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
        return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
    }
}
  • map關(guān)鍵代碼
        private PairWritable mapOutKey = new PairWritable();
        private IntWritable mapOutValue = new IntWritable();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");
            
            //設(shè)置組合key和value ==> <(key,value),value>
            mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
            mapOutValue.set(Integer.valueOf(strs[1]));
            
            context.write(mapOutKey, mapOutValue);
        }
  • reduce關(guān)鍵代碼
        private Text outPutKey = new Text(); 
        public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            //迭代輸出
            for(IntWritable value : values) {
                outPutKey.set(key.getFirst());
                context.write(outPutKey, value);
            }
            
        }
  • 輸出結(jié)果
a   1
a   5
a   7
a   9
b   3
b   8
b   10
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938閱讀 691評(píng)論 0 1
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法,內(nèi)部類的語法,繼承相關(guān)的語法,異常的語法,線程的語...
    子非魚_t_閱讀 34,626評(píng)論 18 399
  • 嘻嘻嘻,荷花可以說是我最喜歡畫的花了! 因?yàn)楹唵斡趾每础?以下的各種姿態(tài)的荷花希望能帶給初學(xué)者一些幫助吧!
    吾棲夢(mèng)閱讀 1,105評(píng)論 0 6
  • 去年公司改制,公司被北京的一家投資公司被收購。但是因?yàn)橘I賣之間沒有商量好。雙方都認(rèn)為沒有達(dá)到預(yù)期的效果。所以從去年...
    天龍?zhí)枪?/span>閱讀 274評(píng)論 2 1

友情鏈接更多精彩內(nèi)容