MapReduce框架原理

最全的MapReduce框架原理,方便以后復(fù)習(xí)。知識(shí)點(diǎn)來(lái)自尚硅谷的課程學(xué)習(xí)。課程鏈接


一、InputFormat數(shù)據(jù)輸入

1. 切片與MapTask并行度決定機(jī)制

數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)。實(shí)際存儲(chǔ)在磁盤上,還是按照HDFS將數(shù)據(jù)分成一個(gè)一個(gè)Block進(jìn)行存儲(chǔ)。
MapTask的并行速度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)Job的處理速度。

決定機(jī)制原理

  • 當(dāng)切片大小為100M時(shí),不同節(jié)點(diǎn)之間需要數(shù)據(jù)傳輸,耗費(fèi)大量IO,不高效;
  • 當(dāng)切分時(shí),只針對(duì)單個(gè)文件進(jìn)行切分,不考慮文件之間的大??;
  • MapTask的數(shù)量由切片數(shù)量決定,切片的大小默認(rèn)是塊大小。

2. Job提交流程源碼解析

Job提交流程源碼解析
waitForCompletion();

// 1 建立連接
connect();
  // 1.1 創(chuàng)建提交Job的代理
  new Cluster(getConfiguration());
  // 1.2 判斷是本地yarn還是遠(yuǎn)程yarn
  initialize(jobTrackAddr, conf);

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
  // 2.1 創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  // 2.2 獲取jobid,并創(chuàng)建Job路徑
  JobID jobId = submitClient.getNewJobID();
  // 2.3 拷貝jar包到集群
  copyAndConfigureFiles(job, submitJobDir);
  rUpLoader.uploadFiles(job, jobSubmitDir);
  // 2.4 計(jì)算切片,生成切片規(guī)劃文件
  writeSplits(job, submitJobDir);
  maps = writeNewSplits(job, jobSubmitDir);
  input.getSplits(job);
  // 2.5 向Stag路徑寫XML配置文件
  writeConf(conf, submitJobFile);
  conf.writeXml(out);
  // 2.6 提交Job,返回提交狀態(tài)
  status = submitClint.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

3. FileInputFormat切片機(jī)制

  • 根據(jù)計(jì)算公式,切片大小默認(rèn)為塊大小,本地模式切片大小為32M;
  • 公式中,默認(rèn)minSize=1,maxSize=Long.MAXValue
  • 切片大小設(shè)置:maxSize調(diào)的小于blockSize,則切片會(huì)變小。minSize調(diào)的比blockSize大,則可以讓切片變大;
  • MrAppMaster根據(jù)切片數(shù)量計(jì)算MapTask個(gè)數(shù);
  • 獲取切片的文件名:
    String name = inputSplit.getPath().getName();
  • 根據(jù)文件類型獲取切片信息:
    FileSplit inputSplit = (FileSplit) context.getInputSplit();

4. CombineTextInputFormat切片機(jī)制

框架默認(rèn)的TextInputFormat切片機(jī)制是對(duì)任務(wù)按照文件進(jìn)行切片,當(dāng)有大量小文件時(shí),就會(huì)產(chǎn)生大量的MapTask,處理效率及其低下。

CombineTextFormat用于小文件過(guò)多的場(chǎng)景,從邏輯上將多個(gè)小文件規(guī)劃到一個(gè)切片中,交給一個(gè)MapTask處理。


  • 虛擬存儲(chǔ)切片最大值可以任意設(shè)置,但是要根據(jù)實(shí)際的小文件大小來(lái)具體設(shè)置:
    combineTextFormat.setMaxInputSplitSize(job, 4194304); // 4M
  • 主要分為兩個(gè)過(guò)程:虛擬存儲(chǔ)過(guò)程和切片過(guò)程。
  • 當(dāng)存儲(chǔ)時(shí)當(dāng)文件大于4M,會(huì)判斷是否比2*4M大。
  • 在實(shí)際使用時(shí),需要處理很多小文件時(shí),可以按如下操作,在Driver類中增加如下信息:
// 如果不設(shè)置InputFormat,默認(rèn)是TextInputFormat.class
job.setInputFormatClass(CombineTextInputForamt.class);
// 設(shè)置虛擬存儲(chǔ)切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

5. FileInputFormat實(shí)現(xiàn)類

在運(yùn)行MapReduce程序時(shí),針對(duì)不同格式的輸入文件,MapReduce是如何讀取這些數(shù)據(jù)的?

FileInputFormat常見的接口實(shí)現(xiàn)類包括:

  • TextInputFormat
  • KeyValueTextInputFormat
  • NLineInputFormat
  • CombineTextInputFormat
  • 自定義的InputFormat
5.1 TextInputFormat

TextinputFormat是默認(rèn)的FileInputFormat實(shí)現(xiàn)類。按行讀取每條記錄,健是存儲(chǔ)該行在整個(gè)文件中的起始字節(jié)偏移量,LongWritable類型。值是這行的內(nèi)容,不包括任何終止符(換行,回車等),Text類型。

5.2 KeyValueTextInputFormat

每一行均為一條記錄,被分割符分割為key和value,默認(rèn)的分割符是"\t"??梢栽隍?qū)動(dòng)類中進(jìn)行設(shè)置:
conf.set(KeyValueLineRecoredReader,KEY_VALUE_SEPERATOR, "\t");
此時(shí)的key是每行排在分割符之前的Text序列。

5.3 NLineInputFormat

如果使用NLineInpurFormat,代表每個(gè)MapTask進(jìn)程處理的InputSplit不再按照Block塊去切分,而是按照NLineInputFormat指定的行數(shù)N來(lái)切分。
輸入文件的總行數(shù) \div N=切片數(shù),如果不整除,切片數(shù)=商+1。
此時(shí)的key-value與TextInputFormat生成的一樣??梢栽隍?qū)動(dòng)類中進(jìn)行設(shè)置:
NLineInputFormat.setNumLinesPerSplit(job, 3);

5.4 自定義InputFormat

具體步驟:
1)自定義一個(gè)類繼承FileInputFormat;
2)改寫RecordReader,實(shí)現(xiàn)自定義讀取并封裝為key-value;
3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件。

SequenceFile文件是Hadoop用來(lái)存儲(chǔ)二進(jìn)制形式的key-value對(duì)的文件格式,文件路徑+文件名為key,文件內(nèi)容為value。

自定義InputFormat案例:


實(shí)現(xiàn)步驟

代碼實(shí)現(xiàn):
WholeFileInputFormat.java

package Inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);

        return recordReader;
    }
}

WholeRecordReader.java

package Inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

    FileSplit split;
    Configuration configuration;
    Text k = new Text();
    BytesWritable v = new BytesWritable();
    boolean isProgress = true;

    // 初始化
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    // 核心的業(yè)務(wù)邏輯
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (isProgress) {
            byte[] buf = new byte[(int) split.getLength()];

            // 1 獲取Fs對(duì)象
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(configuration);

            // 2 獲取輸入流
            FSDataInputStream fis = fs.open(path);

            // 3 拷貝
            IOUtils.readFully(fis, buf, 0, buf.length);

            // 4 封裝v
            v.set(buf, 0, buf.length);

            // 5 封裝k
            k.set(path.toString());

            // 6 關(guān)閉資源
            IOUtils.closeStream(fis);

            isProgress = false;

            return true;
        }

        return false;
    }

    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return v;
    }

    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    public void close() throws IOException {

    }
}

完成相應(yīng)的Mapper和Reducer類,并在Driver中增加兩句:

// 4 設(shè)置輸入的InputFormat
job.setInputFormatClass(WholeFileInputFormat.class);

// 5 設(shè)置輸出的OutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);

二、Shuffle機(jī)制

Map方法之后,Reduce方法之前的數(shù)據(jù)處理過(guò)程稱之為Shuffle。

1. Partition分區(qū)

分區(qū)就是將計(jì)算結(jié)果按照條件輸出到不同文件中。比如按照手機(jī)號(hào)歸屬地將不同省份輸出到不同文件中。

1.1系統(tǒng)默認(rèn)的分區(qū)是Hash分區(qū):
public class Hashpartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hasCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
  • 默認(rèn)分區(qū)是根據(jù)key的HashCode對(duì)ReduceTasks個(gè)數(shù)取模得到的;
  • 用戶沒法控制那個(gè)Key存儲(chǔ)到那個(gè)分區(qū)。
1.2 自定義Partition分區(qū)

自定義步驟:



案例實(shí)現(xiàn):

按照手機(jī)號(hào)前三位將統(tǒng)計(jì)結(jié)果寫入到不同的文件。

自定義Partition類ProvincePartitioner.java

package Flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

        // 1 獲取手機(jī)號(hào)前三位
        String prePhoneNum = key.toString().substring(0, 3);

        int partition = 3;
        if ("136".equals(prePhoneNum)) {
            partition = 0;
        }else if ("137".equals(prePhoneNum)) {
            partition = 1;
        }else if ("138".equals(prePhoneNum)) {
            partition = 2;
        }
        
        return partition;
    }
}

編寫Mapper和Reducer,在Driver類中增加如下代碼:

// 5 設(shè)置Partition分區(qū)
 job.setPartitionerClass(ProvincePartitioner.class);
 job.setNumReduceTasks(4);
  • 在定義分區(qū)時(shí),分區(qū)號(hào)一定要嚴(yán)格按照順序從0開始;
  • 在設(shè)置分區(qū)數(shù)時(shí):
    當(dāng)忘記設(shè)置或者設(shè)置為1,則最終只會(huì)產(chǎn)生一個(gè)文件,程序會(huì)將所有的分區(qū)數(shù)據(jù)統(tǒng)統(tǒng)寫入一個(gè)文件;
    當(dāng)設(shè)置的分區(qū)數(shù)在1和實(shí)際分區(qū)數(shù)之間會(huì)報(bào)錯(cuò)(IO異常);
    當(dāng)設(shè)置的分區(qū)數(shù)大于實(shí)際分區(qū)數(shù),會(huì)生成期待的結(jié)果,但是會(huì)多出超出分區(qū)個(gè)數(shù)個(gè)空文件,因?yàn)閷?shí)際運(yùn)行中,沒有對(duì)應(yīng)的數(shù)據(jù)傳給多余的ReduceTask。
1.3 WritableComparable排序

排序是MapReduce中的重要操作。

MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按照key進(jìn)行排序 ,屬于Hadoop的默認(rèn)行為。默認(rèn)排序是按照字典順序排序,且實(shí)現(xiàn)該排序的方法是快速排序。

MapReduce排序分類:


自定義排序:
bean對(duì)象作為key傳輸,需要實(shí)現(xiàn)WritableComparable接口重寫compareTo方法,就可以實(shí)現(xiàn)排序。

示例編寫:

按照電話所屬區(qū)將統(tǒng)計(jì)數(shù)據(jù)存儲(chǔ)到不同的文件中,并在每個(gè)文件中實(shí)現(xiàn)按總流量的逆序。

重寫編寫FlowBean.java

package Sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        sumFlow = upFlow + downFlow;
    }

    // 核心的比較
    public int compareTo(FlowBean bean) {

        int result;

        if (sumFlow > bean.getSumFlow()) {
            result = -1;
        }else if(sumFlow < bean.getSumFlow()) {
            result = 1;
        }else{
            result = 0;
        }

        return result;
    }

    // 序列化方法
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    // 反序列化方法
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}
  • 此時(shí),要繼承WritableComparable類;
  • 重寫compareTo方法,這是排序比較的核心;

編寫Mapper類,F(xiàn)lowCountSortMapper.java

package Sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    FlowBean k = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 獲取一行
        String line = value.toString();

        // 2 拆分
        String[] fields = line.split("\t");

        // 3 封裝
        k.setUpFlow(Long.parseLong(fields[1]));   // 流量作為key
        k.setDownFlow(Long.parseLong(fields[2]));
        k.setSumFlow(Long.parseLong(fields[3]));

        v.set(fields[0]);  // 電話號(hào)碼作為Value

        // 3 寫出
        context.write(k, v);
    }
}
  • 需要對(duì)數(shù)據(jù)按照總流量排序,所以Map階段結(jié)束后的key應(yīng)該是Flowbean對(duì)象,value為電話號(hào)碼.

編寫Reducer類,F(xiàn)lowCountSortReducer.java

package Sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {
            context.write(value, key);
        }
    }
}
  • 排序結(jié)束后,生成的數(shù)據(jù)還是按照電話,流量展示,所以此時(shí)輸出的key為電話號(hào)碼,value為FlowBean。

編寫Partition類,ProvincePartitioner.java

package Sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {

        String prePhoneNum = text.toString().substring(0, 3);

        int partition = 3;

        if ("136".equals(prePhoneNum)) {
            partition= 0;
        }else if ("137".equals(prePhoneNum)) {
            partition = 1;
        }else if ("138".equals(prePhoneNum)) {
            partition = 2;
        }

        return partition;
    }
}
  • 在Reducer階段,需要將不同所屬區(qū)的數(shù)據(jù)寫入不同文件,此時(shí)需要對(duì)數(shù)據(jù)進(jìn)行Partition分區(qū)。

編寫驅(qū)動(dòng)類,F(xiàn)lowCountSortDriver.java

package Sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowCountSortDriver.class);
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(4);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容