RDD總結(jié)

1. 概述

按照官方文檔,RDD是表示不可變的、分區(qū)的、可并行計(jì)算的數(shù)據(jù)集合,有五個(gè)特點(diǎn)

編號 特點(diǎn) 變量名
1 由若干個(gè)分區(qū)組成 getPartitions
2 一個(gè)函數(shù)用來計(jì)算每個(gè)分區(qū) compute
3 有若干個(gè)依賴的RDD deps
4 對于RDD[(K,V)],可能會有一個(gè)Partitioner partitioner
5 基于移動計(jì)算優(yōu)于移動數(shù)據(jù)原則,每個(gè)分區(qū)都有優(yōu)先的執(zhí)行位置 getPreferredLocations

RDD的操作函數(shù)分為2類,一類為transform,一類為action,前者是惰性的,不觸發(fā)任務(wù)執(zhí)行,每一個(gè)transform操作都會生成新的RDD,后者則相反;操作形成了下游RDD依賴于上游RDD的依賴關(guān)系,依賴通過Dependency來抽象,一類為窄依賴NarrowDependency,它有OneToOneDependencyRangeDependency兩個(gè)子類,另一類稱為寬依賴,由ShuffleDependency表示。
Spark通過RDD的優(yōu)先計(jì)算位置,將RDD的計(jì)算本地性從高往低分為如下(在類TaskLocality中定義,在最后一個(gè)小節(jié)會詳細(xì)討論):

PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY

2. 常用的transform操作

輸出的RDD 算子 描述
MapPartitionsRDD map、flatMap、filter 輸入的RDD與輸出的RDD分區(qū)不會變
CoalescedRDD coalescerepartition 用來合并分區(qū)
PartitionwiseSampledRDD sample 用來對輸入的RDD中的分區(qū)數(shù)據(jù)進(jìn)行采樣
PartitionerAwareUnionRDD union 當(dāng)兩個(gè)RDD的Partitioner相同時(shí)會生成,分區(qū)數(shù)不變
UnionRDD union 當(dāng)兩個(gè)RDD的Partitioner不同或者為null時(shí)會生成,分區(qū)數(shù)為2者之和
MapPartitionsRDD intersection 取兩個(gè)RDD之間的交集,有Shuffle,底層通過cogroup實(shí)現(xiàn)
CartesianRDD cartesian 取兩個(gè)RDD的笛卡爾積
PipedRDD pipe 形參是shell命令,表示將rdd中的數(shù)據(jù)當(dāng)成shell命令的輸入,獲取處理后的輸出
MapPartitionsRDD mapPartitionsWithIndex 將該分區(qū)的索引與整個(gè)分區(qū)作為輸入
ZippedPartitionsRDD* zipPartitions 將所有RDD的相同分區(qū)的數(shù)據(jù)聚合在一起,注意所有RDD分區(qū)數(shù)要相同
SubtractedRDD subtract 獲取不在形參RDD中的元素
CoGroupedRDD cogroup 將多個(gè)RDD key相同的值聚合在一起

3. PairRDDFunctions

PairRDDFunctions里面包含了RDD非常核心的操作,其中以combineByKeyWithClassTag最為關(guān)鍵,因?yàn)樗?code>reduceByKey(reduceBy的底層實(shí)現(xiàn)也是reduceByKey)、groupByKey(groupBy)、aggregateByKey、foldByKey的底層實(shí)現(xiàn)

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    /*驗(yàn)證代碼,忽略*/
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

整個(gè)流程如下:

  1. 對一個(gè)分區(qū),創(chuàng)建一個(gè)map,當(dāng)key不在map中是,執(zhí)行createCombiner,map的類型為[K, C]
  2. 當(dāng)key在map中時(shí),調(diào)用mergeValue,將value合并到C
  3. 在不同分區(qū)的相同key進(jìn)行shuffle聚合時(shí),調(diào)用mergeCombiners,將map中的C合并為另一個(gè)C

4. OrderedRDDFunctions

OrderedRDDFunctions包含了所有的排序操作

輸出的RDD 算子 描述
ShuffledRDD sortByKey 將分區(qū)內(nèi)的數(shù)據(jù)按Key進(jìn)行排序,Paritioner變?yōu)镽angePartitioner
ShuffledRDD repartitionAndSortWithinPartitions 將分區(qū)內(nèi)的數(shù)據(jù)按key進(jìn)行排序,比sortByKey靈活,可以定義key的分區(qū)規(guī)則,按value排序時(shí)可以使用
filterByRange 取范圍之內(nèi)的數(shù)據(jù),范圍為[lower, upper],而不是(lower, upper]

其它RDDFunctions

除了上述的Functions,還有DoubleRDDFunctionsAsyncRDDActions、SequenceFileRDDFunctions.

  1. DoubleRDDFunctions可以用來作一些數(shù)學(xué)運(yùn)算,比如平均值(mean)、方差(variance)、總和(sum)、直方圖(histogram)
  2. AsyncRDDActions可以用來異步計(jì)算RDD
  3. SequenceFileRDDFunctions用來將RDD的數(shù)據(jù)寫入到SequenceFile中

Partitioner

最常用的Partitioner是HashPartitioner(默認(rèn)分區(qū)器)、RangePartitioner,HashPartitioner實(shí)現(xiàn)邏輯簡單,這里介紹下RangePartitioner

RangePartitioner的構(gòu)建流程
  1. RangePartitioner盡量使每個(gè)分區(qū)的數(shù)據(jù)大致相同
  2. 對RDD的Key通過池塘采樣(reservoir sample,從n個(gè)元素中取隨機(jī)取k個(gè)元素作為樣本,保證每個(gè)元素被選取的概率為k/n,采樣總數(shù)不超過一百萬,默認(rèn)為3 * 20 * 原RDD的分區(qū)數(shù))在每個(gè)分區(qū)中進(jìn)行采樣
  3. 對每個(gè)分區(qū)中的樣本進(jìn)行合并,形成一個(gè)候選集合,然后排序,根據(jù)分區(qū)數(shù)來均勻的從候選集合中取值,形成一個(gè)大小為指定分區(qū)數(shù)的數(shù)組來對key進(jìn)行分區(qū)
  4. 在對某個(gè)key分區(qū)時(shí),找到最接近自己的樣本,該樣本的索引既是key所在分區(qū)
  5. 所以,使用了RangePartitioner的算子(如sortByKey)得到的RDD,如果將key按照分區(qū)索引從小到大合并在一起形成一個(gè)集合,該集合的數(shù)據(jù)是有序的

5. 本地性

任務(wù)本地性級別獲取分為如下幾個(gè)步驟:

  1. 獲取計(jì)算資源(ExecutorData)
  2. 獲取待計(jì)算RDD每個(gè)分區(qū)的輸入數(shù)據(jù)所在的位置(即偏好位置,用TaskLocation抽象)
  3. 通過ExecutorDataTaskLocation來獲取整個(gè)待計(jì)算RDD(TaskSetManager)的可能的本地性級別
  4. 通過調(diào)度規(guī)則來決定TaskSetManager中每個(gè)任務(wù)的本地性級別
5.1 獲取計(jì)算資源

這里以yarn-client模式為例


spark_獲取資源時(shí)序圖.PNG

大致過程如下:

  1. 在創(chuàng)建SparkContext時(shí),調(diào)用YarnSchedulerstart方法,里面會接著執(zhí)行YarnClientSchedulerBackend的start方法,YarnClientSchedulerBackend會創(chuàng)建一個(gè)Client與RM進(jìn)行通信,啟動AM
  2. AM啟動后,則會向RM申請資源,申請到資源后,啟動executor
  3. executor啟動后,會向Driver端注冊
  4. 此時(shí),YarnClientSchedulerBackend的成員變量executorDataMap就有了exector的信息,其中在計(jì)算本地性最重要的就是executor所在的host
RDD優(yōu)先計(jì)算位置

DAGScheduler在提交stage時(shí),會通過方法getPreferredLocs來獲取該stage的優(yōu)先計(jì)算位置

// getPreferredLocs實(shí)際調(diào)用了getPreferredLocsInternal(rdd, partition. new HashSet), 深度遍歷RDD的窄依賴
private def getPreferredLocsInternal(rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// 訪問過的分區(qū)就不再重復(fù)訪問了
    if (!visited.add((rdd, partition))) {
      return Nil
    }
    //如果該分區(qū)被緩存了,返回緩存地址,類型為ExecutorCacheTaskLocation 
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 通過子rdd重寫的的preferredLocations方法獲取偏好位置,比如HadoopRDD、
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    // 從尾往頭開始深度遍歷,直到找到不為空的偏好位置,否則返回空
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }
      case _ =>
    }
    Nil
  }
計(jì)算任務(wù)本地性

spark_計(jì)算本地性級別時(shí)序圖

如圖,在創(chuàng)建TaskSetManager時(shí),先通過addPendingTask獲取任務(wù)的候選本地性,然后通過computeValidLocalityLevels獲取真正的本地性

private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        // 被Executor緩存的分區(qū)
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
       // 輸入的數(shù)據(jù)在HDFS上,且數(shù)據(jù)節(jié)點(diǎn)的host在申請的資源之中(HadoopRDD, ReliableCheckpointRDD)
        case e: HDFSCacheTaskLocation => {
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) => {
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
            case None => _
          }
        }
        case _ => Unit
      }
// 比如網(wǎng)絡(luò)上的某個(gè)輸入流或文件
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
// 如果輸入數(shù)據(jù)的主機(jī)在某個(gè)機(jī)架上
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }
// 沒有偏好位置
    if (tasks(index).preferredLocations == Nil) {
      pendingTasksWithNoPrefs += index
    }

    allPendingTasks += index  // 表示分區(qū)為index對應(yīng)的任務(wù)還未提交
  }

計(jì)算任務(wù)的本地性

private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    if (!pendingTasksForExecutor.isEmpty &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty &&
        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    if (!pendingTasksForRack.isEmpty &&
        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
      levels += RACK_LOCAL
    }
    levels += ANY
    levels.toArray
  }

最后TaskSetManager通過resourceOffer方法來決定任務(wù)的本地性

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容