本文通過(guò)一個(gè)demon向讀者展示,如何用spark 實(shí)現(xiàn)word count 功能。
創(chuàng)建項(xiàng)目
創(chuàng)建maven項(xiàng)目,添加spark核心依賴(lài)
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.0</version>
</dependency>
如果使用java8版本還需要加入
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
否則讀取文件時(shí)候出現(xiàn)異常
JavaRDD rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
java.lang.ArrayIndexOutOfBoundsException: 10582
word count 代碼實(shí)現(xiàn)
main 方法
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("WordCountDemon");
//設(shè)置master屬性
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
wordCount1(sc);
}
實(shí)現(xiàn)計(jì)數(shù)方法
public static void wordCount1(JavaSparkContext sc)
{
JavaRDD<String> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
//壓扁
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String[] arr = s.split(" ");
for(String ss : arr){
list.add(ss) ;
}
return list.iterator() ;
}
});
//映射
JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
//聚合
JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//收集,打印輸出
for(Object o : rdd4.collect()){
System.out.println(o);
}
}
也可以采用lambda 表達(dá)式更優(yōu)雅的實(shí)現(xiàn)
public void wordCount2(JavaSparkContext sc){
JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt")
.flatMap( s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((v1,v2)-> (v1+v2));
System.out.println();
rdd1.collect().forEach(t-> System.out.println(t));
}
結(jié)果如下
(are,1)
(you,1)
(how,1)
(,1)
(river,3)
(hello,3)
(boy,1)
(good,1)
發(fā)現(xiàn)將key 值為 blank 的也統(tǒng)計(jì)了,我們可以用filter去掉不想要的結(jié)果
public static void wordCount2(JavaSparkContext sc){
JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt")
.flatMap( s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(s -> new Tuple2<>(s, 1))
.filter(t-> StringUtils.isNoneBlank(t._1))
.reduceByKey((v1,v2)-> (v1+v2));
System.out.println();
rdd1.collect().forEach(t-> System.out.println(t));
}
看到結(jié)果已經(jīng)ok啦
(are,1)
(you,1)
(how,1)
(river,3)
(hello,3)
(boy,1)
(good,1)
謝謝你的閱讀。