Hadoop-Eclipse mapreduce測(cè)試WordCountMapReduce

MapReduce編程重點(diǎn)把握

MapReduce核心概念

    中心思想:分而治之,分布式計(jì)算模型      
    Map(映射)      
    分布式的計(jì)算模型,處理海量數(shù)據(jù)      
    一個(gè)簡(jiǎn)單的MR程序需要指定:      
    map()、reduce()、input、output      
    處理的數(shù)據(jù)放在input中,處理的結(jié)果放在output中      ![](http://upload-images.jianshu.io/upload_images/2409103-c893fe48912e145a.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)      
    MR有一定的格式如下:      


以<key, value>進(jìn)行流向的
input -> map() -> reduce() -> output

思考幾個(gè)問題

    1. 對(duì)處理的文件轉(zhuǎn)化成什么樣的<key, value>?      
    2. map()輸出結(jié)果變成什么樣的<key, value>?      
    3. reduce()是怎么處理<key, value>的,輸出的<key, value>又是什么樣的?

詞頻統(tǒng)計(jì)wordcount的具體執(zhí)行過程


默認(rèn)是把每一行當(dāng)成一個(gè)<key, value>
hadoop hdfs -> <0, hadoop hdfs>
hadoop mapreduce hadoop yarn
hadoop hello
mapreduce hadoop
yarn hadoop
比如我們要統(tǒng)計(jì)單詞 (1) 分割單詞 ,按照空格進(jìn)行分詞
hadoop hdfs -> hadoop hdfs
記錄出現(xiàn)的次數(shù),每出現(xiàn)一次記錄為:
Map() <hadoop, 1> <mapreduce, 1>
<hadoop, 1> <yarn, 1>
reduce的輸入就是map的輸出
reduce() 將相同key的value累加到一起
<hadoop, list(1,1)>
<mapreduce, list(1)>
<hdfs, list(1)>
方法的重寫shift+Alt+S

MapReduce代碼邏輯主干初寫

package com.beifeng.bigdata.senior.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WordCountMapReduce {
//Step 1: Mapper Class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub } }
//Step 2: Reducer Class public static class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable>
values, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub }
public int run(String[] args) {
// TODO Auto-generated method stub return 0; }
}

//Step 3: Driver public int run(String[] args) throws Exception {
//read configuration file
Configuration configuration = new Configuration();
//create job
Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass());
//set job
//input:
Path inpath = new Path(args[0]); FileInputFormat.addInputPath(job, inpath);
//mapper
job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);

//reducer
job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
//submit job
boolean isSucess = job.waitForCompletion(true);
return isSucess ? 0:1;
}
public static void main(String[] args) throws Exception { args = new String [] {
"hdfs://bigdata-senior01.beifeng.com:8020/user/beifeng/input/",

//
"hdfs://bigdata-senior01.beifeng.com:8020/user/beifeng/output9"
};
//run job
int status = new WordCountReducer().run(args);

System.exit(status);
}
}

對(duì)<key, value>在map和reduce階段的處理

package com.beifeng.bigdata.senior.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WordCountMapReduce {
//Step 1: Mapper Class
public static class WordCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapOutputKey = new Text();
private final static IntWritable mapOutputValue = new IntWritable();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//line value
String lineValue = value.toString();
//split line words into separated word
String [] strs = lineValue.split(" ");
//record each value by "for" iterator to build <key, value>
for (String str : strs) {
}
}
}
//Step 2: Reducer Class
public static class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outputValue = new IntWritable(); @Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum = 0;
//Iterator
for(IntWritable value : values){
sum += value.get();
}
//set output value
outputValue.set(sum);
//output
context.write(key, outputValue);
}
public int run(String[] args) {
// TODO Auto-generated method stub
return 0;
}
}
//Step 3: Driver
public int run(String[] args) throws Exception {
//read configuration file
Configuration configuration = new Configuration();
//create job
Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass());
//set job
//input:
Path inpath = new Path(args[0]); FileInputFormat.addInputPath(job, inpath);
//mapper
job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //reducer
job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
//submit job
boolean isSucess = job.waitForCompletion(true);
return isSucess ? 0:1;
}
public static void main(String[] args) throws Exception { args = new String [] {
"hdfs://bigdata-senior01.beifeng.com:8020/user/beifeng/input/",
"hdfs://bigdata-senior01.beifeng.com:8020/user/beifeng/output"
};
//run job
int status = new WordCountMapReduce().run(args); System.exit(status);
}
}

嘗試使用mapreduce來處理wordcount

package com.beifeng.bigdata.senior.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountMapReduce {
//Step 1: Mapper Class
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapOutputKey = new Text();
private final static IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub System.out.println("map-in-0-key" + key.get() + " -- " + "map-in-value" + value.toString());
//line value
String lineValue = value.toString();
//split line words into separated word
String [] strs = lineValue.split(" ");
//record each value by "for" iterator to build <key, value> for (String str : strs) {
//set map output key mapOutputKey.set(str);
//output
context.write(mapOutputKey, mapOutputValue); }
}
}
//Step 2: Reducer Class
public static class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outputValue = new IntWritable(); @Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum = 0;
//Iterator
for(IntWritable value : values){
sum += value.get();
}
//set output value
outputValue.set(sum);
//output
context.write(key, outputValue);
}
}
//Step 3: Driver
public int run(String[] args) throws Exception {
//read configuration file
Configuration configuration = new Configuration();
//create job
Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass());
//set job
//input:
Path inpath = new Path(args[0]); FileInputFormat.addInputPath(job, inpath);
//output
Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); //mapper
job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //reducer
job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
//submit job
boolean isSucess = job.waitForCompletion(true);
return isSucess ? 0 : 1;
}
public static void main(String[] args) throws Exception { args = new String [] {
"hdfs://bigdata-senior01.beifeng.com:8020/user/beifeng/input",
"hdfs://bigdata-senior01.beifeng.com:8020/user/beifeng/output13"};
//run job
int status = new WordCountMapReduce().run(args); System.exit(status);
}
}

打成jar包在yarn上運(yùn)行


bin/yarn jar jars/mr-wc2.jar /user/beifeng/input /user/beifeng/output121

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938閱讀 691評(píng)論 0 1
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評(píng)論 19 139
  • 1.分布式計(jì)算思想: 1.1基本思想:mapreduce是兩個(gè)操作步驟,即映射和規(guī)約也是這個(gè)分布式計(jì)算的思想。即實(shí)...
    起個(gè)什么呢稱呢閱讀 1,283評(píng)論 0 2
  • 1.配置插件 把hadoop-eclipse-plugin-1.2.1.jar拷貝到eclipse的plugins...
    王書劍閱讀 1,112評(píng)論 0 2
  • 繁華的都市,燈紅酒綠,身處其中,方是熱鬧!繁華落幕,而你只不過是一只很小的螢火蟲,發(fā)光,發(fā)熱,連自己的路都照不亮~...
    梧桐語杉閱讀 224評(píng)論 0 1

友情鏈接更多精彩內(nèi)容