6_大數(shù)據(jù)之MapReduce_1

MapReduce概述

1??MapReduce定義

2??MapReduce優(yōu)缺點(diǎn)

  1. 優(yōu)點(diǎn)
  2. 缺點(diǎn)

3??MapReduce核心思想

1)分布式的運(yùn)算程序往往需要分成至少2個階段。
2)第一個階段的MapTask并發(fā)實(shí)例,完全并行運(yùn)行,互不相干。
3)第二個階段的ReduceTask并發(fā)實(shí)例互不相干,但是他們的數(shù)據(jù)依賴于上一個階段的所有MapTask并發(fā)實(shí)例的輸出。
4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個MapReduce程序,串行運(yùn)行。
總結(jié):分析WordCount數(shù)據(jù)流走向深入理解MapReduce核心思想。

4??MapReduce進(jìn)程

5??官方WordCount源碼
??采用反編譯工具反編譯源碼,發(fā)現(xiàn)WordCount案例有Map類、Reduce類和驅(qū)動類。且數(shù)據(jù)的類型是Hadoop自身封裝的序列化類型。

6??常用數(shù)據(jù)序列化類型

7??MapReduce編程規(guī)范 : 用戶編寫的程序分成三個部分:Mapper、ReducerDriver

8??WordCount案例實(shí)操

  1. 需求 : 在給定的文本文件中統(tǒng)計(jì)輸出每一個單詞出現(xiàn)的總次數(shù)
    (1)輸入數(shù)據(jù)
ffadsfsda
fasdfsad
fsadfa
Fsadfa
fsadfa
Ffadsfsda

?(2)期望輸出數(shù)據(jù)

Ffadsfsda  1
Fsadfa 1
fasdfsad   1
ffadsfsda  1
fsadfa 2

2.需求分析:按照MapReduce編程規(guī)范,分別編寫MapperReducer,Driver

  1. 編寫程序
    (1)創(chuàng)建一個Maven工程
    (2)在pom.xml文件中添加如下依賴
<dependencies>
      <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>RELEASE</version>
      </dependency>
      <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-core</artifactId>
          <version>2.8.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.7.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.7.2</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>2.7.2</version>
      </dependency>
</dependencies>

?(3)在項(xiàng)目的src/main/resources目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

?(4)編寫Mapper

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  
  Text k = new Text();
  IntWritable v = new IntWritable(1);
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {
      
      // 1 獲取一行
      String line = value.toString();
      
      // 2 切割
      String[] words = line.split(" ");
      
      // 3 輸出
      for (String word : words) {
          k.set(word);
          context.write(k, v);
      }
  }
}

?(5)編寫Reducer

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

     int sum;
     IntWritable v = new IntWritable();

     @Override
     protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
      
          // 1 累加求和
          sum = 0;
          for (IntWritable count : values) {
              sum += count.get();
          }
      
          // 2 輸出
          v.set(sum);
          context.write(key,v);
     }
}

?(6)編寫Driver驅(qū)動類

public class WordcountDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

      // 1 獲取配置信息以及封裝任務(wù)
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 設(shè)置jar加載路徑
      job.setJarByClass(WordcountDriver.class);

      // 3 設(shè)置map和reduce類
      job.setMapperClass(WordcountMapper.class);
      job.setReducerClass(WordcountReducer.class);

      // 4 設(shè)置map輸出
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);

      // 5 設(shè)置最終輸出kv類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      
      // 6 設(shè)置輸入和輸出路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 7 提交
      boolean result = job.waitForCompletion(true);

      System.exit(result ? 0 : 1);
  }
}

9??集群上測試
(1)用mavenjar包,需要添加的打包插件依賴(注意:標(biāo)記紅顏色的部分需要替換為自己工程主類)

<build>
      <plugins>
          <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>2.3.2</version>
              <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
              </configuration>
          </plugin>
          <plugin>
              <artifactId>maven-assembly-plugin </artifactId>
              <configuration>
                  <descriptorRefs>
                      <descriptorRef>jar-with-dependencies</descriptorRef>
                  </descriptorRefs>
                  <archive>
                      <manifest>
                          <mainClass>com.xxx.mr.WordcountDriver</mainClass>
                      </manifest>
                  </archive>
              </configuration>
              <executions>
                  <execution>
                      <id>make-assembly</id>
                      <phase>package</phase>
                      <goals>
                          <goal>single</goal>
                      </goals>
                  </execution>
              </executions>
          </plugin>
      </plugins>
</build>

注意:如果工程上顯示紅叉。在項(xiàng)目上右鍵->maven->update project即可。
(1)將程序打成jar包,然后拷貝到Hadoop集群中
步驟詳情:右鍵->Run as->maven install。等待編譯完成就會在項(xiàng)目的target文件夾中生成jar包。如果看不到。在項(xiàng)目上右鍵-》Refresh,即可看到。修改不帶依賴的jar包名稱為wc.jar,并拷貝該jar包到Hadoop集群。
(2)啟動Hadoop集群
(3)執(zhí)行WordCount程序

hadoop jar  wc.jar com.xxx.wordcount.WordcountDriver /user/xxx/input /user/xxx/output

Hadoop序列化

1??序列化概述

2??自定義bean對象實(shí)現(xiàn)序列化接口(Writable)
?在企業(yè)開發(fā)中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內(nèi)部傳遞一個bean對象,那么該對象就需要實(shí)現(xiàn)序列化接口。具體實(shí)現(xiàn)bean對象序列化步驟如下7步:
(1)必須實(shí)現(xiàn)Writable接口
(2)反序列化時,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造

public FlowBean() {
  super();
}

(3)重寫序列化方法

@Override
public void write(DataOutput out) throws IOException {
  out.writeLong(upFlow);
  out.writeLong(downFlow);
  out.writeLong(sumFlow);
}

(4)重寫反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
  upFlow = in.readLong();
  downFlow = in.readLong();
  sumFlow = in.readLong();
}

(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結(jié)果顯示在文件中,需要重寫toString(),可用”\t”分開,方便后續(xù)用。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)Comparable接口,因?yàn)?code>MapReduce框中的Shuffle過程要求對key必須能排序。詳見后面排序案例。

@Override
public int compareTo(FlowBean o) {
  // 倒序排列,從大到小
  return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

3??序列化案例實(shí)操
?1. 需求 : 統(tǒng)計(jì)每一個手機(jī)號耗費(fèi)的總上行流量、下行流量、總流量;
(1)輸入數(shù)據(jù)

id     手機(jī)號碼         網(wǎng)絡(luò)ip        上行流量  下行流量     網(wǎng)絡(luò)狀態(tài)碼
1   1363157985066  120.196.100.82    2481     24681       200
2   1363157995052  120.197.40.4      264      0           200
3   1363157991076  120.196.100.99    132      1512        200
4   1363154400022  120.197.40.4      240      0           200
5   1363157993044  120.196.100.99    1527     2106        200
6   1363157995074  120.197.40.4      4116     1432        200
7   1363157993055  120.196.100.99    1116     954         200
8   1363157995033  120.197.40.4      3156     2936        200
9   1363157983019  120.196.100.82    240      0           200
10  1363157984041  120.197.40.4      6960     690         200
11  1363157973098  120.197.40.4      3659     3538        200
12  1363157986029  120.196.100.99    1938     180         200
13  1363157992093  120.196.100.99    918      4938        200
14  1363157986041  120.197.40.4      180      180         200
15  1363157984040  120.197.40.4      1938     2910        200
16  1363157995093  120.196.100.82    3008     3720        200
17  1363157982040  120.196.100.99    7335     110349      200
18  1363157986072  120.196.100.99    9531     2412        200
19  1363157990043  120.196.100.55    11058    48243       200
20  1363157988072  120.196.100.82    120      120         200
21  1363157985066  120.196.100.82    2481     24681       200
22  1363157993055  120.196.100.99    1116     954         200

(2)期望輸出數(shù)據(jù)格式:

 手機(jī)號碼      上行流量  下行流量  總流量
13560436666    1116     954    2070

?2. 需求分析


?3.編寫MapReduce程序
(1)編寫流量統(tǒng)計(jì)的Bean對象

package com.xxx.mapreduce.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// 1 實(shí)現(xiàn)writable接口
public class FlowBean implements Writable{

  private long upFlow;
  private long downFlow;
  private long sumFlow;
  
  //2  反序列化時,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有
  public FlowBean() {
      super();
  }

  public FlowBean(long upFlow, long downFlow) {
      super();
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      this.sumFlow = upFlow + downFlow;
  }
  
  //3  寫序列化方法
  @Override
  public void write(DataOutput out) throws IOException {
      out.writeLong(upFlow);
      out.writeLong(downFlow);
      out.writeLong(sumFlow);
  }
  
  //4 反序列化方法
  //5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致
  @Override
  public void readFields(DataInput in) throws IOException {
      this.upFlow  = in.readLong();
      this.downFlow = in.readLong();
      this.sumFlow = in.readLong();
  }

  // 6 編寫toString方法,方便后續(xù)打印到文本
  @Override
  public String toString() {
      return upFlow + "\t" + downFlow + "\t" + sumFlow;
  }

  public long getUpFlow() {
      return upFlow;
  }

  public void setUpFlow(long upFlow) {
      this.upFlow = upFlow;
  }

  public long getDownFlow() {
      return downFlow;
  }

  public void setDownFlow(long downFlow) {
      this.downFlow = downFlow;
  }

  public long getSumFlow() {
      return sumFlow;
  }

  public void setSumFlow(long sumFlow) {
      this.sumFlow = sumFlow;
  }
}

(2)編寫Mapper

package com.xxx.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
  
  FlowBean v = new FlowBean();
  Text k = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {
      
      // 1 獲取一行
      String line = value.toString();
      
      // 2 切割字段
      String[] fields = line.split("\t");
      
      // 3 封裝對象
      // 取出手機(jī)號碼
      String phoneNum = fields[1];

      // 取出上行流量和下行流量
      long upFlow = Long.parseLong(fields[fields.length - 3]);
      long downFlow = Long.parseLong(fields[fields.length - 2]);

      k.set(phoneNum);
      v.set(downFlow, upFlow);
      
      // 4 寫出
      context.write(k, v);
  }
}

(3)編寫Reducer

package com.xxx.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

  @Override
  protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

      long sum_upFlow = 0;
      long sum_downFlow = 0;

      // 1 遍歷所用bean,將其中的上行流量,下行流量分別累加
      for (FlowBean flowBean : values) {
          sum_upFlow += flowBean.getUpFlow();
          sum_downFlow += flowBean.getDownFlow();
      }

      // 2 封裝對象
      FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
      
      // 3 寫出
      context.write(key, resultBean);
  }
}

(4)編寫Driver驅(qū)動類

package com.xxx.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

  public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
      
      // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
      args = new String[] { "e:/input/inputflow", "e:/output1" };

      // 1 獲取配置信息,或者job對象實(shí)例
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 6 指定本程序的jar包所在的本地路徑
      job.setJarByClass(FlowsumDriver.class);

      // 2 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
      job.setMapperClass(FlowCountMapper.class);
      job.setReducerClass(FlowCountReducer.class);

      // 3 指定mapper輸出數(shù)據(jù)的kv類型
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(FlowBean.class);

      // 4 指定最終輸出的數(shù)據(jù)的kv類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(FlowBean.class);
      
      // 5 指定job的輸入原始文件所在目錄
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 7 將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

MapReduce框架原理

1?? InputFormat數(shù)據(jù)輸入


1.1 切片與MapTask并行度決定機(jī)制
??MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個Job的處理速度。
??思考:1G的數(shù)據(jù),啟動8MapTask,可以提高集群的并發(fā)處理能力。那么1K的數(shù)據(jù),也啟動8MapTask,會提高集群性能嗎?MapTask并行任務(wù)是否越多越好呢?哪些因素影響了MapTask并行度?
1.MapTask并行度決定機(jī)制
?數(shù)據(jù)塊:BlockHDFS物理上把數(shù)據(jù)分成一塊一塊。
?數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對輸入進(jìn)行分片,并不會在磁盤上將其切分成片進(jìn)行存儲。

2?? Job提交流程源碼和切片源碼詳解
1.Job提交流程源碼詳解

waitForCompletion()
submit();
// 1建立連接
  connect();  
      // 1)創(chuàng)建提交Job的代理
      new Cluster(getConfiguration());
          // (1)判斷是本地yarn還是遠(yuǎn)程
          initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
  // 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

  // 2)獲取jobid ,并創(chuàng)建Job路徑
  JobID jobId = submitClient.getNewJobID();

  // 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);  
  rUploader.uploadFiles(job, jobSubmitDir);

// 4)計(jì)算切片,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
      maps = writeNewSplits(job, jobSubmitDir);
      input.getSplits(job);

// 5)向Stag路徑寫XML配置文件
writeConf(conf, submitJobFile);
  conf.writeXml(out);

// 6)提交Job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2.FileInputFormat切片源碼解析(input.getSplits(job))

3??FileInputFormat切片機(jī)制

4??CombineTextInputFormat切片機(jī)制
??框架默認(rèn)的TextInputFormat切片機(jī)制是對任務(wù)按文件規(guī)劃切片,不管文件多小,都會是一個單獨(dú)的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產(chǎn)生大量的MapTask,處理效率極其低下。
?1、應(yīng)用場景:
?CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。
?2、虛擬存儲切片最大值設(shè)置
?CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虛擬存儲切片最大值設(shè)置最好根據(jù)實(shí)際的小文件大小情況來設(shè)置具體的值。
?3、切片機(jī)制
?生成切片過程包括:虛擬存儲過程和切片過程二部分。

?(1)虛擬存儲過程:
??將輸入目錄下所有文件大小,依次和設(shè)置的setMaxInputSplitSize值比較,如果不大于設(shè)置的最大值,邏輯上劃分一個塊。如果輸入文件大于設(shè)置的最大值且大于兩倍,那么以最大值切割一塊;當(dāng)剩余數(shù)據(jù)大小超過設(shè)置的最大值且不大于最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現(xiàn)太小切片)。
??例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M,則先邏輯上分成一個4M。剩余的大小為4.02M,如果按照4M邏輯劃分,就會出現(xiàn)0.02M的小的虛擬存儲文件,所以將剩余的4.02M文件切分成(2.01M和2.01M)兩個文件。
?(2)切片過程:
??(a)判斷虛擬存儲的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨(dú)形成一個切片。
??(b)如果不大于則跟下一個虛擬存儲文件進(jìn)行合并,共同形成一個切片。
??(c)測試舉例:有4個小文件大小分別為1.7M、5.1M3.4M以及6.8M這四個小文件,則虛擬存儲之后形成6個文件塊,大小分別為:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最終會形成3個切片,大小分別為:
(1.7+2.55)M(2.55+3.4)M,(3.4+3.4)M
5??CombineTextInputFormat案例實(shí)操
1.需求 : 將輸入的大量小文件合并成一個切片統(tǒng)一處理。
(1)輸入數(shù)據(jù) : 準(zhǔn)備4個小文件
(2)期望 : 期望一個切片處理4個文件
2.實(shí)現(xiàn)過程
(1)不做任何處理,運(yùn)行1.6節(jié)的WordCount案例程序,觀察切片個數(shù)為4.
(2)在WordcountDriver中增加如下代碼,運(yùn)行程序,并觀察運(yùn)行的切片個數(shù)為3。
?(a)驅(qū)動類中添加代碼如下:

// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲切片最大值設(shè)置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

?(b)運(yùn)行如果為3個切片。

(3)在WordcountDriver中增加如下代碼,運(yùn)行程序,并觀察運(yùn)行的切片個數(shù)為1。
?(a)驅(qū)動中添加代碼如下:

// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲切片最大值設(shè)置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

?(b)運(yùn)行如果為1個切片。


6??FileInputFormat實(shí)現(xiàn)類

7??KeyValueTextInputFormat使用案例
1.需求 : 統(tǒng)計(jì)輸入文件中每一行的第一個單詞相同的行數(shù)。
(1)輸入數(shù)據(jù)

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

(2)期望結(jié)果數(shù)據(jù)

banzhang   2
xihuan 2

2.需求分析

3.代碼實(shí)現(xiàn)
(1)編寫Mapper

package com.xxx.mapreduce.KeyValueTextInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{
  
// 1 設(shè)置value
  LongWritable v = new LongWritable(1);  
   
  @Override
  protected void map(Text key, Text value, Context context)
          throws IOException, InterruptedException {

// banzhang ni hao
       
       // 2 寫出
       context.write(key, v);  
  }
}

(2)編寫Reducer

package com.xxx.mapreduce.KeyValueTextInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
  
   LongWritable v = new LongWritable();  
   
  @Override
  protected void reduce(Text key, Iterable<LongWritable> values,  Context context) throws IOException, InterruptedException {
      
       long sum = 0L;  

       // 1 匯總統(tǒng)計(jì)
       for (LongWritable value : values) {  
           sum += value.get();  
       }
        
       v.set(sum);  
        
       // 2 輸出
       context.write(key, v);  
  }
}

(3)編寫Driver

package com.xxx.mapreduce.keyvaleTextInputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KVTextDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
      
      Configuration conf = new Configuration();
      // 設(shè)置切割符
      conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
      // 1 獲取job對象
      Job job = Job.getInstance(conf);
      
      // 2 設(shè)置jar包位置,關(guān)聯(lián)mapper和reducer
      job.setJarByClass(KVTextDriver.class);
      job.setMapperClass(KVTextMapper.class);
      job.setReducerClass(KVTextReducer.class);
              
      // 3 設(shè)置map輸出kv類型
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(LongWritable.class);

      // 4 設(shè)置最終輸出kv類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(LongWritable.class);
      
      // 5 設(shè)置輸入輸出數(shù)據(jù)路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      
      // 設(shè)置輸入格式
      job.setInputFormatClass(KeyValueTextInputFormat.class);
      
      // 6 設(shè)置輸出數(shù)據(jù)路徑
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
      // 7 提交job
      job.waitForCompletion(true);
  }
}

8??NLineInputFormat使用案例
1.需求 : 對每個單詞進(jìn)行個數(shù)統(tǒng)計(jì),要求根據(jù)每個輸入文件的行數(shù)來規(guī)定輸出多少個切片。此案例要求每三行放入一個切片中。
(1)輸入數(shù)據(jù)

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

(2)期望輸出數(shù)據(jù) Number of splits:4
2.需求分析

3.代碼實(shí)現(xiàn)
(1)編寫Mapper

package com.xxx.mapreduce.nline;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
  
  private Text k = new Text();
  private LongWritable v = new LongWritable(1);
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {
      
       // 1 獲取一行
     String line = value.toString();
       
       // 2 切割
       String[] splited = line.split(" ");
       
       // 3 循環(huán)寫出
       for (int i = 0; i < splited.length; i++) {
          
          k.set(splited[i]);
          
          context.write(k, v);
       }
  }
}

(2)編寫Reducer

package com.xxx.mapreduce.nline;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
  
  LongWritable v = new LongWritable();
  
  @Override
  protected void reduce(Text key, Iterable<LongWritable> values,  Context context) throws IOException, InterruptedException {
      
       long sum = 0l;

       // 1 匯總
       for (LongWritable value : values) {
           sum += value.get();
       }  
       
       v.set(sum);
       
       // 2 輸出
       context.write(key, v);
  }
}

(3)編寫Driver

package com.xxx.mapreduce.nline;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NLineDriver {
  
  public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
      
   // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
   args = new String[] { "e:/input/inputword", "e:/output1" };

       // 1 獲取job對象
       Configuration configuration = new Configuration();
       Job job = Job.getInstance(configuration);
       
       // 7設(shè)置每個切片InputSplit中劃分三條記錄
       NLineInputFormat.setNumLinesPerSplit(job, 3);
         
       // 8使用NLineInputFormat處理記錄數(shù)  
       job.setInputFormatClass(NLineInputFormat.class);  
         
       // 2設(shè)置jar包位置,關(guān)聯(lián)mapper和reducer
       job.setJarByClass(NLineDriver.class);  
       job.setMapperClass(NLineMapper.class);  
       job.setReducerClass(NLineReducer.class);  
       
       // 3設(shè)置map輸出kv類型
       job.setMapOutputKeyClass(Text.class);  
       job.setMapOutputValueClass(LongWritable.class);  
       
       // 4設(shè)置最終輸出kv類型
       job.setOutputKeyClass(Text.class);  
       job.setOutputValueClass(LongWritable.class);  
         
       // 5設(shè)置輸入輸出數(shù)據(jù)路徑
       FileInputFormat.setInputPaths(job, new Path(args[0]));  
       FileOutputFormat.setOutputPath(job, new Path(args[1]));  
         
       // 6提交job
       job.waitForCompletion(true);  
  }
}

9??自定義InputFormat


??自定義InputFormat案例實(shí)操
?無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應(yīng)解決方案。可以自定義InputFormat實(shí)現(xiàn)小文件的合并。
1.需求 : 將多個小文件合并成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進(jìn)制形式的key-value對的文件格式),SequenceFile里面存儲著多個文件,存儲的形式為文件路徑+名稱為key,文件內(nèi)容為value
(1)輸入數(shù)據(jù)

//one.txt
yongpeng weidong weinan
sanfeng luozong xiaoming
//two.txt
longlong fanfan
mazong kailun yuhang yixin
longlong fanfan
mazong kailun yuhang yixin
//three.txt
shuaige changmo zhenqiang 
dongli lingu xuanxuan

(2)期望輸出文件格式
2.需求分析

3.程序?qū)崿F(xiàn)
(1)自定義InputFromat

package com.xxx.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

// 定義類繼承FileInputFormat
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
  
  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
      return false;
  }

  @Override
  public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)   throws IOException, InterruptedException {
      
      WholeRecordReader recordReader = new WholeRecordReader();
      recordReader.initialize(split, context);
      
      return recordReader;
  }
}

(2)自定義RecordReader

package com.xxx.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

  private Configuration configuration;
  private FileSplit split;
  
  private boolean isProgress= true;
  private BytesWritable value = new BytesWritable();
  private Text k = new Text();

  @Override
  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
      
      this.split = (FileSplit)split;
      configuration = context.getConfiguration();
  }

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
      
      if (isProgress) {

          // 1 定義緩存區(qū)
          byte[] contents = new byte[(int)split.getLength()];
          
          FileSystem fs = null;
          FSDataInputStream fis = null;
          
          try {
              // 2 獲取文件系統(tǒng)
              Path path = split.getPath();
              fs = path.getFileSystem(configuration);
              
              // 3 讀取數(shù)據(jù)
              fis = fs.open(path);
              
              // 4 讀取文件內(nèi)容
              IOUtils.readFully(fis, contents, 0, contents.length);
              
              // 5 輸出文件內(nèi)容
              value.set(contents, 0, contents.length);

// 6 獲取文件路徑及名稱
String name = split.getPath().toString();

// 7 設(shè)置輸出的key值
k.set(name);

          } catch (Exception e) {
              
          }finally {
              IOUtils.closeStream(fis);
          }
          
          isProgress = false;
          
          return true;
      }
      
      return false;
  }

  @Override
  public Text getCurrentKey() throws IOException, InterruptedException {
      return k;
  }

  @Override
  public BytesWritable getCurrentValue() throws IOException, InterruptedException {
      return value;
  }

  @Override
  public float getProgress() throws IOException, InterruptedException {
      return 0;
  }

  @Override
  public void close() throws IOException {
  }
}

(3)編寫SequenceFileMapper類處理流程

package com.xxx.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
  
  @Override
  protected void map(Text key, BytesWritable value,           Context context)        throws IOException, InterruptedException {

      context.write(key, value);
  }
}

(4)編寫SequenceFileReducer類處理流程

package com.xxx.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

  @Override
  protected void reduce(Text key, Iterable<BytesWritable> values, Context context)        throws IOException, InterruptedException {

      context.write(key, values.iterator().next());
  }
}

(5)編寫SequenceFileDriver類處理流程

package com.xxx.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
      
      // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
      args = new String[] { "e:/input/inputinputformat", "e:/output1" };

    // 1 獲取job對象
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 設(shè)置jar包存儲位置、關(guān)聯(lián)自定義的mapper和reducer
      job.setJarByClass(SequenceFileDriver.class);
      job.setMapperClass(SequenceFileMapper.class);
      job.setReducerClass(SequenceFileReducer.class);

      // 7設(shè)置輸入的inputFormat
      job.setInputFormatClass(WholeFileInputformat.class);

      // 8設(shè)置輸出的outputFormat
   job.setOutputFormatClass(SequenceFileOutputFormat.class);
      
// 3 設(shè)置map輸出端的kv類型
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(BytesWritable.class);
      
      // 4 設(shè)置最終輸出端的kv類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(BytesWritable.class);

      // 5 設(shè)置輸入輸出路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 提交job
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

MapReduce工作流程
MapReduce詳細(xì)工作流程1
MapReduce詳細(xì)工作流程2

上面的流程是整個MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結(jié)束,具體Shuffle過程詳解,如下:
1)MapTask收集我們的map()方法輸出的kv對,放到內(nèi)存緩沖區(qū)中
2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合并成大的溢出文件
4)在溢出過程及合并的過程中,都要調(diào)用Partitioner進(jìn)行分區(qū)和針對key進(jìn)行排序
5)ReduceTask根據(jù)自己的分區(qū)號,去各個MapTask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6)ReduceTask會取到同一個分區(qū)的來自不同MapTask的結(jié)果文件,ReduceTask會將這些文件再進(jìn)行合并(歸并排序)
7)合并成大文件后,Shuffle的過程也就結(jié)束了,后面進(jìn)入ReduceTask的邏輯運(yùn)算過程(從文件中取出一個一個的鍵值對Group,調(diào)用用戶自定義的reduce()方法)

注意
Shuffle中的緩沖區(qū)大小會影響到MapReduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快。緩沖區(qū)的大小可以通過參數(shù)調(diào)整,參數(shù):io.sort.mb默認(rèn)100M。

源碼解析流程

context.write(k, NullWritable.get());
output.write(key, value);
collector.collect(key, value,partitioner.getPartition(key, value, partitions));
HashPartitioner();
collect()
close()
collect.flush()
sortAndSpill()
sort()   QuickSort
mergeParts();
collector.close();
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • WordCount案例實(shí)操 環(huán)境準(zhǔn)備: 配置JDK Hadoop環(huán)境變量 WordCount案例: maven...
    bullion閱讀 644評論 0 0
  • 天蝎座2018年星座運(yùn)勢:目標(biāo)重現(xiàn),迷茫退卻 進(jìn)入2018年,天蝎座們顯得有些躍躍欲試,你們在在新的一年會對未來有...
    風(fēng)鈴百合_06e3閱讀 465評論 0 1
  • 隨便走走閱讀 286評論 1 0
  • 維生素B6易溶于水,對酸相當(dāng)穩(wěn)定,在堿性溶液中易破壞,在中性溶液中易被光破壞,對氧較穩(wěn)定。 維生素B6生理功能:不...
    拉笨笨閱讀 629評論 0 0
  • 想起多巴胺 你就點(diǎn)燃興奮感 輕輕念起雪花 小河邊 迎春花初現(xiàn) 春天不遠(yuǎn) 把酒問青天 是流年是清歡 念叨我的精神家園...
    龍青閱讀 221評論 0 6

友情鏈接更多精彩內(nèi)容