聚合函數(shù)combineByKey
將RDD[k,v]轉(zhuǎn)化為RDD[k,c],利用該函數(shù)可以實(shí)現(xiàn)reduceByKey函數(shù)的功能。也可以實(shí)現(xiàn)類似于join的操作
參數(shù)簡(jiǎn)介
- createCombiner: V => C
處理每個(gè)分區(qū)數(shù)據(jù)時(shí),如果遇到key沒(méi)有出現(xiàn)的,就會(huì)創(chuàng)建一個(gè)該鍵對(duì)應(yīng)的累加器初始值,每個(gè)分區(qū)相互獨(dú)立。
- mergeValue: (C, V) => C
處理每個(gè)分區(qū)數(shù)據(jù)時(shí),如果遇到key已經(jīng)出現(xiàn),則利用mergeValue進(jìn)行合并處理。
- mergeCombiners: (C, C) => C
所有分區(qū)數(shù)據(jù)處理完成后,利用mergeCombiners對(duì)各個(gè)分區(qū)的累加器進(jìn)行再次合并
實(shí)現(xiàn)reduceByKey函數(shù)
將List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的數(shù)據(jù)按照key,對(duì)value做求和計(jì)算,順帶統(tǒng)計(jì)次數(shù)
val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
type MVType = (Int, Double) //定義一個(gè)元組類型(科目計(jì)數(shù)器,分?jǐn)?shù))
val combReault = rdd.combineByKey(
score => (1, score),
(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
(c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
)
//打印計(jì)算結(jié)果
combReault.collect().foreach(println)
//結(jié)果
(A,(2,101.0))
(B,(2,94.0))
(C,(1,91.0))
實(shí)現(xiàn)join操作
spark實(shí)現(xiàn)join操作非常簡(jiǎn)單 rddA.join(rddB)即可實(shí)現(xiàn)
def joinTest(sc:SparkContext): Unit ={
val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
(2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
rddA.join(rddB).collect().foreach(println)}
//結(jié)果
(1,(www,songshifan))
(1,(iteblog,songshifan))
(1,(com,songshifan))
(2,(bbs,haiyang))
(2,(iteblog,haiyang))
(2,(com,haiyang))
(3,(good,home))
跟sql的left join類似
- 下面我們嘗試使用spark sql來(lái)實(shí)現(xiàn)join操作
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by songsf on 2017/7/4.
*/
object SparkSqlTest {
def main(args: Array[String]) {
val spark = SparkSession
.builder().master("local[*]")
.appName("Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val sc = spark.sparkContext
val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes => Row(attributes._1, attributes._2))
val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3", "home"))).map(attributes => Row(attributes._1, attributes._2))
val schemaString = "key name"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val dataA = spark.createDataFrame(rddA, schema)
dataA.createOrReplaceTempView("dataA")
val dataB = spark.createDataFrame(rddB, schema)
dataB.createOrReplaceTempView("dataB")
dataA.show()
dataB.show()
val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
spark.stop()
}
}
//結(jié)果
+---+-------+----------+
|key| name| name2|
+---+-------+----------+
| 3| good| home|
| 1| www|songshifan|
| 1|iteblog|songshifan|
| 1| com|songshifan|
| 2| bbs| haiyang|
| 2|iteblog| haiyang|
| 2| com| haiyang|
+---+-------+----------+
注意:在使用spark-session時(shí),總是會(huì)報(bào)SparkSession類找不到的錯(cuò)誤,這是因?yàn)槲覀兊拇a是運(yùn)行在本地環(huán)境中,maven在打包的時(shí)候沒(méi)有把Spark-session相關(guān)的內(nèi)容打到我們的package中,這一點(diǎn)可以將編譯好的jar包解壓到相應(yīng)的目錄下找找看。
解決辦法:在編輯器運(yùn)行時(shí),強(qiáng)制指定依賴的jar包。
疑問(wèn):之前測(cè)試過(guò)1.4版本的,寫(xiě)好的代碼不把依賴jar包打入我們的jar包中,提交集群時(shí)會(huì)報(bào)錯(cuò),所以1把所有依賴包都打入jar包中,2 在執(zhí)行時(shí)用--jars參數(shù)去提交機(jī)器上找jar包?,F(xiàn)在有一種說(shuō)法是運(yùn)行環(huán)境已經(jīng)把依賴包都放在創(chuàng)建的執(zhí)行器中,不必再加入依賴jar包。這個(gè)需要繼續(xù)研究、測(cè)試。