RDD的概述
RDD是只讀的、分區(qū)記錄的集合,是Spark編程模型的最主要抽象,它是一種特殊的集合,支持多種數(shù)據(jù)源,有容錯機制、衍生血緣關(guān)系、可被緩存、支持并行操作。
RDD的屬性特征
1)分區(qū)(partition)
數(shù)據(jù)集的基本組成單位。包含一個數(shù)據(jù)分片列表,將數(shù)據(jù)進行切分,并決定并行的計算的粒度。其中分片的個數(shù)可由程序指定(默認值為程序分配的CPU數(shù)量)。每個分區(qū)分配的存儲由BlockManager實現(xiàn),分區(qū)都被邏輯映射成BlockManager的一個Block(Block被一個Task負責(zé)計算)。

另外,從BlockManager的源碼中可以看出,把分區(qū)的數(shù)據(jù)存儲需要制定的幾個參數(shù):
blockId:塊id
data:分區(qū)的數(shù)據(jù)buffer
level:rdd存儲的持久化等級

2)函數(shù)(compute)
一個計算每個分區(qū)的函數(shù),RDD的計算以分片為單位,每個RDD實現(xiàn)compute函數(shù)。通俗來說,compute用于計算每個分片,得出一個可遍歷的結(jié)果,用于描述在父RDD上執(zhí)行的計算。
3)依賴(dependency)
RDD的轉(zhuǎn)換都會生成新的RDD,RDD之間形成子->父的依賴關(guān)系(源RDD沒有依賴),通過依賴關(guān)系描述血緣關(guān)系(lineage),在部分分區(qū)數(shù)據(jù)丟失的,通過lineage重建丟失的分區(qū)數(shù)據(jù),提升整體運算效率。
4)優(yōu)先位置(preferred location)
每個分片的優(yōu)先計算位置,Spark在進行任務(wù)調(diào)度的時候,會盡可能滴將計算任務(wù)分配到所需數(shù)據(jù)塊的存儲位置,滿足“移動計算優(yōu)先移動數(shù)據(jù)”的理念。
5)分區(qū)策略(Partitioner)
RDD分片函數(shù),描述分區(qū)模式和數(shù)據(jù)分片粒度。Partitioner函數(shù)決定RDD本身的分片數(shù)據(jù),同事決定了parent RDD Shuffle輸出時的分片數(shù)據(jù)。
Spark實現(xiàn)兩種類型的分片函數(shù):基于哈希的HashPartitioner和基于范圍的RangePartitioner。區(qū)別在于只有Key-Value類型的RDD才有分區(qū)的,非Key-Value類型的RDD分區(qū)的值是None的。
具體區(qū)別詳看下一篇《Spark RDD分區(qū)策略》
RDD創(chuàng)建
細分來說,Spark有二種方式創(chuàng)建RDD
1、并行化已存在的Scala集合
scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data) //這里關(guān)注slices參數(shù),指定數(shù)據(jù)集切分成幾個分區(qū)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :29
scala> distData.reduce((a,b) => a + b)
res1: Int = 15
2、通過外部文件系統(tǒng)的數(shù)據(jù)集創(chuàng)建
SparkContext類中的textFile用于創(chuàng)建文本類型的RDD
def textFile(path:String, minPartitions: Int = defaultMinPartitions)
path參數(shù):指定文件的URI地址(hdfs://、本地等等)
minPartitions參數(shù):指定分片數(shù)
scala> val distFile = sc.textFile("/Users/irwin/zookeeper.out")
distFile: org.apache.spark.rdd.RDD[String] = /Users/irwin/zookeeper.out MapPartitionsRDD[4] at textFile at :27
scala> distFile.map(s => s.length).reduce((a, b) => (a+b))
res3: Int = 102295
當然SparkContext中包含其他創(chuàng)建RDD的方法,如:
def wholeTextFiles(path:String, minPartitions: Int = defaultMinPartitions)
defhadoopRDD[K,V](conf: JobConf, inputFormatClass:Class[_ <: InputFormat[K,V]], keyClass:Class[K], valueClass:Class[V], minPartitions: Int = defaultMinPartitions)
RDD操作
RDD包含一系列的轉(zhuǎn)換(Transformation)與執(zhí)行(Action)。
轉(zhuǎn)換
所有轉(zhuǎn)換操作都是惰性的,指定處理相互依賴關(guān)系,是數(shù)據(jù)集的邏輯操作,并未真正計算。
執(zhí)行
該操作指定數(shù)據(jù)的形式,當發(fā)生Action操作時,Spark將Action之間的所有Transformation組成的Job會并行計算。
例如從上面例子可以看到,只有在distFile.map的時候,Job才會真正執(zhí)行,且返回最后Action的結(jié)構(gòu)。
這里有個關(guān)鍵點:當每個Job計算完成,其內(nèi)部所有RDD會被清除。所以有RDD需要重復(fù)使用,則使用Persist(或Cache)的方法將RDD持久化,詳見下文RDD緩存介紹。

通過上圖RDD操作列表可以看到,有以下內(nèi)容:
RDD的Transformation操作、Action操作(常用執(zhí)行操作、存儲執(zhí)行操作)、緩存操作、checkpoint操作,具體用法及意義可以詳見官方文檔。
RDD緩存
Spark持久化指的是在不同Transformation過程中,將數(shù)據(jù)集緩存在內(nèi)存中,實現(xiàn)快速重用、故障快速恢復(fù)。
主動持久化
程序主動通過persist()或cache()方法操作標記需被持久化的RDD,事實上cache()使用的是persist()的默認方法。下面我們來看看持久化的等級:

根據(jù)名稱可以理解對應(yīng)Level的意義,其中帶“SER”表示將RDD序列化為Java對象,帶“2”表示將每個分區(qū)賦值到兩個集群節(jié)點。Persist持久化RDD,會修改原來RDD的 meta info 中的StorageLevel,可以看到最后返回的是this,說明返回的是修改的RDD對象本身,而非產(chǎn)生了新的RDD。

另外,從RDD類的源碼中我們可以看到:

默認persist()使用內(nèi)存存儲,而cache()調(diào)用的是persist()默認實現(xiàn)。
自動持久化
指的是Spark自動保存一些Shuffle操作的中間結(jié)果。很容易理解,Spark為了表面Shuffle過程中出現(xiàn)異常的快速恢復(fù)。
再說一點,persist()后不一定說就不丟失,在內(nèi)存不足的情況也是可能被刪除。但是用戶不用關(guān)心這塊,RDD的容錯機制保證了丟失也能計算正確執(zhí)行。RDD通過 meta info 中的 Lineage 可以重算丟失的數(shù)據(jù)。
RDD依賴關(guān)系
Spark根據(jù)提交任務(wù)的計算邏輯(亦即是RDD的Transformation和Action)生成RDD之間的依賴關(guān)系,同時也生成邏輯上的DAG。這里說到的依賴關(guān)系指的是對父RDD依賴,這個關(guān)系包含兩種類型:narrow dependency 和 wide dependency。
窄依賴(narrow dependency)
指每一個 parent RDD 的 Partition 最多被子RDD的一個Partition使用。從數(shù)據(jù)的角度來看,窄依賴的RDD整個操作都可以在同一個集群節(jié)點執(zhí)行,以pinpeline的方式計算所有父分區(qū),不會造成網(wǎng)絡(luò)之間的數(shù)據(jù)混合。

寬依賴(wide dependency)
與窄依賴相反,指的是子RDD的Partition會依賴所有parent RDD的所有或多個Partition。寬依賴RDD會涉及數(shù)據(jù)混合,寬依賴需要首先計算好所有父分區(qū)的數(shù)據(jù),然后在節(jié)點間進行Shuffle。

所有依賴的基類是traid Dependency[T],這是一個純虛類:

其中rdd就是依賴的Parent RDD
對于窄依賴的實現(xiàn)是:

窄依賴有兩種具體的實現(xiàn)
1、OneToOneDependency

2、RangeDependency

UnionRDD將多個RDD合成一個RDD,從上述分析,合成后的RDD每個parent RDD的partition的相對順序是不會變。對于合并后的UnionRDD而言,每個parent RDD與其Partition的其實位置不同。
從RangeDependency類中可以看到,partitionId - outStart(UnionRDD起始位置) + inStart(parent RDD的起始位置),通過上述計算方式找到parent RDD對應(yīng)的Partition。
對于寬依賴的實現(xiàn)是:

子RDD依賴parent RDD的所有Partition,因此需求shuffle過程。Shuffle過程由ShuffleManager控制,而寬依賴包含以下兩種:基于Hash的HashShuffleManager以及基于排序的SortShuffleManager

DAG的構(gòu)建
上述已經(jīng)提到RDD通過一系列Transformation和Action形成了DAG,Spark根據(jù)DAG生成計算任務(wù):
第一步:劃分Stage
根據(jù)依賴關(guān)系的不同將DAG劃分不同的階段。對于窄依賴,由于Partition的去定型,窄依賴劃分到同一個執(zhí)行階段;對于寬依賴,需等待parent RDD Shuffle處理完成,所以Spark根據(jù)寬依賴將DAG劃分不同的Stage。
第二步:Stage內(nèi)部分配Task
每個Partition都會分配一個計算Task(并行執(zhí)行),Stage之間根據(jù)依賴關(guān)系編程一個粗粒度的DAG。
第三步:執(zhí)行順序
DAG的執(zhí)行的順序是從前往后,亦即是Stage只有其parent Stage執(zhí)行完成后才執(zhí)行(當然起始的stage不需要)。
Task的執(zhí)行
上述提到,RDD的Transformation操作是惰性的,只有發(fā)生Action才會生成Job,而這個Job會映射成一個粗粒度的DAG,DAG執(zhí)行每一個Stage會將Partition分配計算Task,這些Task會被提交到集群上執(zhí)行計算,執(zhí)行計算的邏輯部分為:Executor。
Spark的Task有兩類:
org.apache.spark.scheduler.ShuffleMapTask與org.apache.spark.scheduler.ResultTask
回想一下,Transformation和Action的區(qū)別,Action會返回數(shù)據(jù),而Transformation只進行RDD的轉(zhuǎn)換。Spark的Task類型分別對應(yīng)這兩類,下面簡單分析一下Task的執(zhí)行過程。
org.apache.spark.scheduler.Task的run()方法開始執(zhí)行Task




runTask最終調(diào)用RDD的iterator,Task的計算從這里開始。
小結(jié)
至此,RDD的實現(xiàn)分析已經(jīng)介紹完畢,我們回顧一下:
RDD的屬性特征:分區(qū)、函數(shù)、依賴、優(yōu)先位置、分區(qū)策略;
RDD緩存:持久化等級、主動持久化、自動持久化
RDD的操作:Transformation、Action
RDD依賴:窄依賴、寬依賴
DAG與Task
RDD是Spark最基本、最根本的數(shù)據(jù)抽象。