[spark] RDD解析

RDD(Resilient Distributed Dataset):彈性分布式數(shù)據(jù)集。

特性

  • A list of partitions (可分片)
  • A function for computing each split (compute func)
  • A list of dependencies on other RDDs (依賴)
  • A Partitioner for key-value RDDs (分片器,決定一條數(shù)據(jù)屬于某分片)
  • A list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (位置優(yōu)先)

Partition

  1. RDD能并行計(jì)算的原因就是Partition,一個(gè)RDD可有多個(gè)partition,每個(gè)partition一個(gè)task任務(wù),每個(gè)partition代表了該RDD一部分?jǐn)?shù)據(jù),分區(qū)內(nèi)部并不會(huì)存儲(chǔ)具體的數(shù)據(jù),訪問(wèn)數(shù)據(jù)時(shí)是通過(guò)partition的迭代器,iterator 可遍歷到所有數(shù)據(jù)。
  2. partition的個(gè)數(shù)需要視情況而定,RDD 可以通過(guò)創(chuàng)建操作或者轉(zhuǎn)換操作得到,轉(zhuǎn)換操作中,分區(qū)的個(gè)數(shù)會(huì)根據(jù)轉(zhuǎn)換操作對(duì)應(yīng)多個(gè) RDD 之間的依賴關(guān)系確定,窄依賴子 RDD 由父 RDD 分區(qū)個(gè)數(shù)決定,Shuffle 依賴由子 RDD 分區(qū)器決定,從集合中創(chuàng)建RDD時(shí)默認(rèn)個(gè)數(shù)為defaultParallelism,當(dāng)該值沒(méi)有設(shè)定時(shí):
    • 本地模式: conf.getInt("spark.default.parallelism", totalCores) // CPU cores
    • Mesos: conf.getInt("spark.default.parallelism", 8) // 8
    • Standalone&Yarn: conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  3. 特質(zhì)Partition只有一個(gè)返回index的方法,很多具體的 RDD 也會(huì)有自己實(shí)現(xiàn)的 partition。
  trait Partition extends Serializable { 
    def index: Int
    override def hashCode(): Int = index
    override def equals(other: Any): Boolean = super.equals(other)
  }

compute func

每個(gè)具體的RDD都得實(shí)現(xiàn)compute 方法,該方法接受的參數(shù)之一是一個(gè)Partition 對(duì)象,目的是計(jì)算該分區(qū)中的數(shù)據(jù)。
我們通過(guò)map方法來(lái)看具體的實(shí)現(xiàn):

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

調(diào)用map時(shí)都會(huì)new一個(gè)MapPartitionsRDD實(shí)例,并且接收一個(gè)方法作為參數(shù),該方法接收一個(gè)迭代器(后面會(huì)細(xì)講),對(duì)該RDD的map操作函數(shù)f將作用于這個(gè)迭代器的每一條數(shù)據(jù)。在MapPartitionsRDD中是通過(guò)compute方法來(lái)計(jì)算對(duì)應(yīng)分區(qū)的數(shù)據(jù):

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

這里將調(diào)用該RDD(MapPartitionsRDD)內(nèi)的第一個(gè)父 RDD 的 iterator 方法,該方的目的是拉取父 RDD 對(duì)應(yīng)分區(qū)內(nèi)的數(shù)據(jù)。iterator方法會(huì)返回一個(gè)迭代器,對(duì)應(yīng)的是父RDD計(jì)算完成的數(shù)據(jù),該迭代器將作為 f 方法的一個(gè)參數(shù),該f 方法就是上面提到的創(chuàng)建MapPartitionsRDD實(shí)例時(shí)傳入的方法。

其實(shí)RDD的compute方法也類似。接下來(lái)我們看看iterator方法究竟都做了什么事:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

RDD的iterator方法即遍歷對(duì)應(yīng)分區(qū)的數(shù)據(jù),先判斷改RDD的存儲(chǔ)級(jí)別若不為NONE,則說(shuō)明該數(shù)據(jù)已經(jīng)存在于緩存中,RDD 經(jīng)過(guò)持久化操作并經(jīng)歷了一次計(jì)算過(guò)程 ,可直接將數(shù)據(jù)返回。

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
    val blockId = RDDBlockId(id, partition.index)
    var readCachedBlock = true
    // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
      readCachedBlock = false
      computeOrReadCheckpoint(partition, context)
    }) match {
      case Left(blockResult) =>
        if (readCachedBlock) {
          val existingMetrics = context.taskMetrics().inputMetrics
          existingMetrics.incBytesRead(blockResult.bytes)
          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
            override def next(): T = {
              existingMetrics.incRecordsRead(1)
              delegate.next()
            }
          }
        } else {
          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
        }
       ...
    }
  }

通過(guò)RDD_id和partition_index唯一表示一個(gè)block,先從緩存中取數(shù)據(jù),也有可能取不到數(shù)據(jù)

  • 數(shù)據(jù)丟失
  • RDD 經(jīng)過(guò)持久化操作,但是是當(dāng)前分區(qū)數(shù)據(jù)是第一次被計(jì)算,因此會(huì)出現(xiàn)拉取得到數(shù)據(jù)為 None

取不到的時(shí)候則調(diào)用computeOrReadCheckpoint來(lái)獲取并加入緩存。
當(dāng)RDD的存儲(chǔ)級(jí)別若為NONE,則需要直接通過(guò)computeOrReadCheckpoint方法來(lái)計(jì)算。

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

該方法會(huì)先檢查當(dāng)前RDD是否被checkpoint,若是則直接從依賴的checkpointRDD中獲取迭代對(duì)象,若不是則需要通過(guò)compute方法計(jì)算。

dependency

RDD的容錯(cuò)機(jī)制就是通過(guò)dependency實(shí)現(xiàn)的,在外部成為血統(tǒng)(Lineage)關(guān)系,在源碼里面實(shí)為dependency,抽象類Dependency只有一個(gè)返回對(duì)應(yīng)RDD的方法。

abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

每個(gè)RDD都有一個(gè)返回其所依賴的dependences:Seq[Dependency[_]] 的dependencies方法,Dependency里面存的就是父RDD,遞歸RDD+遍歷這個(gè)dependences將可得到整個(gè)DAG。

依賴分為兩種,分別是窄依賴(Narrow Dependency)和 Shuffle 依賴(Shuffle Dependency,也稱即寬依賴)。在窄依賴中,父RDD的一個(gè)分區(qū)至多被一個(gè)子RDD的一個(gè)分區(qū)所依賴,分區(qū)數(shù)據(jù)不可被拆分:

在寬依賴中,父RDD的一個(gè)分區(qū)被子RDD的多個(gè)分區(qū)所依賴,分區(qū)數(shù)據(jù)被拆分:

一次轉(zhuǎn)換操作可同時(shí)包含窄依賴和寬依賴:


窄依賴的抽象類為NarrowDependency,對(duì)應(yīng)實(shí)現(xiàn)分別是 OneToOneDependency (一對(duì)一依賴)類和RangeDependency (范圍依賴)類。

一對(duì)一依賴表示子 RDD 分區(qū)的編號(hào)與父 RDD 分區(qū)的編號(hào)完全一致的情況,若兩個(gè) RDD 之間存在著一對(duì)一依賴,則子 RDD 的分區(qū)個(gè)數(shù)、分區(qū)內(nèi)記錄的個(gè)數(shù)都將繼承自父 RDD。

范圍依賴是依賴關(guān)系中的一個(gè)特例,只被用于表示 UnionRDD 與父 RDD 之間的依賴關(guān)系。

寬依賴的對(duì)應(yīng)實(shí)現(xiàn)為 ShuffleDependency 類,寬依賴支持兩種 Shuffle Manager,即 HashShuffleManager 和 SortShuffleManager

Partitioner

partitioner就是決定一條數(shù)據(jù)應(yīng)該數(shù)據(jù)哪個(gè)分區(qū)的分區(qū)器,但只有 k, v 類型的 RDD 才能有 partitioner,因?yàn)槎际怯善?k 來(lái)決定的。
特質(zhì) Partitioner提供了一個(gè)返回分區(qū)index的方法,通過(guò)傳入k及指定的分區(qū)個(gè)數(shù):

trait Partitioner { 
  def partition(key: Any, numPartitions: Int): Int
}

Spark 內(nèi)置了兩種分區(qū)器,分別是哈希分區(qū)器(Hash Partitioner)和范圍分區(qū)器(Range Partitioner)。

Hash Partitioner

我們來(lái)看HashPartitioner的定義,主要是getPartition方法,當(dāng)key為null時(shí)直接返回null

class HashPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }
  override def hashCode: Int = numPartitions
}

不為null時(shí)將調(diào)用Utils的nonNegativeMod方法,即將key的hashcode和mod取余,若結(jié)果為正,則返回該結(jié)果;若結(jié)果為負(fù),返回結(jié)果加上 mod。

 def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

RangePartitioner

在key的hashcode分布不均的情況下會(huì)到導(dǎo)致通過(guò)HashPartitioner分出來(lái)的分區(qū)數(shù)據(jù)傾斜不均勻,這是就需要用到RangePartitioner分區(qū)器,該分區(qū)器運(yùn)行速度相對(duì)HashPartitioner較慢,原理復(fù)雜。

HashPartitioner會(huì)將一個(gè)范圍的key直接映射到一個(gè)partition,也就是一個(gè)partition的key一定比另一個(gè)partition的key都大或者都小,而怎么具體劃分這個(gè)范圍的邊界成為關(guān)鍵,既要保證分布均勻又要減少遍歷次數(shù)。具體實(shí)現(xiàn)可見 Spark分區(qū)器HashPartitioner和RangePartitioner代碼詳解

preferred locations

每個(gè)具體的RDD實(shí)例都需要實(shí)現(xiàn)自己的getPreferredLocations方法,RDD位置優(yōu)先即返回partition的存儲(chǔ)位置,該位置和spark的任務(wù)調(diào)度有關(guān),盡量將計(jì)算移到該partition對(duì)應(yīng)的地方。
以從Hadoop中讀取數(shù)據(jù)生成RDD為例,preferredLocations返回每一個(gè)數(shù)據(jù)塊所在的機(jī)器名或者IP地址,如果每一個(gè)數(shù)據(jù)塊是多份存儲(chǔ)的(HDFS副本數(shù)),那么就會(huì)返回多個(gè)機(jī)器地址。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容