最全的MapReduce框架原理,方便以后復(fù)習(xí)。知識(shí)點(diǎn)來(lái)自尚硅谷的課程學(xué)習(xí)。課程鏈接
一、InputFormat數(shù)據(jù)輸入
1. 切片與MapTask并行度決定機(jī)制
數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)。實(shí)際存儲(chǔ)在磁盤上,還是按照HDFS將數(shù)據(jù)分成一個(gè)一個(gè)Block進(jìn)行存儲(chǔ)。
MapTask的并行速度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)Job的處理速度。

- 當(dāng)切片大小為100M時(shí),不同節(jié)點(diǎn)之間需要數(shù)據(jù)傳輸,耗費(fèi)大量IO,不高效;
- 當(dāng)切分時(shí),只針對(duì)單個(gè)文件進(jìn)行切分,不考慮文件之間的大??;
- MapTask的數(shù)量由切片數(shù)量決定,切片的大小默認(rèn)是塊大小。
2. Job提交流程源碼解析

waitForCompletion();
// 1 建立連接
connect();
// 1.1 創(chuàng)建提交Job的代理
new Cluster(getConfiguration());
// 1.2 判斷是本地yarn還是遠(yuǎn)程yarn
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 2.1 創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2.2 獲取jobid,并創(chuàng)建Job路徑
JobID jobId = submitClient.getNewJobID();
// 2.3 拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUpLoader.uploadFiles(job, jobSubmitDir);
// 2.4 計(jì)算切片,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 2.5 向Stag路徑寫XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 2.6 提交Job,返回提交狀態(tài)
status = submitClint.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
3. FileInputFormat切片機(jī)制

- 根據(jù)計(jì)算公式,切片大小默認(rèn)為塊大小,本地模式切片大小為32M;
- 公式中,默認(rèn)
minSize=1,maxSize=Long.MAXValue; - 切片大小設(shè)置:maxSize調(diào)的小于blockSize,則切片會(huì)變小。minSize調(diào)的比blockSize大,則可以讓切片變大;
- MrAppMaster根據(jù)切片數(shù)量計(jì)算MapTask個(gè)數(shù);
- 獲取切片的文件名:
String name = inputSplit.getPath().getName(); - 根據(jù)文件類型獲取切片信息:
FileSplit inputSplit = (FileSplit) context.getInputSplit();
4. CombineTextInputFormat切片機(jī)制
框架默認(rèn)的TextInputFormat切片機(jī)制是對(duì)任務(wù)按照文件進(jìn)行切片,當(dāng)有大量小文件時(shí),就會(huì)產(chǎn)生大量的MapTask,處理效率及其低下。
CombineTextFormat用于小文件過(guò)多的場(chǎng)景,從邏輯上將多個(gè)小文件規(guī)劃到一個(gè)切片中,交給一個(gè)MapTask處理。

- 虛擬存儲(chǔ)切片最大值可以任意設(shè)置,但是要根據(jù)實(shí)際的小文件大小來(lái)具體設(shè)置:
combineTextFormat.setMaxInputSplitSize(job, 4194304); // 4M - 主要分為兩個(gè)過(guò)程:虛擬存儲(chǔ)過(guò)程和切片過(guò)程。
- 當(dāng)存儲(chǔ)時(shí)當(dāng)文件大于4M,會(huì)判斷是否比2*4M大。
- 在實(shí)際使用時(shí),需要處理很多小文件時(shí),可以按如下操作,在Driver類中增加如下信息:
// 如果不設(shè)置InputFormat,默認(rèn)是TextInputFormat.class
job.setInputFormatClass(CombineTextInputForamt.class);
// 設(shè)置虛擬存儲(chǔ)切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
5. FileInputFormat實(shí)現(xiàn)類
在運(yùn)行MapReduce程序時(shí),針對(duì)不同格式的輸入文件,MapReduce是如何讀取這些數(shù)據(jù)的?
FileInputFormat常見的接口實(shí)現(xiàn)類包括:
- TextInputFormat
- KeyValueTextInputFormat
- NLineInputFormat
- CombineTextInputFormat
- 自定義的InputFormat
5.1 TextInputFormat
TextinputFormat是默認(rèn)的FileInputFormat實(shí)現(xiàn)類。按行讀取每條記錄,健是存儲(chǔ)該行在整個(gè)文件中的起始字節(jié)偏移量,LongWritable類型。值是這行的內(nèi)容,不包括任何終止符(換行,回車等),Text類型。
5.2 KeyValueTextInputFormat
每一行均為一條記錄,被分割符分割為key和value,默認(rèn)的分割符是"\t"??梢栽隍?qū)動(dòng)類中進(jìn)行設(shè)置:
conf.set(KeyValueLineRecoredReader,KEY_VALUE_SEPERATOR, "\t");
此時(shí)的key是每行排在分割符之前的Text序列。
5.3 NLineInputFormat
如果使用NLineInpurFormat,代表每個(gè)MapTask進(jìn)程處理的InputSplit不再按照Block塊去切分,而是按照NLineInputFormat指定的行數(shù)N來(lái)切分。
,如果不整除,
。
此時(shí)的key-value與TextInputFormat生成的一樣??梢栽隍?qū)動(dòng)類中進(jìn)行設(shè)置:
NLineInputFormat.setNumLinesPerSplit(job, 3);
5.4 自定義InputFormat
具體步驟:
1)自定義一個(gè)類繼承FileInputFormat;
2)改寫RecordReader,實(shí)現(xiàn)自定義讀取并封裝為key-value;
3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件。
SequenceFile文件是Hadoop用來(lái)存儲(chǔ)二進(jìn)制形式的key-value對(duì)的文件格式,文件路徑+文件名為key,文件內(nèi)容為value。
自定義InputFormat案例:

代碼實(shí)現(xiàn):
WholeFileInputFormat.java
package Inputformat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
WholeRecordReader.java
package Inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
FileSplit split;
Configuration configuration;
Text k = new Text();
BytesWritable v = new BytesWritable();
boolean isProgress = true;
// 初始化
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) split;
configuration = context.getConfiguration();
}
// 核心的業(yè)務(wù)邏輯
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
byte[] buf = new byte[(int) split.getLength()];
// 1 獲取Fs對(duì)象
Path path = split.getPath();
FileSystem fs = path.getFileSystem(configuration);
// 2 獲取輸入流
FSDataInputStream fis = fs.open(path);
// 3 拷貝
IOUtils.readFully(fis, buf, 0, buf.length);
// 4 封裝v
v.set(buf, 0, buf.length);
// 5 封裝k
k.set(path.toString());
// 6 關(guān)閉資源
IOUtils.closeStream(fis);
isProgress = false;
return true;
}
return false;
}
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
}
}
完成相應(yīng)的Mapper和Reducer類,并在Driver中增加兩句:
// 4 設(shè)置輸入的InputFormat
job.setInputFormatClass(WholeFileInputFormat.class);
// 5 設(shè)置輸出的OutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
二、Shuffle機(jī)制
Map方法之后,Reduce方法之前的數(shù)據(jù)處理過(guò)程稱之為Shuffle。
1. Partition分區(qū)
分區(qū)就是將計(jì)算結(jié)果按照條件輸出到不同文件中。比如按照手機(jī)號(hào)歸屬地將不同省份輸出到不同文件中。
1.1系統(tǒng)默認(rèn)的分區(qū)是Hash分區(qū):
public class Hashpartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hasCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
- 默認(rèn)分區(qū)是根據(jù)key的HashCode對(duì)ReduceTasks個(gè)數(shù)取模得到的;
- 用戶沒法控制那個(gè)Key存儲(chǔ)到那個(gè)分區(qū)。
1.2 自定義Partition分區(qū)
自定義步驟:

案例實(shí)現(xiàn):
按照手機(jī)號(hào)前三位將統(tǒng)計(jì)結(jié)果寫入到不同的文件。
自定義Partition類ProvincePartitioner.java
package Flowsum;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取手機(jī)號(hào)前三位
String prePhoneNum = key.toString().substring(0, 3);
int partition = 3;
if ("136".equals(prePhoneNum)) {
partition = 0;
}else if ("137".equals(prePhoneNum)) {
partition = 1;
}else if ("138".equals(prePhoneNum)) {
partition = 2;
}
return partition;
}
}
編寫Mapper和Reducer,在Driver類中增加如下代碼:
// 5 設(shè)置Partition分區(qū)
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(4);
- 在定義分區(qū)時(shí),分區(qū)號(hào)一定要嚴(yán)格按照順序從0開始;
- 在設(shè)置分區(qū)數(shù)時(shí):
當(dāng)忘記設(shè)置或者設(shè)置為1,則最終只會(huì)產(chǎn)生一個(gè)文件,程序會(huì)將所有的分區(qū)數(shù)據(jù)統(tǒng)統(tǒng)寫入一個(gè)文件;
當(dāng)設(shè)置的分區(qū)數(shù)在1和實(shí)際分區(qū)數(shù)之間會(huì)報(bào)錯(cuò)(IO異常);
當(dāng)設(shè)置的分區(qū)數(shù)大于實(shí)際分區(qū)數(shù),會(huì)生成期待的結(jié)果,但是會(huì)多出超出分區(qū)個(gè)數(shù)個(gè)空文件,因?yàn)閷?shí)際運(yùn)行中,沒有對(duì)應(yīng)的數(shù)據(jù)傳給多余的ReduceTask。
1.3 WritableComparable排序
排序是MapReduce中的重要操作。
MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按照key進(jìn)行排序 ,屬于Hadoop的默認(rèn)行為。默認(rèn)排序是按照字典順序排序,且實(shí)現(xiàn)該排序的方法是快速排序。

MapReduce排序分類:

自定義排序:
bean對(duì)象作為key傳輸,需要實(shí)現(xiàn)WritableComparable接口重寫compareTo方法,就可以實(shí)現(xiàn)排序。
示例編寫:
按照電話所屬區(qū)將統(tǒng)計(jì)數(shù)據(jù)存儲(chǔ)到不同的文件中,并在每個(gè)文件中實(shí)現(xiàn)按總流量的逆序。
重寫編寫FlowBean.java
package Sort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = upFlow + downFlow;
}
// 核心的比較
public int compareTo(FlowBean bean) {
int result;
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if(sumFlow < bean.getSumFlow()) {
result = 1;
}else{
result = 0;
}
return result;
}
// 序列化方法
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
// 反序列化方法
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
- 此時(shí),要繼承WritableComparable類;
- 重寫compareTo方法,這是排序比較的核心;
編寫Mapper類,F(xiàn)lowCountSortMapper.java
package Sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 拆分
String[] fields = line.split("\t");
// 3 封裝
k.setUpFlow(Long.parseLong(fields[1])); // 流量作為key
k.setDownFlow(Long.parseLong(fields[2]));
k.setSumFlow(Long.parseLong(fields[3]));
v.set(fields[0]); // 電話號(hào)碼作為Value
// 3 寫出
context.write(k, v);
}
}
- 需要對(duì)數(shù)據(jù)按照總流量排序,所以Map階段結(jié)束后的key應(yīng)該是Flowbean對(duì)象,value為電話號(hào)碼.
編寫Reducer類,F(xiàn)lowCountSortReducer.java
package Sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
- 排序結(jié)束后,生成的數(shù)據(jù)還是按照電話,流量展示,所以此時(shí)輸出的key為電話號(hào)碼,value為FlowBean。
編寫Partition類,ProvincePartitioner.java
package Sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
String prePhoneNum = text.toString().substring(0, 3);
int partition = 3;
if ("136".equals(prePhoneNum)) {
partition= 0;
}else if ("137".equals(prePhoneNum)) {
partition = 1;
}else if ("138".equals(prePhoneNum)) {
partition = 2;
}
return partition;
}
}
- 在Reducer階段,需要將不同所屬區(qū)的數(shù)據(jù)寫入不同文件,此時(shí)需要對(duì)數(shù)據(jù)進(jìn)行Partition分區(qū)。
編寫驅(qū)動(dòng)類,F(xiàn)lowCountSortDriver.java
package Sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCountSortDriver.class);
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(4);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}