OS X下MapReduce程序運(yùn)行的幾種模式

1.MapReduce程序運(yùn)行的模式簡介

  1. 程序運(yùn)行模式

    • 本地模式
      • 利用本地的JVM運(yùn)行,使用本地的IDE進(jìn)行debug
    • 遠(yuǎn)程模式
      • 提交至遠(yuǎn)程的集群上運(yùn)行,使用本地的IDE進(jìn)行debug
      • 提交至遠(yuǎn)程的集群上運(yùn)行,不使用本地IDE進(jìn)行debug
  2. 數(shù)據(jù)存放路徑

    • 遠(yuǎn)程文件系統(tǒng)(hdfs)
    • 本地文件系統(tǒng)(local file system)


2.開發(fā)環(huán)境簡介

  • 操作系統(tǒng):macOS Sierra 10.12.6
  • Java版本:1.8.0_131-b11
  • Hadoop版本:hadoop-2.7.4
  • IDE:IntelliJ IDEA


3.MapReduce程序運(yùn)行例子

3.1 程序需求

學(xué)校里開設(shè)了多門課程,有語文(chinese)、數(shù)學(xué)(math)、英語(english)等。經(jīng)過了一次年級統(tǒng)考后,每個(gè)學(xué)生的成績都被記錄在多個(gè)文本文件中,文本文件格式如下。

  • math.txt
Ben 75
Jack 60
May 85
Tom 91
  • english.txt
Jack 72
May 60
Tom 62
Ben 90
  • chinese.txt
Ben 79
May 88
Tom 68
Jack 70

現(xiàn)需要根據(jù)以上的文本文件,算出每個(gè)學(xué)生在本次統(tǒng)考中的平均分,并將結(jié)果用一個(gè)總的文件averageScore.txt進(jìn)行存儲。averageScore.txt的格式如下。

  • averageScore.txt
#name #score
Ben 0.0
May 0.0
Tom 0.0
Jack 0.0


3.2 程序設(shè)計(jì)思路

3.2.1 Mapper的處理邏輯

Mapper每次從文本文件中讀取1行內(nèi)容,即調(diào)用1次map方法。Mapper需要把原始數(shù)據(jù)中一行的內(nèi)容拆分成學(xué)生姓名(student name)和該門課程的分?jǐn)?shù)(score)。按照需求,本程序最終要算出每一個(gè)學(xué)生的平均分,所以學(xué)生姓名應(yīng)作為一個(gè)key,對應(yīng)的value即為該生的平均分(實(shí)際上是不嚴(yán)謹(jǐn)?shù)?,因?yàn)樵趯?shí)際環(huán)境中會出現(xiàn)多個(gè)學(xué)生重名的現(xiàn)象,若不作特殊處理,key是不允許重復(fù)的。最根本的解決方案是采用學(xué)號作為key,但為了演示直觀,僅采用學(xué)生姓名作為key)

Mapper讀完一行的數(shù)據(jù)后,把{student name,score}這個(gè)key-value寫入中間結(jié)果,準(zhǔn)備傳送給Reducer作下一步的運(yùn)算。

3.2.2 Reducer的處理邏輯

Reducer接收到的數(shù)據(jù),實(shí)際上是一個(gè)key與該key對應(yīng)的value的一個(gè)集合(并不僅僅是一個(gè)value)。在本需求中,傳入reduce方法的參數(shù)是學(xué)生姓名,以及該生多門課程分?jǐn)?shù)的集合,類似于Ben,[60,70,80,...]。所以Reducer需要將集合中的分?jǐn)?shù)求和,然后求出平均值,最終得到一個(gè){student name, average score}key-value對。

3.2.3 具體代碼設(shè)計(jì)

  • AVGMapper類
    用于實(shí)現(xiàn)map方法
package mr;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * Created by marco on 2017/8/17.
 */
public class AVGMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String line = value.toString();
        if(line.length() == 0)  // 文件格式錯誤,出現(xiàn)空行
            return;
        String[] split = line.split(" ");
        String stuName = split[0];
        String stuScore = split[1];
        double score = Double.parseDouble(stuScore);    // 轉(zhuǎn)成double類型,方便后續(xù)求均值計(jì)算
        context.write(new Text(stuName), new DoubleWritable(score));
    }
}
  • AVGReducer類
    用于實(shí)現(xiàn)reduce方法
package mr;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**
 * Created by marco on 2017/8/17.
 */
public class AVGReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
    @Override
    protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
    {
        double sum = 0;
        int length = 0;
        for(DoubleWritable value : values)
        {
            sum += value.get();
            length++;
        }

        double avgScore = sum / (double)length;
        context.write(key, new DoubleWritable(avgScore));
    }
}

  • AVGRunner類
    用于關(guān)聯(lián)Mapper與Reducer,并創(chuàng)建MapReduce任務(wù)(Job)提交運(yùn)行。基本代碼如下所示。
package mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * Created by marco on 2017/8/17.
 */
public class AVGRunner
{
    static public void main(String[] args) throws Exception
    {
        // 設(shè)置hdfs的handler
        Configuration fsConf = new Configuration();
        fsConf.set("fs.default.name","hdfs://localhost:9000/");
        FileSystem fs = FileSystem.get(fsConf);

        // MapReduce的配置參數(shù)
        Configuration mrConf = new Configuration();

        // 新建一個(gè)求平均值的Job
        Job avgJob = Job.getInstance(mrConf);
        avgJob.setJarByClass(AVGRunner.class);

        // 設(shè)置Mapper類與Reducer類
        avgJob.setMapperClass(AVGMapper.class);
        avgJob.setReducerClass(AVGReducer.class);

        // 設(shè)置輸入輸出的數(shù)據(jù)結(jié)構(gòu)
        avgJob.setMapOutputKeyClass(Text.class);
        avgJob.setMapOutputValueClass(DoubleWritable.class);
        avgJob.setOutputKeyClass(Text.class);
        avgJob.setOutputValueClass(DoubleWritable.class);

        // 檢查結(jié)果輸出目錄,若已存在則刪除輸出目錄
        if(fs.exists(new Path("/avg/output")))
        {
            fs.delete(new Path("/avg/output"), true);
        }

        // 設(shè)置數(shù)據(jù)目錄以及結(jié)果輸出目錄
        FileInputFormat.setInputPaths(avgJob, new Path(""));
        FileOutputFormat.setOutputPath(avgJob, new Path(""));

        // 提交任務(wù),等待完成
        System.exit(avgJob.waitForCompletion(true)?0:1);
    }
}


3.3 MapReduce程序運(yùn)行

若使用本地文件系統(tǒng)的數(shù)據(jù)文件,且在本地模式運(yùn)行,無需配置hdfs相關(guān)的參數(shù),數(shù)據(jù)目錄以及結(jié)果輸出目錄填寫本地路徑即可。(確保結(jié)果輸出文件夾未被創(chuàng)建,否則會報(bào)異常)

// 均填寫本地文件路徑即可
FileInputFormat.setInputPaths(avgJob, new Path(""));
FileOutputFormat.setOutputPath(avgJob, new Path(""));


若使用hdfs上的數(shù)據(jù)文件,且在本地模式運(yùn)行,應(yīng)配置hdfs相關(guān)參數(shù),數(shù)據(jù)目錄以及結(jié)果輸出目錄均填寫hdfs的路徑。(確保結(jié)果輸出文件夾未被創(chuàng)建,否則會報(bào)異常)

// 設(shè)置hdfs參數(shù),并用該配置創(chuàng)建一個(gè)新的Job
Configuration fsConf = new Configuration();
fsConf.set("fs.default.name","hdfs://localhost:9000/");
Job avgJob = Job.getInstance(fsConf);


// 均填寫hdfs路徑即可
FileInputFormat.setInputPaths(avgJob, new Path(""));
FileOutputFormat.setOutputPath(avgJob, new Path(""));

3.3.1 本地模式運(yùn)行

本地模式運(yùn)行,直接編譯執(zhí)行AVGRunner的main方法即可,程序運(yùn)行結(jié)束后會在自行設(shè)置的結(jié)果輸出目錄中生成運(yùn)行結(jié)果。

3.3.2 遠(yuǎn)程集群運(yùn)行

首先使用IDE將程序打成一個(gè)jar包,本例中命名為hadoop.jar

提交到遠(yuǎn)程集群上運(yùn)行分兩種情況

  • 使用本地IDE(IntelliJ IDEA)運(yùn)行,任務(wù)被提交到集群運(yùn)行,但可使用IDE進(jìn)行跟蹤debug

    新建一個(gè)MapReduce的配置對象,將已經(jīng)打包好的jar包傳入配置中

      // MapReduce的配置參數(shù),遠(yuǎn)程運(yùn)行,本地debug
      Configuration mrConf = new Configuration();
      mrConf.set("mapreduce.job.jar","hadoop.jar");
      mrConf.set("mapreduce.framework.name","yarn");
      
      //利用以上配置新建一個(gè)Job
      Job avgJob = Job.getInstance(mrConf);
      avgJob.setJarByClass(AVGRunner.class);
    


  • 在終端直接使用hadoop命令將任務(wù)提交到集群運(yùn)行,無法使用IDE進(jìn)行跟蹤debug

    直接在終端中輸入hadoop命令

    hadoop jar $jar包名稱 $待執(zhí)行的類的名稱
    

    在本例中應(yīng)輸入

    hadoop jar avg.jar mr.AVGRunner
    

####################### 注意?? #######################

在OS X中,使用IntelliJ IDEA打包jar包后,若在終端中直接使用hadoop jar $jar包名稱 $待執(zhí)行的類的名稱提交MapReduce任務(wù),會報(bào)出異常,因?yàn)镺S X系統(tǒng)的文件系統(tǒng)對大小寫不敏感(case-insensitive)。

經(jīng)過對此異常的搜索,暫時(shí)的解決方案是通過刪除jar包中的LICENSE文件,使任務(wù)順利提交。

# 在終端中執(zhí)行以下命令
zip -d $jar包名稱 META-INF/LICENSE
zip -d $jar包名稱 LICENSE

#####################################################

可以看到使用了hadoop命令提交任務(wù)后,系統(tǒng)調(diào)用了RPC框架和Yarn框架中的一些服務(wù),用于遠(yuǎn)程運(yùn)行,而非使用LocalJobSubmitter于本地運(yùn)行。

并且在MapReduce任務(wù)管理頁面可看到任務(wù)已經(jīng)完成的歷史記錄。

4.總結(jié)

MapReduce任務(wù)可在本地運(yùn)行,也可提交到集群上運(yùn)行。

在開發(fā)初期,需要編寫Demo程序時(shí),可在本地進(jìn)行開發(fā)與測試,將數(shù)據(jù)文件放置在本地文件系統(tǒng),直接使用IDE運(yùn)行主類的main方法,觀察運(yùn)行結(jié)果。

上線前調(diào)試,可采用遠(yuǎn)程模式運(yùn)行,不直接使用hadoop命令提交,而是使用IDE進(jìn)行提交與debug,這樣既可以保證程序運(yùn)行在遠(yuǎn)處集群上(生產(chǎn)環(huán)境or開發(fā)環(huán)境),也可以在本地方便跟蹤調(diào)試。

可上線時(shí),使用hadoop命令直接提交到遠(yuǎn)程集群,并通過localhost:50070(默認(rèn)配置)的任務(wù)管理頁面進(jìn)行觀察。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容