開(kāi)始學(xué)習(xí)用scala寫spark啦,首先當(dāng)然是介紹一下RDD的操作啦:
1、創(chuàng)建SparkSession
在Spark2.1中,SparkSession包括了SparkContext,若想使用SparkContext,用sc.sparkContext即可
//創(chuàng)建一個(gè)SparkSession,在Spark2.1中,SparkSession包括了SparkContext,若想使用SparkContext,用sc.sparkContext即可
val sc = SparkSession
.builder()
.appName("ttt")
//.enableHiveSupport()
.getOrCreate()
2、RDD創(chuàng)建
//創(chuàng)建intRDD
val intRDD = sc.sparkContext.parallelize(List(3,1,2,5,5))
print("打印intRDD的結(jié)果:"+intRDD.collect().mkString(","))
val stringRDD = sc.sparkContext.parallelize(List("Apple","Orange","Banana","Grape","Apple"))
println("打印stringRDD的結(jié)果"+stringRDD.collect().mkString(","))
3、單個(gè)RDD轉(zhuǎn)換運(yùn)算
println("*************************單個(gè)RDD轉(zhuǎn)換運(yùn)算**********************")
def addOne(x:Int):Int =
{
return (x+1)
}
//map運(yùn)算:對(duì)RDD中每一個(gè)元素做一個(gè)轉(zhuǎn)換操作,生成一個(gè)新的RDD
println("使用具體的函數(shù)完成map運(yùn)算:"+intRDD.map(addOne).collect().mkString(","))
println("使用匿名函數(shù)完成map運(yùn)算:"+intRDD.map(x=>x+1).collect().mkString(","))
println("使用匿名函數(shù)和匿名參數(shù)完成map運(yùn)算:"+intRDD.map(_+1).collect().mkString(","))
println("使用匿名函數(shù)完成map運(yùn)算:"+stringRDD.map(x=>"fruit"+x).collect().mkString(","))
//filter運(yùn)算:對(duì)RDD中每一個(gè)元素進(jìn)行篩選,生成一個(gè)新的RDD
println("使用匿名函數(shù)篩選intRDD中小于5的元素:"+intRDD.filter(x=>x < 5).collect().mkString(","))
println("使用匿名函數(shù)和匿名參數(shù)篩選intRDD中小于5的元素:"+intRDD.filter(_ < 5).collect().mkString(","))
println("使用匿名函數(shù)篩選stringRDD中包含ra的元素:"+stringRDD.filter(x=>x.contains('ra)).collect().mkString(","))
//distinct運(yùn)算:對(duì)RDD中元素進(jìn)行去重
println("對(duì)intRDD元素進(jìn)行去重"+intRDD.distinct().collect().mkString(","))
println("對(duì)StringRDD元素進(jìn)行去重"+stringRDD.distinct().collect().mkString(","))
//randomSplit運(yùn)算,按照指定的比例將RDD進(jìn)行劃分
val sRDD = intRDD.randomSplit(Array(0.4,0.6))
println("分割后第一個(gè)RDD為:"+sRDD(0).collect().mkString(","))
println("分割后第二個(gè)RDD為:"+sRDD(1).collect().mkString(","))
//groupBy運(yùn)算:可以按照傳入的匿名函數(shù)規(guī)則,將數(shù)據(jù)分為多個(gè)Array,返回Array[(String,Iterable[Int])]
val gRDD = intRDD.groupBy(x => {if(x%2==0) "even" else "odd"}).collect()
println("偶數(shù)數(shù)組為:"+gRDD(0))
println("奇數(shù)數(shù)組為:"+gRDD(1))
4、多個(gè)RDD轉(zhuǎn)換運(yùn)算
println("*************************多個(gè)RDD轉(zhuǎn)換運(yùn)算**********************")
val intRDD1 = sc.sparkContext.parallelize(List(3,1,2,5,5))
val intRDD2 = sc.sparkContext.parallelize(List(5,6))
val intRDD3 = sc.sparkContext.parallelize(List(2,7))
//并集運(yùn)算,并不會(huì)去重,兩種方式實(shí)現(xiàn)
println("使用union實(shí)現(xiàn)并集運(yùn)算:"+intRDD1.union(intRDD2).union(intRDD3).collect().mkString(","))
println("使用++實(shí)現(xiàn)并集運(yùn)算:"+(intRDD1 ++ intRDD2 ++ intRDD3).collect().mkString(","))
//交集運(yùn)算
println("使用intersection實(shí)現(xiàn)交集運(yùn)算:"+intRDD1.intersection(intRDD2).collect().mkString(","))
//差集運(yùn)算
println("使用subtract實(shí)現(xiàn)差集運(yùn)算:"+intRDD1.subtract(intRDD2).collect().mkString(","))
//笛卡爾積運(yùn)算
println("使用cartesian實(shí)現(xiàn)笛卡爾積運(yùn)算:"+intRDD1.cartesian(intRDD2).collect().mkString(","))
5、基本動(dòng)作運(yùn)算
println("*************************基本動(dòng)作運(yùn)算**********************")
//讀取第1條數(shù)據(jù)
println("intRDD第一條數(shù)據(jù)為:"+intRDD.first())
//讀取前n條數(shù)據(jù)
println("intRDD的前兩條數(shù)據(jù)為:"+intRDD.take(2))
//按照從小到大排序,并讀取前N條數(shù)據(jù)
println("intRDD從小到大排序的前3條數(shù)據(jù)為:"+intRDD.takeOrdered(3))
//按照從大到小排,并讀取前N條數(shù)據(jù)
println("intRDD從大到小排序的前3條數(shù)據(jù)為:"+intRDD.takeOrdered(3)(Ordering[Int].reverse))
//返回RDD的統(tǒng)計(jì)數(shù)據(jù)
println("intRDD的統(tǒng)計(jì)數(shù)據(jù)為:"+intRDD.stats())
//返回最小值
println("intRDD的最小值為:"+intRDD.min())
//返回最大值
println("intRDD的最大值為:"+intRDD.max())
//返回標(biāo)準(zhǔn)差
println("intRDD的標(biāo)準(zhǔn)差為:"+intRDD.stdev())
//返回計(jì)數(shù)
println("intRDD的計(jì)數(shù)為:"+intRDD.count())
//返回求和值
println("intRDD中元素的和為:"+intRDD.sum())
//返回平均值
println("intRDD中元素的平均值為:"+intRDD.mean())
6、基本key-value轉(zhuǎn)換運(yùn)算
println("*************************基本key-value轉(zhuǎn)換運(yùn)算**********************")
//創(chuàng)建一個(gè)key-valueRDD
val kvRDD = sc.sparkContext.parallelize(List((3,4),(3,6),(5,6),(1,2)))
//得到key的集合
println("kvRDD中key的集合為:"+kvRDD.keys.collect().mkString(","))
//得到value的集合
println("kvRDD中value的集合為:"+kvRDD.values.collect().mkString(","))
//使用filter進(jìn)行過(guò)濾
println("kvRDD中key小于5的元素有:"+kvRDD.filter{case (key,value)=>key<5}.collect().mkString(","))
println("kvRDD中value小于5的元素有:"+kvRDD.filter{case (key,value) => value<5}.collect().mkString(","))
//使用mapValues函數(shù)對(duì)每個(gè)value進(jìn)行類似于map的運(yùn)算,返回一個(gè)新的RDD
println("kvRDD中value平方后的結(jié)果為:"+kvRDD.mapValues(x => x * x).collect().mkString(","))
//sortByKey按照key進(jìn)行排序,默認(rèn)參數(shù)為true,即升序排序,false為降序排序
println("kvRDD的升序排序結(jié)果為:"+kvRDD.sortByKey(true).collect().mkString(","))
println("kvRDD的升序排序結(jié)果為:"+kvRDD.sortByKey().collect().mkString(","))
println("kvRDD的降序排序結(jié)果為:"+kvRDD.sortByKey(false).collect().mkString(","))
//reduceByKey會(huì)對(duì)相同key值的元素進(jìn)行操作,比如求和
println("使用匿名函數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:"+kvRDD.reduceByKey((x,y)=>x+y).collect())
println("使用匿名函數(shù)和匿名參數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:"+kvRDD.reduceByKey(_+_).collect())
7、多個(gè)key-value轉(zhuǎn)換運(yùn)算
println("*************************多個(gè)key-value轉(zhuǎn)換運(yùn)算**********************")
val kvRDD1 = sc.sparkContext.parallelize(List((3,4),(3,6),(5,6),(1,2)))
val kvRDD2 = sc.sparkContext.parallelize(List((3,8)))
//join運(yùn)算類似于內(nèi)鏈接
println("兩個(gè)RDD內(nèi)鏈接的結(jié)果為:"+kvRDD1.join(kvRDD2).collect().mkString(","))
//leftOuterJoin是左外鏈接
println("兩個(gè)RDD左外鏈接的結(jié)果為:"+kvRDD1.leftOuterJoin(kvRDD2).collect().mkString(","))
//rightOuterJoin是右外鏈接
println("兩個(gè)RDD右外鏈接的結(jié)果為:"+kvRDD1.rightOuterJoin(kvRDD2).collect().mkString(","))
//subtractByKey運(yùn)算會(huì)刪除相同key值的元素
println("刪除重復(fù)key值元素后結(jié)果為:"+kvRDD1.subtractByKey(kvRDD2).collect().mkString(","))
8、key-value動(dòng)作運(yùn)算
println("*************************key-value動(dòng)作運(yùn)算**********************")
//得到第一條數(shù)據(jù)
println("kvRDD的第一條數(shù)據(jù)為:"+kvRDD.first())
//得到前n條數(shù)據(jù)
println("kvRDD的前兩條數(shù)據(jù)為:"+kvRDD.take(2))
//得到第一條數(shù)據(jù)的key和value值
val kvFirst = kvRDD.first()
println("kvRDD第一條數(shù)據(jù)的key為:"+kvFirst._1)
println("kvRDD第一條數(shù)據(jù)的value為:"+kvFirst._2)
//countByKey得到每一個(gè)Key值的條數(shù)
println("kvRDD每個(gè)key值對(duì)應(yīng)元素的數(shù)量為:"+kvRDD.countByKey().mkString(","))
//創(chuàng)建一個(gè)key-value對(duì)照表,如果有相同的key值,只返回最后一個(gè)的value值
var KV = kvRDD.collectAsMap()
println("kvRDD中3對(duì)應(yīng)的value為:"+KV(3))
println("kvRDD中1對(duì)應(yīng)的value為:"+KV(1))
//使用lookup函數(shù)并輸入key值來(lái)得到value值,與上面不同的是,如果key相同,會(huì)返回所有對(duì)應(yīng)的value值
println("kvRDD中3對(duì)應(yīng)的value為:"+kvRDD.lookup(3))
println("kvRDD中1對(duì)應(yīng)的value為:"+kvRDD.lookup(1))
9、完整輸出
打印intRDD的結(jié)果:3,1,2,5,5
打印stringRDD的結(jié)果Apple,Orange,Banana,Grape,Apple
*************************單個(gè)RDD轉(zhuǎn)換運(yùn)算**********************
使用具體的函數(shù)完成map運(yùn)算:4,2,3,6,6
使用匿名函數(shù)完成map運(yùn)算:4,2,3,6,6
使用匿名函數(shù)和匿名參數(shù)完成map運(yùn)算:4,2,3,6,6
使用匿名函數(shù)完成map運(yùn)算:fruitApple,fruitOrange,fruitBanana,fruitGrape,fruitApple
使用匿名函數(shù)篩選intRDD中小于5的元素:3,1,2
使用匿名函數(shù)和匿名參數(shù)篩選intRDD中小于5的元素:3,1,2
使用匿名函數(shù)篩選stringRDD中包含ra的元素:
對(duì)intRDD元素進(jìn)行去重2,1,3,5
對(duì)StringRDD元素進(jìn)行去重Orange,Apple,Grape,Banana
分割后第一個(gè)RDD為:1,2,5
分割后第二個(gè)RDD為:3,5
偶數(shù)數(shù)組為:(even,CompactBuffer(2))
奇數(shù)數(shù)組為:(odd,CompactBuffer(3, 1, 5, 5))
*************************多個(gè)RDD轉(zhuǎn)換運(yùn)算**********************
使用union實(shí)現(xiàn)并集運(yùn)算:3,1,2,5,5,5,6,2,7
使用++實(shí)現(xiàn)并集運(yùn)算:3,1,2,5,5,5,6,2,7
使用intersection實(shí)現(xiàn)交集運(yùn)算:5
使用subtract實(shí)現(xiàn)差集運(yùn)算:2,1,3
使用cartesian實(shí)現(xiàn)笛卡爾積運(yùn)算:(3,5),(1,5),(3,6),(1,6),(2,5),(5,5),(5,5),(2,6),(5,6),(5,6)
*************************基本動(dòng)作運(yùn)算**********************
intRDD第一條數(shù)據(jù)為:3
intRDD的前兩條數(shù)據(jù)為:[I@65a83d13
intRDD從小到大排序的前3條數(shù)據(jù)為:[I@7b2990cf
intRDD從大到小排序的前3條數(shù)據(jù)為:[I@5ad5dac5
intRDD的統(tǒng)計(jì)數(shù)據(jù)為:(count: 5, mean: 3.200000, stdev: 1.600000, max: 5.000000, min: 1.000000)
intRDD的最小值為:1
intRDD的最大值為:5
intRDD的標(biāo)準(zhǔn)差為:1.6
intRDD的計(jì)數(shù)為:5
intRDD中元素的和為:16.0
intRDD中元素的平均值為:3.2
*************************基本key-value轉(zhuǎn)換運(yùn)算**********************
kvRDD中key的集合為:3,3,5,1
kvRDD中value的集合為:4,6,6,2
kvRDD中key小于5的元素有:(3,4),(3,6),(1,2)
kvRDD中value小于5的元素有:(3,4),(1,2)
kvRDD中value平方后的結(jié)果為:(3,16),(3,36),(5,36),(1,4)
kvRDD的升序排序結(jié)果為:(1,2),(3,4),(3,6),(5,6)
kvRDD的升序排序結(jié)果為:(1,2),(3,4),(3,6),(5,6)
kvRDD的降序排序結(jié)果為:(5,6),(3,4),(3,6),(1,2)
使用匿名函數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:[Lscala.Tuple2;@42cbf60
使用匿名函數(shù)和匿名參數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:[Lscala.Tuple2;@70b78e39
*************************多個(gè)key-value轉(zhuǎn)換運(yùn)算**********************
兩個(gè)RDD內(nèi)鏈接的結(jié)果為:(3,(4,8)),(3,(6,8))
兩個(gè)RDD左外鏈接的結(jié)果為:(1,(2,None)),(3,(4,Some(8))),(3,(6,Some(8))),(5,(6,None))
兩個(gè)RDD右外鏈接的結(jié)果為:(3,(Some(4),8)),(3,(Some(6),8))
刪除重復(fù)key值元素后結(jié)果為:(1,2),(5,6)
*************************key-value動(dòng)作運(yùn)算**********************
kvRDD的第一條數(shù)據(jù)為:(3,4)
kvRDD的前兩條數(shù)據(jù)為:[Lscala.Tuple2;@f28fb14
kvRDD第一條數(shù)據(jù)的key為:3
kvRDD第一條數(shù)據(jù)的value為:4
kvRDD每個(gè)key值對(duì)應(yīng)元素的數(shù)量為:1 -> 1,3 -> 2,5 -> 1
kvRDD中3對(duì)應(yīng)的value為:6
kvRDD中1對(duì)應(yīng)的value為:2
kvRDD中3對(duì)應(yīng)的value為:WrappedArray(4, 6)
kvRDD中1對(duì)應(yīng)的value為:WrappedArray(2)