MapReduce工作機(jī)制和序列化

MapReduce執(zhí)行流程

<div class="mdContent">


image

MapReduce的執(zhí)行步驟

1、Map任務(wù)處理

1.1 讀取HDFS中的文件。每一行解析成一個(gè)<k,v>。每一個(gè)鍵值對(duì)調(diào)用一次map函數(shù)。 <0,hello you> <10,hello me>

1.2 覆蓋map(),接收1.1產(chǎn)生的<k,v>,進(jìn)行處理,轉(zhuǎn)換為新的<k,v>輸出。          <hello,1> <you,1> <hello,1> <me,1>

1.3 對(duì)1.2輸出的<k,v>進(jìn)行分區(qū)。默認(rèn)分為一個(gè)區(qū)。詳見(jiàn)Partitioner

1.4 對(duì)不同分區(qū)中的數(shù)據(jù)進(jìn)行排序(按照k)、分組。分組指的是相同key的value放到一個(gè)集合中?!∨判蚝螅?strong><hello,1> <hello,1> <me,1> <you,1> 分組后:<hello,{1,1}><me,{1}><you,{1}>

1.5 (可選)對(duì)分組后的數(shù)據(jù)進(jìn)行歸約。詳見(jiàn)Combiner

2、Reduce任務(wù)處理

2.1 多個(gè)map任務(wù)的輸出,按照不同的分區(qū),通過(guò)網(wǎng)絡(luò)copy到不同的reduce節(jié)點(diǎn)上。詳見(jiàn)shuffle過(guò)程分析

2.2 對(duì)多個(gè)map的輸出進(jìn)行合并、排序。覆蓋reduce函數(shù),接收的是分組后的數(shù)據(jù),實(shí)現(xiàn)自己的業(yè)務(wù)邏輯, <hello,2> <me,1> <you,1>

處理后,產(chǎn)生新的<k,v>輸出。

2.3 對(duì)reduce輸出的<k,v>寫(xiě)到HDFS中。

Java代碼實(shí)現(xiàn)

注:要導(dǎo)入org.apache.hadoop.fs.FileUtil.java。

1、先創(chuàng)建一個(gè)hello文件,上傳到HDFS中

image

2、然后再編寫(xiě)代碼,實(shí)現(xiàn)文件中的單詞個(gè)數(shù)統(tǒng)計(jì)(代碼中被注釋掉的代碼,是可以省略的,不省略也行)

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"> 1 package mapreduce; 2
3 import java.net.URI; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16
17 public class WordCountApp { 18 static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
19 static final String OUT_PATH = "hdfs://chaoren:9000/out";
20
21 public static void main(String[] args) throws Exception { 22 Configuration conf = new Configuration(); 23 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 24 Path outPath = new Path(OUT_PATH); 25 if (fileSystem.exists(outPath)) { 26 fileSystem.delete(outPath, true);
27 }
28
29 Job job = new Job(conf, WordCountApp.class.getSimpleName());
30
31 // 1.1指定讀取的文件位于哪里
32 FileInputFormat.setInputPaths(job, INPUT_PATH);
33 // 指定如何對(duì)輸入的文件進(jìn)行格式化,把輸入文件每一行解析成鍵值對(duì) 34 //job.setInputFormatClass(TextInputFormat.class);
35
36 // 1.2指定自定義的map類
37 job.setMapperClass(MyMapper.class);
38 // map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 39 //job.setOutputKeyClass(Text.class);
40 //job.setOutputValueClass(LongWritable.class);
41
42 // 1.3分區(qū) 43 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
44 // 有一個(gè)reduce任務(wù)運(yùn)行 45 //job.setNumReduceTasks(1);
46
47 // 1.4排序、分組 48
49 // 1.5歸約 50
51 // 2.2指定自定義reduce類
52 job.setReducerClass(MyReducer.class);
53 // 指定reduce的輸出類型
54 job.setOutputKeyClass(Text.class);
55 job.setOutputValueClass(LongWritable.class);
56
57 // 2.3指定寫(xiě)出到哪里
58 FileOutputFormat.setOutputPath(job, outPath);
59 // 指定輸出文件的格式化類 60 //job.setOutputFormatClass(TextOutputFormat.class);
61
62 // 把job提交給jobtracker運(yùn)行
63 job.waitForCompletion(true);
64 }
65
66 /**
67 *
68 * KEYIN 即K1 表示行的偏移量
69 * VALUEIN 即V1 表示行文本內(nèi)容
70 * KEYOUT 即K2 表示行中出現(xiàn)的單詞
71 * VALUEOUT 即V2 表示行中出現(xiàn)的單詞的次數(shù),固定值1
72 *
73 /
74 static class MyMapper extends
75 Mapper<LongWritable, Text, Text, LongWritable> { 76 protected void map(LongWritable k1, Text v1, Context context) 77 throws java.io.IOException, InterruptedException { 78 String[] splited = v1.toString().split("\t");
79 for (String word : splited) { 80 context.write(new Text(word), new LongWritable(1));
81 }
82 };
83 }
84
85 /
*
86 * KEYIN 即K2 表示行中出現(xiàn)的單詞
87 * VALUEIN 即V2 表示出現(xiàn)的單詞的次數(shù)
88 * KEYOUT 即K3 表示行中出現(xiàn)的不同單詞
89 * VALUEOUT 即V3 表示行中出現(xiàn)的不同單詞的總次數(shù)
90 */
91 static class MyReducer extends
92 Reducer<Text, LongWritable, Text, LongWritable> { 93 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, 94 Context ctx) throws java.io.IOException, 95 InterruptedException {
96 long times = 0L;
97 for (LongWritable count : v2s) { 98 times += count.get(); 99 } 100 ctx.write(k2, new LongWritable(times)); 101 }; 102 } 103 }</pre>

3、運(yùn)行成功后,可以在Linux中查看操作的結(jié)果

image

</div>

MapReduce中的序列化

<div class="mdContent">
hadoop序列化的特點(diǎn):

序列化格式特點(diǎn):

1.緊湊:高效使用存儲(chǔ)空間。
2.快速:讀寫(xiě)數(shù)據(jù)的額外開(kāi)銷(xiāo)小
3.可擴(kuò)展:可透明地讀取老格式的數(shù)據(jù)
4.互操作:支持多語(yǔ)言的交互

hadoop序列化與java序列化的最主要的區(qū)別是:在復(fù)雜類型的對(duì)象下,hadoop序列化不用像java對(duì)象類一樣傳輸多層的父子關(guān)系,需要哪個(gè)屬性就傳輸哪個(gè)屬性值,大大的減少網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷(xiāo)。

hadoop序列化的作用: <div class="mdContent">

1.序列化的在分布式的環(huán)境的作用:進(jìn)程之間的通信,節(jié)點(diǎn)通過(guò)網(wǎng)絡(luò)之間的

2.hadoop節(jié)點(diǎn)之間數(shù)據(jù)傳輸

節(jié)點(diǎn)1:(序列化二進(jìn)制數(shù)據(jù)) ------->(二進(jìn)制流消息) 節(jié)點(diǎn)2:(反序列化二進(jìn)制數(shù)據(jù))

MR中key,value都是需要實(shí)現(xiàn)WritableComparable接口的對(duì)象,這樣的對(duì)象才是hadoop序列化的對(duì)象。

package com.feihao;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class StudentWritable implements WritableComparable<StudentWritable> {
private String name;
private int age;

public void write(DataOutput out) throws IOException {
    out.writeUTF(this.name);
    out.writeInt(this.age);
}

public void readFields(DataInput in) throws IOException {
    this.name = in.readUTF();
    this.age = in.readInt();

}
public int compareTo(StudentWritable o) {
return 0;
}
}
</div>

Combiner

<div class='mdContent'>

一、Combiner的出現(xiàn)背景

1.1 回顧Map階段五大步驟

我們認(rèn)識(shí)了MapReduce的八大步湊,其中在Map階段總共五個(gè)步驟,如下圖所示:

map section

其中,step1.5是一個(gè)可選步驟,它就是我們今天需要了解的 Map規(guī)約 階段?,F(xiàn)在,我們?cè)賮?lái)看看前一篇博文《[計(jì)數(shù)器與自定義計(jì)數(shù)器]》中的第一張關(guān)于計(jì)數(shù)器的圖:

image

我們可以發(fā)現(xiàn),其中有兩個(gè)計(jì)數(shù)器:Combine output records和Combine input records,他們的計(jì)數(shù)都是0,這是因?yàn)槲覀冊(cè)诖a中沒(méi)有進(jìn)行Map階段的規(guī)約操作。

1.2 為什么需要進(jìn)行Map規(guī)約操作

眾所周知,Hadoop框架使用Mapper將數(shù)據(jù)處理成一個(gè)個(gè)的<key,value>鍵值對(duì),在網(wǎng)絡(luò)節(jié)點(diǎn)間對(duì)其進(jìn)行整理(shuffle),然后使用Reducer處理數(shù)據(jù)并進(jìn)行最終輸出。

image

在上述過(guò)程中,我們看到至少兩個(gè)性能瓶頸:

(1)如果我們有10億個(gè)數(shù)據(jù),Mapper會(huì)生成10億個(gè)鍵值對(duì)在網(wǎng)絡(luò)間進(jìn)行傳輸,但如果我們只是對(duì)數(shù)據(jù)求最大值,那么很明顯的Mapper只需要輸出它所知道的最大值即可。這樣做不僅可以減輕網(wǎng)絡(luò)壓力,同樣也可以大幅度提高程序效率。

總結(jié):網(wǎng)絡(luò)帶寬嚴(yán)重被占降低程序效率;

(2)假設(shè)使用美國(guó)專利數(shù)據(jù)集中的國(guó)家一項(xiàng)來(lái)闡述數(shù)據(jù)傾斜這個(gè)定義,這樣的數(shù)據(jù)遠(yuǎn)遠(yuǎn)不是一致性的或者說(shuō)平衡分布的,由于大多數(shù)專利的國(guó)家都屬于美國(guó),這樣不僅Mapper中的鍵值對(duì)、中間階段(shuffle)的鍵值對(duì)等,大多數(shù)的鍵值對(duì)最終會(huì)聚集于一個(gè)單一的Reducer之上,壓倒這個(gè)Reducer,從而大大降低程序的性能。

總結(jié):?jiǎn)我还?jié)點(diǎn)承載過(guò)重降低程序性能;

那么,有木有一種方案能夠解決這兩個(gè)問(wèn)題呢?

二、初步探索Combiner

2.1 Combiner的橫空出世

在MapReduce編程模型中,在Mapper和Reducer之間有一個(gè)非常重要的組件,它解決了上述的性能瓶頸問(wèn)題,它就是Combiner。

PS:

①與mapper和reducer不同的是,combiner沒(méi)有默認(rèn)的實(shí)現(xiàn),需要顯式的設(shè)置在conf中才有作用。

②并不是所有的job都適用combiner,只有操作滿足結(jié)合律的才可設(shè)置combiner。combine操作類似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。

每一個(gè)map都可能會(huì)產(chǎn)生大量的本地輸出,Combiner的作用就是對(duì)map端的輸出先做一次合并,以減少在map和reduce節(jié)點(diǎn)之間的數(shù)據(jù)傳輸量,以提高網(wǎng)絡(luò)IO性能,是MapReduce的一種優(yōu)化手段之一,其具體的作用如下所述。

(1)Combiner最基本是實(shí)現(xiàn)本地key的聚合,對(duì)map輸出的key排序,value進(jìn)行迭代。如下所示:

map: (K1, V1) → list(K2, V2)
  combine: (K2, list(V2)) → list(K2, V2)
  reduce: (K2, list(V2)) → list(K3, V3)

(2)Combiner還有本地reduce功能(其本質(zhì)上就是一個(gè)reduce),例如Hadoop自帶的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致,如下所示:

map: (K1, V1) → list(K2, V2)
  combine: (K2, list(V2)) → list(K3, V3)
  reduce: (K3, list(V3)) → list(K4, V4)

PS:現(xiàn)在想想,如果在wordcount中不用combiner,那么所有的結(jié)果都是reduce完成,效率會(huì)相對(duì)低下。使用combiner之后,先完成的map會(huì)在本地聚合,提升速度。對(duì)于hadoop自帶的wordcount的例子,value就是一個(gè)疊加的數(shù)字,所以map一結(jié)束就可以進(jìn)行reduce的value疊加,而不必要等到所有的map結(jié)束再去進(jìn)行reduce的value疊加。

2.2 融合Combiner的MapReduce

image

前面文章中的代碼都忽略了一個(gè)可以優(yōu)化MapReduce作業(yè)所使用帶寬的步驟—Combiner,它在Mapper之后Reducer之前運(yùn)行。Combiner是可選的,如果這個(gè)過(guò)程適合于你的作業(yè),Combiner實(shí)例會(huì)在每一個(gè)運(yùn)行map任務(wù)的節(jié)點(diǎn)上運(yùn)行。Combiner會(huì)接收特定節(jié)點(diǎn)上的Mapper實(shí)例的輸出作為輸入,接著Combiner的輸出會(huì)被發(fā)送到Reducer那里,而不是發(fā)送Mapper的輸出。Combiner是一個(gè)“迷你reduce”過(guò)程,它只處理單臺(tái)機(jī)器生成的數(shù)據(jù)。

2.3 使用MyReducer作為Combiner

在前面文章中的WordCount代碼中加入以下一句簡(jiǎn)單的代碼,即可加入Combiner方法:

// 設(shè)置Map規(guī)約Combiner
job.setCombinerClass(MyReducer.class)

還是以下面的文件內(nèi)容為例,看看這次計(jì)數(shù)器會(huì)發(fā)生怎樣的改變?

(1)上傳的測(cè)試文件的內(nèi)容

hello edison
hello kevin

(2)調(diào)試后的計(jì)數(shù)器日志信息

image

可以看到,原本都為0的Combine input records和Combine output records發(fā)生了改變。我們可以清楚地看到map的輸出和combine的輸入統(tǒng)計(jì)是一致的,而combine的輸出與reduce的輸入統(tǒng)計(jì)是一樣的。由此可以看出規(guī)約操作成功,而且執(zhí)行在map的最后,reduce之前。

三、自己定義Combiner

為了能夠更加清晰的理解Combiner的工作原理,我們自定義一個(gè)Combiners類,不再使用MyReduce做為Combiners的類,具體的代碼下面一一道來(lái)。

3.1 改寫(xiě)Mapper類的map方法

public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {
String line = value.toString();
String[] spilted = line.split(" "); for (String word : spilted) {
context.write(new Text(word), new LongWritable(1L)); // 為了顯示效果而輸出Mapper的輸出鍵值對(duì)信息
System.out.println("Mapper輸出<" + word + "," + 1 + ">");
}
};
}

3.2 改寫(xiě)Reducer類的reduce方法

public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key,
java.lang.Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { // 顯示次數(shù)表示redcue函數(shù)被調(diào)用了多少次,表示k2有多少個(gè)分組
System.out.println("Reducer輸入分組<" + key.toString() + ",N(N>=1)>"); long count = 0L; for (LongWritable value : values) {
count += value.get(); // 顯示次數(shù)表示輸入的k2,v2的鍵值對(duì)數(shù)量
System.out.println("Reducer輸入鍵值對(duì)<" + key.toString() + ","
+ value.get() + ">");
}
context.write(key, new LongWritable(count));
};
}

3.3 添加MyCombiner類并重寫(xiě)reduce方法

public static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(
Text key,
java.lang.Iterable<LongWritable> values,
org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { // 顯示次數(shù)表示規(guī)約函數(shù)被調(diào)用了多少次,表示k2有多少個(gè)分組
System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>"); long count = 0L; for (LongWritable value : values) {
count += value.get(); // 顯示次數(shù)表示輸入的k2,v2的鍵值對(duì)數(shù)量
System.out.println("Combiner輸入鍵值對(duì)<" + key.toString() + ","
+ value.get() + ">");
}
context.write(key, new LongWritable(count)); // 顯示次數(shù)表示輸出的k2,v2的鍵值對(duì)數(shù)量
System.out.println("Combiner輸出鍵值對(duì)<" + key.toString() + "," + count + ">");
};
}

3.4 添加設(shè)置Combiner的代碼

// 設(shè)置Map規(guī)約Combiner
job.setCombinerClass(MyCombiner.class);

3.5 調(diào)試運(yùn)行的控制臺(tái)輸出信息

(1)Mapper

Mapper輸出<hello,1> Mapper輸出<edison,1> Mapper輸出<hello,1> Mapper輸出<kevin,1>

(2)Combiner

Combiner輸入分組<edison,N(N>=1)> Combiner輸入鍵值對(duì)<edison,1> Combiner輸出鍵值對(duì)<edison,1> Combiner輸入分組<hello,N(N>=1)> Combiner輸入鍵值對(duì)<hello,1> Combiner輸入鍵值對(duì)<hello,1> Combiner輸出鍵值對(duì)<hello,2> Combiner輸入分組<kevin,N(N>=1)> Combiner輸入鍵值對(duì)<kevin,1> Combiner輸出鍵值對(duì)<kevin,1></pre>

這里可以看出,在Combiner中進(jìn)行了一次本地的Reduce操作,從而簡(jiǎn)化了遠(yuǎn)程Reduce節(jié)點(diǎn)的歸并壓力。

(3)Reducer

Reducer輸入分組<edison,N(N>=1)> Reducer輸入鍵值對(duì)<edison,1> Reducer輸入分組<hello,N(N>=1)> Reducer輸入鍵值對(duì)<hello,2> Reducer輸入分組<kevin,N(N>=1)> Reducer輸入鍵值對(duì)<kevin,1>

這里可以看出,在對(duì)hello的歸并上,只進(jìn)行了一次操作就完成了。

那么,如果我們?cè)賮?lái)看看不添加Combiner時(shí)的控制臺(tái)輸出信息:

(1)Mapper
Mapper輸出<hello,1> Mapper輸出<edison,1> Mapper輸出<hello,1> Mapper輸出<kevin,1>

(2)Reducer

Reducer輸入分組<edison,N(N>=1)> Reducer輸入鍵值對(duì)<edison,1> Reducer輸入分組<hello,N(N>=1)> Reducer輸入鍵值對(duì)<hello,1> Reducer輸入鍵值對(duì)<hello,1> Reducer輸入分組<kevin,N(N>=1)> Reducer輸入鍵值對(duì)<kevin,1>

可以看出,沒(méi)有采用Combiner時(shí)hello都是由Reducer節(jié)點(diǎn)來(lái)進(jìn)行統(tǒng)一的歸并,也就是這里為何會(huì)有兩次hello的輸入鍵值對(duì)了。

總結(jié):從控制臺(tái)的輸出信息我們可以發(fā)現(xiàn),其實(shí)combine只是把兩個(gè)相同的hello進(jìn)行規(guī)約,由此輸入給reduce的就變成了<hello,2>。在實(shí)際的Hadoop集群操作中,我們是由多臺(tái)主機(jī)一起進(jìn)行MapReduce的,如果加入規(guī)約操作,每一臺(tái)主機(jī)會(huì)在reduce之前進(jìn)行一次對(duì)本機(jī)數(shù)據(jù)的規(guī)約,然后在通過(guò)集群進(jìn)行reduce操作,這樣就會(huì)大大節(jié)省reduce的時(shí)間,從而加快MapReduce的處理速度。
</div>

Partition分區(qū)

<div class='mdContent'>
舊版 API 的 Partitioner 解析

Partitioner 的作用是對(duì) Mapper 產(chǎn)生的中間結(jié)果進(jìn)行分片,以便將同一分組的數(shù)據(jù)交給同一個(gè) Reducer 處理,它直接影響 Reduce 階段的負(fù)載均衡。舊版 API 中 Partitioner 的類圖如圖所示。它繼承了JobConfigurable,可通過(guò) configure 方法初始化。它本身只包含一個(gè)待實(shí)現(xiàn)的方法 getPartition。 該方法包含三個(gè)參數(shù), 均由框架自動(dòng)傳入,前面兩個(gè)參數(shù)是key/value,第三個(gè)參數(shù) numPartitions 表示每個(gè) Mapper 的分片數(shù),也就是 Reducer 的個(gè)數(shù)。


image

MapReduce 提供了兩個(gè)Partitioner 實(shí) 現(xiàn):HashPartitioner和TotalOrderPartitioner。其中 HashPartitioner 是默認(rèn)實(shí)現(xiàn),它實(shí)現(xiàn)了一種基于哈希值的分片方法,代碼如下:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}</pre>

TotalOrderPartitioner 提供了一種基于區(qū)間的分片方法,通常用在數(shù)據(jù)全排序中。在MapReduce 環(huán)境中,容易想到的全排序方案是歸并排序,即在 Map 階段,每個(gè) Map Task進(jìn)行局部排序;在 Reduce 階段,啟動(dòng)一個(gè) Reduce Task 進(jìn)行全局排序。由于作業(yè)只能有一個(gè) Reduce Task,因而 Reduce 階段會(huì)成為作業(yè)的瓶頸。為了提高全局排序的性能和擴(kuò)展性,MapReduce 提供了 TotalOrderPartitioner。它能夠按照大小將數(shù)據(jù)分成若干個(gè)區(qū)間(分片),并保證后一個(gè)區(qū)間的所有數(shù)據(jù)均大于前一個(gè)區(qū)間數(shù)據(jù),這使得全排序的步驟如下:
步驟1:數(shù)據(jù)采樣。在 Client 端通過(guò)采樣獲取分片的分割點(diǎn)。Hadoop 自帶了幾個(gè)采樣算法,如 IntercalSampler、 RandomSampler、 SplitSampler 等(具體見(jiàn)org.apache.hadoop.mapred.lib 包中的 InputSampler 類)。 下面舉例說(shuō)明。
采樣數(shù)據(jù)為: b, abc, abd, bcd, abcd, efg, hii, afd, rrr, mnk
經(jīng)排序后得到: abc, abcd, abd, afd, b, bcd, efg, hii, mnk, rrr
如果 Reduce Task 個(gè)數(shù)為 4,則采樣數(shù)據(jù)的四等分點(diǎn)為 abd、 bcd、 mnk,將這 3 個(gè)字符串作為分割點(diǎn)。
步驟2:Map 階段。本階段涉及兩個(gè)組件,分別是 Mapper 和 Partitioner。其中,Mapper 可采用 IdentityMapper,直接將輸入數(shù)據(jù)輸出,但 Partitioner 必須選用TotalOrderPartitioner,它將步驟 1 中獲取的分割點(diǎn)保存到 trie 樹(shù)中以便快速定位任意一個(gè)記錄所在的區(qū)間,這樣,每個(gè) Map Task 產(chǎn)生 R(Reduce Task 個(gè)數(shù))個(gè)區(qū)間,且區(qū)間之間有序。TotalOrderPartitioner 通過(guò) trie 樹(shù)查找每條記錄所對(duì)應(yīng)的 Reduce Task 編號(hào)。 如圖所示, 我們將分割點(diǎn) 保存在深度為 2 的 trie 樹(shù)中, 假設(shè)輸入數(shù)據(jù)中 有兩個(gè)字符串“ abg”和“ mnz”, 則字符串“ abg” 對(duì)應(yīng) partition1, 即第 2 個(gè) Reduce Task, 字符串“ mnz” 對(duì)應(yīng)partition3, 即第 4 個(gè) Reduce Task。

image

步驟 3:Reduce 階段。每個(gè) Reducer 對(duì)分配到的區(qū)間數(shù)據(jù)進(jìn)行局部排序,最終得到全排序數(shù)據(jù)。從以上步驟可以看出,基于 TotalOrderPartitioner 全排序的效率跟 key 分布規(guī)律和采樣算法有直接關(guān)系;key 值分布越均勻且采樣越具有代表性,則 Reduce Task 負(fù)載越均衡,全排序效率越高。TotalOrderPartitioner 有兩個(gè)典型的應(yīng)用實(shí)例: TeraSort 和 HBase 批量數(shù)據(jù)導(dǎo)入。 其中,TeraSort 是 Hadoop 自 帶的一個(gè)應(yīng)用程序?qū)嵗?它曾在 TB 級(jí)數(shù)據(jù)排序基準(zhǔn)評(píng)估中 贏得第一名,而 TotalOrderPartitioner正是從該實(shí)例中提煉出來(lái)的。HBase 是一個(gè)構(gòu)建在 Hadoop之上的 NoSQL 數(shù)據(jù)倉(cāng)庫(kù)。它以 Region為單位劃分?jǐn)?shù)據(jù),Region 內(nèi)部數(shù)據(jù)有序(按 key 排序),Region 之間也有序。很明顯,一個(gè) MapReduce 全排序作業(yè)的 R 個(gè)輸出文件正好可對(duì)應(yīng) HBase 的 R 個(gè) Region。

新版 API 的 Partitioner 解析

新版 API 中的Partitioner類圖如圖所示。它不再實(shí)現(xiàn)JobConfigurable 接口。當(dāng)用戶需要讓 Partitioner通過(guò)某個(gè)JobConf 對(duì)象初始化時(shí),可自行實(shí)現(xiàn)Configurable 接口,如:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">public class TotalOrderPartitioner<K, V> extends Partitioner<K,V> implements Configurable</pre>

image

Partition所處的位置

image

Partition主要作用就是將map的結(jié)果發(fā)送到相應(yīng)的reduce。這就對(duì)partition有兩個(gè)要求:

1)均衡負(fù)載,盡量的將工作均勻的分配給不同的reduce。

2)效率,分配速度一定要快。

Mapreduce提供的Partitioner

image

patition類結(jié)構(gòu)

1. Partitioner<k,v>是partitioner的基類,如果需要定制partitioner也需要繼承該類。源代碼如下:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">package org.apache.hadoop.mapred; /** * Partitions the key space.

  • <p><code>Partitioner</code> controls the partitioning of the keys of the
  • intermediate map-outputs. The key (or a subset of the key) is used to derive
  • the partition, typically by a hash function. The total number of partitions
  • is the same as the number of reduce tasks for the job. Hence this controls
  • which of the <code>m</code> reduce tasks the intermediate key (and hence the
  • record) is sent for reduction.</p>
  • @see Reducer
  • @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead. / @Deprecated public interface Partitioner<K2, V2> extends JobConfigurable { /* * Get the paritition number for a given key (hence record) given the total
    • number of partitions i.e. number of reduce-tasks for the job.
    • <p>Typically a hash function on a all or a subset of the key.</p>
    • @param key the key to be paritioned.
    • @param value the entry value.
    • @param numPartitions the total number of partitions.
    • @return the partition number for the <code>key</code>. */
      int getPartition(K2 key, V2 value, int numPartitions);
      }</pre>

2. HashPartitioner<k,v>是mapreduce的默認(rèn)partitioner。源代碼如下:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">package org.apache.hadoop.mapreduce.lib.partition; import org.apache.hadoop.mapreduce.Partitioner; /** Partition keys by their {@link Object#hashCode()}. /
public class HashPartitioner<K, V> extends Partitioner<K, V> { /
* Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}</pre>

3. BinaryPatitioner繼承于Partitioner<BinaryComparable ,V>,是Partitioner<k,v>的偏特化子類。該類提供leftOffset和rightOffset,在計(jì)算which reducer時(shí)僅對(duì)鍵值K的[rightOffset,leftOffset]這個(gè)區(qū)間取hash。

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">reducer=(hash & Integer.MAX_VALUE) % numReduceTasks</pre>

4. KeyFieldBasedPartitioner<k2, v2="">也是基于hash的個(gè)partitioner。和BinaryPatitioner不同,它提供了多個(gè)區(qū)間用于計(jì)算hash。當(dāng)區(qū)間數(shù)為0時(shí)KeyFieldBasedPartitioner退化成HashPartitioner。 源代碼如下:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">package org.apache.hadoop.mapred.lib; import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription; /** * Defines a way to partition keys based on certain key fields (also see

  • {@link KeyFieldBasedComparator}.

  • The key specification supported is of the form -k pos1[,pos2], where,

  • pos is of the form f[.c][opts], where f is the number

  • of the key field to use, and c is the number of the first character from

  • the beginning of the field. Fields and character posns are numbered

  • starting with 1; a character position of zero in pos2 indicates the

  • field's last character. If '.c' is omitted from pos1, it defaults to 1

  • (the beginning of the field); if omitted from pos2, it defaults to 0

  • (the end of the field).

  • */
    public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> { private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName()); private int numOfPartitionFields; private KeyFieldHelper keyFieldHelper = new KeyFieldHelper(); public void configure(JobConf job) {
    String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); if (job.get("num.key.fields.for.partition") != null) {
    LOG.warn("Using deprecated num.key.fields.for.partition. " +
    "Use mapred.text.key.partitioner.options instead"); this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
    keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
    } else {
    String option = job.getKeyFieldPartitionerOption();
    keyFieldHelper.parseOption(option);
    }
    } public int getPartition(K2 key, V2 value, int numReduceTasks) { byte[] keyBytes;

    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs(); if (allKeySpecs.size() == 0) { return getPartition(key.toString().hashCode(), numReduceTasks);
    } try {
    keyBytes = key.toString().getBytes("UTF-8");
    } catch (UnsupportedEncodingException e) { throw new RuntimeException("The current system does not " +
    "support UTF-8 encoding!", e);
    } // return 0 if the key is empty
    if (keyBytes.length == 0) { return 0;
    } int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
    keyBytes.length); int currentHash = 0; for (KeyDescription keySpec : allKeySpecs) { int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length,
    lengthIndicesFirst, keySpec); // no key found! continue
    if (startChar < 0) { continue;
    } int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
    lengthIndicesFirst, keySpec);
    currentHash = hashCode(keyBytes, startChar, endChar,
    currentHash);
    } return getPartition(currentHash, numReduceTasks);
    } protected int hashCode(byte[] b, int start, int end, int currentHash) { for (int i = start; i <= end; i++) {
    currentHash = 31*currentHash + b[i];
    } return currentHash;
    } protected int getPartition(int hash, int numReduceTasks) { return (hash & Integer.MAX_VALUE) % numReduceTasks;
    }
    }</pre>

5. TotalOrderPartitioner這個(gè)類可以實(shí)現(xiàn)輸出的全排序。不同于以上3個(gè)partitioner,這個(gè)類并不是基于hash的。下面詳細(xì)的介紹TotalOrderPartitioner

TotalOrderPartitioner 類

每一個(gè)reducer的輸出在默認(rèn)的情況下都是有順序的,但是reducer之間在輸入是無(wú)序的情況下也是無(wú)序的。如果要實(shí)現(xiàn)輸出是全排序的那就會(huì)用到TotalOrderPartitioner。

要使用TotalOrderPartitioner,得給TotalOrderPartitioner提供一個(gè)partition file。這個(gè)文件要求Key(這些key就是所謂的劃分)的數(shù)量和當(dāng)前reducer的數(shù)量-1相同并且是從小到大排列。對(duì)于為什么要用到這樣一個(gè)文件,以及這個(gè)文件的具體細(xì)節(jié)待會(huì)還會(huì)提到。

TotalOrderPartitioner對(duì)不同Key的數(shù)據(jù)類型提供了兩種方案:

1) 對(duì)于非BinaryComparable 類型的Key,TotalOrderPartitioner采用二分發(fā)查找當(dāng)前的K所在的index。

例如:reducer的數(shù)量為5,partition file 提供的4個(gè)劃分為【2,4,6,8】。如果當(dāng)前的一個(gè)key/value 是<4,”good”>,利用二分法查找到index=1,index+1=2那么這個(gè)key/value 將會(huì)發(fā)送到第二個(gè)reducer。如果一個(gè)key/value為<4.5, “good”>。那么二分法查找將返回-3,同樣對(duì)-3加1然后取反就是這個(gè)key/value將要去的reducer。

對(duì)于一些數(shù)值型的數(shù)據(jù)來(lái)說(shuō),利用二分法查找復(fù)雜度是O(log(reducer count)),速度比較快。

2) 對(duì)于BinaryComparable類型的Key(也可以直接理解為字符串)。字符串按照字典順序也是可以進(jìn)行排序的。

這樣的話也可以給定一些劃分,讓不同的字符串key分配到不同的reducer里。這里的處理和數(shù)值類型的比較相近。

例如:reducer的數(shù)量為5,partition file 提供了4個(gè)劃分為【“abc”, “bce”, “eaa”, ”fhc”】那么“ab”這個(gè)字符串將會(huì)被分配到第一個(gè)reducer里,因?yàn)樗∮诘谝粋€(gè)劃分“abc”。

但是不同于數(shù)值型的數(shù)據(jù),字符串的查找和比較不能按照數(shù)值型數(shù)據(jù)的比較方法。mapreducer采用的Tire tree(關(guān)于Tire tree可以參考《字典樹(shù)(Trie Tree)》)的字符串查找方法。查找的時(shí)間復(fù)雜度o(m),m為樹(shù)的深度,空間復(fù)雜度o(255^m-1)。是一個(gè)典型的空間換時(shí)間的案例。

Tire tree的構(gòu)建

假設(shè)樹(shù)的最大深度為3,劃分為【aaad ,aaaf, aaaeh,abbx】

image

Mapreduce里的Tire tree主要有兩種節(jié)點(diǎn)組成:

1) Innertirenode
Innertirenode在mapreduce中是包含了255個(gè)字符的一個(gè)比較長(zhǎng)的串。上圖中的例子只包含了26個(gè)英文字母。
2) 葉子節(jié)點(diǎn){unslipttirenode, singesplittirenode, leaftirenode}
Unslipttirenode 是不包含劃分的葉子節(jié)點(diǎn)。
Singlesplittirenode 是只包含了一個(gè)劃分點(diǎn)的葉子節(jié)點(diǎn)。
Leafnode是包含了多個(gè)劃分點(diǎn)的葉子節(jié)點(diǎn)。(這種情況比較少見(jiàn),達(dá)到樹(shù)的最大深度才出現(xiàn)這種情況。在實(shí)際操作過(guò)程中比較少見(jiàn))

Tire tree的搜索過(guò)程

接上面的例子:
1)假如當(dāng)前 key value pair <aad, 10="">這時(shí)會(huì)找到圖中的leafnode,在leafnode內(nèi)部使用二分法繼續(xù)查找找到返回 aad在劃分?jǐn)?shù)組中的索引。找不到會(huì)返回一個(gè)和它最接近的劃分的索引。
2)假如找到singlenode,如果和singlenode的劃分相同或小返回他的索引,比singlenode的劃分大則返回索引+1。
3)假如找到nosplitnode則返回前面的索引。如<zaa, 20="">將會(huì)返回abbx的在劃分?jǐn)?shù)組中的索引。

TotalOrderPartitioner的疑問(wèn)

上面介紹了partitioner有兩個(gè)要求,一個(gè)是速度,另外一個(gè)是均衡負(fù)載。使用tire tree提高了搜素的速度,但是我們?cè)趺床拍苷业竭@樣的partition file 呢?讓所有的劃分剛好就能實(shí)現(xiàn)均衡負(fù)載。

InputSampler
輸入采樣類,可以對(duì)輸入目錄下的數(shù)據(jù)進(jìn)行采樣。提供了3種采樣方法。

image

采樣類結(jié)構(gòu)圖

采樣方式對(duì)比表:

|

類名稱

|

采樣方式

|

構(gòu)造方法

|

效率

|

特點(diǎn)

|
|

SplitSampler<K,V>

|

對(duì)前n個(gè)記錄進(jìn)行采樣

|

采樣總數(shù),劃分?jǐn)?shù)

|

最高

| |
|

RandomSampler<K,V>

|

遍歷所有數(shù)據(jù),隨機(jī)采樣

|

采樣頻率,采樣總數(shù),劃分?jǐn)?shù)

|

最低

| |
|

IntervalSampler<K,V>

|

固定間隔采樣

|

采樣頻率,劃分?jǐn)?shù)

|

|

對(duì)有序的數(shù)據(jù)十分適用

|

writePartitionFile這個(gè)方法很關(guān)鍵,這個(gè)方法就是根據(jù)采樣類提供的樣本,首先進(jìn)行排序,然后選定(隨機(jī)的方法)和reducer數(shù)目-1的樣本寫(xiě)入到partition file。這樣經(jīng)過(guò)采樣的數(shù)據(jù)生成的劃分,在每個(gè)劃分區(qū)間里的key/value就近似相同了,這樣就能完成均衡負(fù)載的作用。

SplitSampler類的源代碼如下:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;"> /** * Samples the first n records from s splits.

  • Inexpensive way to sample random data. /
    public static class SplitSampler<K,V> implements Sampler<K,V> { private final int numSamples; private final int maxSplitsSampled; /
    * * Create a SplitSampler sampling <em>all</em> splits.
    • Takes the first numSamples / numSplits records from each split.
    • @param numSamples Total number of samples to obtain from all selected
    •               splits. */
      
public SplitSampler(int numSamples) { this(numSamples, Integer.MAX_VALUE);
} /** * Create a new SplitSampler.
 * @param numSamples Total number of samples to obtain from all selected
 *                   splits.
 * @param maxSplitsSampled The maximum number of splits to examine. */
public SplitSampler(int numSamples, int maxSplitsSampled) { this.numSamples = numSamples; this.maxSplitsSampled = maxSplitsSampled;
} /** * From each split sampled, take the first numSamples / numSplits records. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.length); int splitStep = splits.length / splitsToSample; int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue(); while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey(); ++records; if ((i+1) * samplesPerSplit <= records) { break;
      }
    }
    reader.close();
  } return (K[])samples.toArray();
}

}</pre>

RandomSampler類的源代碼如下:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;"> /** * Sample from random points in the input.

  • General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
  • each split. /
    public static class RandomSampler<K,V> implements Sampler<K,V> { private double freq; private final int numSamples; private final int maxSplitsSampled; /
    * * Create a new RandomSampler sampling <em>all</em> splits.
    • This will read every split at the client, which is very expensive.
    • @param freq Probability with which a key will be chosen.
    • @param numSamples Total number of samples to obtain from all selected
    •               splits. */
      
public RandomSampler(double freq, int numSamples) { this(freq, numSamples, Integer.MAX_VALUE);
} /** * Create a new RandomSampler.
 * @param freq Probability with which a key will be chosen.
 * @param numSamples Total number of samples to obtain from all selected
 *                   splits.
 * @param maxSplitsSampled The maximum number of splits to examine. */
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { this.freq = freq; this.numSamples = numSamples; this.maxSplitsSampled = maxSplitsSampled;
} /** * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random(); long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed); // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i]; int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  } // our target rate is in terms of the maximum number of sample splits, // but we accept the possibility of sampling additional splits to hit // the target sample keyset
  for (int i = 0; i < splitsToSample || (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue(); while (reader.next(key, value)) { if (r.nextDouble() <= freq) { if (samples.size() < numSamples) {
          samples.add(key);
        } else { // When exceeding the maximum number of samples, replace a // random element with this one, then adjust the frequency // to reflect the possibility of existing elements being // pushed out
          int ind = r.nextInt(numSamples); if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  } return (K[])samples.toArray();
}

}</pre>

IntervalSampler類的源代碼為:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;"> /** * Sample from s splits at regular intervals.

  • Useful for sorted data. /
    public static class IntervalSampler<K,V> implements Sampler<K,V> { private final double freq; private final int maxSplitsSampled; /
    * * Create a new IntervalSampler sampling <em>all</em> splits.
    • @param freq The frequency with which records will be emitted. /
      public IntervalSampler(double freq) { this(freq, Integer.MAX_VALUE);
      } /
      * * Create a new IntervalSampler.
    • @param freq The frequency with which records will be emitted.
    • @param maxSplitsSampled The maximum number of splits to examine.
    • @see #getSample /
      public IntervalSampler(double freq, int maxSplitsSampled) { this.freq = freq; this.maxSplitsSampled = maxSplitsSampled;
      } /
      * * For each split sampled, emit when the ratio of the number of records
    • retained to the total record count is less than the specified
    • frequency. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
      public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>(); int splitsToSample = Math.min(maxSplitsSampled, splits.length); int splitStep = splits.length / splitsToSample; long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) {
      RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
      job, Reporter.NULL);
      K key = reader.createKey();
      V value = reader.createValue(); while (reader.next(key, value)) { ++records; if ((double) kept / records < freq) { ++kept;
      samples.add(key);
      key = reader.createKey();
      }
      }
      reader.close();
      } return (K[])samples.toArray();
      }
      }</pre>

InputSampler類完整源代碼如下:

image

InputSampler

TotalOrderPartitioner實(shí)例

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: Consolas, "Courier New", 宋體, Courier, mono, serif; font-size: 12px !important; line-height: 1;">public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool
{
@Override public int run(String[] args) throws Exception
{
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1;
}
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat
.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf,
CompressionType.BLOCK);
conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>( 0.1, 10000, 10);
Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input, "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler); // Add to DistributedCache
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf); return 0;
} public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run( new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
}</pre>

</div>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容