4.MapReduce框架原理2 - shuffle combiner

1.Shuffle機(jī)制

Map方法之后,Reduce方法之前的數(shù)據(jù)處理過(guò)程稱之為Shuffle。


image.png

2.Shuffle排序機(jī)制

  • hadoop排序是使用WritableComparator對(duì)象
  • 實(shí)現(xiàn)排序的方法:
  • 1.直接讓參與對(duì)比的對(duì)象實(shí)現(xiàn)WritableComparable 接口,并指定泛型,實(shí)現(xiàn)compareTo方法,實(shí)現(xiàn)比較規(guī)則
  • 2.自定義一個(gè)比較器對(duì)象,需要繼承WritableComparator類(lèi),重寫(xiě)compare的方法。注意在構(gòu)造器中調(diào)用父類(lèi)對(duì)當(dāng)前要參與比較的對(duì)象進(jìn)行實(shí)例化。當(dāng)前要參與比較的對(duì)象必須要實(shí)現(xiàn)WritableComparable接口,最后在driver類(lèi)中指定自定義的比較器對(duì)象
//自定義的比較器對(duì)象
public class FlowBeanWritableComparator extends WritableComparator {

    // 指定當(dāng)前自定義的比較器對(duì)象為誰(shuí)服務(wù)
   // 注意在構(gòu)造器中調(diào)用父類(lèi)對(duì)當(dāng)前要參與比較的對(duì)象進(jìn)行實(shí)例化。
    public FlowBeanWritableComparator() {
        super(FlowBean.class, true);
    }

    /**
     * 自定義比較規(guī)則
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        FlowBean abean = (FlowBean) a;
        FlowBean bbean = (FlowBean) b;
        System.out.println("aa"+abean);
        System.out.println("bb"+bbean);
        return -abean.getSumFlow().compareTo(((FlowBean) b).getSumFlow());
    }
}


// 指定自定義的比較器對(duì)象
 job.setSortComparatorClass(FlowBeanWritableComparator.class);

3.Shuffle排序源碼分析

 // 為當(dāng)前Job中的對(duì)象獲取比較器對(duì)象
    comparator = job.getOutputKeyComparator();
    // 獲取比較器對(duì)象的核心邏輯
    public RawComparator getOutputKeyComparator() {
    // 在當(dāng)前Job中獲取比較器對(duì)象的class文件--> mapreduce.job.output.key.comparator.class
    Class<? extends RawComparator> theClass = getClass(
    JobContext.KEY_COMPARATOR, null, RawComparator.class);
    // 如果通過(guò)JobContext.KEY_COMPARATOR 獲取到比較器對(duì)象
    if (theClass != null){
         return ReflectionUtils.newInstance(theClass, this);
    }
     // 如果通過(guò)JobContext.KEY_COMPARATOR 獲取不到比較器對(duì)象
     // Hadoop 會(huì)默認(rèn)獲取比較器對(duì)象 通過(guò)調(diào)用WritableComparator對(duì)象的get方法獲取,
     // 在獲取之前有個(gè)前提 判斷當(dāng)前job的MapOutputKeyClass 是否實(shí)現(xiàn)了WritableComparable接口
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
            }
            

4.hadoop如何給自身的數(shù)據(jù)類(lèi)型獲取比較器

1). 自身的數(shù)據(jù)類(lèi)型已經(jīng)實(shí)現(xiàn)WritableComparable接口
2). 自身的數(shù)據(jù)類(lèi)型對(duì)象中 已經(jīng)通過(guò)構(gòu)造函數(shù)創(chuàng)建比較器對(duì)象

 // 以Text為例
   public static class Comparator extends WritableComparator {
    public Comparator() {
    super(Text.class);
    }

3). 自身的數(shù)據(jù)類(lèi)型對(duì)象中 通過(guò)靜態(tài)代碼塊把 當(dāng)前對(duì)象的class 和 它的比較器對(duì)象
放入一個(gè)Map進(jìn)行了維護(hù)。

static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
}
                  
 public static void define(Class c, WritableComparator comparator) {
    comparators.put(c, comparator);
}

5.Shuffle的combiner流程使用和注意事項(xiàng)

概念:是Shuffle過(guò)程中的一個(gè)可選流程(優(yōu)化手段)
可以為Map階段計(jì)算完的數(shù)據(jù)進(jìn)行提前匯總,主要考慮到 減少 從Map階段到
Reduce階段的數(shù)據(jù)傳輸?shù)拇笮】刂埔约皽p少Reduce端的計(jì)算壓力。
使用場(chǎng)景:當(dāng)不考慮多個(gè)MapTask的整體數(shù)據(jù)關(guān)聯(lián)關(guān)系的時(shí)候才使用。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容