1. 概述
按照官方文檔,RDD是表示不可變的、分區(qū)的、可并行計(jì)算的數(shù)據(jù)集合,有五個(gè)特點(diǎn)
| 編號 | 特點(diǎn) | 變量名 |
|---|---|---|
| 1 | 由若干個(gè)分區(qū)組成 | getPartitions |
| 2 | 一個(gè)函數(shù)用來計(jì)算每個(gè)分區(qū) | compute |
| 3 | 有若干個(gè)依賴的RDD
|
deps |
| 4 | 對于RDD[(K,V)],可能會有一個(gè)Partitioner |
partitioner |
| 5 | 基于移動計(jì)算優(yōu)于移動數(shù)據(jù)原則,每個(gè)分區(qū)都有優(yōu)先的執(zhí)行位置 | getPreferredLocations |
RDD的操作函數(shù)分為2類,一類為transform,一類為action,前者是惰性的,不觸發(fā)任務(wù)執(zhí)行,每一個(gè)transform操作都會生成新的RDD,后者則相反;操作形成了下游RDD依賴于上游RDD的依賴關(guān)系,依賴通過Dependency來抽象,一類為窄依賴NarrowDependency,它有OneToOneDependency、RangeDependency兩個(gè)子類,另一類稱為寬依賴,由ShuffleDependency表示。
Spark通過RDD的優(yōu)先計(jì)算位置,將RDD的計(jì)算本地性從高往低分為如下(在類TaskLocality中定義,在最后一個(gè)小節(jié)會詳細(xì)討論):
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
2. 常用的transform操作
| 輸出的RDD | 算子 | 描述 |
|---|---|---|
| MapPartitionsRDD |
map、flatMap、filter
|
輸入的RDD與輸出的RDD分區(qū)不會變 |
| CoalescedRDD |
coalesce、repartition
|
用來合并分區(qū) |
| PartitionwiseSampledRDD | sample |
用來對輸入的RDD中的分區(qū)數(shù)據(jù)進(jìn)行采樣 |
| PartitionerAwareUnionRDD | union |
當(dāng)兩個(gè)RDD的Partitioner相同時(shí)會生成,分區(qū)數(shù)不變 |
| UnionRDD | union |
當(dāng)兩個(gè)RDD的Partitioner不同或者為null時(shí)會生成,分區(qū)數(shù)為2者之和 |
| MapPartitionsRDD | intersection |
取兩個(gè)RDD之間的交集,有Shuffle,底層通過cogroup實(shí)現(xiàn) |
| CartesianRDD | cartesian |
取兩個(gè)RDD的笛卡爾積 |
| PipedRDD | pipe |
形參是shell命令,表示將rdd中的數(shù)據(jù)當(dāng)成shell命令的輸入,獲取處理后的輸出 |
| MapPartitionsRDD | mapPartitionsWithIndex |
將該分區(qū)的索引與整個(gè)分區(qū)作為輸入 |
| ZippedPartitionsRDD* | zipPartitions |
將所有RDD的相同分區(qū)的數(shù)據(jù)聚合在一起,注意所有RDD分區(qū)數(shù)要相同 |
| SubtractedRDD | subtract |
獲取不在形參RDD中的元素 |
| CoGroupedRDD | cogroup |
將多個(gè)RDD key相同的值聚合在一起 |
3. PairRDDFunctions
PairRDDFunctions里面包含了RDD非常核心的操作,其中以combineByKeyWithClassTag最為關(guān)鍵,因?yàn)樗?code>reduceByKey(reduceBy的底層實(shí)現(xiàn)也是reduceByKey)、groupByKey(groupBy)、aggregateByKey、foldByKey的底層實(shí)現(xiàn)
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
/*驗(yàn)證代碼,忽略*/
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
整個(gè)流程如下:
- 對一個(gè)分區(qū),創(chuàng)建一個(gè)map,當(dāng)key不在map中是,執(zhí)行
createCombiner,map的類型為[K, C]- 當(dāng)key在map中時(shí),調(diào)用
mergeValue,將value合并到C- 在不同分區(qū)的相同key進(jìn)行shuffle聚合時(shí),調(diào)用
mergeCombiners,將map中的C合并為另一個(gè)C
4. OrderedRDDFunctions
OrderedRDDFunctions包含了所有的排序操作
| 輸出的RDD | 算子 | 描述 |
|---|---|---|
| ShuffledRDD | sortByKey |
將分區(qū)內(nèi)的數(shù)據(jù)按Key進(jìn)行排序,Paritioner變?yōu)镽angePartitioner |
| ShuffledRDD | repartitionAndSortWithinPartitions |
將分區(qū)內(nèi)的數(shù)據(jù)按key進(jìn)行排序,比sortByKey靈活,可以定義key的分區(qū)規(guī)則,按value排序時(shí)可以使用 |
filterByRange |
取范圍之內(nèi)的數(shù)據(jù),范圍為[lower, upper],而不是(lower, upper] |
其它RDDFunctions
除了上述的Functions,還有DoubleRDDFunctions、AsyncRDDActions、SequenceFileRDDFunctions.
DoubleRDDFunctions可以用來作一些數(shù)學(xué)運(yùn)算,比如平均值(mean)、方差(variance)、總和(sum)、直方圖(histogram)AsyncRDDActions可以用來異步計(jì)算RDDSequenceFileRDDFunctions用來將RDD的數(shù)據(jù)寫入到SequenceFile中
Partitioner
最常用的Partitioner是HashPartitioner(默認(rèn)分區(qū)器)、RangePartitioner,HashPartitioner實(shí)現(xiàn)邏輯簡單,這里介紹下RangePartitioner
RangePartitioner的構(gòu)建流程
- RangePartitioner盡量使每個(gè)分區(qū)的數(shù)據(jù)大致相同
- 對RDD的Key通過池塘采樣(reservoir sample,從n個(gè)元素中取隨機(jī)取k個(gè)元素作為樣本,保證每個(gè)元素被選取的概率為k/n,采樣總數(shù)不超過一百萬,默認(rèn)為3 * 20 *
原RDD的分區(qū)數(shù))在每個(gè)分區(qū)中進(jìn)行采樣- 對每個(gè)分區(qū)中的樣本進(jìn)行合并,形成一個(gè)候選集合,然后排序,根據(jù)分區(qū)數(shù)來均勻的從候選集合中取值,形成一個(gè)大小為指定分區(qū)數(shù)的數(shù)組來對key進(jìn)行分區(qū)
- 在對某個(gè)key分區(qū)時(shí),找到最接近自己的樣本,該樣本的索引既是key所在分區(qū)
- 所以,使用了RangePartitioner的算子(如sortByKey)得到的RDD,如果將key按照分區(qū)索引從小到大合并在一起形成一個(gè)集合,該集合的數(shù)據(jù)是有序的
5. 本地性
任務(wù)本地性級別獲取分為如下幾個(gè)步驟:
- 獲取計(jì)算資源(
ExecutorData)- 獲取待計(jì)算RDD每個(gè)分區(qū)的輸入數(shù)據(jù)所在的位置(即偏好位置,用
TaskLocation抽象)- 通過
ExecutorData與TaskLocation來獲取整個(gè)待計(jì)算RDD(TaskSetManager)的可能的本地性級別- 通過調(diào)度規(guī)則來決定
TaskSetManager中每個(gè)任務(wù)的本地性級別
5.1 獲取計(jì)算資源
這里以yarn-client模式為例
大致過程如下:
- 在創(chuàng)建
SparkContext時(shí),調(diào)用YarnScheduler的start方法,里面會接著執(zhí)行YarnClientSchedulerBackend的start方法,YarnClientSchedulerBackend會創(chuàng)建一個(gè)Client與RM進(jìn)行通信,啟動AM- AM啟動后,則會向RM申請資源,申請到資源后,啟動executor
- executor啟動后,會向Driver端注冊
- 此時(shí),
YarnClientSchedulerBackend的成員變量executorDataMap就有了exector的信息,其中在計(jì)算本地性最重要的就是executor所在的host
RDD優(yōu)先計(jì)算位置
DAGScheduler在提交stage時(shí),會通過方法getPreferredLocs來獲取該stage的優(yōu)先計(jì)算位置
// getPreferredLocs實(shí)際調(diào)用了getPreferredLocsInternal(rdd, partition. new HashSet), 深度遍歷RDD的窄依賴
private def getPreferredLocsInternal(rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// 訪問過的分區(qū)就不再重復(fù)訪問了
if (!visited.add((rdd, partition))) {
return Nil
}
//如果該分區(qū)被緩存了,返回緩存地址,類型為ExecutorCacheTaskLocation
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// 通過子rdd重寫的的preferredLocations方法獲取偏好位置,比如HadoopRDD、
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// 從尾往頭開始深度遍歷,直到找到不為空的偏好位置,否則返回空
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
計(jì)算任務(wù)本地性

如圖,在創(chuàng)建TaskSetManager時(shí),先通過
addPendingTask獲取任務(wù)的候選本地性,然后通過computeValidLocalityLevels獲取真正的本地性
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
// 被Executor緩存的分區(qū)
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
// 輸入的數(shù)據(jù)在HDFS上,且數(shù)據(jù)節(jié)點(diǎn)的host在申請的資源之中(HadoopRDD, ReliableCheckpointRDD)
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
case None => _
}
}
case _ => Unit
}
// 比如網(wǎng)絡(luò)上的某個(gè)輸入流或文件
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
// 如果輸入數(shù)據(jù)的主機(jī)在某個(gè)機(jī)架上
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
// 沒有偏好位置
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // 表示分區(qū)為index對應(yīng)的任務(wù)還未提交
}
計(jì)算任務(wù)的本地性
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
levels.toArray
}
最后TaskSetManager通過resourceOffer方法來決定任務(wù)的本地性