3.3RDD的轉換和DAG的生成

3.3 RDD的轉換和DAG的生成

Spark會根據用戶提交的計算邏輯中的RDD的轉換和動作來生成RDD之間的依賴關系,同時這個計算鏈也就生成了邏輯上的DAG。接下來以“Word Count”為例,詳細描述這個DAG生成的實現過程。

Spark Scala版本的Word Count程序如下:

1:val file = spark.textFile("hdfs://...")

2:val counts = file.flatMap(line => line.split(" "))

3:? ? ? ? .map(word => (word, 1))

4:? ? ? ? .reduceByKey(_ + _)

5:counts.saveAsTextFile("hdfs://...")

file和counts都是RDD,其中file是從HDFS上讀取文件并創(chuàng)建了RDD,而counts是在file的基礎上通過flatMap、map和reduceByKey這三個RDD轉換生成的。最后,counts調用了動作saveAsTextFile,用戶的計算邏輯就從這里開始提交的集群進行計算。那么上面這5行代碼的具體實現是什么呢?

1)行1:spark是org.apache.spark.SparkContext的實例,它是用戶程序和Spark的交互接口。spark會負責連接到集群管理者,并根據用戶設置或者系統(tǒng)默認設置來申請計算資源,完成RDD的創(chuàng)建等。

spark.textFile("hdfs://...")就完成了一個org.apache.spark.rdd.HadoopRDD的創(chuàng)建,并且完成了一次RDD的轉換:通過map轉換到一個org.apache.spark.rdd.MapPartitions-RDD。也就是說,file實際上是一個MapPartitionsRDD,它保存了文件的所有行的數據內容。

2)行2:將file中的所有行的內容,以空格分隔為單詞的列表,然后將這個按照行構成的單詞列表合并為一個列表。最后,以每個單詞為元素的列表被保存到MapPartitionsRDD。

3)行3:將第2步生成的MapPartitionsRDD再次經過map將每個單詞word轉為(word,1)的元組。這些元組最終被放到一個MapPartitionsRDD中。

4)行4:首先會生成一個MapPartitionsRDD,起到map端combiner的作用;然后會生成一個ShuffledRDD,它從上一個RDD的輸出讀取數據,作為reducer的開始;最后,還會生成一個MapPartitionsRDD,起到reducer端reduce的作用。

5)行5:首先會生成一個MapPartitionsRDD,這個RDD會通過調用org.apache. spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS輸出RDD的數據內容。最后,調用org.apache.spark.SparkContext#runJob向集群提交這個計算任務。

RDD之間的關系可以從兩個維度來理解:一個是RDD是從哪些RDD轉換而來,也就是RDD的parent RDD(s)是什么;還有就是依賴于parent RDD(s)的哪些Partition(s)。這個關系,就是RDD之間的依賴,org.apache.spark.Dependency。根據依賴于parent RDD(s)的Partitions的不同情況,Spark將這種依賴分為兩種,一種是寬依賴,一種是窄依賴。

3.3.1 RDD的依賴關系

RDD和它依賴的parent RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

1)窄依賴指的是每一個parent RDD的Partition最多被子RDD的一個Partition使用,如圖3-6所示。

[插圖]

圖3-6 RDD的窄依賴

2)寬依賴指的是多個子RDD的Partition會依賴同一個parent RDD的Partition,如圖3-7所示。

[插圖]

圖3-7 RDD的寬依賴

接下來可以從不同類型的轉換來進一步理解RDD的窄依賴和寬依賴的區(qū)別,如圖3-8所示。

對于map和filter形式的轉換來說,它們只是將Partition的數據根據轉換的規(guī)則進行轉化,并不涉及其他的處理,可以簡單地認為只是將數據從一個形式轉換到另一個形式。對于union,只是將多個RDD合并成一個,parent RDD的Partition(s)不會有任何的變化,可以認為只是把parent RDD的Partition(s)簡單進行復制與合并。對于join,如果每個Partition僅僅和已知的、特定的Partition進行join,那么這個依賴關系也是窄依賴。對于這種有規(guī)則的數據的join,并不會引入昂貴的Shuffle。對于窄依賴,由于RDD每個Partition依賴固定數量的parent RDD(s)的Partition(s),因此可以通過一個計算任務來處理這些Partition,并且這些Partition相互獨立,這些計算任務也就可以并行執(zhí)行了。

[插圖]

圖3-8 RDD的窄依賴和寬依賴

對于groupByKey,子RDD的所有Partition(s)會依賴于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的結果,因此這兩個R D D是不能通過一個計算任務來完成的。同樣,對于需要parent RDD的所有Partition進行join的轉換,也是需要Shuffle,這類join的依賴就是寬依賴而不是前面提到的窄依賴了。

所有的依賴都要實現trait Dependency[T]:

abstract class Dependency[T] extends Serializable {

def rdd: RDD[T]

}

其中rdd就是依賴的parent RDD。

對于窄依賴的實現是:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

//返回子RDD的partitionId依賴的所有的parent RDD的Partition(s)

def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd

}

現在有兩種窄依賴的具體實現,一種是一對一的依賴,即OneToOneDependency:

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int) = List(partitionId)

}

通過getParents的實現不難看出,RDD僅僅依賴于parent RDD相同ID的Partition。

還有一個是范圍的依賴,即RangeDependency,它僅僅被org.apache.spark.rdd. UnionRDD使用。UnionRDD是把多個RDD合成一個RDD,這些RDD是被拼接而成,即每個parent RDD的Partition的相對順序不會變,只不過每個parent RDD在UnionRDD中的Partition的起始位置不同。因此它的getPartents如下:

override def getParents(partitionId: Int) = {

if(partitionId >= outStart && partitionId < outStart + length) {

List(partitionId - outStart + inStart)

} else {

Nil

}

}

其中,inStart是parent RDD中Partition的起始位置,outStart是在UnionRDD中的起始位置,length就是parent RDD中Partition的數量。

寬依賴的實現只有一種:ShuffleDependency。子RDD依賴于parent RDD的所有Partition,因此需要Shuffle過程:

class ShuffleDependency[K, V, C](

@transient _rdd: RDD[_ <: Product2[K, V]],

val partitioner: Partitioner,

val serializer: Option[Serializer] = None,

val keyOrdering: Option[Ordering[K]] = None,

val aggregator: Option[Aggregator[K, V, C]] = None,

val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {


override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

//獲取新的shuffleId

val shuffleId: Int = _rdd.context.newShuffleId()

//向ShuffleManager注冊Shuffle的信息

val shuffleHandle: ShuffleHandle =

_rdd.context.env.shuffleManager.registerShuffle(

shuffleId, _rdd.partitions.size, this)


_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

寬依賴支持兩種Shuffle Manager,即org.apache.spark.shuffle.hash.HashShuffleManager(基于Hash的Shuffle機制)和org.apache.spark.shuffle.sort.SortShuffleManager(基于排序的Shuffle機制)。

3.3.2 DAG的生成

原始的RDD(s)通過一系列轉換就形成了DAG。RDD之間的依賴關系,包含了RDD由哪些Parent RDD(s)轉換而來和它依賴parent RDD(s)的哪些Partitions,是DAG的重要屬性。借助這些依賴關系,DAG可以認為這些RDD之間形成了Lineage (血統(tǒng))。借助Lineage,能保證一個RDD被計算前,它所依賴的parent RDD都已經完成了計算;同時也實現了RDD的容錯性,即如果一個RDD的部分或者全部的計算結果丟失了,那么就需要重新計算這部分丟失的數據。

那么Spark是如何根據DAG來生成計算任務呢?首先,根據依賴關系的不同將DAG劃分為不同的階段(Stage)。對于窄依賴,由于Partition依賴關系的確定性,Partition的轉換處理就可以在同一個線程里完成,窄依賴被Spark劃分到同一個執(zhí)行階段;對于寬依賴,由于Shuffle的存在,只能在parent RDD(s)Shuffle處理完成后,才能開始接下來的計算,因此寬依賴就是Spark劃分Stage的依據,即Spark根據寬依賴將DAG劃分為不同的Stage。在一個Stage內部,每個Partition都會被分配一個計算任務(Task),這些Task是可以并行執(zhí)行的。Stage之間根據依賴關系變成了一個大粒度的DAG,這個DAG的執(zhí)行順序也是從前向后的。也就是說,Stage只有在它沒有parent Stage或者parent Stage都已經執(zhí)行完成后,才可以執(zhí)行。這個過程可以查詢第4章。

3.3.3 Word Count的RDD轉換和DAG劃分的邏輯視圖

上文分析了在Word Count的RDD轉換時,Spark生成了不同的RDD。這些RDD有的和用戶邏輯直接顯式對應,比如map操作會生成一個org.apache.spark.rdd.Map-PartitionsRDD;而有的RDD則是和Spark的實現原理相關,是Spark隱式生成的,比如org.apache.spark.rdd.ShuffledRDD,這個過程對于用戶來說是透明的,用戶只需要關心RDD的轉換和動作即可。

RDD在創(chuàng)建子RDD的時候,會通過Dependency來定義它們之間的關系。通過Dependency,子RDD也可以獲得它的parent RDD和parent RDD的Partition。

RDD轉換的細節(jié)如圖3-9所示。

[插圖]

圖3-9“Word Count”的RDD轉換

通過圖3-9,可以清晰地看到Spark對于用戶提交的Application所做的處理。用戶定義的RDD被系統(tǒng)顯式和隱式地轉換成多個RDD以及這些RDD之間的依賴,這些依賴構建了這些RDD的處理順序及相互關系。關于這些RDD的轉換時如何在計算節(jié)點上運行的,請參閱第4章。

為了對圖3-9有更加直觀的理解,圖3-10以一個有五個分片的輸入文件為例,詳細描述了“Word Count”的邏輯執(zhí)行過程。之所以稱為邏輯執(zhí)行過程,是因為具體的計算過程可能會有網絡的交互,有頻繁地將處理中間數據寫入磁盤等過程。

[插圖]

圖3-10“Word Count”RDD的邏輯轉換關系圖

需要強調的一點是在轉換操作reduceByKey時會觸發(fā)一個Shuffle(洗牌)的過程。在Shuffle開始之前,有一個本地聚合的過程,比如第三個分片的(e,1)(e,1)聚合成了(e,2)。Shuffle的結果是為下游的Task生成了三個分片,這三個分片就構成了ShuffledRDD。之后在做了一個聚合之后,就生成了結果的RDD。關于Shuffle的具體實現過程,可以參閱第7章。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容