歡迎關(guān)注我的CSDN: https://blog.csdn.net/bingque6535
Partitioner分區(qū)部分轉(zhuǎn)載的是: https://www.cnblogs.com/qingyunzong/p/8584379.html
一. Partitioner分區(qū)
1. Partitioner的作用:
進(jìn)行MapReduce計(jì)算時(shí),有時(shí)候需要把最終的輸出數(shù)據(jù)分到不同的文件中,我們知道最終的輸出數(shù)據(jù)是來自于Reducer任務(wù)。那么,如果要得到多個(gè)文件,意味著有同樣數(shù)量的Reducer任務(wù)在運(yùn)行。Reducer任務(wù)的數(shù)據(jù)來自于Mapper任務(wù),也就說Mapper任務(wù)要?jiǎng)澐謹(jǐn)?shù)據(jù),對(duì)于不同的數(shù)據(jù)分配給不同的Reducer任務(wù)運(yùn)行。Mapper任務(wù)劃分?jǐn)?shù)據(jù)的過程就稱作Partition。負(fù)責(zé)實(shí)現(xiàn)劃分?jǐn)?shù)據(jù)的類稱作Partitioner。
2. Partitioner源碼
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
//默認(rèn)使用key的hash值與上int的最大值,避免出現(xiàn)數(shù)據(jù)溢出 的情況
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
3. getPartition()三個(gè)參數(shù)分別是什么?
HashPartitioner是處理Mapper任務(wù)輸出的,getPartition()方法有三個(gè)形參,源碼中key、value分別指的是Mapper任務(wù)的輸出 <font color="red"> [標(biāo)注: Partitioner的key, value類型和map端輸出的key, value類型一致]</font>,numReduceTasks指的是設(shè)置的Reducer任務(wù)數(shù)量,默認(rèn)值是1。那么任何整數(shù)與1相除的余數(shù)肯定是0。也就是說getPartition(…)方法的返回值總是0。也就是Mapper任務(wù)的輸出總是送給一個(gè)Reducer任務(wù),最終只能輸出到一個(gè)文件中。
據(jù)此分析,如果想要最終輸出到多個(gè)文件中,在Mapper任務(wù)中對(duì)數(shù)據(jù)應(yīng)該劃分到多個(gè)區(qū)中。那么,我們只需要按照一定的規(guī)則讓getPartition(…)方法的返回值是0,1,2,3…即可。
4. 自定義Partitioner
大部分情況下,我們都會(huì)使用默認(rèn)的分區(qū)函數(shù),但有時(shí)我們又有一些,特殊的需求,而需要定制Partition來完成我們的業(yè)務(wù)
-
代碼
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; // Partitioner<IntWritable, IntWritable> 的key, value 類型和map的輸出key, value類型一致 public class FivePartitioner extends Partitioner<IntWritable, IntWritable>{ /** * 我們的需求: 按照能否被5除盡去分區(qū) * * 1、如果除以5的余數(shù)是0, 放在0號(hào)分區(qū) * 2、如果除以5的余數(shù)不是0, 放在1分區(qū) */ @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { int intValue = key.get(); if(intValue % 5 == 0){ return 0; }else{ return 1; } } } -
在運(yùn)行Mapreduce程序時(shí),只需在主函數(shù)里加入如下兩行代碼即可
// 指定Partitioner類 job.setPartitionerClass(FivePartitioner.class); // 設(shè)置分區(qū)數(shù), 和Partitioner類中設(shè)置的分區(qū)數(shù)保持一致 // Partition分區(qū)數(shù)和ReduceTaskes數(shù)目一致 job.setNumReduceTasks(2);
二. Combiner分區(qū)
Combiner分區(qū)部分轉(zhuǎn)載的是:
https://www.cnblogs.com/qingyunzong/p/8584509.html
1. 對(duì)combiner的理解
- combiner其實(shí)屬于優(yōu)化方案,由于帶寬限制,應(yīng)該盡量map和reduce之間的數(shù)據(jù)傳輸數(shù)量。它在Map端把同一個(gè)key的鍵值對(duì)合并在一起并計(jì)算,計(jì)算規(guī)則與reduce一致,所以combiner也可以看作特殊的Reducer。
- 執(zhí)行combiner操作要求開發(fā)者必須在程序中設(shè)置了combiner(程序中通過job.setCombinerClass(myCombine.class)自定義combiner操作)。
- Combiner組件是用來做局部匯總的,就在mapTask中進(jìn)行匯總;Reducer組件是用來做全局匯總的,最終的,最后一次匯總。
2. 哪里使用combiner?
- map輸出數(shù)據(jù)根據(jù)分區(qū)排序完成后,在寫入文件之前會(huì)執(zhí)行一次combine操作(前提是作業(yè)中設(shè)置了這個(gè)操作);
- 如果map輸出比較大,溢出文件個(gè)數(shù)大于3(此值可以通過屬性min.num.spills.for.combine配置)時(shí),在merge的過程(多個(gè)spill文件合并為一個(gè)大文件)中前還會(huì)執(zhí)行combiner操作;
3. 注意事項(xiàng)
不是每種作業(yè)都可以做combiner操作的,只有滿足以下條件才可以:
- Combiner 只能對(duì) 一個(gè)mapTask的中間結(jié)果進(jìn)行匯總
- 如果想使用Reducer直接充當(dāng)Combiner,那么必須滿足: Reducer的輸入和輸出key-value類型是一致的。
處于兩個(gè)不同節(jié)點(diǎn)的mapTask的結(jié)果不能combiner到一起
處于同一個(gè)節(jié)點(diǎn)的兩個(gè)MapTask的結(jié)果不能否combiner到一起
-
求最大值、求最小值、求和、去重時(shí)可直接使用Reducer充當(dāng)Combiner,但是求平均值時(shí)不能直接使用Reducer充當(dāng)Combiner。
原因:對(duì)2組值求平均值
2 3 4 5 6 == 20 / 5 == 4
4 5 6 == 15 / 3 == 5
結(jié)果: 4.5
20+15 / 5+3 = 35 / 8
結(jié)果: 4.375
歡迎關(guān)注我的CSDN: https://blog.csdn.net/bingque6535