1.Shuffle機(jī)制
Map方法之后,Reduce方法之前的數(shù)據(jù)處理過(guò)程稱之為Shuffle。

image.png
2.Shuffle排序機(jī)制
- hadoop排序是使用WritableComparator對(duì)象
- 實(shí)現(xiàn)排序的方法:
- 1.直接讓參與對(duì)比的對(duì)象實(shí)現(xiàn)WritableComparable 接口,并指定泛型,實(shí)現(xiàn)compareTo方法,實(shí)現(xiàn)比較規(guī)則
- 2.自定義一個(gè)比較器對(duì)象,需要繼承WritableComparator類(lèi),重寫(xiě)compare的方法。注意在構(gòu)造器中調(diào)用父類(lèi)對(duì)當(dāng)前要參與比較的對(duì)象進(jìn)行實(shí)例化。當(dāng)前要參與比較的對(duì)象必須要實(shí)現(xiàn)WritableComparable接口,最后在driver類(lèi)中指定自定義的比較器對(duì)象
//自定義的比較器對(duì)象
public class FlowBeanWritableComparator extends WritableComparator {
// 指定當(dāng)前自定義的比較器對(duì)象為誰(shuí)服務(wù)
// 注意在構(gòu)造器中調(diào)用父類(lèi)對(duì)當(dāng)前要參與比較的對(duì)象進(jìn)行實(shí)例化。
public FlowBeanWritableComparator() {
super(FlowBean.class, true);
}
/**
* 自定義比較規(guī)則
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
FlowBean abean = (FlowBean) a;
FlowBean bbean = (FlowBean) b;
System.out.println("aa"+abean);
System.out.println("bb"+bbean);
return -abean.getSumFlow().compareTo(((FlowBean) b).getSumFlow());
}
}
// 指定自定義的比較器對(duì)象
job.setSortComparatorClass(FlowBeanWritableComparator.class);
3.Shuffle排序源碼分析
// 為當(dāng)前Job中的對(duì)象獲取比較器對(duì)象
comparator = job.getOutputKeyComparator();
// 獲取比較器對(duì)象的核心邏輯
public RawComparator getOutputKeyComparator() {
// 在當(dāng)前Job中獲取比較器對(duì)象的class文件--> mapreduce.job.output.key.comparator.class
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
// 如果通過(guò)JobContext.KEY_COMPARATOR 獲取到比較器對(duì)象
if (theClass != null){
return ReflectionUtils.newInstance(theClass, this);
}
// 如果通過(guò)JobContext.KEY_COMPARATOR 獲取不到比較器對(duì)象
// Hadoop 會(huì)默認(rèn)獲取比較器對(duì)象 通過(guò)調(diào)用WritableComparator對(duì)象的get方法獲取,
// 在獲取之前有個(gè)前提 判斷當(dāng)前job的MapOutputKeyClass 是否實(shí)現(xiàn)了WritableComparable接口
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
4.hadoop如何給自身的數(shù)據(jù)類(lèi)型獲取比較器
1). 自身的數(shù)據(jù)類(lèi)型已經(jīng)實(shí)現(xiàn)WritableComparable接口
2). 自身的數(shù)據(jù)類(lèi)型對(duì)象中 已經(jīng)通過(guò)構(gòu)造函數(shù)創(chuàng)建比較器對(duì)象
// 以Text為例
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}
3). 自身的數(shù)據(jù)類(lèi)型對(duì)象中 通過(guò)靜態(tài)代碼塊把 當(dāng)前對(duì)象的class 和 它的比較器對(duì)象
放入一個(gè)Map進(jìn)行了維護(hù)。
static {
// register this comparator
WritableComparator.define(Text.class, new Comparator());
}
public static void define(Class c, WritableComparator comparator) {
comparators.put(c, comparator);
}
5.Shuffle的combiner流程使用和注意事項(xiàng)
概念:是Shuffle過(guò)程中的一個(gè)可選流程(優(yōu)化手段)
可以為Map階段計(jì)算完的數(shù)據(jù)進(jìn)行提前匯總,主要考慮到 減少 從Map階段到
Reduce階段的數(shù)據(jù)傳輸?shù)拇笮】刂埔约皽p少Reduce端的計(jì)算壓力。
使用場(chǎng)景:當(dāng)不考慮多個(gè)MapTask的整體數(shù)據(jù)關(guān)聯(lián)關(guān)系的時(shí)候才使用。