MapReduce應用
二次排序
<div class='mdContent'>
二次排序的需求說明
在mapreduce操作時,shuffle階段會多次根據(jù)key值排序。但是在shuffle分組后,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時value值也是排序好的,這種需求就是二次排序。

測試的文件數(shù)據(jù)
a 1
a 5
a 7
a 9
b 3
b 8
b 10
未經過二次排序的輸出結果
a 9
a 7
a 5
a 1
b 10
b 8
b 3
第一種實現(xiàn)思路
直接在reduce端對分組后的values進行排序。
- reduce關鍵代碼
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Integer> valuesList = new ArrayList<Integer>();
// 取出value
for(IntWritable value : values) {
valuesList.add(value.get());
}
// 進行排序
Collections.sort(valuesList);
for(Integer value : valuesList) {
context.write(key, new IntWritable(value));
}
}
- 輸出結果
a 1
a 5
a 7
a 9
b 3
b 8
b 10
很容易發(fā)現(xiàn),這樣把排序工作都放到reduce端完成,當values序列長度非常大的時候,會對CPU和內存造成極大的負載。
- 注意的地方(容易被“坑”)
在reduce端對values進行迭代的時候,不要直接直接存儲value值或者key值,因為reduce方法會反復執(zhí)行多次,但key和value相關的對象只有兩個,reduce會反復重用這兩個對象。需要用相應的數(shù)據(jù)類型.get()取出后再存儲。
第二種實現(xiàn)思路
將map端輸出的<key,value>中的key和value組合成一個新的key(稱為newKey),value值不變。這里就變成<(key,value),value>,在針對newKey排序的時候,如果key相同,就再對value進行排序。
- 需要自定義的地方
- 自定義數(shù)據(jù)類型實現(xiàn)組合key
實現(xiàn)方式:繼承WritableComparable - 自定義partioner,形成newKey后保持分區(qū)規(guī)則任然按照key進行。保證不打亂原來的分區(qū)。
實現(xiàn)方式:繼承partitioner - 自動以分組,保持分組規(guī)則任然按照key進行。不打亂原來的分組
實現(xiàn)方式:繼承RawComparator
- 自定義數(shù)據(jù)類型關鍵代碼
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class PairWritable implements WritableComparable<PairWritable> {
// 組合key
private String first;
private int second;
public PairWritable() {
}
public PairWritable(String first, int second) {
this.set(first, second);
}
/**
* 方便設置字段
*/
public void set(String first, int second) {
this.first = first;
this.second = second;
}
/**
* 反序列化
*/
@Override
public void readFields(DataInput arg0) throws IOException {
this.first = arg0.readUTF();
this.second = arg0.readInt();
}
/**
* 序列化
*/
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(first);
arg0.writeInt(second);
}
/*
* 重寫比較器
*/
public int compareTo(PairWritable o) {
int comp = this.first.compareTo(o.first);
if(comp != 0) {
return comp;
} else { // 若第一個字段相等,則比較第二個字段
return Integer.valueOf(this.second).compareTo(
Integer.valueOf(o.getSecond()));
}
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
- 自定義分區(qū)規(guī)則
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
/*
* 默認的實現(xiàn) (key.hashCode() & Integer.MAX_VALUE) % numPartitions
* 讓key中first字段作為分區(qū)依據(jù)
*/
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
- 自定義分組比較器
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
public class SecondGroupComparator implements RawComparator<PairWritable> {
/*
* 對象比較
*/
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
/*
* 字節(jié)比較
* arg0,arg3為要比較的兩個字節(jié)數(shù)組
* arg1,arg2表示第一個字節(jié)數(shù)組要進行比較的收尾位置,arg4,arg5表示第二個
* 從第一個字節(jié)比到組合key中second的前一個字節(jié),因為second為int型,所以長度為4
*/
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
}
}
- map關鍵代碼
private PairWritable mapOutKey = new PairWritable();
private IntWritable mapOutValue = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineValue = value.toString();
String[] strs = lineValue.split("\t");
//設置組合key和value ==> <(key,value),value>
mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
mapOutValue.set(Integer.valueOf(strs[1]));
context.write(mapOutKey, mapOutValue);
}
- reduce關鍵代碼
private Text outPutKey = new Text();
public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//迭代輸出
for(IntWritable value : values) {
outPutKey.set(key.getFirst());
context.write(outPutKey, value);
}
}
- 輸出結果
a 1
a 5
a 7
a 9
b 3
b 8
b 10
</div>
MapReduce Join
<div class='mdContent'>
對兩份數(shù)據(jù)data1和data2進行關鍵詞連接是一個很通用的問題,如果數(shù)據(jù)量比較小,可以在內存中完成連接。
如果數(shù)據(jù)量比較大,在內存進行連接操會發(fā)生OOM。mapreduce join可以用來解決大數(shù)據(jù)的連接。
1 思路
1.1 reduce join
在map階段, 把關鍵字作為key輸出,并在value中標記出數(shù)據(jù)是來自data1還是data2。因為在shuffle階段已經自然按key分組,reduce階段,判斷每一個value是來自data1還是data2,在內部分成2組,做集合的乘積。
這種方法有2個問題:
1, map階段沒有對數(shù)據(jù)瘦身,shuffle的網絡傳輸和排序性能很低。
2, reduce端對2個集合做乘積計算,很耗內存,容易導致OOM。
1.2 map join
兩份數(shù)據(jù)中,如果有一份數(shù)據(jù)比較小,小數(shù)據(jù)全部加載到內存,按關鍵字建立索引。大數(shù)據(jù)文件作為map的輸入文件,對map()函數(shù)每一對輸入,都能夠方便地和已加載到內存的小數(shù)據(jù)進行連接。把連接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組的,并且連接好了的數(shù)據(jù)。
這種方法,要使用hadoop中的DistributedCache把小數(shù)據(jù)分布到各個計算節(jié)點,每個map節(jié)點都要把小數(shù)據(jù)庫加載到內存,按關鍵字建立索引。
這種方法有明顯的局限性:有一份數(shù)據(jù)比較小,在map端,能夠把它加載到內存,并進行join操作。
1.3 使用內存服務器,擴大節(jié)點的內存空間
針對map join,可以把一份數(shù)據(jù)存放到專門的內存服務器,在map()方法中,對每一個<key,value>的輸入對,根據(jù)key到內存服務器中取出數(shù)據(jù),進行連接
1.4 使用BloomFilter過濾空連接的數(shù)據(jù)
對其中一份數(shù)據(jù)在內存中建立BloomFilter,另外一份數(shù)據(jù)在連接之前,用BloomFilter判斷它的key是否存在,如果不存在,那這個記錄是空連接,可以忽略。
1.5 使用mapreduce專為join設計的包
在mapreduce包里看到有專門為join設計的包,對這些包還沒有學習,不知道怎么使用,只是在這里記錄下來,作個提醒。
jar: mapreduce-client-core.jar
package: org.apache.hadoop.mapreduce.lib.join
2 實現(xiàn)map join
相對而言,map join更加普遍,下面的代碼使用DistributedCache實現(xiàn)map join
2.1 背景
有客戶數(shù)據(jù)customer和訂單數(shù)據(jù)orders。
customer
| 客戶編號 | 姓名 | 地址 | 電話 |
|---|---|---|---|
| 1 | hanmeimei | ShangHai | 110 |
| 2 | leilei | BeiJing | 112 |
| 3 | lucy | GuangZhou | 119 |
** order**
| 訂單編號 | 客戶編號 | 其它字段被忽略 |
|---|---|---|
| 1 | 1 | 50 |
| 2 | 1 | 200 |
| 3 | 3 | 15 |
| 4 | 3 | 350 |
| 5 | 3 | 58 |
| 6 | 1 | 42 |
| 7 | 1 | 352 |
| 8 | 2 | 1135 |
| 9 | 2 | 400 |
| 10 | 2 | 2000 |
| 11 | 2 | 300 |
要求對customer和orders按照客戶編號進行連接,結果要求對客戶編號分組,對訂單編號排序,對其它字段不作要求
| 客戶編號 | 訂單編號 | 訂單金額 | 姓名 | 地址 | 電話 |
|---|---|---|---|---|---|
| 1 | 1 | 50 | hanmeimei | ShangHai | 110 |
| 1 | 2 | 200 | hanmeimei | ShangHai | 110 |
| 1 | 6 | 42 | hanmeimei | ShangHai | 110 |
| 1 | 7 | 352 | hanmeimei | ShangHai | 110 |
| 2 | 8 | 1135 | leilei | BeiJing | 112 |
| 2 | 9 | 400 | leilei | BeiJing | 112 |
| 2 | 10 | 2000 | leilei | BeiJing | 112 |
| 2 | 11 | 300 | leilei | BeiJing | 112 |
| 3 | 3 | 15 | lucy | GuangZhou | 119 |
| 3 | 4 | 350 | lucy | GuangZhou | 119 |
| 3 | 5 | 58 | lucy | GuangZhou | 119 |
- 在提交job的時候,把小數(shù)據(jù)通過DistributedCache分發(fā)到各個節(jié)點。
- map端使用DistributedCache讀到數(shù)據(jù),在內存中構建映射關系--如果使用專門的內存服務器,就把數(shù)據(jù)加載到內存服務器,map()節(jié)點可以只保留一份小緩存;如果使用BloomFilter來加速,在這里就可以構建;
- map()函數(shù)中,對每一對<key,value>,根據(jù)key到第2)步構建的映射里面中找出數(shù)據(jù),進行連接,輸出。
2.2 程序實現(xiàn)
public class Join extends Configured implements Tool {
// customer文件在hdfs上的位置。
// TODO: 改用參數(shù)傳入
private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
private static class CustomerBean {
private int custId;
private String name;
private String address;
private String phone;
public CustomerBean() {}
public CustomerBean(int custId, String name, String address,
String phone) {
super();
this.custId = custId;
this.name = name;
this.address = address;
this.phone = phone;
}
public int getCustId() {
return custId;
}
public String getName() {
return name;
}
public String getAddress() {
return address;
}
public String getPhone() {
return phone;
}
}
private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
private int custId;
private int orderId;
public void set(int custId, int orderId) {
this.custId = custId;
this.orderId = orderId;
}
public int getCustId() {
return custId;
}
public int getOrderId() {
return orderId;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(custId);
out.writeInt(orderId);
}
@Override
public void readFields(DataInput in) throws IOException {
custId = in.readInt();
orderId = in.readInt();
}
@Override
public int compareTo(CustOrderMapOutKey o) {
int res = Integer.compare(custId, o.custId);
return res == 0 ? Integer.compare(orderId, o.orderId) : res;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CustOrderMapOutKey) {
CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
return custId == o.custId && orderId == o.orderId;
} else {
return false;
}
}
@Override
public String toString() {
return custId + "\t" + orderId;
}
}
private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
private final Text outputValue = new Text();
/**
* 在內存中customer數(shù)據(jù)
*/
private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 格式: 訂單編號 客戶編號 訂單金額
String[] cols = value.toString().split("\t");
if (cols.length < 3) {
return;
}
int custId = Integer.parseInt(cols[1]); // 取出客戶編號
CustomerBean customerBean = CUSTOMER_MAP.get(custId);
if (customerBean == null) { // 沒有對應的customer信息可以連接
return;
}
StringBuffer sb = new StringBuffer();
sb.append(cols[2])
.append("\t")
.append(customerBean.getName())
.append("\t")
.append(customerBean.getAddress())
.append("\t")
.append(customerBean.getPhone());
outputValue.set(sb.toString());
outputKey.set(custId, Integer.parseInt(cols[0]));
context.write(outputKey, outputValue);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));
BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
String line = null;
String[] cols = null;
// 格式:客戶編號 姓名 地址 電話
while ((line = reader.readLine()) != null) {
cols = line.split("\t");
if (cols.length < 4) { // 數(shù)據(jù)格式不匹配,忽略
continue;
}
CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
CUSTOMER_MAP.put(bean.getCustId(), bean);
}
}
}
/**
* reduce
* @author Ivan
*
*/
private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
@Override
protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 什么事都不用做,直接輸出
for (Text value : values) {
context.write(key, value);
}
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if (args.length < 2) {
new IllegalArgumentException("Usage: <inpath> <outpath>");
return;
}
ToolRunner.run(new Configuration(), new Join(), args);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, Join.class.getSimpleName());
job.setJarByClass(SecondarySortMapReduce.class);
// 添加customer cache文件
job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// map settings
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(CustOrderMapOutKey.class);
job.setMapOutputValueClass(Text.class);
// reduce settings
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(CustOrderMapOutKey.class);
job.setOutputKeyClass(Text.class);
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
}
運行環(huán)境
- 操作系統(tǒng): Centos 6.4
- Hadoop: Apache Hadoop-2.5.0
==客戶數(shù)據(jù)文件在hdfs上的位置硬編碼為==
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 運行程序之前先把客戶數(shù)據(jù)上傳到這個位置。
- 程序運行結果

</div>
MapReduce自定義分組Group
<div class='mdContent'>
一:背景
在上一篇文章中我們可以對兩列數(shù)據(jù)進行排序,即完成了當?shù)谝涣邢嗤瑫r第二列數(shù)據(jù)升序排列的功能,現(xiàn)在我們需要進一步完善一個功能,那就是當?shù)谝涣邢嗤瑫r求出第二列的最小值或最大值,Hadoop提供了自定義分組的功能,可以滿足我們的需求。
二:技術實現(xiàn)
我們先來看看需求
當?shù)谝涣胁幌嗟葧r,第一列按升序排列,當?shù)谝涣邢嗟葧r,求出對應第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
輸出結果應該是:
1 1
2 1
3 1
實現(xiàn):
(1).自定義分組比較器繼承RawComparator,實現(xiàn)compare()方法。
(2).在設置作業(yè)是設置job.setGroupingComparatorClass()。
代碼如下:
public class MyGroupTest {
// 定義輸入路徑
private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
// 定義輸出路徑
private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
public static void main(String[] args) {
try {
// 創(chuàng)建配置信息
Configuration conf = new Configuration();
// 創(chuàng)建文件系統(tǒng)
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
// 如果輸出目錄存在,我們就刪除
if (fileSystem.exists(new Path(OUT_PATH))) {
fileSystem.delete(new Path(OUT_PATH), true);
}
// 創(chuàng)建任務
Job job = new Job(conf, MyGroupTest.class.getName());
// 天龍八部1.1 設置輸入目錄和設置輸入數(shù)據(jù)格式化的類
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
//1.2 設置自定義Mapper類和設置map函數(shù)輸出數(shù)據(jù)的key和value的類型
job.setMapperClass(MyGroupMapper.class);
job.setMapOutputKeyClass(CombineKey.class);
job.setMapOutputValueClass(LongWritable.class);
//一定不要忘記設置自定義分組比較器的類(這一步是關鍵)
job.setGroupingComparatorClass(MyGroupComparator.class);
//1.3 設置分區(qū)和reduce數(shù)量(reduce的數(shù)量,和分區(qū)的數(shù)量對應,因為分區(qū)為一個,所以reduce的數(shù)量也是一個)
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 排序、分組
//1.5 歸約
//2.1 Shuffle把數(shù)據(jù)從Map端拷貝到Reduce端。
//2.2 指定Reducer類和輸出key和value的類型
job.setReducerClass(MyGroupReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定輸出的路徑和設置輸出的格式化類
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
// 提交作業(yè) 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
// 創(chuàng)建聯(lián)合的key
private CombineKey combineKey = new CombineKey();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
InterruptedException {
// 對輸入value進行分割
String[] splits = value.toString().split("\t");
// 設置聯(lián)合的Key
combineKey.setComKey(Long.parseLong(splits[0]));
combineKey.setComVal(Long.parseLong(splits[1]));
// 寫出去
context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
}
}
public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
long min = Long.MAX_VALUE;
// 遍歷比較求出每個組中的最小值
for (LongWritable val : values) {
if (val.get() < min) {
min = val.get();
}
}
// 把原始數(shù)據(jù)中的第一列中的元素分組后的組號作為key,所求的最小值為value將結果寫出去
context.write(new LongWritable(combineKey.getComKey()), new LongWritable(min));
}
}
}
/**
- 二次排序構造一個新的Key
*/
class CombineKey implements WritableComparable<CombineKey> {
private Long comKey;
private Long comVal;
// 無參構造函數(shù)必須提供,否則Hadoop的反射機制會報錯
public CombineKey() {
}
// 有參構造函數(shù)
public CombineKey(Long comKey, Long comVal) {
this.comKey = comKey;
this.comVal = comVal;
}
public Long getComKey() {
return comKey;
}
public void setComKey(Long comKey) {
this.comKey = comKey;
}
public Long getComVal() {
return comVal;
}
public void setComVal(Long comVal) {
this.comVal = comVal;
}
public void write(DataOutput out) throws IOException {
out.writeLong(this.comKey);
out.writeLong(this.comVal);
}
public void readFields(DataInput in) throws IOException {
this.comKey = in.readLong();
this.comVal = in.readLong();
}
/**
* 第一列按升序排列,第一列相同時,第二列也按升序排列
*/
public int compareTo(CombineKey o) {
long minus = this.comKey - o.comVal;
if (minus != 0) {
return (int) minus;
}
return (int) (this.comVal - o.comVal);
}
}
/**
-
自定義分組比較器
*/
class MyGroupComparator implements RawComparator<CombineKey> {// 分組策略中,這個方法不是重點
public int compare(CombineKey o1, CombineKey o2) {
// TODO Auto-generated method stub
return 0;
}/**
b1 表示第一個參與比較的字節(jié)數(shù)組
s1 表示第一個字節(jié)數(shù)組中開始比較的位置
l1 表示第一個字節(jié)數(shù)組中參與比較的字節(jié)長度
b2 表示第二個參與比較的字節(jié)數(shù)組
s2 表示第二個字節(jié)數(shù)組中開始比較的位置
-
l2 表示第二個字節(jié)數(shù)組參與比較的字節(jié)長度
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {// 這里是按第CombineKey中的第一個元素進行分組,因為是long類型,所以是8個字節(jié)
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
}
</div>
MapReduce自定義輸入輸出
自定義輸入:
其中 :reader.readLine(tmp); 是讀取下一行到tmp中
map的默認輸入key是行的偏移值 value是每一行的數(shù)據(jù)
相對map的輸入key value 以及讀哪些文件我們都可以靈活控制 :
輸入的格式是有FileInputFormat控制的 而對格式的控制是有RecordReader做到的 所以 要想控制輸入格式 首先重寫FileInputFormat的RecordReader 方法,在重寫的RecordReader 中new一個新類(繼承FileInputFormat 實現(xiàn)五個方法),達到控制
上代碼:
//1.繼承FileInputFormat 重寫RecordReader 輸入輸出為map輸入輸出
public class AuthReader extends FileInputFormat<Text,Text>{
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new InputFormat(); //new的新類
}
}
//2.創(chuàng)建新類 繼承RecordReader 輸入輸出為map輸入輸出
public class InputFormat extends RecordReader<Text,Text>{
private FileSplit fs ;
private Text key;
private Text value;
private LineReader reader;
private String fileName;
//初始化方法
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
fs = (FileSplit) split;
fileName = fs.getPath().getName();
Path path = fs.getPath();
Configuration conf = new Configuration();
//獲取文件系統(tǒng)
FileSystem system = path.getFileSystem(conf);
FSDataInputStream in = system.open(path);
reader = new LineReader(in);
}
//知識點1:這個方法會被調用多次 這個方法的返回值如果是true就會被調用一次
// 知識點2:每當nextKeyValue被調用一次 ,getCurrentKey,getCurrentValue也會被跟著調用一次
//知識點3:getCurrentKey,getCurrentValue給Map傳key,value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//可以定義哪些文件不處理
if(!fileName.startsWith("wo"))return false;
Text tmp = new Text();
int length = reader.readLine(tmp);
if(length==0){
return false;
}else{
value=new Text(tmp+"何睿");
key = new Text("我是雷神托爾");
return true;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
if(reader!=null){
reader.close();
}
}
}
最后 在Driver中
//自定義輸入
job.setInputFormatClass(AuthReader.class);
自定義輸出:
//writer
public class AuthWriter<K,V> extends FileOutputFormat<K,V>{
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Path path=super.getDefaultWorkFile(job, "");
Configuration conf=job.getConfiguration();
FileSystem fs=path.getFileSystem(conf);
FSDataOutputStream out=fs.create(path);
//新類 的鍵值分割符 行分割符
return new NOutputFormat<K,V>(out,"#|#","\r\n");
}
//實現(xiàn)類
public class NOutputFormat<K,V> extends RecordWriter<K,V>{
private FSDataOutputStream out;
private String keyValueSeparator;//鍵值分隔符
private String lineSeparator; //行與行分隔符
public NOutputFormat(FSDataOutputStream out,String keyValueSeparator,String lineSeparator){
this.out=out;
this.keyValueSeparator=keyValueSeparator;
this.lineSeparator=lineSeparator;
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
out.write(key.toString().getBytes());//key
out.write(keyValueSeparator.getBytes());//鍵值對分隔符
out.write(value.toString().getBytes());//vale
out.write(lineSeparator.getBytes());//行與行分隔符
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if(out!=null)out.close();
}
}
//在Driver中
//自定義輸出
job.setOutputFormatClass(AuthWriter.class);
多輸入源 一個job執(zhí)行
在Driver中
//對A目錄 用A Mapper A Reduce 執(zhí)行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score.txt"),AuthInputFormat.class,ScoreMapper.class);
//對B目錄 用B Mapper B Reduce 執(zhí)行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score-1.txt"),TextInputFormat.class,ScoreMapper2.class);
</div>