Hadoop學(xué)習(xí)之WritableComparable排序

自定義排序?qū)崿F(xiàn)

bean對象做為key傳輸,需要實現(xiàn)WritableComparable接口重寫compareTo方法,就可以實現(xiàn)排序。

WritableComparable 的底層實現(xiàn)

WritableComparable 本身繼承了 Writable 和 Comparable 接口。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {}

自定義排序選需要同時實現(xiàn) 序列化、反序列化及比較器

 void write(DataOutput out) throws IOException;
 void readFields(DataInput in) throws IOException;
 public int compareTo(T o);

案例實操

數(shù)據(jù)準備

序號  電話號碼      IP地址          域名        上行流量  下行流量  狀態(tài)
1   13736230513 192.196.100.1   www.tenxun.com  2481    24681   200
2   13846544121 192.196.100.2                   264     0       200
3   13956435636 192.196.100.3                   132     1512    200
4   13966251146 192.168.100.1                   240     0       404
5   18271575951 192.168.100.2   www.tenxun.com  1527    2106    200
6   84188413    192.168.100.3   www.tenxun.com  4116    1432    200
7   13590439668 192.168.100.4                   1116    954     200
8   15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9   13729199489 192.168.100.6                   240     0       200
10  13630577991 192.168.100.7   www.shouhu.com  6960    690     200
11  15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12  15959002129 192.168.100.9   www.tenxun.com  1938    180     500
13  13560439638 192.168.100.10                  918     4938    200
14  13470253144 192.168.100.11                  180     180     200
15  13682846555 192.168.100.12  www.qq.com      1938    2910    200
16  13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17  13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18  18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19  13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20  13768778790 192.168.100.17                  120     120     200
21  13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22  13568436656 192.168.100.19                  1116    954     200

需求

求出總流量(上行流量+下行流量)并按照總流量進行排序,最后輸出結(jié)果包含手機號碼、上行流量、下行流量,總流量

輸出結(jié)果

電話號碼     上行下行 總流量
13768778790 120 120 240
13966251146 240 0 240
13729199489 240 0 240
13846544121 264 0 264
13470253144 180 180 360
13956435636 132 1512 1644
13590439668 1116 954 2070
13568436656 1116 954 2070
15959002129 1938 180 2118
18271575951 1527 2106 3633
13682846555 1938 2910 4848
84188413    4116 1432 5548
13560439638 918 4938 5856
15910133277 3156 2936 6092
13992314666 3008 3720 6728
15043685818 3659 3538 7197
13630577991 6960 690 7650
18390173782 9531 2412 11943
13568436656 2481 24681 27162
13736230513 2481 24681 27162
13975057813 11058 48243 59301
13509468723 7335 110349 117684

排序分類

  1. 全排序:

最終輸出結(jié)果只有一個文件,且文件內(nèi)部有序。實現(xiàn)方式是只設(shè)置一個ReduceTask。但該方法在處理大型文件時效率極低,因此一臺機器處理所有文件,完全喪失了MapReduce所提供的并行架構(gòu)。

  1. 部分排序(區(qū)內(nèi)排序):

MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內(nèi)部有序。

  1. 二次排序(多次排序):\color{red}{不做演示}

在自定義排序過程中,如果compareTo的判斷條件為兩個即為二次排序。

全排序

創(chuàng)建實體類

實現(xiàn) WritableComparable 接口、實現(xiàn)compareTo、write、readFields方法
注意點:
1:readFields 讀取數(shù)據(jù)的順序必須與write寫入的順序一致。
2:必須保證有一個空構(gòu)造器。
3:必須實現(xiàn)toString()方法,否則輸出數(shù)據(jù)是對象的地址信息。

/**
 * 流量數(shù)據(jù)bean
 */
public class PhoneComparableBean implements WritableComparable<PhoneComparableBean> {
    /**
     * 上行流量
     */
    private long upFlow;
    /**
     * 下行流量
     */
    private long downFlow;
    /**
     * 總流量
     */
    private long sumFlow;

    public PhoneComparableBean(){}

    public PhoneComparableBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow+downFlow;
    }

    /**
      *  若多個排序條件就可以稱為二次排序或多次排序
      */
    @Override
    public int compareTo(PhoneComparableBean o) {
        return Long.compare(this.sumFlow, o.sumFlow);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();

    }
    
    // getter/setter

    @Override
    public String toString() {
        return String.format("%s %s %s", upFlow, downFlow, sumFlow);
    }
}

mapper

/**
  *  LongWritable:數(shù)據(jù)偏移量,可以理解為文本內(nèi)容所在的行號
  *  Text:默認采用 TextInputFromat,按照一行行內(nèi)容進行讀取
  *  PhoneComparableBean:將自定義的Bean作為key 進行返回
  * Text:把手機號碼作為value返回
  */
public class PhoneComparableMapper extends Mapper<LongWritable, Text, PhoneComparableBean, Text> {
    // 分割條件
    private static Pattern pattern=Pattern.compile("[\\s+]");
    private Text outKey = new Text();
    /**
      * 重寫 Mapper 中的Map方法
      */
     @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 獲取一行內(nèi)容中的分割數(shù)據(jù)
        List<String> list = pattern.splitAsStream(value.toString())
                .filter(s -> s.length()>0).collect(Collectors.toList());
        // 獲取手機號碼
        String phone = list.get(1);
        outKey.set(phone);
        // 由于域名可能為空,為了保證數(shù)據(jù)不能為空,從尾進行獲取
        // 上行流量
        long upFlow = Long.parseLong(list.get(list.size() - 3));
        // 下行流量
        long downFlow = Long.parseLong(list.get(list.size() - 2));
        // 將內(nèi)容包裝成對象,并作為key
        PhoneComparableBean outValue = new PhoneComparableBean(upFlow,downFlow);
        //返回 K,V
        context.write(outValue,outKey);
    }
}

reduce

/**
  *  PhoneComparableBean:Mapper的返回key,自定義的對象
  *  Text:Mapper的返回vlaue,手機號碼
  *  Text:Reducer返回的key,手機號碼
  *   PhoneComparableBean:Reducer返回的Value,自定義的對象
  */
public class PhoneComparableReduce extends Reducer<PhoneComparableBean, Text,Text,PhoneComparableBean> {

    @Override
    protected void reduce(PhoneComparableBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // 進行讀取寫出
        Iterator<Text> iterator = values.iterator();
        while (iterator.hasNext()) {
            context.write(iterator.next(), key);
        }
    }
}

main:主類(采用本地測試)

public class PhoneComparableDriver {

    public static void main(String[] args) {

        Configuration configuration = new Configuration();

        try {
            Job job = Job.getInstance(configuration);
            // mapper 返回的key、value數(shù)據(jù)類型
            job.setMapOutputKeyClass(PhoneComparableBean.class);
            job.setMapOutputValueClass(Text.class);
            
            // 調(diào)用的 mapper 及 reduce
            job.setMapperClass(PhoneComparableMapper.class);
            job.setReducerClass(PhoneComparableReduce.class);
            
            // 實際返回(Reduce)的 key、value 數(shù)據(jù)類型
            job.setOutputValueClass(PhoneComparableBean.class);
            job.setOutputKeyClass(Text.class);
            // 數(shù)據(jù)輸入路徑
            FileInputFormat.setInputPaths(job, new Path("D:\\development tool\\document\\temp\\input\\phone_data .txt"));
            // 數(shù)據(jù)輸出路徑
            String str = UUID.randomUUID().toString().replaceAll("-", "");
            FileOutputFormat.setOutputPath(job, new Path("D:\\development tool\\document\\temp\\output\\",str));
            // 提交任務(wù),等待執(zhí)行結(jié)果
            job.waitForCompletion(true);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

測試結(jié)果:

13768778790 120 120 240
13966251146 240 0 240
13729199489 240 0 240
13846544121 264 0 264
13470253144 180 180 360
13956435636 132 1512 1644
13590439668 1116 954 2070
13568436656 1116 954 2070
15959002129 1938 180 2118
18271575951 1527 2106 3633
13682846555 1938 2910 4848
84188413    4116 1432 5548
13560439638 918 4938 5856
15910133277 3156 2936 6092
13992314666 3008 3720 6728
15043685818 3659 3538 7197
13630577991 6960 690 7650
18390173782 9531 2412 11943
13568436656 2481 24681 27162
13736230513 2481 24681 27162
13975057813 11058 48243 59301
13509468723 7335 110349 117684

部分排序(區(qū)內(nèi)排序):

實現(xiàn)區(qū)內(nèi)排序的前提時需要進行先分區(qū),上面的數(shù)據(jù)不需要變動。

自定義分區(qū)

繼承 org.apache.hadoop.mapreduce.Partitioner 類,實現(xiàn) getPartition() 方法

/**
 * 自定義分區(qū)
 *  由于 分區(qū)階段 處理 Mapper -->分區(qū) --> 環(huán)形緩沖區(qū) 之間。
 *  所以 Partitioner<KEY,VALUE> 是 Mapper 返回的數(shù)據(jù)類型(PhoneComparableBean和Text)
 */
public class PhoneComparablePartitioner extends Partitioner<PhoneComparableBean, Text> {

    /**
     * 按照手機號碼開頭進行分區(qū)
     * 分別按照 134、135、137、139、其他號碼 分為5個區(qū)
     * @param phoneComparableBean
     * @param text
     * @param numPartitions
     * @return
     */
    @Override
    public int getPartition(PhoneComparableBean phoneComparableBean, Text text, int numPartitions) {
        String str = text.toString();
        if (str.startsWith("134")) {
            return 0;
        } else if (str.startsWith("135")) {
            return 1;
        } else if (str.startsWith("137")) {
            return 2;
        } else if (str.startsWith("139")) {
            return 3;
        }
        return 4;
    }
}

主類(main):

mapper 和 reduce 都不會有改動,這里只需要更改主類就行。

public class PhoneComparableDriver {

    public static void main(String[] args) {

        Configuration configuration = new Configuration();

        try {
            Job job = Job.getInstance(configuration);
            // mapper 返回的key、value數(shù)據(jù)類型
            job.setMapOutputKeyClass(PhoneComparableBean.class);
            job.setMapOutputValueClass(Text.class);
            
            // 調(diào)用的 mapper 及 reduce
            job.setMapperClass(PhoneComparableMapper.class);
            job.setReducerClass(PhoneComparableReduce.class);

            // 添加自定義分區(qū)類
            job.setPartitionerClass(PhoneComparablePartitioner.class);
            // 設(shè)置Reducetask個數(shù),該數(shù)量應(yīng)該與分區(qū)數(shù)量保持一致
            job.setNumReduceTasks(5);

            // 實際返回(Reduce)的 key、value 數(shù)據(jù)類型
            job.setOutputValueClass(PhoneComparableBean.class);
            job.setOutputKeyClass(Text.class);
            // 數(shù)據(jù)輸入路徑
            FileInputFormat.setInputPaths(job, new Path("D:\\development tool\\document\\temp\\input\\phone_data .txt"));
            // 數(shù)據(jù)輸出路徑
            String str = UUID.randomUUID().toString().replaceAll("-", "");
            FileOutputFormat.setOutputPath(job, new Path("D:\\development tool\\document\\temp\\output\\",str));
            // 提交任務(wù),等待執(zhí)行結(jié)果
            job.waitForCompletion(true);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

運行結(jié)果:


區(qū)類排序運行結(jié)果

生成五個文件,就不一個個區(qū)查看了,在這里查看一下 135 開頭的數(shù)據(jù)分區(qū)號為1(part-r-00001的文件)。

13590439668 1116 954 2070
13568436656 1116 954 2070
13560439638 918 4938 5856
13568436656 2481 24681 27162
13509468723 7335 110349 117684
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容