一 實(shí)驗(yàn)?zāi)康?/h1>
通過本實(shí)驗(yàn),了解Scala語言的特點(diǎn)、理解Scala與Java聯(lián)系、熟悉Scala與Java的語法區(qū)別,能夠編寫SparkWordCount程序,并學(xué)會用Spark-shell與編輯器兩種方式執(zhí)行程序。
二 實(shí)驗(yàn)內(nèi)容
本實(shí)驗(yàn)概述了Scala語言的特點(diǎn)并多角度比較Java與Scala的語法特點(diǎn),包括HelloWorld代碼、構(gòu)造函數(shù)(屬性設(shè)置)、WordCount代碼例子,此外,還加入了Java8版本新特性Lamda表達(dá)式實(shí)現(xiàn)WordCount,以供參考比較學(xué)習(xí),最后用Spark-shell提交與編輯器兩種方式實(shí)現(xiàn)代碼的提交與執(zhí)行。
三 實(shí)驗(yàn)要求
以小組為單元進(jìn)行實(shí)驗(yàn),每小組5人,小組自主協(xié)商選一位組長,由組長安排和分配實(shí)驗(yàn)任務(wù),做實(shí)驗(yàn)前應(yīng)確保Spark集群部署正確。
四 準(zhǔn)備知識
4.1 Scala簡介
Scala是一門多范式的編程語言,運(yùn)行在Java虛擬機(jī)(JVM)上,Scala 源代碼被編譯成Java字節(jié)碼,可以輕松實(shí)現(xiàn)和豐富的 Java 類庫互聯(lián)互通,具有面向?qū)ο螅∣O)以及函數(shù)式編程(FP)的特性。它具備動態(tài)語言的靈活簡潔,同時(shí)又保留了靜態(tài)類型帶來的安全保障和執(zhí)行效率。強(qiáng)大的抽象能力,使Scala不僅能處理腳本化的臨時(shí)任務(wù),還能處理高并發(fā)場景下的分布式互聯(lián)網(wǎng)大數(shù)據(jù)應(yīng)用。
4.2 Scala與Java的語法比較
4.2.1 HelloWorld的比較
【1】Java版本HelloWorld:
public class HelloWorld {
public static void main(String[] args) {
System.out.println("Hello World!");
}
}
【2】Scala版本HelloWorld:
object HelloWorld {
def main(args: Array[String]): Unit = {
println("Hello World!")
}
}
4.2.2 構(gòu)造函數(shù)的比較
【1】Java版本構(gòu)造函數(shù):
class Person {
private int age;
private String name;
public Person(int age, String name) {
this.age = age;
this.name = name;
}
}
【2】Scala版本構(gòu)造函數(shù):
class Person(age: Int, name: String)
4.2.3 WordCount例子比較Java與Scala語法
【1】Java版本W(wǎng)ordCount實(shí)現(xiàn):
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount")
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + "出現(xiàn)了" + wordCount._2 + "次");
}
});
sc.close();
}
}
【2】Scala版本W(wǎng)ordCount實(shí)現(xiàn):
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount");
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
val words = lines.flatMap { line => line.split(" ") }
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey { _ + _ }
wordCounts.foreach(wordCount => println(wordCount._1 + "出現(xiàn)了" + wordCount._2 + "次"))
}
}
主要語法解釋:
word => (word, 1)等同于java版本的匿名內(nèi)部類中,傳入一個(gè)String參數(shù),返回一個(gè)為<String,Integer>類型的對象
_ + _此處等同于java版本中,傳入兩個(gè)Integer參數(shù),合成一個(gè)Integer類型的對象
Java8新特性的Lamda表達(dá)式的實(shí)現(xiàn)比較:
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines= sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
JavaRDD<String> words = lines.flatMap(
line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> counts = words
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((x, y)-> x+y);
counts.foreach(wordCount -> System.out.println(wordCount._1() + ":" + wordCount._2()));
spark.stop();
}
}
五 實(shí)驗(yàn)步驟
5.1 Spark-shell方式
5.1.1 準(zhǔn)備測試文件wordcount.txt
前提:確保HDFS已啟動,用下面指令查看數(shù)據(jù)是否存在,如不存在則參考實(shí)驗(yàn)19 自行建立并上傳至HDFS。
hadoop fs -cat /wordcount.txt
[圖片上傳失敗...(image-c59fc0-1540979695732)]
5.1.2 啟動Spark-shell(需配置好環(huán)境變量)
spark-shell --master spark://master-30405-30406-30407-h81vl:7077
5.1.3 執(zhí)行程序(可三步一起執(zhí)行,也可以分開執(zhí)行)
val file=sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt")
val count=file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
count.collect()
說明:sc為Spark-shell默認(rèn)的SparkContext對象。Spark程序必須做的第一件事是創(chuàng)建一個(gè)SparkContext對象,該對象告訴Spark如何訪問集群。要創(chuàng)建SparkContext,您首先需要構(gòu)建一個(gè)包含有關(guān)應(yīng)用程序信息的SparkConf對象。每個(gè)JVM只有一個(gè)SparkContext可能是活動的。 在創(chuàng)建新的SparkContext之前,必須先停止活動狀態(tài)的SparkContext。
[圖片上傳失敗...(image-10070f-1540979695732)]
5.1.4 執(zhí)行結(jié)果
[圖片上傳失敗...(image-6e0b0d-1540979695732)]
5.2 編輯器方式(IDEA編輯器)
5.2.1 將4.2.3相應(yīng)的代碼拷貝到IDEA編輯器(Eclipse、IDEA等)
5.2.2 打包用xftp上傳到master執(zhí)行即可(參考附加實(shí)驗(yàn):項(xiàng)目打包)
六 總結(jié)
本實(shí)驗(yàn)介紹了Spark的第一個(gè)例子,應(yīng)仔細(xì)分析比較實(shí)驗(yàn)提供的Java與Scala例子,做到融會貫通,提供的Java8新特性Lamda表達(dá)式例子可供以后學(xué)習(xí)參考,注意自己所安裝的Java版本。還應(yīng)學(xué)會去比較Spark-shell與Spark-submit兩種方式執(zhí)行代碼的不通。本實(shí)驗(yàn)承上啟下,非常重要,必須多實(shí)操幾遍。