Scala版RDD基本操作

開(kāi)始學(xué)習(xí)用scala寫spark啦,首先當(dāng)然是介紹一下RDD的操作啦:

1、創(chuàng)建SparkSession

在Spark2.1中,SparkSession包括了SparkContext,若想使用SparkContext,用sc.sparkContext即可

//創(chuàng)建一個(gè)SparkSession,在Spark2.1中,SparkSession包括了SparkContext,若想使用SparkContext,用sc.sparkContext即可
    val sc = SparkSession
      .builder()
      .appName("ttt")
      //.enableHiveSupport()
      .getOrCreate()

2、RDD創(chuàng)建

      //創(chuàng)建intRDD
    val intRDD = sc.sparkContext.parallelize(List(3,1,2,5,5))
    print("打印intRDD的結(jié)果:"+intRDD.collect().mkString(","))

    val stringRDD = sc.sparkContext.parallelize(List("Apple","Orange","Banana","Grape","Apple"))
    println("打印stringRDD的結(jié)果"+stringRDD.collect().mkString(","))

3、單個(gè)RDD轉(zhuǎn)換運(yùn)算

    println("*************************單個(gè)RDD轉(zhuǎn)換運(yùn)算**********************")
    def addOne(x:Int):Int =
    {
      return (x+1)
    }
    //map運(yùn)算:對(duì)RDD中每一個(gè)元素做一個(gè)轉(zhuǎn)換操作,生成一個(gè)新的RDD
    println("使用具體的函數(shù)完成map運(yùn)算:"+intRDD.map(addOne).collect().mkString(","))
    println("使用匿名函數(shù)完成map運(yùn)算:"+intRDD.map(x=>x+1).collect().mkString(","))
    println("使用匿名函數(shù)和匿名參數(shù)完成map運(yùn)算:"+intRDD.map(_+1).collect().mkString(","))

    println("使用匿名函數(shù)完成map運(yùn)算:"+stringRDD.map(x=>"fruit"+x).collect().mkString(","))

    //filter運(yùn)算:對(duì)RDD中每一個(gè)元素進(jìn)行篩選,生成一個(gè)新的RDD
    println("使用匿名函數(shù)篩選intRDD中小于5的元素:"+intRDD.filter(x=>x < 5).collect().mkString(","))
    println("使用匿名函數(shù)和匿名參數(shù)篩選intRDD中小于5的元素:"+intRDD.filter(_ < 5).collect().mkString(","))
    println("使用匿名函數(shù)篩選stringRDD中包含ra的元素:"+stringRDD.filter(x=>x.contains('ra)).collect().mkString(","))

    //distinct運(yùn)算:對(duì)RDD中元素進(jìn)行去重
    println("對(duì)intRDD元素進(jìn)行去重"+intRDD.distinct().collect().mkString(","))
    println("對(duì)StringRDD元素進(jìn)行去重"+stringRDD.distinct().collect().mkString(","))

    //randomSplit運(yùn)算,按照指定的比例將RDD進(jìn)行劃分
    val sRDD = intRDD.randomSplit(Array(0.4,0.6))
    println("分割后第一個(gè)RDD為:"+sRDD(0).collect().mkString(","))
    println("分割后第二個(gè)RDD為:"+sRDD(1).collect().mkString(","))

    //groupBy運(yùn)算:可以按照傳入的匿名函數(shù)規(guī)則,將數(shù)據(jù)分為多個(gè)Array,返回Array[(String,Iterable[Int])]
    val gRDD = intRDD.groupBy(x => {if(x%2==0) "even" else "odd"}).collect()
    println("偶數(shù)數(shù)組為:"+gRDD(0))
    println("奇數(shù)數(shù)組為:"+gRDD(1))

4、多個(gè)RDD轉(zhuǎn)換運(yùn)算

    println("*************************多個(gè)RDD轉(zhuǎn)換運(yùn)算**********************")
    val intRDD1 = sc.sparkContext.parallelize(List(3,1,2,5,5))
    val intRDD2 = sc.sparkContext.parallelize(List(5,6))
    val intRDD3 = sc.sparkContext.parallelize(List(2,7))

    //并集運(yùn)算,并不會(huì)去重,兩種方式實(shí)現(xiàn)
    println("使用union實(shí)現(xiàn)并集運(yùn)算:"+intRDD1.union(intRDD2).union(intRDD3).collect().mkString(","))
    println("使用++實(shí)現(xiàn)并集運(yùn)算:"+(intRDD1 ++ intRDD2 ++ intRDD3).collect().mkString(","))

    //交集運(yùn)算
    println("使用intersection實(shí)現(xiàn)交集運(yùn)算:"+intRDD1.intersection(intRDD2).collect().mkString(","))

    //差集運(yùn)算
    println("使用subtract實(shí)現(xiàn)差集運(yùn)算:"+intRDD1.subtract(intRDD2).collect().mkString(","))

    //笛卡爾積運(yùn)算
    println("使用cartesian實(shí)現(xiàn)笛卡爾積運(yùn)算:"+intRDD1.cartesian(intRDD2).collect().mkString(","))

5、基本動(dòng)作運(yùn)算

    println("*************************基本動(dòng)作運(yùn)算**********************")
    //讀取第1條數(shù)據(jù)
    println("intRDD第一條數(shù)據(jù)為:"+intRDD.first())

    //讀取前n條數(shù)據(jù)
    println("intRDD的前兩條數(shù)據(jù)為:"+intRDD.take(2))

    //按照從小到大排序,并讀取前N條數(shù)據(jù)
    println("intRDD從小到大排序的前3條數(shù)據(jù)為:"+intRDD.takeOrdered(3))

    //按照從大到小排,并讀取前N條數(shù)據(jù)
    println("intRDD從大到小排序的前3條數(shù)據(jù)為:"+intRDD.takeOrdered(3)(Ordering[Int].reverse))

    //返回RDD的統(tǒng)計(jì)數(shù)據(jù)
    println("intRDD的統(tǒng)計(jì)數(shù)據(jù)為:"+intRDD.stats())

    //返回最小值
    println("intRDD的最小值為:"+intRDD.min())

    //返回最大值
    println("intRDD的最大值為:"+intRDD.max())

    //返回標(biāo)準(zhǔn)差
    println("intRDD的標(biāo)準(zhǔn)差為:"+intRDD.stdev())

    //返回計(jì)數(shù)
    println("intRDD的計(jì)數(shù)為:"+intRDD.count())

    //返回求和值
    println("intRDD中元素的和為:"+intRDD.sum())

    //返回平均值
    println("intRDD中元素的平均值為:"+intRDD.mean())

6、基本key-value轉(zhuǎn)換運(yùn)算

    println("*************************基本key-value轉(zhuǎn)換運(yùn)算**********************")
    //創(chuàng)建一個(gè)key-valueRDD
    val kvRDD = sc.sparkContext.parallelize(List((3,4),(3,6),(5,6),(1,2)))

    //得到key的集合
    println("kvRDD中key的集合為:"+kvRDD.keys.collect().mkString(","))

    //得到value的集合
    println("kvRDD中value的集合為:"+kvRDD.values.collect().mkString(","))

    //使用filter進(jìn)行過(guò)濾
    println("kvRDD中key小于5的元素有:"+kvRDD.filter{case (key,value)=>key<5}.collect().mkString(","))
    println("kvRDD中value小于5的元素有:"+kvRDD.filter{case (key,value) => value<5}.collect().mkString(","))

    //使用mapValues函數(shù)對(duì)每個(gè)value進(jìn)行類似于map的運(yùn)算,返回一個(gè)新的RDD
    println("kvRDD中value平方后的結(jié)果為:"+kvRDD.mapValues(x => x * x).collect().mkString(","))

    //sortByKey按照key進(jìn)行排序,默認(rèn)參數(shù)為true,即升序排序,false為降序排序
    println("kvRDD的升序排序結(jié)果為:"+kvRDD.sortByKey(true).collect().mkString(","))
    println("kvRDD的升序排序結(jié)果為:"+kvRDD.sortByKey().collect().mkString(","))
    println("kvRDD的降序排序結(jié)果為:"+kvRDD.sortByKey(false).collect().mkString(","))

    //reduceByKey會(huì)對(duì)相同key值的元素進(jìn)行操作,比如求和
    println("使用匿名函數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:"+kvRDD.reduceByKey((x,y)=>x+y).collect())
    println("使用匿名函數(shù)和匿名參數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:"+kvRDD.reduceByKey(_+_).collect())

7、多個(gè)key-value轉(zhuǎn)換運(yùn)算

    println("*************************多個(gè)key-value轉(zhuǎn)換運(yùn)算**********************")
    val kvRDD1 = sc.sparkContext.parallelize(List((3,4),(3,6),(5,6),(1,2)))
    val kvRDD2 = sc.sparkContext.parallelize(List((3,8)))

    //join運(yùn)算類似于內(nèi)鏈接
    println("兩個(gè)RDD內(nèi)鏈接的結(jié)果為:"+kvRDD1.join(kvRDD2).collect().mkString(","))

    //leftOuterJoin是左外鏈接
    println("兩個(gè)RDD左外鏈接的結(jié)果為:"+kvRDD1.leftOuterJoin(kvRDD2).collect().mkString(","))

    //rightOuterJoin是右外鏈接
    println("兩個(gè)RDD右外鏈接的結(jié)果為:"+kvRDD1.rightOuterJoin(kvRDD2).collect().mkString(","))

    //subtractByKey運(yùn)算會(huì)刪除相同key值的元素
    println("刪除重復(fù)key值元素后結(jié)果為:"+kvRDD1.subtractByKey(kvRDD2).collect().mkString(","))

8、key-value動(dòng)作運(yùn)算

    println("*************************key-value動(dòng)作運(yùn)算**********************")

    //得到第一條數(shù)據(jù)
    println("kvRDD的第一條數(shù)據(jù)為:"+kvRDD.first())

    //得到前n條數(shù)據(jù)
    println("kvRDD的前兩條數(shù)據(jù)為:"+kvRDD.take(2))

    //得到第一條數(shù)據(jù)的key和value值
    val kvFirst = kvRDD.first()
    println("kvRDD第一條數(shù)據(jù)的key為:"+kvFirst._1)
    println("kvRDD第一條數(shù)據(jù)的value為:"+kvFirst._2)

    //countByKey得到每一個(gè)Key值的條數(shù)
    println("kvRDD每個(gè)key值對(duì)應(yīng)元素的數(shù)量為:"+kvRDD.countByKey().mkString(","))

    //創(chuàng)建一個(gè)key-value對(duì)照表,如果有相同的key值,只返回最后一個(gè)的value值
    var KV = kvRDD.collectAsMap()
    println("kvRDD中3對(duì)應(yīng)的value為:"+KV(3))
    println("kvRDD中1對(duì)應(yīng)的value為:"+KV(1))

    //使用lookup函數(shù)并輸入key值來(lái)得到value值,與上面不同的是,如果key相同,會(huì)返回所有對(duì)應(yīng)的value值
    println("kvRDD中3對(duì)應(yīng)的value為:"+kvRDD.lookup(3))
    println("kvRDD中1對(duì)應(yīng)的value為:"+kvRDD.lookup(1))

9、完整輸出


打印intRDD的結(jié)果:3,1,2,5,5
打印stringRDD的結(jié)果Apple,Orange,Banana,Grape,Apple
*************************單個(gè)RDD轉(zhuǎn)換運(yùn)算**********************
使用具體的函數(shù)完成map運(yùn)算:4,2,3,6,6
使用匿名函數(shù)完成map運(yùn)算:4,2,3,6,6
使用匿名函數(shù)和匿名參數(shù)完成map運(yùn)算:4,2,3,6,6
使用匿名函數(shù)完成map運(yùn)算:fruitApple,fruitOrange,fruitBanana,fruitGrape,fruitApple
使用匿名函數(shù)篩選intRDD中小于5的元素:3,1,2
使用匿名函數(shù)和匿名參數(shù)篩選intRDD中小于5的元素:3,1,2
使用匿名函數(shù)篩選stringRDD中包含ra的元素:
對(duì)intRDD元素進(jìn)行去重2,1,3,5
對(duì)StringRDD元素進(jìn)行去重Orange,Apple,Grape,Banana
分割后第一個(gè)RDD為:1,2,5
分割后第二個(gè)RDD為:3,5
偶數(shù)數(shù)組為:(even,CompactBuffer(2))
奇數(shù)數(shù)組為:(odd,CompactBuffer(3, 1, 5, 5))
*************************多個(gè)RDD轉(zhuǎn)換運(yùn)算**********************
使用union實(shí)現(xiàn)并集運(yùn)算:3,1,2,5,5,5,6,2,7
使用++實(shí)現(xiàn)并集運(yùn)算:3,1,2,5,5,5,6,2,7
使用intersection實(shí)現(xiàn)交集運(yùn)算:5
使用subtract實(shí)現(xiàn)差集運(yùn)算:2,1,3
使用cartesian實(shí)現(xiàn)笛卡爾積運(yùn)算:(3,5),(1,5),(3,6),(1,6),(2,5),(5,5),(5,5),(2,6),(5,6),(5,6)
*************************基本動(dòng)作運(yùn)算**********************
intRDD第一條數(shù)據(jù)為:3
intRDD的前兩條數(shù)據(jù)為:[I@65a83d13
intRDD從小到大排序的前3條數(shù)據(jù)為:[I@7b2990cf
intRDD從大到小排序的前3條數(shù)據(jù)為:[I@5ad5dac5
intRDD的統(tǒng)計(jì)數(shù)據(jù)為:(count: 5, mean: 3.200000, stdev: 1.600000, max: 5.000000, min: 1.000000)
intRDD的最小值為:1
intRDD的最大值為:5
intRDD的標(biāo)準(zhǔn)差為:1.6
intRDD的計(jì)數(shù)為:5
intRDD中元素的和為:16.0
intRDD中元素的平均值為:3.2
*************************基本key-value轉(zhuǎn)換運(yùn)算**********************
kvRDD中key的集合為:3,3,5,1
kvRDD中value的集合為:4,6,6,2
kvRDD中key小于5的元素有:(3,4),(3,6),(1,2)
kvRDD中value小于5的元素有:(3,4),(1,2)
kvRDD中value平方后的結(jié)果為:(3,16),(3,36),(5,36),(1,4)
kvRDD的升序排序結(jié)果為:(1,2),(3,4),(3,6),(5,6)
kvRDD的升序排序結(jié)果為:(1,2),(3,4),(3,6),(5,6)
kvRDD的降序排序結(jié)果為:(5,6),(3,4),(3,6),(1,2)
使用匿名函數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:[Lscala.Tuple2;@42cbf60
使用匿名函數(shù)和匿名參數(shù)并按照相同key值對(duì)kvRDD元素進(jìn)行求和操作的結(jié)果為:[Lscala.Tuple2;@70b78e39
*************************多個(gè)key-value轉(zhuǎn)換運(yùn)算**********************
兩個(gè)RDD內(nèi)鏈接的結(jié)果為:(3,(4,8)),(3,(6,8))
兩個(gè)RDD左外鏈接的結(jié)果為:(1,(2,None)),(3,(4,Some(8))),(3,(6,Some(8))),(5,(6,None))
兩個(gè)RDD右外鏈接的結(jié)果為:(3,(Some(4),8)),(3,(Some(6),8))
刪除重復(fù)key值元素后結(jié)果為:(1,2),(5,6)
*************************key-value動(dòng)作運(yùn)算**********************
kvRDD的第一條數(shù)據(jù)為:(3,4)
kvRDD的前兩條數(shù)據(jù)為:[Lscala.Tuple2;@f28fb14
kvRDD第一條數(shù)據(jù)的key為:3
kvRDD第一條數(shù)據(jù)的value為:4
kvRDD每個(gè)key值對(duì)應(yīng)元素的數(shù)量為:1 -> 1,3 -> 2,5 -> 1
kvRDD中3對(duì)應(yīng)的value為:6
kvRDD中1對(duì)應(yīng)的value為:2
kvRDD中3對(duì)應(yīng)的value為:WrappedArray(4, 6)
kvRDD中1對(duì)應(yīng)的value為:WrappedArray(2)

 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容