1. RDD 詳解
RDD 是一個數(shù)據(jù)集的表示,不僅表示了數(shù)據(jù)集,還表示了這個數(shù)據(jù)集從哪來,如何計算,主要屬性包括:
- 分區(qū)列表
- 計算函數(shù)
- 依賴關系
- 分區(qū)函數(shù)(默認是 hash)
- 最佳位置
分區(qū)列表、分區(qū)函數(shù)、最佳位置,這三個屬性其實說的就是數(shù)據(jù)集在哪,在哪計算更合適,如何分區(qū);
計算函數(shù)、依賴關系,這兩個屬性其實說的是數(shù)據(jù)集怎么來的。
2. RDD-API
1) RDD 的創(chuàng)建方式
由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有 Hadoop 支持的數(shù)據(jù)集,比如 HDFS、Cassandra、HBase 等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")通過已有的 RDD 經(jīng)過算子轉換生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))由一個已經(jīng)存在的 Scala 集合創(chuàng)建:
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
makeRDD 方法底層調用了 parallelize 方法:
2) RDD 的算子分類
RDD 的算子分為兩類:
- Transformation轉換操作:返回一個新的 RDD
- Action動作操作:返回值不是 RDD(無返回值或返回其他的)
?? 注意:
1、RDD 不實際存儲真正要計算的數(shù)據(jù),而是記錄了數(shù)據(jù)的位置在哪里,數(shù)據(jù)的轉換關系(調用了什么方法,傳入什么函數(shù))。
2、RDD 中的所有轉換都是惰性求值/延遲執(zhí)行的,也就是說并不會直接計算。只有當發(fā)生一個要求返回結果給 Driver 的 Action 動作時,這些轉換才會真正運行。
3、之所以使用惰性求值/延遲執(zhí)行,是因為這樣可以在 Action 時對 RDD 操作形成 DAG 有向無環(huán)圖進行 Stage 的劃分和并行優(yōu)化,這種設計讓 Spark 更加有效率地運行。
3) Transformation 轉換算子
| 轉換算子 | 含義 |
|---|---|
| map(func) | 返回一個新的 RDD,該 RDD 由每一個輸入元素經(jīng)過 func 函數(shù)轉換后組成 |
| filter(func) | 返回一個新的 RDD,該 RDD 由經(jīng)過 func 函數(shù)計算后返回值為 true 的輸入元素組成 |
| flatMap(func) | 類似于 map,但是每一個輸入元素可以被映射為 0 或多個輸出元素(所以 func 應該返回一個序列,而不是單一元素) |
| mapPartitions(func) | 類似于 map,但獨立地在 RDD 的每一個分片上運行,因此在類型為 T 的 RDD 上運行時,func 的函數(shù)類型必須是 Iterator[T] => Iterator[U] |
| mapPartitionsWithIndex(func) | 類似于 mapPartitions,但 func 帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為 T 的 RDD 上運行時,func 的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U] |
| sample(withReplacement, fraction, seed) | 根據(jù) fraction 指定的比例對數(shù)據(jù)進行采樣,可以選擇是否使用隨機數(shù)進行替換,seed 用于指定隨機數(shù)生成器種子 |
| union(otherDataset) | 對源 RDD 和參數(shù) RDD 求并集后返回一個新的 RDD |
| intersection(otherDataset) | 對源 RDD 和參數(shù) RDD 求交集后返回一個新的 RDD |
| distinct([numTasks])) | 對源 RDD 進行去重后返回一個新的 RDD |
| groupByKey([numTasks]) | 在一個(K,V)的 RDD 上調用,返回一個(K, Iterator[V])的 RDD |
| reduceByKey(func, [numTasks]) | 在一個(K,V)的 RDD 上調用,返回一個(K,V)的 RDD,使用指定的 reduce 函數(shù),將相同 key 的值聚合到一起,與 groupByKey 類似,reduce 任務的個數(shù)可以通過第二個可選的參數(shù)來設置 |
| aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 對 PairRDD 中相同的 Key 值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和 aggregate 函數(shù)類似,aggregateByKey 返回值的類型不需要和 RDD 中 value 的類型一致 |
| sortByKey([ascending], [numTasks]) | 在一個(K,V)的 RDD 上調用,K 必須實現(xiàn) Ordered 接口,返回一個按照 key 進行排序的(K,V)的 RDD |
| sortBy(func,[ascending], [numTasks]) | 與 sortByKey 類似,但是更靈活 |
| join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個相同 key 對應的所有元素對在一起的(K,(V,W))的 RDD |
| cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W)的 RDD 上調用,返回一個(K,(Iterable ,Iterable ))類型的 RDD |
| cartesian(otherDataset) | 笛卡爾積 |
| pipe(command, [envVars]) | 對 rdd 進行管道操作 |
| coalesce(numPartitions) | 減少 RDD 的分區(qū)數(shù)到指定值。在過濾大量數(shù)據(jù)之后,可以執(zhí)行此操作 |
| repartition(numPartitions) | 重新給 RDD 分區(qū) |
4) Action 動作算子
| 動作算子 | 含義 |
|---|---|
| reduce(func) | 通過 func 函數(shù)聚集 RDD 中的所有元素,這個功能必須是可交換且可并聯(lián)的 |
| collect() | 在驅動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素 |
| count() | 返回 RDD 的元素個數(shù) |
| first() | 返回 RDD 的第一個元素(類似于 take(1)) |
| take(n) | 返回一個由數(shù)據(jù)集的前 n 個元素組成的數(shù)組 |
| takeSample(withReplacement,num, [seed]) | 返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機采樣的 num 個元素組成,可以選擇是否用隨機數(shù)替換不足的部分,seed 用于指定隨機數(shù)生成器種子 |
| takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個元素 |
| saveAsTextFile(path) | 將數(shù)據(jù)集的元素以 textfile 的形式保存到 HDFS 文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark 將會調用 toString 方法,將它裝換為文件中的文本 |
| saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以 Hadoop sequencefile 的格式保存到指定的目錄下,可以使 HDFS 或者其他 Hadoop 支持的文件系統(tǒng) |
| saveAsObjectFile(path) | 將數(shù)據(jù)集的元素,以 Java 序列化的方式保存到指定的目錄下 |
| countByKey() | 針對(K,V)類型的 RDD,返回一個(K,Int)的 map,表示每一個 key 對應的元素個數(shù) |
| foreach(func) | 在數(shù)據(jù)集的每一個元素上,運行函數(shù) func 進行更新 |
| foreachPartition(func) | 在數(shù)據(jù)集的每一個分區(qū)上,運行函數(shù) func |
統(tǒng)計操作:
| 算子 | 含義 |
|---|---|
| count | 個數(shù) |
| mean | 均值 |
| sum | 求和 |
| max | 最大值 |
| min | 最小值 |
| variance | 方差 |
| sampleVariance | 從采樣中計算方差 |
| stdev | 標準差:衡量數(shù)據(jù)的離散程度 |
| sampleStdev | 采樣的標準差 |
| stats | 查看統(tǒng)計結果 |
4) RDD 算子練習
- 需求:
給定一個鍵值對 RDD:
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
key 表示圖書名稱,value 表示某天圖書銷量
請計算每個鍵對應的平均值,也就是計算每種圖書的每天平均銷量。
最終結果:("spark",4),("hadoop",5)。
- 答案 1:
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
rdd2.mapValues(v=>v.sum/v.size).collect
Array[(String, Int)] = Array((spark,4), (hadoop,5))
- 答案 2:
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
val rdd3 = rdd2.map(t=>(t._1,t._2.sum /t._2.size))
rdd3.collect
//Array[(String, Int)] = Array((spark,4), (hadoop,5))
3. RDD 的持久化/緩存
在實際開發(fā)中某些 RDD 的計算或轉換可能會比較耗費時間,如果這些 RDD 后續(xù)還會頻繁的被使用到,那么可以將這些 RDD 進行持久化/緩存,這樣下次再使用到的時候就不用再重新計算了,提高了程序運行的效率。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會去讀取HDFS的文件,rdd2會真正執(zhí)行持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會去讀緩存中的數(shù)據(jù),執(zhí)行速度會比之前快,因為rdd2已經(jīng)持久化到內存中了
持久化/緩存 API 詳解
- ersist 方法和 cache 方法
RDD 通過 persist 或 cache 方法可以將前面的計算結果緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發(fā)后面的 action 時,該 RDD 將會被緩存在計算節(jié)點的內存中,并供后面重用。
通過查看 RDD 的源碼發(fā)現(xiàn) cache 最終也是調用了 persist 無參方法(默認存儲只存在內存中):
- 存儲級別
默認的存儲級別都是僅在內存存儲一份,Spark 的存儲級別還有好多種,存儲級別在 object StorageLevel 中定義的。
| 持久化級別 | 說明 |
|---|---|
| MORY_ONLY(默認) | 將 RDD 以非序列化的 Java 對象存儲在 JVM 中。如果沒有足夠的內存存儲 RDD,則某些分區(qū)將不會被緩存,每次需要時都會重新計算。這是默認級別 |
| MORY_AND_DISK(開發(fā)中可以使用這個) | 將 RDD 以非序列化的 Java 對象存儲在 JVM 中。如果數(shù)據(jù)在內存中放不下,則溢寫到磁盤上.需要時則會從磁盤上讀取 |
| MEMORY_ONLY_SER (Java and Scala) | 將 RDD 以序列化的 Java 對象(每個分區(qū)一個字節(jié)數(shù)組)的方式存儲.這通常比非序列化對象(deserialized objects)更具空間效率,特別是在使用快速序列化的情況下,但是這種方式讀取數(shù)據(jù)會消耗更多的 CPU |
| MEMORY_AND_DISK_SER (Java and Scala) | 與 MEMORY_ONLY_SER 類似,但如果數(shù)據(jù)在內存中放不下,則溢寫到磁盤上,而不是每次需要重新計算它們 |
| DISK_ONLY | 將 RDD 分區(qū)存儲在磁盤上 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2 等 | 與上面的儲存級別相同,只不過將持久化數(shù)據(jù)存為兩份,備份每個分區(qū)存儲在兩個集群節(jié)點上 |
| OFF_HEAP(實驗中) | 與 MEMORY_ONLY_SER 類似,但將數(shù)據(jù)存儲在堆外內存中。(即不是直接存儲在 JVM 內存中) |
總結:
- RDD 持久化/緩存的目的是為了提高后續(xù)操作的速度
- 緩存的級別有很多,默認只存在內存中,開發(fā)中使用 memory_and_disk
- 只有執(zhí)行 action 操作的時候才會真正將 RDD 數(shù)據(jù)進行持久化/緩存
- 實際開發(fā)中如果某一個 RDD 后續(xù)會被頻繁的使用,可以將該 RDD 進行持久化/緩存
4. RDD 容錯機制 Checkpoint
- 持久化的局限:
持久化/緩存可以把數(shù)據(jù)放在內存中,雖然是快速的,但是也是最不可靠的;也可以把數(shù)據(jù)放在磁盤上,也不是完全可靠的!例如磁盤會損壞等。
- 問題解決:
Checkpoint 的產(chǎn)生就是為了更加可靠的數(shù)據(jù)持久化,在 Checkpoint 的時候一般把數(shù)據(jù)放在在 HDFS 上,這就天然的借助了 HDFS 天生的高容錯、高可靠來實現(xiàn)數(shù)據(jù)最大程度上的安全,實現(xiàn)了 RDD 的容錯和高可用。
用法:
SparkContext.setCheckpointDir("目錄") //HDFS的目錄RDD.checkpoint
總結:
開發(fā)中如何保證數(shù)據(jù)的安全性性及讀取效率:可以對頻繁使用且重要的數(shù)據(jù),先做緩存/持久化,再做 checkpint 操作。
持久化和 Checkpoint 的區(qū)別:
位置:Persist 和 Cache 只能保存在本地的磁盤和內存中(或者堆外內存--實驗中) Checkpoint 可以保存數(shù)據(jù)到 HDFS 這類可靠的存儲上。
生命周期:Cache 和 Persist 的 RDD 會在程序結束后會被清除或者手動調用 unpersist 方法 Checkpoint 的 RDD 在程序結束后依然存在,不會被刪除。
6. DAG 的生成和劃分 Stage
1) DAG 介紹
- DAG 是什么:
DAG(Directed Acyclic Graph 有向無環(huán)圖)指的是數(shù)據(jù)轉換執(zhí)行的過程,有方向,無閉環(huán)(其實就是 RDD 執(zhí)行的流程);
原始的 RDD 通過一系列的轉換操作就形成了 DAG 有向無環(huán)圖,任務執(zhí)行時,可以按照 DAG 的描述,執(zhí)行真正的計算(數(shù)據(jù)被操作的一個過程)。
- DAG 的邊界
開始:通過 SparkContext 創(chuàng)建的 RDD;
結束:觸發(fā) Action,一旦觸發(fā) Action 就形成了一個完整的 DAG。
2) DAG 劃分 Stage
一個 Spark 程序可以有多個 DAG(有幾個 Action,就有幾個 DAG,上圖最后只有一個 Action(圖中未表現(xiàn)),那么就是一個 DAG)。
一個 DAG 可以有多個 Stage(根據(jù)寬依賴/shuffle 進行劃分)。
同一個 Stage 可以有多個 Task 并行執(zhí)行(task 數(shù)=分區(qū)數(shù),如上圖,Stage1 中有三個分區(qū) P1、P2、P3,對應的也有三個 Task)。
可以看到這個 DAG 中只 reduceByKey 操作是一個寬依賴,Spark 內核會以此為邊界將其前后劃分成不同的 Stage。
同時我們可以注意到,在圖中 Stage1 中,從 textFile 到 flatMap 到 map 都是窄依賴,這幾步操作可以形成一個流水線操作,通過 flatMap 操作生成的 partition 可以不用等待整個 RDD 計算結束,而是繼續(xù)進行 map 操作,這樣大大提高了計算的效率。
- 為什么要劃分 Stage? --并行計算
一個復雜的業(yè)務邏輯如果有 shuffle,那么就意味著前面階段產(chǎn)生結果后,才能執(zhí)行下一個階段,即下一個階段的計算要依賴上一個階段的數(shù)據(jù)。那么我們按照 shuffle 進行劃分(也就是按照寬依賴就行劃分),就可以將一個 DAG 劃分成多個 Stage/階段,在同一個 Stage 中,會有多個算子操作,可以形成一個 pipeline 流水線,流水線內的多個平行的分區(qū)可以并行執(zhí)行。
- 如何劃分 DAG 的 stage?
對于窄依賴,partition 的轉換處理在 stage 中完成計算,不劃分(將窄依賴盡量放在在同一個 stage 中,可以實現(xiàn)流水線計算)。
對于寬依賴,由于有 shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計算,也就是說需要要劃分 stage。
總結:
Spark 會根據(jù) shuffle/寬依賴使用回溯算法來對 DAG 進行 Stage 劃分,從后往前,遇到寬依賴就斷開,遇到窄依賴就把當前的 RDD 加入到當前的 stage/階段中
具體的劃分算法請參見 AMP 實驗室發(fā)表的論文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
7. RDD 累加器和廣播變量
在默認情況下,當 Spark 在集群的多個不同節(jié)點的多個任務上并行運行一個函數(shù)時,它會把函數(shù)中涉及到的每個變量,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變量,或者在任務(Task)和任務控制節(jié)點(Driver Program)之間共享變量。
為了滿足這種需求,Spark 提供了兩種類型的變量:
累加器 accumulators:累加器支持在所有不同節(jié)點之間進行累加計算(比如計數(shù)或者求和)。
廣播變量 broadcast variables:廣播變量用來把變量在所有節(jié)點的內存之間進行共享,在每個機器上緩存一個只讀的變量,而不是為機器上的每個任務都生成一個副本。
1) 累加器
1. 不使用累加器
var counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println("Counter value: "+ counter)
運行結果:
Counter value: 6
如果我們將 data 轉換成 RDD,再來重新計算:
var counter = 0
val data = Seq(1, 2, 3)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter += x)
println("Counter value: "+ counter)
運行結果:
Counter value: 0
2. 使用累加器
通常在向 Spark 傳遞函數(shù)時,比如使用 map() 函數(shù)或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。這時使用累加器就可以實現(xiàn)我們想要的效果:
val xx: Accumulator[Int] = sc.accumulator(0)
3. 代碼示例:
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//使用scala集合完成累加
var counter1: Int = 0;
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
println(counter1)//6
println("+++++++++++++++++++++++++")
//使用RDD進行累加
var counter2: Int = 0;
val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
dataRDD.foreach(x => counter2 += x)
println(counter2)//0
//注意:上面的RDD操作運行結果是0
//因為foreach中的函數(shù)是傳遞給Worker中的Executor執(zhí)行,用到了counter2變量
//而counter2變量在Driver端定義的,在傳遞給Executor的時候,各個Executor都有了一份counter2
//最后各個Executor將各自個x加到自己的counter2上面了,和Driver端的counter2沒有關系
//那這個問題得解決啊!不能因為使用了Spark連累加都做不了了啊!
//如果解決?---使用累加器
val counter3: Accumulator[Int] = sc.accumulator(0)
dataRDD.foreach(x => counter3 += x)
println(counter3)//6
}
}
2) 廣播變量
1. 不使用廣播變量
2. 使用廣播變量
3. 代碼示例:
關鍵詞:sc.broadcast()
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariablesTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//不使用廣播變量
val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
//根據(jù)水果編號取水果名稱
val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
fruitNames.foreach(println)
//注意:以上代碼看似一點問題沒有,但是考慮到數(shù)據(jù)量如果較大,且Task數(shù)較多,
//那么會導致,被各個Task共用到的fruitMap會被多次傳輸
//應該要減少fruitMap的傳輸,一臺機器上一個,被該臺機器中的Task共用即可
//如何做到?---使用廣播變量
//注意:廣播變量的值不能被修改,如需修改可以將數(shù)據(jù)存到外部數(shù)據(jù)源,如MySQL、Redis
println("=====================")
val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
fruitNames2.foreach(println)
}
}