spark combineByKey常用的數(shù)據(jù)操作

聚合函數(shù)combineByKey

將RDD[k,v]轉(zhuǎn)化為RDD[k,c],利用該函數(shù)可以實(shí)現(xiàn)reduceByKey函數(shù)的功能。也可以實(shí)現(xiàn)類似于join的操作

參數(shù)簡(jiǎn)介

  • createCombiner: V => C

處理每個(gè)分區(qū)數(shù)據(jù)時(shí),如果遇到key沒(méi)有出現(xiàn)的,就會(huì)創(chuàng)建一個(gè)該鍵對(duì)應(yīng)的累加器初始值,每個(gè)分區(qū)相互獨(dú)立。

  • mergeValue: (C, V) => C

處理每個(gè)分區(qū)數(shù)據(jù)時(shí),如果遇到key已經(jīng)出現(xiàn),則利用mergeValue進(jìn)行合并處理。

  • mergeCombiners: (C, C) => C

所有分區(qū)數(shù)據(jù)處理完成后,利用mergeCombiners對(duì)各個(gè)分區(qū)的累加器進(jìn)行再次合并

實(shí)現(xiàn)reduceByKey函數(shù)

將List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0))中的數(shù)據(jù)按照key,對(duì)value做求和計(jì)算,順帶統(tǒng)計(jì)次數(shù)

val rdd = sc.parallelize(List(("A",10.0),("B",3.0),("C",91.0),("A",91.0),("B",91.0)))
type MVType = (Int, Double) //定義一個(gè)元組類型(科目計(jì)數(shù)器,分?jǐn)?shù))
val combReault = rdd.combineByKey(
  score => (1, score),
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
)
//打印計(jì)算結(jié)果
combReault.collect().foreach(println)
//結(jié)果
(A,(2,101.0))
(B,(2,94.0))
(C,(1,91.0))

實(shí)現(xiàn)join操作

spark實(shí)現(xiàn)join操作非常簡(jiǎn)單 rddA.join(rddB)即可實(shí)現(xiàn)

def joinTest(sc:SparkContext): Unit ={
val rddA = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
  (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val rddB = sc.parallelize(List((1,"songshifan"),(2,"haiyang"),(3,"home")))
rddA.join(rddB).collect().foreach(println)}
//結(jié)果
(1,(www,songshifan))
(1,(iteblog,songshifan))
(1,(com,songshifan))
(2,(bbs,haiyang))
(2,(iteblog,haiyang))
(2,(com,haiyang))
(3,(good,home))

跟sql的left join類似

  • 下面我們嘗試使用spark sql來(lái)實(shí)現(xiàn)join操作
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
/**
* Created by songsf on 2017/7/4.
*/
object SparkSqlTest {
    def main(args: Array[String]) {
    val spark = SparkSession
                .builder().master("local[*]")
                .appName("Spark SQL data sources example")
                .config("spark.some.config.option", "some-value")
                .getOrCreate()
    val sc = spark.sparkContext
    val rddA = sc.parallelize((List(("1", "www"), ("1", "iteblog"), ("1", "com"),
    ("2", "bbs"), ("2", "iteblog"), ("2", "com"), ("3", "good")))).map(attributes =>         Row(attributes._1, attributes._2))
    val rddB = sc.parallelize(List(("1", "songshifan"), ("2", "haiyang"), ("3",              "home"))).map(attributes => Row(attributes._1, attributes._2))
    val schemaString = "key name"
    val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val dataA = spark.createDataFrame(rddA, schema)
    dataA.createOrReplaceTempView("dataA")
    val dataB = spark.createDataFrame(rddB, schema)
    dataB.createOrReplaceTempView("dataB")
    dataA.show()
    dataB.show()
    val dataA_1 = spark.sql("select * from dataA where key = '1'").show()
    val BLeftJoinA = spark.sql("select a.*,b.name name2 from dataA a left join dataB b on a.key = b.key").show()
    spark.stop()
    }
}
//結(jié)果
+---+-------+----------+
|key|   name|     name2|
+---+-------+----------+
|  3|   good|      home|
|  1|    www|songshifan|
|  1|iteblog|songshifan|
|  1|    com|songshifan|
|  2|    bbs|   haiyang|
|  2|iteblog|   haiyang|
|  2|    com|   haiyang|
+---+-------+----------+
  • 注意:在使用spark-session時(shí),總是會(huì)報(bào)SparkSession類找不到的錯(cuò)誤,這是因?yàn)槲覀兊拇a是運(yùn)行在本地環(huán)境中,maven在打包的時(shí)候沒(méi)有把Spark-session相關(guān)的內(nèi)容打到我們的package中,這一點(diǎn)可以將編譯好的jar包解壓到相應(yīng)的目錄下找找看。

  • 解決辦法:在編輯器運(yùn)行時(shí),強(qiáng)制指定依賴的jar包。

  • 疑問(wèn):之前測(cè)試過(guò)1.4版本的,寫(xiě)好的代碼不把依賴jar包打入我們的jar包中,提交集群時(shí)會(huì)報(bào)錯(cuò),所以1把所有依賴包都打入jar包中,2 在執(zhí)行時(shí)用--jars參數(shù)去提交機(jī)器上找jar包?,F(xiàn)在有一種說(shuō)法是運(yùn)行環(huán)境已經(jīng)把依賴包都放在創(chuàng)建的執(zhí)行器中,不必再加入依賴jar包。這個(gè)需要繼續(xù)研究、測(cè)試。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容