Spark之RDD超詳細總結(三)

1. RDD 詳解

RDD 是一個數(shù)據(jù)集的表示,不僅表示了數(shù)據(jù)集,還表示了這個數(shù)據(jù)集從哪來,如何計算,主要屬性包括:

  1. 分區(qū)列表
  2. 計算函數(shù)
  3. 依賴關系
  4. 分區(qū)函數(shù)(默認是 hash)
  5. 最佳位置

分區(qū)列表、分區(qū)函數(shù)、最佳位置,這三個屬性其實說的就是數(shù)據(jù)集在哪,在哪計算更合適,如何分區(qū);
計算函數(shù)、依賴關系,這兩個屬性其實說的是數(shù)據(jù)集怎么來的。

2. RDD-API

1) RDD 的創(chuàng)建方式

  1. 由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有 Hadoop 支持的數(shù)據(jù)集,比如 HDFS、Cassandra、HBase 等:
    val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

  2. 通過已有的 RDD 經(jīng)過算子轉換生成新的 RDD:
    val rdd2=rdd1.flatMap(_.split(" "))

  3. 由一個已經(jīng)存在的 Scala 集合創(chuàng)建:
    val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))或者
    val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD 方法底層調用了 parallelize 方法:

RDD源碼

2) RDD 的算子分類

RDD 的算子分為兩類:

  1. Transformation轉換操作:返回一個新的 RDD
  2. Action動作操作:返回值不是 RDD(無返回值或返回其他的)

?? 注意:
1、RDD 不實際存儲真正要計算的數(shù)據(jù),而是記錄了數(shù)據(jù)的位置在哪里,數(shù)據(jù)的轉換關系(調用了什么方法,傳入什么函數(shù))。
2、RDD 中的所有轉換都是惰性求值/延遲執(zhí)行的,也就是說并不會直接計算。只有當發(fā)生一個要求返回結果給 Driver 的 Action 動作時,這些轉換才會真正運行。
3、之所以使用惰性求值/延遲執(zhí)行,是因為這樣可以在 Action 時對 RDD 操作形成 DAG 有向無環(huán)圖進行 Stage 的劃分和并行優(yōu)化,這種設計讓 Spark 更加有效率地運行。

3) Transformation 轉換算子

轉換算子 含義
map(func) 返回一個新的 RDD,該 RDD 由每一個輸入元素經(jīng)過 func 函數(shù)轉換后組成
filter(func) 返回一個新的 RDD,該 RDD 由經(jīng)過 func 函數(shù)計算后返回值為 true 的輸入元素組成
flatMap(func) 類似于 map,但是每一個輸入元素可以被映射為 0 或多個輸出元素(所以 func 應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似于 map,但獨立地在 RDD 的每一個分片上運行,因此在類型為 T 的 RDD 上運行時,func 的函數(shù)類型必須是 Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似于 mapPartitions,但 func 帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為 T 的 RDD 上運行時,func 的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù) fraction 指定的比例對數(shù)據(jù)進行采樣,可以選擇是否使用隨機數(shù)進行替換,seed 用于指定隨機數(shù)生成器種子
union(otherDataset) 對源 RDD 和參數(shù) RDD 求并集后返回一個新的 RDD
intersection(otherDataset) 對源 RDD 和參數(shù) RDD 求交集后返回一個新的 RDD
distinct([numTasks])) 對源 RDD 進行去重后返回一個新的 RDD
groupByKey([numTasks]) 在一個(K,V)的 RDD 上調用,返回一個(K, Iterator[V])的 RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的 RDD 上調用,返回一個(K,V)的 RDD,使用指定的 reduce 函數(shù),將相同 key 的值聚合到一起,與 groupByKey 類似,reduce 任務的個數(shù)可以通過第二個可選的參數(shù)來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 對 PairRDD 中相同的 Key 值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和 aggregate 函數(shù)類似,aggregateByKey 返回值的類型不需要和 RDD 中 value 的類型一致
sortByKey([ascending], [numTasks]) 在一個(K,V)的 RDD 上調用,K 必須實現(xiàn) Ordered 接口,返回一個按照 key 進行排序的(K,V)的 RDD
sortBy(func,[ascending], [numTasks]) 與 sortByKey 類似,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素對在一起的(K,(V,W))的 RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個(K,(Iterable ,Iterable ))類型的 RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars]) 對 rdd 進行管道操作
coalesce(numPartitions) 減少 RDD 的分區(qū)數(shù)到指定值。在過濾大量數(shù)據(jù)之后,可以執(zhí)行此操作
repartition(numPartitions) 重新給 RDD 分區(qū)

4) Action 動作算子

動作算子 含義
reduce(func) 通過 func 函數(shù)聚集 RDD 中的所有元素,這個功能必須是可交換且可并聯(lián)的
collect() 在驅動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素
count() 返回 RDD 的元素個數(shù)
first() 返回 RDD 的第一個元素(類似于 take(1))
take(n) 返回一個由數(shù)據(jù)集的前 n 個元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機采樣的 num 個元素組成,可以選擇是否用隨機數(shù)替換不足的部分,seed 用于指定隨機數(shù)生成器種子
takeOrdered(n, [ordering]) 返回自然順序或者自定義順序的前 n 個元素
saveAsTextFile(path) 將數(shù)據(jù)集的元素以 textfile 的形式保存到 HDFS 文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark 將會調用 toString 方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以 Hadoop sequencefile 的格式保存到指定的目錄下,可以使 HDFS 或者其他 Hadoop 支持的文件系統(tǒng)
saveAsObjectFile(path) 將數(shù)據(jù)集的元素,以 Java 序列化的方式保存到指定的目錄下
countByKey() 針對(K,V)類型的 RDD,返回一個(K,Int)的 map,表示每一個 key 對應的元素個數(shù)
foreach(func) 在數(shù)據(jù)集的每一個元素上,運行函數(shù) func 進行更新
foreachPartition(func) 在數(shù)據(jù)集的每一個分區(qū)上,運行函數(shù) func

統(tǒng)計操作:

算子 含義
count 個數(shù)
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 從采樣中計算方差
stdev 標準差:衡量數(shù)據(jù)的離散程度
sampleStdev 采樣的標準差
stats 查看統(tǒng)計結果

4) RDD 算子練習

  • 需求

給定一個鍵值對 RDD:

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))

key 表示圖書名稱,value 表示某天圖書銷量

請計算每個鍵對應的平均值,也就是計算每種圖書的每天平均銷量。

最終結果:("spark",4),("hadoop",5)。

  • 答案 1
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
rdd2.mapValues(v=>v.sum/v.size).collect
Array[(String, Int)] = Array((spark,4), (hadoop,5))
  • 答案 2
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))

val rdd3 = rdd2.map(t=>(t._1,t._2.sum /t._2.size))
rdd3.collect
//Array[(String, Int)] = Array((spark,4), (hadoop,5))

3. RDD 的持久化/緩存

在實際開發(fā)中某些 RDD 的計算或轉換可能會比較耗費時間,如果這些 RDD 后續(xù)還會頻繁的被使用到,那么可以將這些 RDD 進行持久化/緩存,這樣下次再使用到的時候就不用再重新計算了,提高了程序運行的效率。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會去讀取HDFS的文件,rdd2會真正執(zhí)行持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會去讀緩存中的數(shù)據(jù),執(zhí)行速度會比之前快,因為rdd2已經(jīng)持久化到內存中了

持久化/緩存 API 詳解

  • ersist 方法和 cache 方法

RDD 通過 persist 或 cache 方法可以將前面的計算結果緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發(fā)后面的 action 時,該 RDD 將會被緩存在計算節(jié)點的內存中,并供后面重用。
通過查看 RDD 的源碼發(fā)現(xiàn) cache 最終也是調用了 persist 無參方法(默認存儲只存在內存中):

RDD源碼
  • 存儲級別

默認的存儲級別都是僅在內存存儲一份,Spark 的存儲級別還有好多種,存儲級別在 object StorageLevel 中定義的。

持久化級別 說明
MORY_ONLY(默認) 將 RDD 以非序列化的 Java 對象存儲在 JVM 中。如果沒有足夠的內存存儲 RDD,則某些分區(qū)將不會被緩存,每次需要時都會重新計算。這是默認級別
MORY_AND_DISK(開發(fā)中可以使用這個) 將 RDD 以非序列化的 Java 對象存儲在 JVM 中。如果數(shù)據(jù)在內存中放不下,則溢寫到磁盤上.需要時則會從磁盤上讀取
MEMORY_ONLY_SER (Java and Scala) 將 RDD 以序列化的 Java 對象(每個分區(qū)一個字節(jié)數(shù)組)的方式存儲.這通常比非序列化對象(deserialized objects)更具空間效率,特別是在使用快速序列化的情況下,但是這種方式讀取數(shù)據(jù)會消耗更多的 CPU
MEMORY_AND_DISK_SER (Java and Scala) 與 MEMORY_ONLY_SER 類似,但如果數(shù)據(jù)在內存中放不下,則溢寫到磁盤上,而不是每次需要重新計算它們
DISK_ONLY 將 RDD 分區(qū)存儲在磁盤上
MEMORY_ONLY_2, MEMORY_AND_DISK_2 等 與上面的儲存級別相同,只不過將持久化數(shù)據(jù)存為兩份,備份每個分區(qū)存儲在兩個集群節(jié)點上
OFF_HEAP(實驗中) 與 MEMORY_ONLY_SER 類似,但將數(shù)據(jù)存儲在堆外內存中。(即不是直接存儲在 JVM 內存中)

總結:

  1. RDD 持久化/緩存的目的是為了提高后續(xù)操作的速度
  2. 緩存的級別有很多,默認只存在內存中,開發(fā)中使用 memory_and_disk
  3. 只有執(zhí)行 action 操作的時候才會真正將 RDD 數(shù)據(jù)進行持久化/緩存
  4. 實際開發(fā)中如果某一個 RDD 后續(xù)會被頻繁的使用,可以將該 RDD 進行持久化/緩存

4. RDD 容錯機制 Checkpoint

  • 持久化的局限:

持久化/緩存可以把數(shù)據(jù)放在內存中,雖然是快速的,但是也是最不可靠的;也可以把數(shù)據(jù)放在磁盤上,也不是完全可靠的!例如磁盤會損壞等。

  • 問題解決:

Checkpoint 的產(chǎn)生就是為了更加可靠的數(shù)據(jù)持久化,在 Checkpoint 的時候一般把數(shù)據(jù)放在在 HDFS 上,這就天然的借助了 HDFS 天生的高容錯、高可靠來實現(xiàn)數(shù)據(jù)最大程度上的安全,實現(xiàn)了 RDD 的容錯和高可用。

用法:

SparkContext.setCheckpointDir("目錄") //HDFS的目錄RDD.checkpoint
  • 總結:

  • 開發(fā)中如何保證數(shù)據(jù)的安全性性及讀取效率:可以對頻繁使用且重要的數(shù)據(jù),先做緩存/持久化,再做 checkpint 操作。

  • 持久化和 Checkpoint 的區(qū)別:

  1. 位置:Persist 和 Cache 只能保存在本地的磁盤和內存中(或者堆外內存--實驗中) Checkpoint 可以保存數(shù)據(jù)到 HDFS 這類可靠的存儲上。

  2. 生命周期:Cache 和 Persist 的 RDD 會在程序結束后會被清除或者手動調用 unpersist 方法 Checkpoint 的 RDD 在程序結束后依然存在,不會被刪除。

6. DAG 的生成和劃分 Stage

1) DAG 介紹

  • DAG 是什么:

DAG(Directed Acyclic Graph 有向無環(huán)圖)指的是數(shù)據(jù)轉換執(zhí)行的過程,有方向,無閉環(huán)(其實就是 RDD 執(zhí)行的流程);
原始的 RDD 通過一系列的轉換操作就形成了 DAG 有向無環(huán)圖,任務執(zhí)行時,可以按照 DAG 的描述,執(zhí)行真正的計算(數(shù)據(jù)被操作的一個過程)。

  • DAG 的邊界

開始:通過 SparkContext 創(chuàng)建的 RDD;
結束:觸發(fā) Action,一旦觸發(fā) Action 就形成了一個完整的 DAG。

2) DAG 劃分 Stage

DAG劃分Stage

一個 Spark 程序可以有多個 DAG(有幾個 Action,就有幾個 DAG,上圖最后只有一個 Action(圖中未表現(xiàn)),那么就是一個 DAG)。

一個 DAG 可以有多個 Stage(根據(jù)寬依賴/shuffle 進行劃分)。

同一個 Stage 可以有多個 Task 并行執(zhí)行(task 數(shù)=分區(qū)數(shù),如上圖,Stage1 中有三個分區(qū) P1、P2、P3,對應的也有三個 Task)。

可以看到這個 DAG 中只 reduceByKey 操作是一個寬依賴,Spark 內核會以此為邊界將其前后劃分成不同的 Stage。

同時我們可以注意到,在圖中 Stage1 中,從 textFile 到 flatMap 到 map 都是窄依賴,這幾步操作可以形成一個流水線操作,通過 flatMap 操作生成的 partition 可以不用等待整個 RDD 計算結束,而是繼續(xù)進行 map 操作,這樣大大提高了計算的效率

  • 為什么要劃分 Stage? --并行計算

一個復雜的業(yè)務邏輯如果有 shuffle,那么就意味著前面階段產(chǎn)生結果后,才能執(zhí)行下一個階段,即下一個階段的計算要依賴上一個階段的數(shù)據(jù)。那么我們按照 shuffle 進行劃分(也就是按照寬依賴就行劃分),就可以將一個 DAG 劃分成多個 Stage/階段,在同一個 Stage 中,會有多個算子操作,可以形成一個 pipeline 流水線,流水線內的多個平行的分區(qū)可以并行執(zhí)行。

  • 如何劃分 DAG 的 stage?

對于窄依賴,partition 的轉換處理在 stage 中完成計算,不劃分(將窄依賴盡量放在在同一個 stage 中,可以實現(xiàn)流水線計算)。

對于寬依賴,由于有 shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計算,也就是說需要要劃分 stage。

總結:

Spark 會根據(jù) shuffle/寬依賴使用回溯算法來對 DAG 進行 Stage 劃分,從后往前,遇到寬依賴就斷開,遇到窄依賴就把當前的 RDD 加入到當前的 stage/階段中

具體的劃分算法請參見 AMP 實驗室發(fā)表的論文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se

7. RDD 累加器和廣播變量

在默認情況下,當 Spark 在集群的多個不同節(jié)點的多個任務上并行運行一個函數(shù)時,它會把函數(shù)中涉及到的每個變量,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變量,或者在任務(Task)和任務控制節(jié)點(Driver Program)之間共享變量。

為了滿足這種需求,Spark 提供了兩種類型的變量:

  1. 累加器 accumulators:累加器支持在所有不同節(jié)點之間進行累加計算(比如計數(shù)或者求和)。

  2. 廣播變量 broadcast variables:廣播變量用來把變量在所有節(jié)點的內存之間進行共享,在每個機器上緩存一個只讀的變量,而不是為機器上的每個任務都生成一個副本。

1) 累加器

1. 不使用累加器

var counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println("Counter value: "+ counter)

運行結果:

Counter value: 6

如果我們將 data 轉換成 RDD,再來重新計算:

var counter = 0
val data = Seq(1, 2, 3)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter += x)
println("Counter value: "+ counter)

運行結果:

Counter value: 0

2. 使用累加器

通常在向 Spark 傳遞函數(shù)時,比如使用 map() 函數(shù)或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。這時使用累加器就可以實現(xiàn)我們想要的效果:

val xx: Accumulator[Int] = sc.accumulator(0)

3. 代碼示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6

    println("+++++++++++++++++++++++++")

    //使用RDD進行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0
    //注意:上面的RDD操作運行結果是0
    //因為foreach中的函數(shù)是傳遞給Worker中的Executor執(zhí)行,用到了counter2變量
    //而counter2變量在Driver端定義的,在傳遞給Executor的時候,各個Executor都有了一份counter2
    //最后各個Executor將各自個x加到自己的counter2上面了,和Driver端的counter2沒有關系

    //那這個問題得解決啊!不能因為使用了Spark連累加都做不了了啊!
    //如果解決?---使用累加器
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6
  }
}

2) 廣播變量

1. 不使用廣播變量

2. 使用廣播變量

3. 代碼示例

關鍵詞:sc.broadcast()

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariablesTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //不使用廣播變量
    val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
    val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
    //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
    val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
    //根據(jù)水果編號取水果名稱
    val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
    fruitNames.foreach(println)
    //注意:以上代碼看似一點問題沒有,但是考慮到數(shù)據(jù)量如果較大,且Task數(shù)較多,
    //那么會導致,被各個Task共用到的fruitMap會被多次傳輸
    //應該要減少fruitMap的傳輸,一臺機器上一個,被該臺機器中的Task共用即可
    //如何做到?---使用廣播變量
    //注意:廣播變量的值不能被修改,如需修改可以將數(shù)據(jù)存到外部數(shù)據(jù)源,如MySQL、Redis
    println("=====================")
    val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
    val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
    fruitNames2.foreach(println)

  }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容