MapReduce開發(fā)階段
MapReduce 的開發(fā)一共有八個步驟, 其中 Map 階段分為 2 個步驟,Shuwle 階段 4 個步 驟,Reduce 階段分為 2 個步驟
Map 階段 2 個步驟
- 設置 InputFormat 類, 將數(shù)據(jù)切分為 Key-Value(K1和V1) 對, 輸入到第二步
- 自定義 Map 邏輯, 將第一步的結(jié)果轉(zhuǎn)換成另外的 Key-Value(K2和V2) 對, 輸出結(jié)果
Shuffle 階段 4 個步驟
- 對輸出的 Key-Value 對進行分區(qū)
- 對不同分區(qū)的數(shù)據(jù)按照相同的 Key 排序
- (可選) 對分組過的數(shù)據(jù)初步規(guī)約, 降低數(shù)據(jù)的網(wǎng)絡拷貝
- 對數(shù)據(jù)進行分組, 相同 Key 的 Value 放入一個集合中
Reduce 階段 2 個步驟
- 對多個 Map 任務的結(jié)果進行排序以及合并, 編寫 Reduce 函數(shù)實現(xiàn)自己的邏輯, 對輸入的 Key-Value 進行處理, 轉(zhuǎn)為新的 Key-Value(K3和V3)輸出
- 設置 OutputFormat 處理并保存 Reduce 輸出的 Key-Value 數(shù)據(jù)
以上所有步驟加起來一共8個。
mapReduce 調(diào)用 API( wordCount案例 )

image
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cyf</groupId>
<artifactId>MyWordCount</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>cn.itcast.mapreduce.WordCountDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
WordCountMapper.java
package cn.itcast.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import static com.sun.corba.se.spi.activation.IIOP_CLEAR_TEXT.value;
/**
* @author AllenWoon
* <p>
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:是指框架讀取到的數(shù)據(jù)的key類型
* 在默認的讀取數(shù)據(jù)組件InputFormat下,讀取的key是一行文本的偏移量,所以key的類型是long類型的
* <p>
* VALUEIN指框架讀取到的數(shù)據(jù)的value類型
* 在默認的讀取數(shù)據(jù)組件InputFormat下,讀到的value就是一行文本的內(nèi)容,所以value的類型是String類型的
* <p>
* keyout是指用戶自定義邏輯方法返回的數(shù)據(jù)中key的類型 這個是由用戶業(yè)務邏輯決定的。
* 在我們的單詞統(tǒng)計當中,我們輸出的是單詞作為key,所以類型是String
* <p>
* VALUEOUT是指用戶自定義邏輯方法返回的數(shù)據(jù)中value的類型 這個是由用戶業(yè)務邏輯決定的。
* 在我們的單詞統(tǒng)計當中,我們輸出的是單詞數(shù)量作為value,所以類型是Integer
* <p>
* 但是,String ,Long都是jdk中自帶的數(shù)據(jù)類型,在序列化的時候,效率比較低。hadoop為了提高序列化的效率,他就自己自定義了一套數(shù)據(jù)結(jié)構(gòu)。
* <p>
* 所以說在我們的hadoop程序中,如果該數(shù)據(jù)需要進行序列化(寫磁盤,或者網(wǎng)絡傳輸),就一定要用實現(xiàn)了hadoop序列化框架的數(shù)據(jù)類型
* <p>
* <p>
* Long------->LongWritable
* String----->Text
* Integer---->IntWritable
* null------->nullWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 這個map方法就是mapreduce程序中被主體程序MapTask所調(diào)用的用戶業(yè)務邏輯方法
* Maptask會驅(qū)動我們的讀取數(shù)據(jù)組件inputFormat去讀取數(shù)據(jù)(KEYIN,VALUEIN),每讀取一個(k,v),也就會傳入到這個用戶寫的map方法中去調(diào)用一次
* 在默認的inputFormat實現(xiàn)中,此處的key就是一行的起始偏移量,value就是一行的內(nèi)容
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lines = value.toString();
String[] words = lines.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
WordCountReducer.java
package cn.itcast.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/***
* @author AllenWoon
* <p>
* reducetask在調(diào)用我們的reduce方法
* <p>
* reducetask應該接收到map階段(前一階段)中所有maptask輸出的數(shù)據(jù)中的一部分;
* (key.hashcode% numReduceTask==本ReduceTask編號)
* <p>
* reducetask將接收到的kv數(shù)據(jù)拿來處理時,是這樣調(diào)用我們的reduce方法的:
* <p>
* 先講自己接收到的所有的kv對按照k分組(根據(jù)k是否相同)
* <p>
* 然后將一組kv中的k傳給我們的reduce方法的key變量,把這一組kv中的所有的v用一個迭代器傳給reduce方法的變量values
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
WordCountDriver.java
package cn.itcast.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @author AllenWoon
* <p>
* 本類是客戶端用來指定wordcount job程序運行時候所需要的很多參數(shù)
* <p>
* 比如:指定哪個類作為map階段的業(yè)務邏輯類 哪個類作為reduce階段的業(yè)務邏輯類
* 指定用哪個組件作為數(shù)據(jù)的讀取組件 數(shù)據(jù)結(jié)果輸出組件
* 指定這個wordcount jar包所在的路徑
* <p>
* ....
* 以及其他各種所需要的參數(shù)
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//告訴框架,我們的程序所在jar包的位置
job.setJar("/root/wordcount.jar");
//告訴程序,我們的程序所用好的mapper類和reduce類是什么
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//告訴框架,我們的程序輸出的數(shù)據(jù)類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
//告訴框架我們程序使用的數(shù)據(jù)讀取組件 結(jié)果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內(nèi)置的一種讀取數(shù)據(jù)的組件 準確的說叫做讀取文本文件的輸入組件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數(shù)據(jù)文件在哪個路徑下
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
//告訴框架我們的輸出結(jié)果輸出的位置
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
Boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
或者用第二種方法
package cn.leon.mapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//1 . 創(chuàng)建job對象
Job job = Job.getInstance(super.getConf(),JobMain.class.getSimpleName());
//打包到集群上面運行時候,必須要添加以下配置,指定程序的main函數(shù)
job.setJarByClass(JobMain.class);
//2. 配置job任務(8個步驟)
//第一步:設置輸入類和輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/wordcount"));
//第二步:設置mapper類
job.setMapperClass(WordCountMapper.class);
//設置我們map階段完成之后的輸出類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三步,第四步,第五步,第六步,省略
//第七步:設置我們的reduce類
job.setReducerClass(WordCountReducer.class);
//設置我們reduce階段完成之后的輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步:設置輸出類以及輸出路徑
Path path = new Path("hdfs://node1:8020/wordcount_out");
//判斷目錄是否存在
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020/wordcount_cout"),new Configuration());
if (fileSystem.exists(path)){
//刪除目標目錄
fileSystem.delete(path,true);
}
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,path);
boolean b = job.waitForCompletion(true);
//3. 等待任務結(jié)束
return b?0:1;
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Tool tool = new JobMain();
//啟動Job任務
int run = ToolRunner.run(configuration, tool, args);
System.exit(run);
}
}
先建兩個文件1.txt 2.txt
內(nèi)容如下
1.txt
hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello
2.txt
hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello
在hdfs上創(chuàng)建文件夾
hadoop fs -mkdir -p /wordcount/input
把1.txt 2.txt放在/wordcount/input目錄下
hadoop fs -put 1.txt 2.txt /wordcount/input
上傳jar包
上傳wordcount.jar

image
運行
hadoop jar wordcount.jar cn.itcast.mapreduce.WordCountDriver
查看生成的結(jié)果文件
hdfs dfs -cat /wordcount/output/part-r-00000

image