Spark實(shí)驗(yàn):SparkWordCount第一個(gè)例子

一 實(shí)驗(yàn)?zāi)康?/h1>

通過本實(shí)驗(yàn),了解Scala語言的特點(diǎn)、理解Scala與Java聯(lián)系、熟悉Scala與Java的語法區(qū)別,能夠編寫SparkWordCount程序,并學(xué)會用Spark-shell與編輯器兩種方式執(zhí)行程序。

二 實(shí)驗(yàn)內(nèi)容

本實(shí)驗(yàn)概述了Scala語言的特點(diǎn)并多角度比較Java與Scala的語法特點(diǎn),包括HelloWorld代碼、構(gòu)造函數(shù)(屬性設(shè)置)、WordCount代碼例子,此外,還加入了Java8版本新特性Lamda表達(dá)式實(shí)現(xiàn)WordCount,以供參考比較學(xué)習(xí),最后用Spark-shell提交與編輯器兩種方式實(shí)現(xiàn)代碼的提交與執(zhí)行。

三 實(shí)驗(yàn)要求

以小組為單元進(jìn)行實(shí)驗(yàn),每小組5人,小組自主協(xié)商選一位組長,由組長安排和分配實(shí)驗(yàn)任務(wù),做實(shí)驗(yàn)前應(yīng)確保Spark集群部署正確。

四 準(zhǔn)備知識

4.1 Scala簡介

Scala是一門多范式的編程語言,運(yùn)行在Java虛擬機(jī)(JVM)上,Scala 源代碼被編譯成Java字節(jié)碼,可以輕松實(shí)現(xiàn)和豐富的 Java 類庫互聯(lián)互通,具有面向?qū)ο螅∣O)以及函數(shù)式編程(FP)的特性。它具備動態(tài)語言的靈活簡潔,同時(shí)又保留了靜態(tài)類型帶來的安全保障和執(zhí)行效率。強(qiáng)大的抽象能力,使Scala不僅能處理腳本化的臨時(shí)任務(wù),還能處理高并發(fā)場景下的分布式互聯(lián)網(wǎng)大數(shù)據(jù)應(yīng)用。

4.2 Scala與Java的語法比較

4.2.1 HelloWorld的比較

【1】Java版本HelloWorld:

public class HelloWorld {
    public static void main(String[] args) {
      System.out.println("Hello World!");
 }
}   

【2】Scala版本HelloWorld:

object HelloWorld {
    def main(args: Array[String]): Unit = {
      println("Hello World!")
 }
}  

4.2.2 構(gòu)造函數(shù)的比較

【1】Java版本構(gòu)造函數(shù):

class Person {
    private int age;
    private String name;
    public Person(int age, String name) {
      this.age = age;
      this.name = name;
 }
}

【2】Scala版本構(gòu)造函數(shù):

class Person(age: Int, name: String)

4.2.3 WordCount例子比較Java與Scala語法

【1】Java版本W(wǎng)ordCount實(shí)現(xiàn):

public class WordCount {

 public static void main(String[] args) {

 SparkConf conf = new SparkConf().setAppName("WordCount")

 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaRDD<String> lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");

 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

 private static final long serialVersionUID = 1L;

 public Iterable<String> call(String line) throws Exception {

 return Arrays.asList(line.split(" "));

 }

 });

 JavaPairRDD<String, Integer> pairs = words.mapToPair(

 new PairFunction<String, String, Integer>() {

 private static final long serialVersionUID = 1L;

 public Tuple2<String, Integer> call(String word) throws Exception {

 return new Tuple2<String, Integer>(word, 1);

 }

 });

 JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

 new Function2<Integer, Integer, Integer>() {

 private static final long serialVersionUID = 1L;

 public Integer call(Integer v1, Integer v2) throws Exception {

 return v1 + v2;

 }

 });

 wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

 private static final long serialVersionUID = 1L;

 public void call(Tuple2<String, Integer> wordCount) throws Exception {

 System.out.println(wordCount._1 + "出現(xiàn)了" + wordCount._2 + "次");   

 }

 });

 sc.close();

 }

}

【2】Scala版本W(wǎng)ordCount實(shí)現(xiàn):

object WordCount {

 def main(args: Array[String]) {

 val conf = new SparkConf().setAppName("WordCount");

 val sc = new SparkContext(conf)

 val lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");

 val words = lines.flatMap { line => line.split(" ") }

 val pairs = words.map { word => (word, 1) }

 val wordCounts = pairs.reduceByKey { _ + _ }

 wordCounts.foreach(wordCount => println(wordCount._1 + "出現(xiàn)了" + wordCount._2 + "次")) 

 }

}

主要語法解釋:

word => (word, 1)等同于java版本的匿名內(nèi)部類中,傳入一個(gè)String參數(shù),返回一個(gè)為<String,Integer>類型的對象

_ + _此處等同于java版本中,傳入兩個(gè)Integer參數(shù),合成一個(gè)Integer類型的對象

Java8新特性的Lamda表達(dá)式的實(shí)現(xiàn)比較:

public class WordCount {

 public static void main(String[] args) {

 SparkConf conf = new SparkConf().setAppName("WordCount");

 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaRDD<String> lines= sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");

 JavaRDD<String> words = lines.flatMap(

 line -> Arrays.asList(line.split(" ")).iterator());

 JavaPairRDD<String, Integer> counts = words

 .mapToPair(word -> new Tuple2<String, Integer>(word, 1))

 .reduceByKey((x, y)-> x+y);

 counts.foreach(wordCount -> System.out.println(wordCount._1() + ":" + wordCount._2()));

 spark.stop();

 }

}

五 實(shí)驗(yàn)步驟

5.1 Spark-shell方式

5.1.1 準(zhǔn)備測試文件wordcount.txt

前提:確保HDFS已啟動,用下面指令查看數(shù)據(jù)是否存在,如不存在則參考實(shí)驗(yàn)19 自行建立并上傳至HDFS。

hadoop fs -cat /wordcount.txt
[圖片上傳失敗...(image-c59fc0-1540979695732)]

5.1.2 啟動Spark-shell(需配置好環(huán)境變量)

spark-shell --master spark://master-30405-30406-30407-h81vl:7077

5.1.3 執(zhí)行程序(可三步一起執(zhí)行,也可以分開執(zhí)行)

val file=sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt")

val count=file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

count.collect()               

說明:sc為Spark-shell默認(rèn)的SparkContext對象。Spark程序必須做的第一件事是創(chuàng)建一個(gè)SparkContext對象,該對象告訴Spark如何訪問集群。要創(chuàng)建SparkContext,您首先需要構(gòu)建一個(gè)包含有關(guān)應(yīng)用程序信息的SparkConf對象。每個(gè)JVM只有一個(gè)SparkContext可能是活動的。 在創(chuàng)建新的SparkContext之前,必須先停止活動狀態(tài)的SparkContext。

[圖片上傳失敗...(image-10070f-1540979695732)]

5.1.4 執(zhí)行結(jié)果

[圖片上傳失敗...(image-6e0b0d-1540979695732)]

5.2 編輯器方式(IDEA編輯器)

5.2.1 將4.2.3相應(yīng)的代碼拷貝到IDEA編輯器(Eclipse、IDEA等)

5.2.2 打包用xftp上傳到master執(zhí)行即可(參考附加實(shí)驗(yàn):項(xiàng)目打包)

六 總結(jié)

本實(shí)驗(yàn)介紹了Spark的第一個(gè)例子,應(yīng)仔細(xì)分析比較實(shí)驗(yàn)提供的Java與Scala例子,做到融會貫通,提供的Java8新特性Lamda表達(dá)式例子可供以后學(xué)習(xí)參考,注意自己所安裝的Java版本。還應(yīng)學(xué)會去比較Spark-shell與Spark-submit兩種方式執(zhí)行代碼的不通。本實(shí)驗(yàn)承上啟下,非常重要,必須多實(shí)操幾遍。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容