[Spark學習] Spark RDD詳解

什么是RDD

RDD(Resilient Distributed Datasets),彈性分布式數(shù)據(jù)集,是Spark的基本數(shù)據(jù)結構。
它是一個不可變分布式對象集合。
RDD中的每個數(shù)據(jù)集被劃分為邏輯分區(qū),其可以在集群的不同節(jié)點上計算
RDD可以包含任何類型的Python,Java或Scala對象,包括用戶定義的類。

形式上,RDD是只讀的 分區(qū) 記錄集合。 可以通過讀取外部存儲系統(tǒng)中的數(shù)據(jù)集(如HDFS,HBase或提供Hadoop輸入格式的任何數(shù)據(jù)源等)、轉換現(xiàn)有數(shù)據(jù)集合或對其他RDD的數(shù)據(jù)進行轉換來創(chuàng)建RDD。
RDD是一個支持容錯集合,可以并行操作。

RDD的主要屬性

從RDD的內(nèi)部定義來看,每個RDD擁有以下五個主要屬性:

  • 分區(qū)列表
  • 與其他RDD的依賴關系列表
  • 計算分片(split)的函數(shù)
  • (可選) 鍵值RDD中的分區(qū)器Partitioner (例如,hash-partitioner)
  • (可選) 用于計算每個分片的的優(yōu)選位置列表 (例如,HDFS文件的block位置)

RDD的組成

RDD主要由以下四部分組成

  • 分區(qū)(Partitions):數(shù)據(jù)集的原子片段。 每個計算節(jié)點含有一個或多個分區(qū)。
  • 依賴關系(Dependencies):RDD的每個分區(qū)計算時依賴哪些父RDD的分區(qū)(如下圖)
  • 函數(shù)/算子(Functions): 基于其父RDD的用于計算數(shù)據(jù)集的函數(shù)。
  • 元數(shù)據(jù)(Metadata): RDD的分區(qū)方案和數(shù)據(jù)的存儲放置。


    Credit: https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies

    Credit: https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies

RDD的分區(qū)(Partition)

RDD中的數(shù)據(jù)被存儲在多個分區(qū)中。

RDD分區(qū)的特征
  • 分區(qū)永遠不會跨越多臺機器,即同一分區(qū)中的數(shù)據(jù)始終保證在同一臺機器上。
  • 群集中的每個節(jié)點包含一個或多個分區(qū)。
  • 分區(qū)的數(shù)目是可以設置的。 默認情況下,它等于所有執(zhí)行程序節(jié)點上的核心總數(shù)。 例如。 6個工作節(jié)點,每個具有4個核心,RDD將被劃分為24個分區(qū)。
RDD分區(qū)與任務執(zhí)行的關系

在Map階段partition數(shù)目保持不變。
在Reduce階段,RDD的聚合會觸發(fā)shuffle操作,聚合后的RDD的partition數(shù)目跟具體操作有關,例如repartition操作會聚合成指定分區(qū)數(shù),還有一些算子是可配置的。

RDD分區(qū)數(shù)的調整可以通過以下兩個函數(shù)完成:

  • repartition
    repartition函數(shù)相當于coalesce(numPartitions, shuffle = True), 不僅可以調整分區(qū)數(shù)目(增加或減少),也可以將partitioner調整為hash-partitioner,產(chǎn)生shuffle操作
  • coalesce
    coalesce函數(shù)可以控制是否shuffle,但是shuffle為False時,只能減少分區(qū)數(shù),無法增大。

RDD在計算的時候,每個分區(qū)都會啟動一個task,RDD的分區(qū)數(shù)目決定了總的task數(shù)目。

申請的Executor數(shù)和Executor的CPU核數(shù),決定了你同一時刻可以并行執(zhí)行的task數(shù)量。

這里我們舉個例子來加深對RDD分區(qū)數(shù)量與task執(zhí)行的關系的理解</font></b>

比如的RDD有100個分區(qū),那么計算的時候就會生成100個task,你的資源配置為10個計算節(jié)點,每個兩2個核,同一時刻可以并行的task數(shù)目為20,計算這個RDD就需要5個輪次。如果計算資源不變,你有101個task的話,就需要6個輪次,在最后一輪中,只有一個task在執(zhí)行,其余核都在空轉。

<font color=black>partition數(shù)量太少會造成資源利用不夠充分。
例如,在資源不變的情況,你的RDD只有10個分區(qū),那么同一時刻只有10個task運行,其余10個核將空轉。

通常在spark調優(yōu)中,可以增大RDD分區(qū)數(shù)目來增大任務并行度。

但是partition數(shù)量太多則會造成task過多,task的傳輸/序列化開銷增大,也可能會造成輸出過多的(小)文件。

<<b>spark.default.parallelism</b> 和 <b>spark.sql.shuffle.partitions</b> 這兩個參數(shù)很重要

RDD的分區(qū)器(Partitioner)

Spark中提供兩種分區(qū)器:

  • 散列分區(qū) Hash partitioning
  • 范圍分區(qū) Range partitioning

<font color=black>只有PairRDD支持自定義分區(qū)器。

RDD的邏輯執(zhí)行計劃(Lineage)

RDD Lineage,又叫做RDD運算符圖或RDD依賴圖,是包含一個子RDD的所有父RDD的圖。每當我們執(zhí)行RDD轉換(transformation)操作,就會產(chǎn)生RDD Lineage并用于創(chuàng)建 邏輯執(zhí)行計劃。

Spark stages的DAG的執(zhí)行稱作 物理執(zhí)行計劃

邏輯執(zhí)行計劃從最初始的RDD (不依賴于其他RDD或引用緩存數(shù)據(jù)的RDD)開始,以調用可以產(chǎn)生RDD結果的action算子結束。

使用toDebugString函數(shù)可以顯示RDD Lineage

RDD Lineage是Spark中容錯的關鍵
我們可以通過RDD Lineage,追溯到丟失分區(qū)的父RDD,然后根據(jù)父RDD重新計算丟失分區(qū),使其從故障中恢復。

RDD的依賴關系(Dependencies)

RDD的每一個Transformation操作都會生成一個新的RDD,所以RDD之間就會形成類似流水線的前后依賴關系;在Spark中,RDD之間存在兩種類型的依賴關系:窄依賴(Narrow Dependency)和寬依賴(Wide Dependency);


Credit:https://github.com/rohgar/scala-spark-4/wiki/Wide-vs-Narrow-Dependencies
窄依賴(Narrow Dependency)

窄依賴是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操作都會產(chǎn)生窄依賴;

對于窄依賴,由于partition依賴關系的確定性,partition的轉換處理就可以在同一個線程里完成,這種轉換不會引起shuffle操作,速度快!

寬依賴(Wide Dependency)

寬依賴是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產(chǎn)生寬依賴;

這種轉換會引起shuffle操作,速度慢!

Shuffle是MapReduce框架中的一個特定的phase,介于Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希,并且分發(fā)到每一個Reducer上去,這個過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。

RDD與Task/Stage的關系

Task

Task是Spark中最小的任務執(zhí)行單元,每個RDD的transformation操作都會被翻譯成相應的task,分配到相應的executor節(jié)點上對相應的partition執(zhí)行。

credit: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-DAGScheduler-Stage.html

RDD在計算的時候,每個分區(qū)都會啟動一個task,RDD的分區(qū)數(shù)目決定了總的task數(shù)目。

Task的類型分為2種:ShuffleMapTask和ResultTask;
簡單來說,DAG的最后一個階段會為每個結果的partition生成一個ResultTask,即每個Stage里面的Task的數(shù)量是由該Stage中最后一個RDD的Partition的數(shù)量所決定的。

Stage

Stage是程序執(zhí)行時的物理,是物理執(zhí)行計劃中的一個步驟。
Stage由一組有narrow transformation(無要shuffle)構成的task組成, 不需要在節(jié)點間傳輸數(shù)據(jù),可以被高效的執(zhí)行。
一個Stage只能在單個RDD的分區(qū)上工作。

Stage的類型分為2種:

  • ShuffleMapStage
  • ResultStage

參考來源:
Wide vs Narrow Dependencies
Mastering Apache Spark
Partitioning
spark學習之RDD來源解密

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容