MapReduce應用

MapReduce應用

二次排序

<div class='mdContent'>
二次排序的需求說明

在mapreduce操作時,shuffle階段會多次根據(jù)key值排序。但是在shuffle分組后,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時value值也是排序好的,這種需求就是二次排序。

1.jpg

測試的文件數(shù)據(jù)

a 1
a 5
a 7
a 9
b 3
b 8
b 10

未經過二次排序的輸出結果

a   9
a   7
a   5
a   1
b   10
b   8
b   3

第一種實現(xiàn)思路

直接在reduce端對分組后的values進行排序。

  • reduce關鍵代碼
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {

             List<Integer> valuesList = new ArrayList<Integer>();

             // 取出value
             for(IntWritable value : values) {
                 valuesList.add(value.get());
             }
             // 進行排序
             Collections.sort(valuesList);

             for(Integer value : valuesList) {
                context.write(key, new IntWritable(value));
             }

        }

  • 輸出結果
a   1
a   5
a   7
a   9
b   3
b   8
b   10

很容易發(fā)現(xiàn),這樣把排序工作都放到reduce端完成,當values序列長度非常大的時候,會對CPU和內存造成極大的負載。

  • 注意的地方(容易被“坑”)
    在reduce端對values進行迭代的時候,不要直接直接存儲value值或者key值,因為reduce方法會反復執(zhí)行多次,但key和value相關的對象只有兩個,reduce會反復重用這兩個對象。需要用相應的數(shù)據(jù)類型.get()取出后再存儲。

第二種實現(xiàn)思路

將map端輸出的<key,value>中的key和value組合成一個新的key(稱為newKey),value值不變。這里就變成<(key,value),value>,在針對newKey排序的時候,如果key相同,就再對value進行排序。

  • 需要自定義的地方
  1. 自定義數(shù)據(jù)類型實現(xiàn)組合key
    實現(xiàn)方式:繼承WritableComparable
  2. 自定義partioner,形成newKey后保持分區(qū)規(guī)則任然按照key進行。保證不打亂原來的分區(qū)。
    實現(xiàn)方式:繼承partitioner
  3. 自動以分組,保持分組規(guī)則任然按照key進行。不打亂原來的分組
    實現(xiàn)方式:繼承RawComparator
  • 自定義數(shù)據(jù)類型關鍵代碼
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

 public class PairWritable implements WritableComparable<PairWritable> {
    // 組合key
      private String first;
      private int second;

    public PairWritable() {
    }

    public PairWritable(String first, int second) {
        this.set(first, second);
    }

    /**
     * 方便設置字段
     */
    public void set(String first, int second) {
        this.first = first;
        this.second = second;
    }

    /**
     * 反序列化
     */
    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.first = arg0.readUTF();
        this.second = arg0.readInt();
    }
    /**
     * 序列化
     */
    @Override
    public void write(DataOutput arg0) throws IOException {
        arg0.writeUTF(first);
        arg0.writeInt(second);
    }

    /*
     * 重寫比較器
     */
    public int compareTo(PairWritable o) {
        int comp = this.first.compareTo(o.first);

        if(comp != 0) {
            return comp;
        } else { // 若第一個字段相等,則比較第二個字段
            return Integer.valueOf(this.second).compareTo(
                    Integer.valueOf(o.getSecond()));
        }
    }

    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    public String getFirst() {
        return first;
    }
    public void setFirst(String first) {
        this.first = first;
    }

  • 自定義分區(qū)規(guī)則
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
        /* 
         * 默認的實現(xiàn) (key.hashCode() & Integer.MAX_VALUE) % numPartitions
         * 讓key中first字段作為分區(qū)依據(jù)
         */
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 
    }
}

  • 自定義分組比較器
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class SecondGroupComparator implements RawComparator<PairWritable> {

    /*
     * 對象比較
     */
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

    /*
     * 字節(jié)比較
     * arg0,arg3為要比較的兩個字節(jié)數(shù)組
     * arg1,arg2表示第一個字節(jié)數(shù)組要進行比較的收尾位置,arg4,arg5表示第二個
     * 從第一個字節(jié)比到組合key中second的前一個字節(jié),因為second為int型,所以長度為4
     */
    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
        return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
    }
}

  • map關鍵代碼
        private PairWritable mapOutKey = new PairWritable();
        private IntWritable mapOutValue = new IntWritable();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");

            //設置組合key和value ==> <(key,value),value>
            mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
            mapOutValue.set(Integer.valueOf(strs[1]));

            context.write(mapOutKey, mapOutValue);
        }

  • reduce關鍵代碼
        private Text outPutKey = new Text(); 
        public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            //迭代輸出
            for(IntWritable value : values) {
                outPutKey.set(key.getFirst());
                context.write(outPutKey, value);
            }

        }

  • 輸出結果
a   1
a   5
a   7
a   9
b   3
b   8
b   10

</div>

MapReduce Join

<div class='mdContent'>
對兩份數(shù)據(jù)data1和data2進行關鍵詞連接是一個很通用的問題,如果數(shù)據(jù)量比較小,可以在內存中完成連接。

如果數(shù)據(jù)量比較大,在內存進行連接操會發(fā)生OOM。mapreduce join可以用來解決大數(shù)據(jù)的連接。

1 思路

1.1 reduce join

在map階段, 把關鍵字作為key輸出,并在value中標記出數(shù)據(jù)是來自data1還是data2。因為在shuffle階段已經自然按key分組,reduce階段,判斷每一個value是來自data1還是data2,在內部分成2組,做集合的乘積。

這種方法有2個問題:

1, map階段沒有對數(shù)據(jù)瘦身,shuffle的網絡傳輸和排序性能很低。

2, reduce端對2個集合做乘積計算,很耗內存,容易導致OOM。

1.2 map join

兩份數(shù)據(jù)中,如果有一份數(shù)據(jù)比較小,小數(shù)據(jù)全部加載到內存,按關鍵字建立索引。大數(shù)據(jù)文件作為map的輸入文件,對map()函數(shù)每一對輸入,都能夠方便地和已加載到內存的小數(shù)據(jù)進行連接。把連接結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組的,并且連接好了的數(shù)據(jù)。

這種方法,要使用hadoop中的DistributedCache把小數(shù)據(jù)分布到各個計算節(jié)點,每個map節(jié)點都要把小數(shù)據(jù)庫加載到內存,按關鍵字建立索引。

這種方法有明顯的局限性:有一份數(shù)據(jù)比較小,在map端,能夠把它加載到內存,并進行join操作。

1.3 使用內存服務器,擴大節(jié)點的內存空間

針對map join,可以把一份數(shù)據(jù)存放到專門的內存服務器,在map()方法中,對每一個<key,value>的輸入對,根據(jù)key到內存服務器中取出數(shù)據(jù),進行連接

1.4 使用BloomFilter過濾空連接的數(shù)據(jù)

對其中一份數(shù)據(jù)在內存中建立BloomFilter,另外一份數(shù)據(jù)在連接之前,用BloomFilter判斷它的key是否存在,如果不存在,那這個記錄是空連接,可以忽略。

1.5 使用mapreduce專為join設計的包

在mapreduce包里看到有專門為join設計的包,對這些包還沒有學習,不知道怎么使用,只是在這里記錄下來,作個提醒。

jar: mapreduce-client-core.jar

package: org.apache.hadoop.mapreduce.lib.join

2 實現(xiàn)map join

相對而言,map join更加普遍,下面的代碼使用DistributedCache實現(xiàn)map join

2.1 背景

有客戶數(shù)據(jù)customer和訂單數(shù)據(jù)orders。

customer

客戶編號 姓名 地址 電話
1 hanmeimei ShangHai 110
2 leilei BeiJing 112
3 lucy GuangZhou 119

** order**

訂單編號 客戶編號 其它字段被忽略
1 1 50
2 1 200
3 3 15
4 3 350
5 3 58
6 1 42
7 1 352
8 2 1135
9 2 400
10 2 2000
11 2 300

要求對customer和orders按照客戶編號進行連接,結果要求對客戶編號分組,對訂單編號排序,對其它字段不作要求

客戶編號 訂單編號 訂單金額 姓名 地址 電話
1 1 50 hanmeimei ShangHai 110
1 2 200 hanmeimei ShangHai 110
1 6 42 hanmeimei ShangHai 110
1 7 352 hanmeimei ShangHai 110
2 8 1135 leilei BeiJing 112
2 9 400 leilei BeiJing 112
2 10 2000 leilei BeiJing 112
2 11 300 leilei BeiJing 112
3 3 15 lucy GuangZhou 119
3 4 350 lucy GuangZhou 119
3 5 58 lucy GuangZhou 119
  1. 在提交job的時候,把小數(shù)據(jù)通過DistributedCache分發(fā)到各個節(jié)點。
  2. map端使用DistributedCache讀到數(shù)據(jù),在內存中構建映射關系--如果使用專門的內存服務器,就把數(shù)據(jù)加載到內存服務器,map()節(jié)點可以只保留一份小緩存;如果使用BloomFilter來加速,在這里就可以構建;
  3. map()函數(shù)中,對每一對<key,value>,根據(jù)key到第2)步構建的映射里面中找出數(shù)據(jù),進行連接,輸出。

2.2 程序實現(xiàn)

public class Join extends Configured implements Tool {
    // customer文件在hdfs上的位置。
    // TODO: 改用參數(shù)傳入
    private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
    private static class CustomerBean {     
        private int custId;
        private String name;
        private String address;
        private String phone;

        public CustomerBean() {}

        public CustomerBean(int custId, String name, String address,
                String phone) {
            super();
            this.custId = custId;
            this.name = name;
            this.address = address;
            this.phone = phone;
        }

        public int getCustId() {
            return custId;
        }

        public String getName() {
            return name;
        }

        public String getAddress() {
            return address;
        }

        public String getPhone() {
            return phone;
        }
    }

    private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
        private int custId;
        private int orderId;

        public void set(int custId, int orderId) {
            this.custId = custId;
            this.orderId = orderId;
        }

        public int getCustId() {
            return custId;
        }

        public int getOrderId() {
            return orderId;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(custId);
            out.writeInt(orderId);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            custId = in.readInt();
            orderId = in.readInt();
        }

        @Override
        public int compareTo(CustOrderMapOutKey o) {
            int res = Integer.compare(custId, o.custId);
            return res == 0 ? Integer.compare(orderId, o.orderId) : res;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof CustOrderMapOutKey) {
                CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
                return custId == o.custId && orderId == o.orderId;
            } else {
                return false;
            }
        }

        @Override
        public String toString() {
            return custId + "\t" + orderId;
        }
    }

    private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
        private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
        private final Text outputValue = new Text();

        /**
         * 在內存中customer數(shù)據(jù)
         */
        private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // 格式: 訂單編號 客戶編號    訂單金額
            String[] cols = value.toString().split("\t");           
            if (cols.length < 3) {
                return;
            }

            int custId = Integer.parseInt(cols[1]);     // 取出客戶編號
            CustomerBean customerBean = CUSTOMER_MAP.get(custId);

            if (customerBean == null) {         // 沒有對應的customer信息可以連接
                return;
            }

            StringBuffer sb = new StringBuffer();
            sb.append(cols[2])
                .append("\t")
                .append(customerBean.getName())
                .append("\t")
                .append(customerBean.getAddress())
                .append("\t")
                .append(customerBean.getPhone());

            outputValue.set(sb.toString());
            outputKey.set(custId, Integer.parseInt(cols[0]));

            context.write(outputKey, outputValue);
        }

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
            FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));

            BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
            String line = null;
            String[] cols = null;

            // 格式:客戶編號  姓名  地址  電話
            while ((line = reader.readLine()) != null) {
                cols = line.split("\t");
                if (cols.length < 4) {              // 數(shù)據(jù)格式不匹配,忽略
                    continue;
                }

                CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
                CUSTOMER_MAP.put(bean.getCustId(), bean);
            }
        }
    }

    /**
     * reduce
     * @author Ivan
     *
     */
    private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
        @Override
        protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 什么事都不用做,直接輸出
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }
    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            new IllegalArgumentException("Usage: <inpath> <outpath>");
            return;
        }

        ToolRunner.run(new Configuration(), new Join(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, Join.class.getSimpleName());
        job.setJarByClass(SecondarySortMapReduce.class);

        // 添加customer cache文件
        job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // map settings
        job.setMapperClass(JoinMapper.class);
        job.setMapOutputKeyClass(CustOrderMapOutKey.class);
        job.setMapOutputValueClass(Text.class);

        // reduce settings
        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(CustOrderMapOutKey.class);
        job.setOutputKeyClass(Text.class);

        boolean res = job.waitForCompletion(true);

        return res ? 0 : 1;
    }
}

運行環(huán)境

  • 操作系統(tǒng): Centos 6.4
  • Hadoop: Apache Hadoop-2.5.0

==客戶數(shù)據(jù)文件在hdfs上的位置硬編碼為==
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 運行程序之前先把客戶數(shù)據(jù)上傳到這個位置。

  • 程序運行結果
##

</div>

MapReduce自定義分組Group

<div class='mdContent'>
一:背景

在上一篇文章中我們可以對兩列數(shù)據(jù)進行排序,即完成了當?shù)谝涣邢嗤瑫r第二列數(shù)據(jù)升序排列的功能,現(xiàn)在我們需要進一步完善一個功能,那就是當?shù)谝涣邢嗤瑫r求出第二列的最小值或最大值,Hadoop提供了自定義分組的功能,可以滿足我們的需求。

二:技術實現(xiàn)

我們先來看看需求

當?shù)谝涣胁幌嗟葧r,第一列按升序排列,當?shù)谝涣邢嗟葧r,求出對應第二列的最小值

3 3
3 2
3 1
2 2
2 1
1 1

輸出結果應該是:
1 1
2 1
3 1

實現(xiàn):

(1).自定義分組比較器繼承RawComparator,實現(xiàn)compare()方法。

(2).在設置作業(yè)是設置job.setGroupingComparatorClass()。

代碼如下:

public class MyGroupTest {
// 定義輸入路徑
private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
// 定義輸出路徑
private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

public static void main(String[] args) {

    try {
        // 創(chuàng)建配置信息
        Configuration conf = new Configuration();


        // 創(chuàng)建文件系統(tǒng)
        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        // 如果輸出目錄存在,我們就刪除
        if (fileSystem.exists(new Path(OUT_PATH))) {
            fileSystem.delete(new Path(OUT_PATH), true);
        }

        // 創(chuàng)建任務
        Job job = new Job(conf, MyGroupTest.class.getName());

        // 天龍八部1.1 設置輸入目錄和設置輸入數(shù)據(jù)格式化的類
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        job.setInputFormatClass(TextInputFormat.class);

        //1.2 設置自定義Mapper類和設置map函數(shù)輸出數(shù)據(jù)的key和value的類型
        job.setMapperClass(MyGroupMapper.class);
        job.setMapOutputKeyClass(CombineKey.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //一定不要忘記設置自定義分組比較器的類(這一步是關鍵)
        job.setGroupingComparatorClass(MyGroupComparator.class);
        
        //1.3 設置分區(qū)和reduce數(shù)量(reduce的數(shù)量,和分區(qū)的數(shù)量對應,因為分區(qū)為一個,所以reduce的數(shù)量也是一個)
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);

        //1.4 排序、分組
        //1.5 歸約
        //2.1 Shuffle把數(shù)據(jù)從Map端拷貝到Reduce端。
        //2.2 指定Reducer類和輸出key和value的類型
        job.setReducerClass(MyGroupReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定輸出的路徑和設置輸出的格式化類
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        job.setOutputFormatClass(TextOutputFormat.class);

        // 提交作業(yè) 退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
    // 創(chuàng)建聯(lián)合的key
    private CombineKey combineKey = new CombineKey();

    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
            InterruptedException {
        // 對輸入value進行分割
        String[] splits = value.toString().split("\t");
        // 設置聯(lián)合的Key
        combineKey.setComKey(Long.parseLong(splits[0]));
        combineKey.setComVal(Long.parseLong(splits[1]));

        // 寫出去
        context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
    }
}

public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
    @Override
    protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
            Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {

        long min = Long.MAX_VALUE;
        // 遍歷比較求出每個組中的最小值
        for (LongWritable val : values) {

            if (val.get() < min) {
                min = val.get();
            }
        }

        // 把原始數(shù)據(jù)中的第一列中的元素分組后的組號作為key,所求的最小值為value將結果寫出去
        context.write(new LongWritable(combineKey.getComKey()), new LongWritable(min));
    }
}

}

/**

  • 二次排序構造一個新的Key

*/
class CombineKey implements WritableComparable<CombineKey> {

private Long comKey;
private Long comVal;

// 無參構造函數(shù)必須提供,否則Hadoop的反射機制會報錯
public CombineKey() {
}

// 有參構造函數(shù)
public CombineKey(Long comKey, Long comVal) {
    this.comKey = comKey;
    this.comVal = comVal;
}

public Long getComKey() {
    return comKey;
}

public void setComKey(Long comKey) {
    this.comKey = comKey;
}

public Long getComVal() {
    return comVal;
}

public void setComVal(Long comVal) {
    this.comVal = comVal;
}

public void write(DataOutput out) throws IOException {
    out.writeLong(this.comKey);
    out.writeLong(this.comVal);
}

public void readFields(DataInput in) throws IOException {
    this.comKey = in.readLong();
    this.comVal = in.readLong();
}

/**
 * 第一列按升序排列,第一列相同時,第二列也按升序排列
 */
public int compareTo(CombineKey o) {
    long minus = this.comKey - o.comVal;
    if (minus != 0) {
        return (int) minus;
    }

    return (int) (this.comVal - o.comVal);
}

}

/**

  • 自定義分組比較器
    */
    class MyGroupComparator implements RawComparator<CombineKey> {

    // 分組策略中,這個方法不是重點
    public int compare(CombineKey o1, CombineKey o2) {
    // TODO Auto-generated method stub
    return 0;
    }

    /**

    • b1 表示第一個參與比較的字節(jié)數(shù)組

    • s1 表示第一個字節(jié)數(shù)組中開始比較的位置

    • l1 表示第一個字節(jié)數(shù)組中參與比較的字節(jié)長度

    • b2 表示第二個參與比較的字節(jié)數(shù)組

    • s2 表示第二個字節(jié)數(shù)組中開始比較的位置

    • l2 表示第二個字節(jié)數(shù)組參與比較的字節(jié)長度
      */
      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

      // 這里是按第CombineKey中的第一個元素進行分組,因為是long類型,所以是8個字節(jié)
      return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
      }

}

</div>

MapReduce自定義輸入輸出

自定義輸入:

其中 :reader.readLine(tmp); 是讀取下一行到tmp中
map的默認輸入key是行的偏移值 value是每一行的數(shù)據(jù)
相對map的輸入key value 以及讀哪些文件我們都可以靈活控制 :

輸入的格式是有FileInputFormat控制的 而對格式的控制是有RecordReader做到的 所以 要想控制輸入格式 首先重寫FileInputFormat的RecordReader 方法,在重寫的RecordReader 中new一個新類(繼承FileInputFormat 實現(xiàn)五個方法),達到控制

上代碼:

//1.繼承FileInputFormat 重寫RecordReader  輸入輸出為map輸入輸出
public class AuthReader extends FileInputFormat<Text,Text>{
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new InputFormat(); //new的新類
    }
}

//2.創(chuàng)建新類 繼承RecordReader  輸入輸出為map輸入輸出
public class InputFormat extends RecordReader<Text,Text>{
    private FileSplit fs ;
    private Text key;
    private Text value;
    private LineReader reader;

    private String fileName;

    //初始化方法
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        fs = (FileSplit) split;
        fileName = fs.getPath().getName();
        Path path = fs.getPath();
        Configuration conf = new Configuration();
        //獲取文件系統(tǒng)
        FileSystem system = path.getFileSystem(conf);
        FSDataInputStream in = system.open(path);
        reader = new LineReader(in);
    }

     //知識點1:這個方法會被調用多次   這個方法的返回值如果是true就會被調用一次
     // 知識點2:每當nextKeyValue被調用一次 ,getCurrentKey,getCurrentValue也會被跟著調用一次
     //知識點3:getCurrentKey,getCurrentValue給Map傳key,value
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        //可以定義哪些文件不處理
        if(!fileName.startsWith("wo"))return false;
        Text tmp = new Text();
        int length = reader.readLine(tmp);
        if(length==0){
            return false;
        }else{
            value=new Text(tmp+"何睿");
            key = new Text("我是雷神托爾");
            return true;
        }

    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        if(reader!=null){
            reader.close();
        }
    }
}

最后 在Driver中

          //自定義輸入
        job.setInputFormatClass(AuthReader.class);

自定義輸出:

//writer
public class AuthWriter<K,V> extends FileOutputFormat<K,V>{
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Path path=super.getDefaultWorkFile(job, "");
        Configuration conf=job.getConfiguration();
        FileSystem fs=path.getFileSystem(conf);
        FSDataOutputStream out=fs.create(path);
        //新類 的鍵值分割符      行分割符
        return new NOutputFormat<K,V>(out,"#|#","\r\n");
    }

//實現(xiàn)類
public class NOutputFormat<K,V> extends RecordWriter<K,V>{
    private FSDataOutputStream out;
    private String keyValueSeparator;//鍵值分隔符
    private String lineSeparator; //行與行分隔符

    public NOutputFormat(FSDataOutputStream out,String keyValueSeparator,String lineSeparator){
        this.out=out;
        this.keyValueSeparator=keyValueSeparator;
        this.lineSeparator=lineSeparator;
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
        out.write(key.toString().getBytes());//key
        out.write(keyValueSeparator.getBytes());//鍵值對分隔符
        out.write(value.toString().getBytes());//vale
        out.write(lineSeparator.getBytes());//行與行分隔符
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if(out!=null)out.close();
    }
}

//在Driver中 
        //自定義輸出
        job.setOutputFormatClass(AuthWriter.class);

多輸入源 一個job執(zhí)行

在Driver中

//對A目錄 用A Mapper  A Reduce 執(zhí)行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score.txt"),AuthInputFormat.class,ScoreMapper.class);

//對B目錄 用B Mapper  B Reduce 執(zhí)行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score-1.txt"),TextInputFormat.class,ScoreMapper2.class);

</div>

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 一.簡述如何安裝配置apache 的一個開源的hadoop 1.使用root賬戶登陸 2.修改ip 3.修改hos...
    梔子花_ef39閱讀 5,068評論 0 52
  • 關于Mongodb的全面總結 MongoDB的內部構造《MongoDB The Definitive Guide》...
    中v中閱讀 32,306評論 2 89
  • 三月,初春時節(jié)?;▋簜冞€沒有開始爭奇斗艷,作為哺乳動物的人們也裹著冬衣在大地上行走。在羅江只可以看到一抹抹盎然...
    嘻哈Ray閱讀 160評論 0 1
  • 乾卦第一 坤卦第二 乾下乾上 坤下坤上 乾:元、亨、利、貞。 坤:元、享,利牝馬之貞。 君子有攸往,先迷,后得,主...
    香香純妹子淺淺閱讀 165評論 0 0
  • 今天和你相見我面帶微笑我的手和你的手相握我用我的溫暖接近你請你收下我的關懷和敬意 各自的手剛才還在自己的袖管里沉默...
    王春淶閱讀 245評論 1 7

友情鏈接更多精彩內容