Spark的編程核心RDD的實(shí)現(xiàn)詳解

一.什么是RDD

RDD是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset),RDD是只讀的、 分區(qū)記錄的集合。 RDD只能基于在穩(wěn)定物理存儲(chǔ)中的數(shù)據(jù)集和其他已有的RDD上執(zhí)行確定性操作來創(chuàng)建。 這些確定性操作稱為轉(zhuǎn)換, 如map、 filter、 groupBy、 join。RDD含有如何從其他RDD衍生(即計(jì)算)出本RDD的相關(guān)信息(即Lineage) , 因此在RDD部分分區(qū)數(shù)據(jù)丟失的時(shí)候可以從物理存儲(chǔ)的數(shù)據(jù)計(jì)算出相應(yīng)的RDD分區(qū)。

每個(gè)RDD有5個(gè)主要的特點(diǎn):
1.RDD由一系列Partition組成
對(duì)于RDD來說, 每個(gè)分區(qū)都會(huì)被一個(gè)計(jì)算任務(wù)處理, 并決定并行計(jì)算的粒度。 用戶可以在創(chuàng)建RDD時(shí)指定RDD的分區(qū)個(gè)數(shù), 如果沒有指定, 那么就會(huì)采用默認(rèn)值。 默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。在分區(qū)存儲(chǔ)的計(jì)算模型中,每個(gè)分配的存儲(chǔ)是由BlockManager實(shí)現(xiàn)的。 每個(gè)分區(qū)都會(huì)被邏輯映射成BlockManager的一個(gè)Block, 而這個(gè)Block會(huì)被一個(gè)Task負(fù)責(zé)計(jì)算。

2.算子函數(shù)是作用在Partition上的
Spark中RDD的計(jì)算是以Partition為單位的, 每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合, 不需要保存每次計(jì)算的結(jié)果。(即獲取父RDD的迭代器,然后將自定義的函數(shù)作用在該迭代器迭代出的每一條數(shù)據(jù))。

3.RDD之間有依賴關(guān)系
RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD, 所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。 在部分分區(qū)數(shù)據(jù)丟失時(shí), Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù), 而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

4.分區(qū)器作用在K-V 格式RDD上
當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù), 一個(gè)是基于哈希的HashPartitioner, 另外一個(gè)是基于范圍的RangePartitioner。 只有對(duì)于key-value的RDD, 才會(huì)有Partitioner, 非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量, 也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

5.每個(gè)Partition對(duì)外提供最佳計(jì)算位置(preferred location)
一個(gè)列表, 存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location) 。 對(duì)于一個(gè)HDFS文件來說, 這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。 按照“計(jì)算向數(shù)據(jù)移動(dòng)”的理念, Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候, 會(huì)盡可能地將計(jì)算任務(wù)分配到
其所要處理數(shù)據(jù)塊的存儲(chǔ)位置,(數(shù)據(jù)本地化級(jí)別)。

二.如何創(chuàng)建RDD

可通過以下幾種方式創(chuàng)建RDD:

  • 通過讀取外部數(shù)據(jù)集 (本地文件系統(tǒng)/HDFS/Cassandra/HBase/...)
  • 通過一個(gè)已經(jīng)存在的Scala集合創(chuàng)建(List/Set/...)
  • 通過已有的RDD生成新的RDD

三.Spark對(duì)RDD操作方式

Spark對(duì)RDD的算子分為三種,即轉(zhuǎn)換算子(Transformation)和行動(dòng)算子(Action)和控制算子。

1.轉(zhuǎn)換算子
不觸發(fā)實(shí)際計(jì)算,從現(xiàn)有的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集,返回一個(gè)新的RDD,例如對(duì)數(shù)據(jù)的匹配操作map和過濾操作filter,惰性求值。

2.動(dòng)作算子
會(huì)觸發(fā)實(shí)際計(jì)算,即在數(shù)據(jù)集上進(jìn)行計(jì)算后,會(huì)向Driver程序驅(qū)動(dòng)器返回結(jié)果或?qū)⒔Y(jié)果寫到外部系統(tǒng)。

如何區(qū)別兩種算子?
看返回值類型,返回RDD類型的為轉(zhuǎn)換操作,返回其他數(shù)據(jù)類型的是行動(dòng)操作。

3.控制算子
如persist,cache和checkpoint這三種算子,可以用來做緩存或者持久化,復(fù)用RDD時(shí)避免重復(fù)計(jì)算,或者在應(yīng)用崩潰時(shí)恢復(fù)。

惰性求值?
RDD中的所有轉(zhuǎn)換都是惰性的, 也就是說, 它們并不會(huì)直接計(jì)算結(jié)果。 相反的, 它們只是記住這些應(yīng)用到最原始數(shù)據(jù)集上的轉(zhuǎn)換操作。 只有當(dāng)調(diào)用行動(dòng)算子(Action)返回結(jié)果給Driver的動(dòng)作時(shí), 這些轉(zhuǎn)換才會(huì)真正運(yùn)行。 這個(gè)設(shè)計(jì)讓Spark更加有效率地運(yùn)行。

為何會(huì)有惰性求值?
如果每經(jīng)過一次轉(zhuǎn)換操作都觸發(fā)計(jì)算,將會(huì)有系統(tǒng)負(fù)擔(dān),而惰性求值會(huì)將多個(gè)轉(zhuǎn)換操作合并到一起,抵消不必要的步驟后,在最后必要的時(shí)才進(jìn)行運(yùn)算,獲得性能的提升同時(shí)又減輕系統(tǒng)運(yùn)算負(fù)擔(dān)。

Transformation操作

函數(shù)名 目的 示例 結(jié)果
map(f) 將函數(shù)應(yīng)用于每一個(gè)元素中,返回值構(gòu)成新的RDD rdd.map(x=>x+1) {2,3,4,4}
flatMap(f) 將函數(shù)應(yīng)用于每一個(gè)元素中,并把元素中迭代器內(nèi)所有內(nèi)容一并生成新的RDD,常用于切分單詞 rdd.flatMap(x=>x.to(3)) {1,2,3,,2,3,3,3}
filter(f) 過濾元素 rdd.filter(x=>x!=1) {2,3,3}
distinct() 元素去重 rdd.distinct() {1,2,3}
sample( withReplacement, fraction , [seed] ) 元素采樣,以及是否需要替換 rdd.sample(false,0.5) 不確定值,不確定數(shù)目
union(rdd) 合并兩個(gè)RDD所有元素(不去重) rdd1.union(rdd2) {1,2,3,3,4,5}
intersection(rdd) 求兩個(gè)RDD的交集 rdd1.intersection(rdd2) {3}
substract(rdd) 移除在RDD2中存在的RDD1元素 rdd1.substract(rdd2) {1,2}
cartesian(rdd) 求兩個(gè)RDD的笛卡爾積 rdd1.cartesian(rdd2) {(1,3),(1,4),(1,5)...(3,5)}

Action操作

函數(shù)名 目的 示例 結(jié)果
collect() 收集并返回RDD中所有元素 rdd.collect() {1,2,3,3}
count() RDD中元素的個(gè)數(shù) rdd.count() 4
countByValue() 各元素出現(xiàn)的個(gè)數(shù) rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 從RDD中返回num個(gè)元素 rdd.take(2) {1,2}
top(num) 返回最前面的num個(gè)元素 rdd.take(2) {3,3}
takeOrdered(num,[ordering]) 按提供的順序返回前num個(gè)元素 rdd.takeOrdered(2,[myOrdering]) {3,3}
takeSample(withReplacement, num ,[seed]) 返回任意元素 takeSample(false,1) 不確定值
reduce(f) 并行整合RDD中所有元素,返回一個(gè)同一類型元素 rdd.reduce((x,y) => x+y ) 9
fold(zeroValue)(f) 與reduce一樣,不過需要提供初始值 rdd.fold(0)((x,y) => x+y ) 9
aggregate(zeroValue)(seqOp , combOp) 與reduce相似,不過返回不同類型的元素 rdd. aggregate(( 0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) {9,4}
foreach(f) 給每個(gè)元素使用給定的函數(shù),結(jié)果不需發(fā)回本地 rdd.foreach(f)

四.RDD的持久化(緩存)

Spark速度非??斓脑蛑唬?就是在不同操作中在內(nèi)存中持久化(或緩存) 一個(gè)數(shù)據(jù)集。 當(dāng)持久化一個(gè)RDD后, 每一個(gè)節(jié)點(diǎn)
都將把計(jì)算的分片結(jié)果保存在內(nèi)存中, 并在對(duì)此數(shù)據(jù)集(或者衍生出的數(shù)據(jù)集) 進(jìn)行的其他動(dòng)作(action) 中重用。 這使得后
續(xù)的動(dòng)作變得更加迅速(通常快10倍) 。 RDD相關(guān)的持久化和緩存, 是Spark最重要的特征之一。 可以說, 緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵。

出于不同目的和場景需求,我們可選擇的持久化級(jí)別有:

級(jí)別 使用空間 CPU時(shí)間 是否在內(nèi)存中 是否在磁盤上
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 部分 部分
MEMORY_AND_DISK_SER 部分 部分
DISK_ONLY

我們可以通過persist() 或cache() 方法可以標(biāo)記一個(gè)要被持久化的RDD, 一旦首次被觸發(fā), 該RDD將會(huì)被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中并重用。

persist的源碼實(shí)現(xiàn)如下

 /**
   * Set this RDD's storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet. Local checkpointing is an exception.
   */
  def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

代碼示例

scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rdd1 = rdd.map(_+5)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:26

scala> rdd1 = rdd1.persist(StorageLevel.MEMORY_ONLY)
res2: rdd1.type = MapPartitionsRDD[6] at map at <console>:26

scala> rdd1.reduce(_+_)
res3: Int = 40

scala> rdd1.count()
res4: Long = 5

scala> rdd1.first()
res5: Int = 6

scala> rdd1.unpersist()
res7: rdd1.type = MapPartitionsRDD[6] at map at <console>:26

如果要緩存的數(shù)據(jù)太多,內(nèi)存放不下,Spark會(huì)自動(dòng)使用LRU(最近最小使用)的緩存策略把最老的分區(qū)從內(nèi)存中移除。同時(shí)緩存有可能丟失,RDD的緩存的容錯(cuò)機(jī)制保證了即使緩存丟失也能保證計(jì)算的正確執(zhí)行。 通過基于RDD的一系列的轉(zhuǎn)換, 丟失的數(shù)據(jù)會(huì)被重算。 RDD的各個(gè)Partition是相對(duì)獨(dú)立的, 因此只需要計(jì)算丟失的部分即可, 并不需要重算全部Partition。

最后,可調(diào)用rdd.unpersist()方法手動(dòng)移除RDD緩存。

五.RDD之間的依賴關(guān)系

Spark會(huì)根據(jù)用戶提交的計(jì)算邏輯中的RDD的轉(zhuǎn)換和動(dòng)作來生成RDD之間的依賴關(guān)系, RDD之間的關(guān)系可以從兩個(gè)維度來理解:

  • RDD是從哪些RDD轉(zhuǎn)換而來, 也就是RDD的parent RDD是什么
  • RDD中的Partition,依賴于parent RDD中 的哪些Partition

根據(jù)依賴于parentRDD的Partitions的不同情況, Spark將這種依賴分為兩種, 一種是寬依賴, 一種是窄依賴。

  • 窄依賴指的是子RDD依賴于父RDD中固定的Partitions
  • 寬依賴指的是子RDD對(duì)父RDD中的所有partition都有依賴,或者說依賴于父RDD的數(shù)量不能明確
窄依賴和寬依賴

從上面的圖中我們可以理解下這兩種依賴關(guān)系之間的區(qū)別

對(duì)于map和filter形式的轉(zhuǎn)換來說, 它們只是將Partition的數(shù)據(jù)根據(jù)轉(zhuǎn)換的規(guī)則進(jìn)行轉(zhuǎn)化, 并不涉及其他的處理, 可以簡單地認(rèn)為
只是將數(shù)據(jù)從一個(gè)形式轉(zhuǎn)換到另一個(gè)形式。

對(duì)于union, 只是將多個(gè)RDD合并成一個(gè),parent RDD的Partition不會(huì)有任何的變化, 可以認(rèn)為只是把parent RDD的Partition 簡單進(jìn)行復(fù)制與合并。

對(duì)于join, 如果每個(gè)Partition僅僅和已知的、 特定的Partition進(jìn)行join, 那么這個(gè)依賴關(guān)系也是窄依賴。 對(duì)于這種有規(guī)則的數(shù)據(jù)的join, 并不會(huì)引入昂貴的Shuffle。 對(duì)于窄依賴, 由于RDD每個(gè)Partition依賴固定數(shù)量的parent RDD的Partition, 因此可以通過一個(gè)計(jì)算任務(wù)來處理這些Partition, 并且這些Partition相互獨(dú)立, 這些計(jì)算任務(wù)也就可以并行執(zhí)行了。

對(duì)于groupByKey, 子RDD的所有Partition會(huì)依賴于parent RDD的所有Partition, 子RDD的Partition是parent RDD的所有Partition Shuffle的結(jié)果, 因此這兩個(gè)RDD是不能通過一個(gè)計(jì)算任務(wù)來完成的。 同樣, 對(duì)于需要parent RDD的所有Partition進(jìn)行join的轉(zhuǎn)換, 也是需要Shuffle, 這類join的依賴就是寬依賴而不是前面提到的窄依賴了。

五.RDD依賴關(guān)系的具體代碼實(shí)現(xiàn)

RDD的依賴關(guān)系繼承圖

Spark中對(duì)應(yīng)窄依賴的的抽象類為NarrowDependency,具體實(shí)現(xiàn)有兩種。

一種是一對(duì)一的依賴, 即OneToOneDependency:

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
     override def getParents(partitionId: Int) = List(partitionId)
}

通過OneToOneDependency的源碼中的getParents的實(shí)現(xiàn)不難看出, RDD僅僅依賴于parent RDD相同ID的Partition。

還有一個(gè)是范圍的依賴, 即RangeDependency:
它僅僅被UnionRDD使用,UnionRDD是把多個(gè)RDD合成一個(gè)RDD,這些RDD是被拼接而成, 即每個(gè)parent RDD的Partition的相對(duì)順序不會(huì)變, 只不過每個(gè)parent RDD在UnionRDD中的Partition的起始位置不同。 因此它的getPartents如下:

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    // inStart是parent RDD中Partition的起始位置
    // outStart是在UnionRDD中的起始位置
    // length就是parent RDD中Partition的數(shù)量。
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

寬依賴的實(shí)現(xiàn)只有一種,ShuffleDependency的實(shí)現(xiàn)相對(duì)前面幾種較為復(fù)雜,會(huì)在后續(xù)的文章中詳細(xì)講解...

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

六.區(qū)分兩種依賴的作用

1.劃分 Stage
根據(jù)RDD之間的依賴關(guān)系將DAG圖劃分為不同的階段Stage( Stage之間的依賴關(guān)系可以認(rèn)為就是Lineage)。

對(duì)于窄依賴,由于partition依賴關(guān)系的確定性,partition的轉(zhuǎn)換處理就可以在同一個(gè)線程里完成,窄依賴就被spark劃分到同一個(gè)stage中。

而對(duì)于寬依賴,只能等父RDD shuffle處理完成后,下一個(gè)stage才能開始接下來的計(jì)算,因此寬依賴要單獨(dú)劃分一個(gè)Stage。

Stage 之間做 shuffle,Stage 之內(nèi)做 pipeline(流水線)。方便stage內(nèi)優(yōu)化。

2.解決數(shù)據(jù)容錯(cuò)的高效性

假如某個(gè)節(jié)點(diǎn)出故障了,窄依賴只需重新計(jì)算丟失RDD分區(qū)的父分區(qū),而且不同節(jié)點(diǎn)之間可以并行計(jì)算;而對(duì)于一個(gè)寬依賴關(guān)系的Lineage圖,單個(gè)節(jié)點(diǎn)失效可能導(dǎo)致這個(gè)RDD的所有父RDD都要進(jìn)行重新計(jì)算。

七.RDD的檢查點(diǎn)(checkpoint)機(jī)制

RDD的緩存能夠在第一次計(jì)算完成后,將計(jì)算結(jié)果保存到內(nèi)存、本地文件系統(tǒng)或者Tachyon中。通過緩存,Spark避免了RDD上的重復(fù)計(jì)算,能夠極大地提升計(jì)算速度。但是,如果緩存丟失了,則需要重新計(jì)算。如果計(jì)算特別復(fù)雜或者計(jì)算耗時(shí)特別多,那么緩存丟失對(duì)于整個(gè)Job的影響是不容忽視的。

為了避免緩存丟失重新計(jì)算帶來的開銷,Spark又引入了檢查點(diǎn)(checkpoint)機(jī)制。

緩存是在計(jì)算結(jié)束后,直接將計(jì)算結(jié)果寫入不同的介質(zhì)。而檢查點(diǎn)不同,它是在計(jì)算完成后,為數(shù)據(jù)創(chuàng)建一個(gè)目錄,并且將計(jì)算結(jié)果寫入新創(chuàng)建的目錄,之后重新建立一個(gè)Job來計(jì)算。接著創(chuàng)建一個(gè)CheckpointRDD,RDD變成CheckPointRDD后,前邊的所有RDD依賴都會(huì)被移除。這就意味著RDD的轉(zhuǎn)換的計(jì)算鏈(compute chain) 等信息都被清除。

一般推薦先將RDD緩存,這樣就能保證檢查點(diǎn)的操作可以快速完成。

設(shè)置檢查點(diǎn):

//設(shè)置檢查點(diǎn)目錄 存儲(chǔ)在HDFS上,并使用checkpoint設(shè)置檢查點(diǎn),該操作屬于懶加載
sc.setCheckpointDir("hdfs://xxxx:9000/checkpoint/")
rdd.checkpoint()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容