RDD(Resilient Distributed Dataset):彈性分布式數(shù)據(jù)集。
特性
- A list of partitions (可分片)
- A function for computing each split (compute func)
- A list of dependencies on other RDDs (依賴)
- A Partitioner for key-value RDDs (分片器,決定一條數(shù)據(jù)屬于某分片)
- A list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (位置優(yōu)先)
Partition
- RDD能并行計(jì)算的原因就是Partition,一個(gè)RDD可有多個(gè)partition,每個(gè)partition一個(gè)task任務(wù),每個(gè)partition代表了該RDD一部分?jǐn)?shù)據(jù),分區(qū)內(nèi)部并不會(huì)存儲(chǔ)具體的數(shù)據(jù),訪問(wèn)數(shù)據(jù)時(shí)是通過(guò)partition的迭代器,iterator 可遍歷到所有數(shù)據(jù)。
- partition的個(gè)數(shù)需要視情況而定,RDD 可以通過(guò)創(chuàng)建操作或者轉(zhuǎn)換操作得到,轉(zhuǎn)換操作中,分區(qū)的個(gè)數(shù)會(huì)根據(jù)轉(zhuǎn)換操作對(duì)應(yīng)多個(gè) RDD 之間的依賴關(guān)系確定,窄依賴子 RDD 由父 RDD 分區(qū)個(gè)數(shù)決定,Shuffle 依賴由子 RDD 分區(qū)器決定,從集合中創(chuàng)建RDD時(shí)默認(rèn)個(gè)數(shù)為defaultParallelism,當(dāng)該值沒(méi)有設(shè)定時(shí):
- 本地模式: conf.getInt("spark.default.parallelism", totalCores) // CPU cores
- Mesos: conf.getInt("spark.default.parallelism", 8) // 8
- Standalone&Yarn: conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
- 特質(zhì)Partition只有一個(gè)返回index的方法,很多具體的 RDD 也會(huì)有自己實(shí)現(xiàn)的 partition。
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
compute func
每個(gè)具體的RDD都得實(shí)現(xiàn)compute 方法,該方法接受的參數(shù)之一是一個(gè)Partition 對(duì)象,目的是計(jì)算該分區(qū)中的數(shù)據(jù)。
我們通過(guò)map方法來(lái)看具體的實(shí)現(xiàn):
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
調(diào)用map時(shí)都會(huì)new一個(gè)MapPartitionsRDD實(shí)例,并且接收一個(gè)方法作為參數(shù),該方法接收一個(gè)迭代器(后面會(huì)細(xì)講),對(duì)該RDD的map操作函數(shù)f將作用于這個(gè)迭代器的每一條數(shù)據(jù)。在MapPartitionsRDD中是通過(guò)compute方法來(lái)計(jì)算對(duì)應(yīng)分區(qū)的數(shù)據(jù):
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
這里將調(diào)用該RDD(MapPartitionsRDD)內(nèi)的第一個(gè)父 RDD 的 iterator 方法,該方的目的是拉取父 RDD 對(duì)應(yīng)分區(qū)內(nèi)的數(shù)據(jù)。iterator方法會(huì)返回一個(gè)迭代器,對(duì)應(yīng)的是父RDD計(jì)算完成的數(shù)據(jù),該迭代器將作為 f 方法的一個(gè)參數(shù),該f 方法就是上面提到的創(chuàng)建MapPartitionsRDD實(shí)例時(shí)傳入的方法。
其實(shí)RDD的compute方法也類似。接下來(lái)我們看看iterator方法究竟都做了什么事:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
RDD的iterator方法即遍歷對(duì)應(yīng)分區(qū)的數(shù)據(jù),先判斷改RDD的存儲(chǔ)級(jí)別若不為NONE,則說(shuō)明該數(shù)據(jù)已經(jīng)存在于緩存中,RDD 經(jīng)過(guò)持久化操作并經(jīng)歷了一次計(jì)算過(guò)程 ,可直接將數(shù)據(jù)返回。
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead of sc.env.
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
...
}
}
通過(guò)RDD_id和partition_index唯一表示一個(gè)block,先從緩存中取數(shù)據(jù),也有可能取不到數(shù)據(jù)
- 數(shù)據(jù)丟失
- RDD 經(jīng)過(guò)持久化操作,但是是當(dāng)前分區(qū)數(shù)據(jù)是第一次被計(jì)算,因此會(huì)出現(xiàn)拉取得到數(shù)據(jù)為 None
取不到的時(shí)候則調(diào)用computeOrReadCheckpoint來(lái)獲取并加入緩存。
當(dāng)RDD的存儲(chǔ)級(jí)別若為NONE,則需要直接通過(guò)computeOrReadCheckpoint方法來(lái)計(jì)算。
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
該方法會(huì)先檢查當(dāng)前RDD是否被checkpoint,若是則直接從依賴的checkpointRDD中獲取迭代對(duì)象,若不是則需要通過(guò)compute方法計(jì)算。
dependency
RDD的容錯(cuò)機(jī)制就是通過(guò)dependency實(shí)現(xiàn)的,在外部成為血統(tǒng)(Lineage)關(guān)系,在源碼里面實(shí)為dependency,抽象類Dependency只有一個(gè)返回對(duì)應(yīng)RDD的方法。
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
每個(gè)RDD都有一個(gè)返回其所依賴的dependences:Seq[Dependency[_]] 的dependencies方法,Dependency里面存的就是父RDD,遞歸RDD+遍歷這個(gè)dependences將可得到整個(gè)DAG。
依賴分為兩種,分別是窄依賴(Narrow Dependency)和 Shuffle 依賴(Shuffle Dependency,也稱即寬依賴)。在窄依賴中,父RDD的一個(gè)分區(qū)至多被一個(gè)子RDD的一個(gè)分區(qū)所依賴,分區(qū)數(shù)據(jù)不可被拆分:

在寬依賴中,父RDD的一個(gè)分區(qū)被子RDD的多個(gè)分區(qū)所依賴,分區(qū)數(shù)據(jù)被拆分:

一次轉(zhuǎn)換操作可同時(shí)包含窄依賴和寬依賴:

窄依賴的抽象類為NarrowDependency,對(duì)應(yīng)實(shí)現(xiàn)分別是 OneToOneDependency (一對(duì)一依賴)類和RangeDependency (范圍依賴)類。
一對(duì)一依賴表示子 RDD 分區(qū)的編號(hào)與父 RDD 分區(qū)的編號(hào)完全一致的情況,若兩個(gè) RDD 之間存在著一對(duì)一依賴,則子 RDD 的分區(qū)個(gè)數(shù)、分區(qū)內(nèi)記錄的個(gè)數(shù)都將繼承自父 RDD。
范圍依賴是依賴關(guān)系中的一個(gè)特例,只被用于表示 UnionRDD 與父 RDD 之間的依賴關(guān)系。
寬依賴的對(duì)應(yīng)實(shí)現(xiàn)為 ShuffleDependency 類,寬依賴支持兩種 Shuffle Manager,即 HashShuffleManager 和 SortShuffleManager
Partitioner
partitioner就是決定一條數(shù)據(jù)應(yīng)該數(shù)據(jù)哪個(gè)分區(qū)的分區(qū)器,但只有 k, v 類型的 RDD 才能有 partitioner,因?yàn)槎际怯善?k 來(lái)決定的。
特質(zhì) Partitioner提供了一個(gè)返回分區(qū)index的方法,通過(guò)傳入k及指定的分區(qū)個(gè)數(shù):
trait Partitioner {
def partition(key: Any, numPartitions: Int): Int
}
Spark 內(nèi)置了兩種分區(qū)器,分別是哈希分區(qū)器(Hash Partitioner)和范圍分區(qū)器(Range Partitioner)。
Hash Partitioner
我們來(lái)看HashPartitioner的定義,主要是getPartition方法,當(dāng)key為null時(shí)直接返回null
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
不為null時(shí)將調(diào)用Utils的nonNegativeMod方法,即將key的hashcode和mod取余,若結(jié)果為正,則返回該結(jié)果;若結(jié)果為負(fù),返回結(jié)果加上 mod。
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
RangePartitioner
在key的hashcode分布不均的情況下會(huì)到導(dǎo)致通過(guò)HashPartitioner分出來(lái)的分區(qū)數(shù)據(jù)傾斜不均勻,這是就需要用到RangePartitioner分區(qū)器,該分區(qū)器運(yùn)行速度相對(duì)HashPartitioner較慢,原理復(fù)雜。
HashPartitioner會(huì)將一個(gè)范圍的key直接映射到一個(gè)partition,也就是一個(gè)partition的key一定比另一個(gè)partition的key都大或者都小,而怎么具體劃分這個(gè)范圍的邊界成為關(guān)鍵,既要保證分布均勻又要減少遍歷次數(shù)。具體實(shí)現(xiàn)可見 Spark分區(qū)器HashPartitioner和RangePartitioner代碼詳解
preferred locations
每個(gè)具體的RDD實(shí)例都需要實(shí)現(xiàn)自己的getPreferredLocations方法,RDD位置優(yōu)先即返回partition的存儲(chǔ)位置,該位置和spark的任務(wù)調(diào)度有關(guān),盡量將計(jì)算移到該partition對(duì)應(yīng)的地方。
以從Hadoop中讀取數(shù)據(jù)生成RDD為例,preferredLocations返回每一個(gè)數(shù)據(jù)塊所在的機(jī)器名或者IP地址,如果每一個(gè)數(shù)據(jù)塊是多份存儲(chǔ)的(HDFS副本數(shù)),那么就會(huì)返回多個(gè)機(jī)器地址。