SparkCore(一)(RDD和一些算子)

什么是RDD

RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象。代碼中是一個抽象類,它代表一個不可變、可分區(qū)、里面的元素可并行計算的集合。

  • 一組分區(qū)(Partition),即數(shù)據(jù)集的基本組成單位;

  • 一個計算每個分區(qū)的函數(shù);

  • RDD之間的依賴關(guān)系;

  • 一個Partitioner,即RDD的分片函數(shù);

  • 一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。

RDD創(chuàng)建的方法

  • 從集合中創(chuàng)建 并行度一般為2
##makerdd或parallise都是根據(jù)totalcpucores和2比較最大值
##如果直接覆蓋makerdd或parallise的第二個分區(qū)個數(shù)的參數(shù)可以改變數(shù)量
 override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

#查看源碼所得
sc.parallelize

#makeRDD實際上是在內(nèi)部創(chuàng)建了一個parallelize
sc.makeRDD

  • 從文件中轉(zhuǎn)換
#從文件轉(zhuǎn)換RDD
sc.textFile

#從文件夾拉取多個文件
sc.wholeTextFiles("data/baseinput/ratings100/")

  • textFile在讀取小文件的時候,會參考小文件的個數(shù),文件個數(shù)越多,分區(qū)個數(shù)越多

  • sc.textFile遇到小文件沒有辦法很好合并小文件的,即便重寫第二個參數(shù)也沒有作用

  • 用textFile時,它的partition的數(shù)量是與文件夾下的文件數(shù)量(實例中用3個xxx.log文件)相關(guān),一個文件就是一個partition(既然3個文件就是:partition=3)。

  • wholeTextFiles的partition數(shù)量是根據(jù)用戶指定或者文件大小來(文件內(nèi)的數(shù)據(jù)量少 有hdfs源碼默認確定的)

  • 確定與hdfs目錄下的文件數(shù)量無關(guān)!所以說:wholeTextFile通常用于讀取許多小文件的需求。

查看RDD分區(qū)的shell命令

#從集合中創(chuàng)建
sc.parallelize(Seq(1,2,3,4))

#查看分區(qū)數(shù)量(并行數(shù)量)
res3.getNumPartitions

#查看分區(qū)并行數(shù)量的內(nèi)容
#將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型時RDD[Array[T]]

res3.glom.collect

#查看分區(qū)數(shù)量(并行數(shù)量)
res3.partitions.length

關(guān)于DRR分區(qū)決定因素

  • 第一點:RDD分區(qū)的原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目,這樣可以充分利用CPU的計算資源;

  • 第二點:在實際中為了更加充分的壓榨CPU的計算資源,會把并行度設(shè)置為cpu核數(shù)的2~3倍;

  • 第三點:RDD分區(qū)數(shù)和啟動時指定的核數(shù)、調(diào)用方法時指定的分區(qū)數(shù)、如文件本身分區(qū)數(shù)有關(guān)

partitionBy 改變分區(qū)

解析:

  • 對RDD進行分區(qū)操作,如果原有的partionRDD和現(xiàn)有的partionRDD是一致的話就不進行分區(qū), 否則會生成ShuffleRDD.
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> rdd.partitions.size
res24: Int = 4

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

scala> var rdd3 = rdd.partitionBy(new org.apache.spark.RangePartitioner(2,rdd))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25

scala> rdd2.partitions.size
res25: Int = 2
scala> rdd2.glom.collect
res26: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
scala> rdd3.glom.collect
res27: Array[Array[(Int, String)]] = Array(Array((1,aaa), (2,bbb)), Array((3,ccc), (4,ddd)))

注意:Spark采用的分區(qū)有三種:

  • 水平分區(qū),也就是sc.makerdd按照下標元素劃分,

  • Hash劃分根據(jù)數(shù)據(jù)確定性劃分到某個分區(qū),一般只給定分區(qū)數(shù)。

  • Range分區(qū)該方法一般按照元素大小進行劃分不同區(qū)域,每個分區(qū)表示一個數(shù)據(jù)區(qū)域,如數(shù)組中每個數(shù)是[0,100]之間的隨機數(shù),Range劃分首先將區(qū)域劃分為10份,然后將數(shù)組中每個數(shù)字分發(fā)到不同的分區(qū),比如將18分到(10,20]的分區(qū),最后對每個分區(qū)進行排序。

RDD編程

在Spark中,RDD被表示為對象,通過對象上的方法調(diào)用來對RDD進行轉(zhuǎn)換。經(jīng)過一系列的transformations定義RDD之后,就可以調(diào)用actions觸發(fā)RDD的計算,action可以是向應(yīng)用程序返回結(jié)果(count, collect等),或者是向存儲系統(tǒng)保存數(shù)據(jù)(saveAsTextFile等)。在Spark中,只有遇到action,才會執(zhí)行RDD的計算(即延遲計算),這樣在運行時可以通過管道的方式傳輸多個轉(zhuǎn)換。

要使用Spark,開發(fā)者需要編寫一個Driver程序,它被提交到集群以調(diào)度運行Worker,如下圖所示。Driver中定義了一個或多個RDD,并調(diào)用RDD上的action,Worker則執(zhí)行RDD分區(qū)計算任務(wù)。

RDD的轉(zhuǎn)化 ( 重點掌握 )

RDD整體上分為 TRANSFORMATIONS 跟 ACTIONS 兩種

Value類型

map(func) 重點

將RDD創(chuàng)建的集合轉(zhuǎn)換為另外一個映射集合,例如,如果將一個Array中的數(shù)全部 *2 輸出,那么就會用到map方法。例如

//創(chuàng)建一個array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合內(nèi)每個元素*2
scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印輸出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)        
mapPartitions(func)

類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是Iterator[T] => Iterator[U]。假設(shè)有N個元素,有M個分區(qū),那么map的函數(shù)的將被調(diào)用N次,而mapPartitions被調(diào)用M次,一個函數(shù)一次處理所有分區(qū)。同樣以上述的需求為例:

//創(chuàng)建一個array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合內(nèi)每個元素*2
scala> res0.mapPartitions(x=>x.map(_*2))
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印輸出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)       
mapPartitionsWithIndex(func)

類似于mapPartitions,但func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U];

glom

將每一個分區(qū)形成一個數(shù)組,形成新的RDD類型時RDD[Array[T]]

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func) 重點

分組,按照傳入函數(shù)的返回值進行分組。將相同的key對應(yīng)的值放入一個迭代器。

scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

上述例子解釋是創(chuàng)建一個1到4的序列,然后把能被2整除的放進一個元祖中,不能被2整除的放入另外一個元祖中。那么分組的條件就是%2

filter(func) 重點

過濾。返回一個新的RDD,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成。比如創(chuàng)建一個RDD(由字符串組成),過濾出一個新RDD(包含”xiao”子串)

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

sortBy(func,[ascending], [numTasks]) 重點

使用func先對數(shù)據(jù)進行處理,按照處理后的數(shù)據(jù)比較結(jié)果排序,默認為正序。

//創(chuàng)建一個RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

//按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)

//按照與3余數(shù)的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)

Key-Value類型

partitionBy

pairRDD進行分區(qū)操作,如果原有的partionRDD和現(xiàn)有的partionRDD是一致的話就不進行分區(qū), 否則會生成ShuffleRDD,即會產(chǎn)生shuffle過程。

groupByKey

作用:groupByKey也是對每個key進行操作,但只生成一個sequence。

//創(chuàng)建一個pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

//將相同key對應(yīng)值聚合到一個sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

//打印結(jié)果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

//計算相同key對應(yīng)值的相加結(jié)果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

//打印結(jié)果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks])

在一個(K,V)的RDD上調(diào)用,返回一個(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,reduce任務(wù)的個數(shù)可以通過第二個可選的參數(shù)來設(shè)置。

//創(chuàng)建一個pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

//算相同key對應(yīng)值的相加結(jié)果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

//打印結(jié)果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))

reduceByKey和groupByKey的區(qū)別

1.reduceByKey:按照key進行聚合,在shuffle之前有combine(預(yù)聚合)操作,返回結(jié)果是RDD[k,v].

2.groupByKey:按照key進行分組,直接進行shuffle。

aggregateByKey

在kv對的RDD中,,按key將value進行分組合并,合并時,將每個value和初始值作為seq函數(shù)的參數(shù),進行計算,返回的結(jié)果作為一個新的kv對,然后再將結(jié)果按照key進行合并,最后將每個分組的value傳遞給combine函數(shù)進行計算(先將前兩個value進行計算,將返回結(jié)果和下一個value傳給combine函數(shù),以此類推),將key與計算結(jié)果作為一個新的kv對輸出。

(1)zeroValue:給每一個分區(qū)中的每一個key一個初始值;

(2)seqOp:函數(shù)用于在每一個分區(qū)中用初始值逐步迭代value;

(3)combOp:函數(shù)用于合并每個分區(qū)中的結(jié)果。

//創(chuàng)建一個pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

//取出每個分區(qū)相同key對應(yīng)值的最大值,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

//打印結(jié)果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
reduceByKey和groupByKey的區(qū)別
  1. reduceByKey:按照key進行聚合,在shuffle之前有combine(預(yù)聚合)操作,返回結(jié)果是RDD[k,v].
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
  1. groupByKey:按照key進行分組,直接進行shuffle。
val createCombiner = (v: V) => CompactBuffer(v) ,它把一個V變成一個C(例    如,創(chuàng)建一個單元素列表)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v ,將一個V合并到一個C中(例如,將它添加到列表的末尾)
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 ,將兩個C合并成一個C。
  1. 開發(fā)指導(dǎo):reduceByKey比groupByKey,建議使用。但是需要注意是否會影響業(yè)務(wù)邏輯。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容