spark - java 編程實(shí)現(xiàn)Word count

本文通過(guò)一個(gè)demon向讀者展示,如何用spark 實(shí)現(xiàn)word count 功能。

創(chuàng)建項(xiàng)目

創(chuàng)建maven項(xiàng)目,添加spark核心依賴(lài)

   <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>2.4.0</version>
    </dependency>

如果使用java8版本還需要加入

   <dependency>
        <groupId>com.thoughtworks.paranamer</groupId>
        <artifactId>paranamer</artifactId>
        <version>2.8</version>
    </dependency>

否則讀取文件時(shí)候出現(xiàn)異常

JavaRDD rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
java.lang.ArrayIndexOutOfBoundsException: 10582

word count 代碼實(shí)現(xiàn)

main 方法
    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.setAppName("WordCountDemon");
        //設(shè)置master屬性
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        wordCount1(sc);

    }

實(shí)現(xiàn)計(jì)數(shù)方法

public static void wordCount1(JavaSparkContext sc)
    {

        JavaRDD<String> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
        //壓扁
        JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                List<String> list = new ArrayList<String>();
                String[] arr = s.split(" ");
                for(String ss : arr){
                    list.add(ss) ;
                }
                return list.iterator() ;
            }
        });
        //映射
        JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s,1);
            }
        });
        //聚合
        JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //收集,打印輸出
        for(Object o : rdd4.collect()){
            System.out.println(o);
        }
    }

也可以采用lambda 表達(dá)式更優(yōu)雅的實(shí)現(xiàn)


    public void wordCount2(JavaSparkContext sc){
        JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt")
                .flatMap( s -> Arrays.asList(s.split(" ")).iterator())
                .mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((v1,v2)-> (v1+v2));
        System.out.println();
        rdd1.collect().forEach(t-> System.out.println(t));

    }

結(jié)果如下

(are,1)
(you,1)
(how,1)
(,1)
(river,3)
(hello,3)
(boy,1)
(good,1)

發(fā)現(xiàn)將key 值為 blank 的也統(tǒng)計(jì)了,我們可以用filter去掉不想要的結(jié)果

   public static void wordCount2(JavaSparkContext sc){
        JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt")
                .flatMap( s -> Arrays.asList(s.split(" ")).iterator())
                .mapToPair(s -> new Tuple2<>(s, 1))
                .filter(t-> StringUtils.isNoneBlank(t._1))
                .reduceByKey((v1,v2)-> (v1+v2));
        System.out.println();
        rdd1.collect().forEach(t-> System.out.println(t));

    }

看到結(jié)果已經(jīng)ok啦

(are,1)
(you,1)
(how,1)
(river,3)
(hello,3)
(boy,1)
(good,1)

謝謝你的閱讀。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容