Spark RDD實現(xiàn)分析

RDD的概述

RDD是只讀的、分區(qū)記錄的集合,是Spark編程模型的最主要抽象,它是一種特殊的集合,支持多種數(shù)據(jù)源,有容錯機制、衍生血緣關(guān)系、可被緩存、支持并行操作。

RDD的屬性特征

1)分區(qū)(partition)

數(shù)據(jù)集的基本組成單位。包含一個數(shù)據(jù)分片列表,將數(shù)據(jù)進行切分,并決定并行的計算的粒度。其中分片的個數(shù)可由程序指定(默認值為程序分配的CPU數(shù)量)。每個分區(qū)分配的存儲由BlockManager實現(xiàn),分區(qū)都被邏輯映射成BlockManager的一個Block(Block被一個Task負責(zé)計算)。

RDD Partition存儲和計算模型

另外,從BlockManager的源碼中可以看出,把分區(qū)的數(shù)據(jù)存儲需要制定的幾個參數(shù):

blockId:塊id

data:分區(qū)的數(shù)據(jù)buffer

level:rdd存儲的持久化等級

BlockManager存儲分區(qū)數(shù)據(jù)

2)函數(shù)(compute)

一個計算每個分區(qū)的函數(shù),RDD的計算以分片為單位,每個RDD實現(xiàn)compute函數(shù)。通俗來說,compute用于計算每個分片,得出一個可遍歷的結(jié)果,用于描述在父RDD上執(zhí)行的計算。

3)依賴(dependency)

RDD的轉(zhuǎn)換都會生成新的RDD,RDD之間形成子->父的依賴關(guān)系(源RDD沒有依賴),通過依賴關(guān)系描述血緣關(guān)系(lineage),在部分分區(qū)數(shù)據(jù)丟失的,通過lineage重建丟失的分區(qū)數(shù)據(jù),提升整體運算效率。

4)優(yōu)先位置(preferred location)

每個分片的優(yōu)先計算位置,Spark在進行任務(wù)調(diào)度的時候,會盡可能滴將計算任務(wù)分配到所需數(shù)據(jù)塊的存儲位置,滿足“移動計算優(yōu)先移動數(shù)據(jù)”的理念。

5)分區(qū)策略(Partitioner)

RDD分片函數(shù),描述分區(qū)模式和數(shù)據(jù)分片粒度。Partitioner函數(shù)決定RDD本身的分片數(shù)據(jù),同事決定了parent RDD Shuffle輸出時的分片數(shù)據(jù)。

Spark實現(xiàn)兩種類型的分片函數(shù):基于哈希的HashPartitioner和基于范圍的RangePartitioner。區(qū)別在于只有Key-Value類型的RDD才有分區(qū)的,非Key-Value類型的RDD分區(qū)的值是None的。

具體區(qū)別詳看下一篇《Spark RDD分區(qū)策略

RDD創(chuàng)建

細分來說,Spark有二種方式創(chuàng)建RDD

1、并行化已存在的Scala集合

scala> val data = Array(1,2,3,4,5)

data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data) //這里關(guān)注slices參數(shù),指定數(shù)據(jù)集切分成幾個分區(qū)

distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :29

scala> distData.reduce((a,b) => a + b)

res1: Int = 15

2、通過外部文件系統(tǒng)的數(shù)據(jù)集創(chuàng)建

SparkContext類中的textFile用于創(chuàng)建文本類型的RDD

def textFile(path:String, minPartitions: Int = defaultMinPartitions)

path參數(shù):指定文件的URI地址(hdfs://、本地等等)

minPartitions參數(shù):指定分片數(shù)

scala> val distFile = sc.textFile("/Users/irwin/zookeeper.out")

distFile: org.apache.spark.rdd.RDD[String] = /Users/irwin/zookeeper.out MapPartitionsRDD[4] at textFile at :27

scala> distFile.map(s => s.length).reduce((a, b) => (a+b))

res3: Int = 102295

當然SparkContext中包含其他創(chuàng)建RDD的方法,如:

def wholeTextFiles(path:String, minPartitions: Int = defaultMinPartitions)

defhadoopRDD[K,V](conf: JobConf, inputFormatClass:Class[_ <: InputFormat[K,V]], keyClass:Class[K], valueClass:Class[V], minPartitions: Int = defaultMinPartitions)

RDD操作

RDD包含一系列的轉(zhuǎn)換(Transformation)與執(zhí)行(Action)。

轉(zhuǎn)換

所有轉(zhuǎn)換操作都是惰性的,指定處理相互依賴關(guān)系,是數(shù)據(jù)集的邏輯操作,并未真正計算。

執(zhí)行

該操作指定數(shù)據(jù)的形式,當發(fā)生Action操作時,Spark將Action之間的所有Transformation組成的Job會并行計算。

例如從上面例子可以看到,只有在distFile.map的時候,Job才會真正執(zhí)行,且返回最后Action的結(jié)構(gòu)。

這里有個關(guān)鍵點:當每個Job計算完成,其內(nèi)部所有RDD會被清除。所以有RDD需要重復(fù)使用,則使用Persist(或Cache)的方法將RDD持久化,詳見下文RDD緩存介紹。

RDD操作

通過上圖RDD操作列表可以看到,有以下內(nèi)容:

RDD的Transformation操作、Action操作(常用執(zhí)行操作、存儲執(zhí)行操作)、緩存操作、checkpoint操作,具體用法及意義可以詳見官方文檔。

RDD緩存

Spark持久化指的是在不同Transformation過程中,將數(shù)據(jù)集緩存在內(nèi)存中,實現(xiàn)快速重用、故障快速恢復(fù)。

主動持久化

程序主動通過persist()或cache()方法操作標記需被持久化的RDD,事實上cache()使用的是persist()的默認方法。下面我們來看看持久化的等級:

持久化等級

根據(jù)名稱可以理解對應(yīng)Level的意義,其中帶“SER”表示將RDD序列化為Java對象,帶“2”表示將每個分區(qū)賦值到兩個集群節(jié)點。Persist持久化RDD,會修改原來RDD的 meta info 中的StorageLevel,可以看到最后返回的是this,說明返回的是修改的RDD對象本身,而非產(chǎn)生了新的RDD。

newLevel替換原來storageLevel值

另外,從RDD類的源碼中我們可以看到:

RDD

默認persist()使用內(nèi)存存儲,而cache()調(diào)用的是persist()默認實現(xiàn)。

自動持久化

指的是Spark自動保存一些Shuffle操作的中間結(jié)果。很容易理解,Spark為了表面Shuffle過程中出現(xiàn)異常的快速恢復(fù)。

再說一點,persist()后不一定說就不丟失,在內(nèi)存不足的情況也是可能被刪除。但是用戶不用關(guān)心這塊,RDD的容錯機制保證了丟失也能計算正確執(zhí)行。RDD通過 meta info 中的 Lineage 可以重算丟失的數(shù)據(jù)。

RDD依賴關(guān)系

Spark根據(jù)提交任務(wù)的計算邏輯(亦即是RDD的Transformation和Action)生成RDD之間的依賴關(guān)系,同時也生成邏輯上的DAG。這里說到的依賴關(guān)系指的是對父RDD依賴,這個關(guān)系包含兩種類型:narrow dependency 和 wide dependency。

窄依賴(narrow dependency)

指每一個 parent RDD 的 Partition 最多被子RDD的一個Partition使用。從數(shù)據(jù)的角度來看,窄依賴的RDD整個操作都可以在同一個集群節(jié)點執(zhí)行,以pinpeline的方式計算所有父分區(qū),不會造成網(wǎng)絡(luò)之間的數(shù)據(jù)混合。

窄依賴

寬依賴(wide dependency)

與窄依賴相反,指的是子RDD的Partition會依賴所有parent RDD的所有或多個Partition。寬依賴RDD會涉及數(shù)據(jù)混合,寬依賴需要首先計算好所有父分區(qū)的數(shù)據(jù),然后在節(jié)點間進行Shuffle。

寬依賴

所有依賴的基類是traid Dependency[T],這是一個純虛類:

class Dependency

其中rdd就是依賴的Parent RDD

對于窄依賴的實現(xiàn)是:

窄依賴實現(xiàn)

窄依賴有兩種具體的實現(xiàn)

1、OneToOneDependency

OneToOneDependency

2、RangeDependency

RangeDependency

UnionRDD將多個RDD合成一個RDD,從上述分析,合成后的RDD每個parent RDD的partition的相對順序是不會變。對于合并后的UnionRDD而言,每個parent RDD與其Partition的其實位置不同。

從RangeDependency類中可以看到,partitionId - outStart(UnionRDD起始位置) + inStart(parent RDD的起始位置),通過上述計算方式找到parent RDD對應(yīng)的Partition。

對于寬依賴的實現(xiàn)是:

寬依賴實現(xiàn)

子RDD依賴parent RDD的所有Partition,因此需求shuffle過程。Shuffle過程由ShuffleManager控制,而寬依賴包含以下兩種:基于Hash的HashShuffleManager以及基于排序的SortShuffleManager

ShuffleManager

DAG的構(gòu)建

上述已經(jīng)提到RDD通過一系列Transformation和Action形成了DAG,Spark根據(jù)DAG生成計算任務(wù):

第一步:劃分Stage

根據(jù)依賴關(guān)系的不同將DAG劃分不同的階段。對于窄依賴,由于Partition的去定型,窄依賴劃分到同一個執(zhí)行階段;對于寬依賴,需等待parent RDD Shuffle處理完成,所以Spark根據(jù)寬依賴將DAG劃分不同的Stage。

第二步:Stage內(nèi)部分配Task

每個Partition都會分配一個計算Task(并行執(zhí)行),Stage之間根據(jù)依賴關(guān)系編程一個粗粒度的DAG。

第三步:執(zhí)行順序

DAG的執(zhí)行的順序是從前往后,亦即是Stage只有其parent Stage執(zhí)行完成后才執(zhí)行(當然起始的stage不需要)。

Task的執(zhí)行

上述提到,RDD的Transformation操作是惰性的,只有發(fā)生Action才會生成Job,而這個Job會映射成一個粗粒度的DAG,DAG執(zhí)行每一個Stage會將Partition分配計算Task,這些Task會被提交到集群上執(zhí)行計算,執(zhí)行計算的邏輯部分為:Executor。

Spark的Task有兩類:

org.apache.spark.scheduler.ShuffleMapTask與org.apache.spark.scheduler.ResultTask

回想一下,Transformation和Action的區(qū)別,Action會返回數(shù)據(jù),而Transformation只進行RDD的轉(zhuǎn)換。Spark的Task類型分別對應(yīng)這兩類,下面簡單分析一下Task的執(zhí)行過程。

org.apache.spark.scheduler.Task的run()方法開始執(zhí)行Task

Task
runTask
ShuffleMapTask#runTask
RDD#iterator

runTask最終調(diào)用RDD的iterator,Task的計算從這里開始。

小結(jié)

至此,RDD的實現(xiàn)分析已經(jīng)介紹完畢,我們回顧一下:

RDD的屬性特征:分區(qū)、函數(shù)、依賴、優(yōu)先位置、分區(qū)策略;

RDD緩存:持久化等級、主動持久化、自動持久化

RDD的操作:Transformation、Action

RDD依賴:窄依賴、寬依賴

DAG與Task

RDD是Spark最基本、最根本的數(shù)據(jù)抽象。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容