Spark RDD詳解

SparkRDD

在Spark中RDD具有5個主要的屬性:

  • A list of partitions
  • A function(compute) to computing each split(partation)
  • A list of dependencies on other RDDs
  • Optionally, a partitioner for key-value RDDs
  • Optionally,a list of preferred locations to compute each split on

其中,Partition 是數(shù)據(jù)集的基本單位,每個partition都會被一幾個計算任務(wù)處理,它決定了并行計算的粒度。RDD的分片數(shù)可以在創(chuàng)建時指定,也可以使用默認值spark.default.parallelism,在Local模式下默認值是local[n]中n,Mesos模式下是8,在其他模式下是max(coreNumber,2),coreNumber是Application分配到的CPU core數(shù)(虛擬core數(shù),不一定CPU物理core數(shù))。

Compute是計算每個Partition的的函數(shù),它會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。

Dependencies是RDD之間的依賴關(guān)系,實際上它保存了所有父RDD的信息。RDD之間的這些依賴關(guān)系詮釋了RDD之間的血統(tǒng)(lineage),Spark在劃分stage的時候就是根據(jù)這些信息來劃分的。

Partitioner是RDD的分片函數(shù),目前Spark實現(xiàn)了兩種類型的分片函數(shù),基于Hash的HashPartitioner和基于范圍的RangePartitioner,只有Key-value的RDD才會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner既決定了本身的分片數(shù)量,也決定了父RDD shuffle時輸出的分片數(shù)量。

Preferred location存儲每個Partition的優(yōu)先位置,對于HDFS文件這個列表保存的就是Partition所在的block的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度時會盡可能的將計算任務(wù)分配到其所需處理數(shù)據(jù)塊的存儲位置。

RDD中有5個方法代表上述屬性

/**
* 輸入一個Partition并對其代表的數(shù)據(jù)進行計算
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* 數(shù)據(jù)如何split的邏輯
*/
protected def getPartitions: Array[Partition]
/**
* 這個RDD的依賴,及其父RDD
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* 存儲這個RDD的每個分片的Preferred Location
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/**
* key-value RDD分片
*/
@transient val partitioner: Option[Partitioner] = None

Compute

RDD抽象類要求其所有子類都必須實現(xiàn)compute方法,方法的參數(shù)是Partition和TaskContext,目的是計算該分區(qū)中的數(shù)據(jù)
下面是BlockRDD的compute方法實現(xiàn),直觀的該方法就是從blockManager獲取這個block代表的具體數(shù)據(jù)。

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
        assertValid()
        val blockManager = SparkEnv.get.blockManager
        val blockId = split.asInstanceOf[BlockRDDPartition].blockId
        blockManager.get[T](blockId) match {
          case Some(block) => block.data.asInstanceOf[Iterator[T]]
          case None =>
            throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
        }
      }

MapPartitionsRDD中的compute實現(xiàn)如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
    
      override def clearDependencies() {
        super.clearDependencies()
        prev = null
      }

MapPartitionsRDD類的compute方法調(diào)用當(dāng)前RDD的第一個父RDD的iterator方法(iterator方法是拉取父RDD對應(yīng)分區(qū)的數(shù)據(jù)并返回一個Iterator對象,Iterator內(nèi)部存儲的每個元素即父RDD對應(yīng)分區(qū)的數(shù)據(jù)記錄),RDD會對每個分區(qū)(不是一條一條的數(shù)據(jù)記錄)內(nèi)的數(shù)據(jù)執(zhí)行操作f,最終返回包含所有經(jīng)過轉(zhuǎn)換過的數(shù)據(jù)記錄的新的迭代器,即新的Partition。

Partition

A partition(split) is a logical chunk of large distributed data set.
RDD代表的原始數(shù)據(jù)會被按照某種邏輯切分成N分,每份數(shù)據(jù)對應(yīng)RDD中的一個Partition,Partition的數(shù)量決定Task的數(shù)量,影響著程序的并行的。Spark通過Partition來管理數(shù)據(jù)可以方便的進行數(shù)據(jù)并行處理以及減少不同executors之間的網(wǎng)絡(luò)傳輸,通常spark會從距離較近的節(jié)點讀取數(shù)據(jù)。
可以使用def getPartitions: Array[Partition]來獲取某個RDD的Partitions or someRDD.partitions.size
Spark只能為RDD的每個分區(qū)運行1個并發(fā)任務(wù),最多可以為集群中的cores數(shù)。例如有一個包含50個core的集群,那么RDD至少有50個分區(qū)(可能是該分區(qū)數(shù)的2-3倍-100-150)。
另外,Partition數(shù)也影響著在保存RDD時需要創(chuàng)建多少個文件,每個Partition的大小受限于Executor的內(nèi)存大小。

Tips: 當(dāng)使用sc.texeFile讀取壓縮文件(file.txt.gz,demo.gz)時,Spark產(chǎn)生的RDD只有1個Partition,在這種情況下需要主動使用reparation進行分區(qū)。

rdd = sc.textFile("demo.gz")
nrdd = rdd.repartation(100)

Partition的定義

Spark源碼中對Partition的定義如下:

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * 序列號 從0開始一次遞增
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}

Partition和RDD是伴生的,每種RDD都有其對應(yīng)的Partition實現(xiàn),分析RDD只要是分析其子類。如HadoopPartition的實現(xiàn)如下:

/**
 * A Spark split class that wraps around a Hadoop InputSplit.
 */
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
  extends Partition {

  val inputSplit = new SerializableWritable[InputSplit](s)

  override def hashCode(): Int = 31 * (31 + rddId) + index

  override def equals(other: Any): Boolean = super.equals(other)

  /**
   * Get any environment variables that should be added to the users environment when running pipes
   * @return a Map with the environment variables and corresponding values, it could be empty
   */
  def getPipeEnvVars(): Map[String, String] = {
      ...
  }
}

Partition對性能的影響

  1. Partition數(shù)量太少

    資源不能被充分利用

  2. Partition數(shù)量太多

    導(dǎo)致Task過多,序列化和傳輸時間開銷增大。

    根據(jù)Spark doc給出的建議:Typically you want 2-4 partitions for each CPU in your cluster.

Partition調(diào)整

  1. reparation

    reparation是coalesce(numPartitions,shuffle=true),reparation不僅會調(diào)整Partition的數(shù)量,同時也會將Partitioner修改為HashPartitioner,產(chǎn)生shuffle操作。

   /**
      * 返回一個擁有numPartitons的 新RDD
      *
      * 可以增加或減小RDD的parallelism,相應(yīng)的會進行shuffle
      *
      * 如果是要減少Partition數(shù),使用coalesce能獲得更好的性能
      */
   def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
       coalesce(numPartitions, shuffle = true)
   }
  1. coalesce

    coalesce可以控制是否進行shuffle,但是當(dāng)shuffle=false時只能減小Partition數(shù),不能增大。

   /**
   * 返回一個具有numPartitions的 新RDD
   * 當(dāng)減小Partition數(shù)時不會 shuffle
   * When increase numPartitions you should set shuffle = true
   */
   def coalesce(numPartitions: Int, shuffle: Boolean = false,
                  partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                 (implicit ord: Ordering[T] = null)
         : RDD[T] = withScope {
       require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
       if (shuffle) {
         /** Distributes elements evenly across output partitions, starting from a random partition. */
         val distributePartition = (index: Int, items: Iterator[T]) => {
           var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
           items.map { t =>
             // Note that the hash code of the key will just be the key itself. The HashPartitioner
             // will mod it with the number of total partitions.
             position = position + 1
             (position, t)
           }
         } : Iterator[(Int, T)]
   
         // include a shuffle step so that our upstream tasks are still distributed
         new CoalescedRDD(
           new ShuffledRDD[Int, T, T](
             mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
             new HashPartitioner(numPartitions)),
           numPartitions,
           partitionCoalescer).values
       } else {
         new CoalescedRDD(this, numPartitions, partitionCoalescer)
       }
     }
   

Dependency

Dependency記錄了RDD在transformation過程中Partition的演化,通過dependency的類型可以判斷如何處理數(shù)據(jù),即pipeline或者shuffle。
Spark中的Dependency分為Narrow Dependency和Wide Dependency。其中Narrow Dependency指的是父RDD的一個Partition最多對應(yīng)子RDD的一個Partition,在處理過程中不需要進行shuffle,可以在同一個Map Task中完成;Wide Dependency中父RDD的一個Partition可能對應(yīng)子RDD的多個Partition,因此處理時需要進行shuffle操作,Wide Dependency是Spark DAGScheduler劃分Stage的依據(jù)。

Dependency 的定義

屬性rdd對應(yīng)RDD的父RDD。Dependency可以說是對父RDD的包裝,通過Dependency的類型說明當(dāng)前transformation對應(yīng)的數(shù)據(jù)處理方式。

    @DeveloperApi
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }

Narrow Dependency


@DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * 根據(jù)RDD的Partition ID返回對應(yīng)的父RDD的Partition ID
       * @param partitionId 子RDD的Partition ID
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
  1. OneToOneDependency

OneToOneDependency表示子RDD和父RDD的Partition之間是1對1的關(guān)系,即子RDD的PartitionId和父RDD的PartitionId是一致性的。

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
  1. RangeDependency

RangeDependency表示子RDD和父RDD的Partition之間的關(guān)系是一個區(qū)間內(nèi)的1對1關(guān)系。

    /**
     * :: DeveloperApi ::
     * 如子RDD Partition index   3 4 5 6
     *   父RDD Partition index   9 10 11 12
     *  那么求子RDD的Partition中index為4對應(yīng)的父RDD的Partition index就是
     *  4 - 3 + 9 = 10
     *  對應(yīng)代碼中的partitionId - outStart + inStart
     *
     * @param rdd 父RDD
     * @param inStart 父RDD Partition Range的起始位置
     * @param outStart 子RDD Partition Range的起始位置
     * @param length range的長度
     */
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }

ShuffleDependency

ShuffleDependecy的實現(xiàn)要相對復(fù)雜一些,shuffle過程需要涉及到網(wǎng)絡(luò)傳輸,所有需要有Serializer以減少網(wǎng)絡(luò)傳輸,可以增加map端聚合,通過mapSideCombine和aggregator控制,還有和key排序相關(guān)的keyOrdering,以及重輸出的數(shù)據(jù)如何分區(qū)的Partitioner,其他信息包括k,v和combiner的class信息以及shuffleId。Partition之間的關(guān)系在shuffle處戛然而止,因此shuffle是劃分state的依據(jù)。

/**
     * :: DeveloperApi ::
     * 代表shuffle階段的Dependency。在shuffle階段,RDD是瞬態(tài)的,因為executor段不需要它。
     *
     * @param _rdd 父 RDD
     * @param partitioner shuffle output的分區(qū)方式
     * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. 可以通過spark.serializer設(shè)置
     * @param keyOrdering shuffle結(jié)果key如何排序
     * @param aggregator map/reduce-side aggregator for RDD's shuffle
     * @param mapSideCombine map端是否進行聚合
     */
    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()

      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this)
     _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }

Partitioner

Partitioner在shuffle階段起作用,在map階段處理結(jié)果時需要根據(jù)RDD的Partitioner將結(jié)果寫到不同的bucket中,不同的bucket后續(xù)被不同的reducer使用。

Partitioner的定義

抽象類Partitioner具有兩個方法,numPartitions返回這個RDD具有多少個Partition,而getPartition則根據(jù)element的key返回其應(yīng)該寫入的Partition ID。

/**
     * 決定key-value RDD的element如何根據(jù)key進行分區(qū)
     * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
     *
     * Partitioner必須是確定的,partition id 和 對應(yīng)的Partition key 必須返回相同的結(jié)果
     */
    abstract class Partitioner extends Serializable {
      def numPartitions: Int
      def getPartition(key: Any): Int
    }

Partitioner的伴生對象中定義了defaultPartitioner方法,實現(xiàn)類似于cogroup這類操作中如何從父RDD中選擇Partitioner。
默認Partitioner的選取策略是先判斷RDD中是否有Partitioner,如果有則選擇其中Partition數(shù)最大的并判斷這個分區(qū)數(shù)和在上游分區(qū)的最大數(shù)量是否在單個數(shù)量級內(nèi)(大于或小于),或者最大的分區(qū)數(shù)大于默認的分區(qū)數(shù)據(jù)數(shù)(default partitions number),那么選擇這個Partitioner,否則需要使用具有default partitions number的HashPartitioner。

Object Partitioner{
    def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
        val rdds = (Seq(rdd) ++ others)
        val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
    
        val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
          Some(hasPartitioner.maxBy(_.partitions.length))
        } else {
          None
        }

        val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
          rdd.context.defaultParallelism
        } else {
          rdds.map(_.partitions.length).max
        }

        // If the existing max partitioner is an eligible one, or its partitions number is larger
        // than the default number of partitions, use the existing partitioner.
        if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
            defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
          hasMaxPartitioner.get.partitioner.get
        } else {
          new HashPartitioner(defaultNumPartitions)
        }
      }

      /**
       * Returns true if the number of partitions of the RDD is either greater than or is less than and
       * within a single order of magnitude of the max number of upstream partitions, otherwise returns
       * false.
       */
      private def isEligiblePartitioner(
         hasMaxPartitioner: RDD[_],
         rdds: Seq[RDD[_]]): Boolean = {
        val maxPartitions = rdds.map(_.partitions.length).max
        log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
      }
    }
}

Partitioner在Spark中有兩個具體的實現(xiàn)HashPartitioner和RangePartitioner。

HashPartitioner

numPartitions返回傳入的分區(qū)數(shù),getPartition方法使用傳入的key的hashCode對numPartitions取模得到Partition ID。

class HashPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    
      def numPartitions: Int = partitions

      def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
      }

      override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
          h.numPartitions == numPartitions
        case _ =>
          false
      }
    
      override def hashCode: Int = numPartitions
    }

RangePartitioner

RangePartitioner的運行機制可以概述為如何選取分區(qū)的分隔符
Spark根據(jù)分隔符確定key屬于哪個分區(qū),分區(qū)內(nèi)(key)數(shù)據(jù)無序,分區(qū)間有序

  1. 使用reservoir Sample抽樣方法,對每個Partition進行抽樣
  2. 計算權(quán)重, 對數(shù)據(jù)多(大于sampleSizePerPartition)的分區(qū)再進行抽樣
  3. 由權(quán)重信息計算分區(qū)分隔符rangeBounds
  4. 由rangeBounds計算分區(qū)數(shù)和key屬于哪個分區(qū)
/**
  * [[org.apache.spark.Partitioner]]按范圍將可排序記錄劃分為大致相等的范圍。 范圍通過對傳入的RDD的內(nèi)容進行采樣來確定。
  * 在采樣記錄數(shù)小于`partitions`的情況下,RangePartitioner創(chuàng)建的實際分區(qū)數(shù)可能與`partitions`參數(shù)不同。
  * @param partitions 分區(qū)數(shù)
  * @param rdd RDD
  * @param ascending 升序或者降序
  * @param samplePointsPerPartitionHint 每個分區(qū)的采樣數(shù)
  */
class RangePartitioner[K : Ordering : ClassTag, V](
        partitions: Int,
        rdd: RDD[_ <: Product2[K, V]],
        private var ascending: Boolean = true,
        val samplePointsPerPartitionHint: Int = 20)
      extends Partitioner {
    
      def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
        this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
      }
    
      // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
      require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
      require(samplePointsPerPartitionHint > 0,
        s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")
    
      private var ordering = implicitly[Ordering[K]]

RangePartitioner的numPartitions和getPartition方法實現(xiàn)如下,其中g(shù)etPartition使用線性查找或者二分查找確定key所在的分區(qū);rangeBounds保存的是每個分區(qū)的上界,即分隔符。

      //返回分區(qū)數(shù),partitions和rangeBounds.length+1可能不相等
      def numPartitions: Int = rangeBounds.length + 1
      //二分查找策略
      private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
      //返回key對應(yīng)的分區(qū)索引
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[K]
        var partition = 0
        if (rangeBounds.length <= 128) {
          // 如果分區(qū)數(shù)小于128,則直接使用線性查找,二則使用二分查找
          while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
            partition += 1
          }
        } else {
          // Determine which binary search method to use only once.
          partition = binarySearch(rangeBounds, k)
          // binarySearch either returns the match location or -[insertion point]-1
          if (partition < 0) {
            partition = -partition-1
          }
          if (partition > rangeBounds.length) {
            partition = rangeBounds.length
          }
        }
        // 根據(jù)升序或者降序返回Partition Id
        if (ascending) {
          partition
        } else {
          rangeBounds.length - partition
        }
      }

分區(qū)間的間隔符通過下面的方法得到:

  // An array of upper bounds for the first (partitions - 1) partitions
      private var rangeBounds: Array[K] = {
        if (partitions <= 1) {
          Array.empty
        } else {
          // 確定采樣的數(shù)量,上限為1M
          // Cast to double to avoid overflowing ints or longs
          val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
          // 假設(shè)分區(qū)大致平衡并稍微過采樣,保證分區(qū)數(shù)少時也能收集更多樣本
          val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
          //對每個Partition進行抽樣
          val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
          if (numItems == 0L) {
            Array.empty
          } else {
            // 如果分區(qū)包含的內(nèi)容遠遠超過平均樣本數(shù)(單個分區(qū)記錄數(shù)*fraction > sampleSizePartition),我們將從中重新進行抽樣
            // 以確保從該分區(qū)收集足夠的樣本。
            val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
            val candidates = ArrayBuffer.empty[(K, Float)]
            // 使用imbalancedPartitions存儲,后續(xù)再進行采樣,確保數(shù)據(jù)多的分區(qū)抽取足夠多的樣本
            val imbalancedPartitions = mutable.Set.empty[Int]
            sketched.foreach { case (idx, n, sample) =>
              if (fraction * n > sampleSizePerPartition) {
                imbalancedPartitions += idx
              } else {
                // The weight is 1 over the sampling probability.
                val weight = (n.toDouble / sample.length).toFloat
                for (key <- sample) {
                  candidates += ((key, weight))
                }
              }
            }
            if (imbalancedPartitions.nonEmpty) {
              // 使用所需的采樣概率重新采樣不平衡分區(qū),同時調(diào)整權(quán)重
              val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
              val seed = byteswap32(-rdd.id - 1)
              val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
              // 調(diào)整權(quán)重
              val weight = (1.0 / fraction).toFloat
              candidates ++= reSampled.map(x => (x, weight))
            }
            //根據(jù)(key,weight)選取邊界
            RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
          }
        }
      }

抽樣方法sketch根據(jù)rdd的id獲得抽樣的seed(用于reservoir sampling中產(chǎn)生隨機數(shù)),然后調(diào)用SamplingUtils.reservirSampleAndCount方法進行采樣。


private[spark] object RangePartitioner {
      /**
       * 核心抽樣算法:reservoir sampling
       * Sketches the input RDD via reservoir sampling on each partition.
       *
       * @param rdd the input RDD to sketch
       * @param sampleSizePerPartition max sample size per partition
       * @return (total number of items, an array of (partitionId, number of items, sample))
       */
      def sketch[K : ClassTag](
          rdd: RDD[K],
          sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
        val shift = rdd.id
        // val classTagK = classTag[K] 
        // to avoid serializing the entire partitioner object
        val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
          val seed = byteswap32(idx ^ (shift << 16))
          val (sample, n) = SamplingUtils.reservoirSampleAndCount(
            iter, sampleSizePerPartition, seed)
          Iterator((idx, n, sample))
        }.collect()
        val numItems = sketched.map(_._2).sum
        (numItems, sketched)
      }

reservoir sampling

  1. 先獲取大小為k的樣本數(shù)到reservoir數(shù)組中,若果記錄數(shù)小于k則直接返回樣本。
  2. 否則使用seed生成一個隨機數(shù)生成器,繼續(xù)遍歷數(shù)據(jù),同時每個record對應(yīng)生成一個隨機數(shù),如果隨機數(shù)小于k,則把1步中的樣本數(shù)據(jù)下標對應(yīng)的數(shù)據(jù)替換。
      /**
       * Reservoir sampling implementation that also returns the input size.
       *
       * @param input input size
       * @param k reservoir size 分區(qū)樣本數(shù)
       * @param seed random seed 隨機種子
       * @return (samples, input size)
       */
      def reservoirSampleAndCount[T: ClassTag](
          input: Iterator[T],
          k: Int,
          seed: Long = Random.nextLong())
        : (Array[T], Long) = {
        val reservoir = new Array[T](k)
        // 把前k個records放到reservoir數(shù)組中,k為每個分區(qū)的采樣數(shù)
        var i = 0
        while (i < k && input.hasNext) {
          val item = input.next()
          reservoir(i) = item
          i += 1
        }
    
        // I如果分區(qū)records數(shù)小于k,直接返回,否則進行隨機替換
        if (i < k) {
          // If input size < k, trim the array to return only an array of input size.
          val trimReservoir = new Array[T](i)
          System.arraycopy(reservoir, 0, trimReservoir, 0, i)
          (trimReservoir, i)
        } else {
          // If input size > k, continue the sampling process.
          var l = i.toLong
          val rand = new XORShiftRandom(seed)
          while (input.hasNext) {
            val item = input.next()
            l += 1
            // There are k elements in the reservoir, and the l-th element has been
            // consumed. It should be chosen with probability k/l. The expression
            // below is a random long chosen uniformly from [0,l)
            val replacementIndex = (rand.nextDouble() * l).toLong
            if (replacementIndex < k) {
              reservoir(replacementIndex.toInt) = item
            }
          }
          (reservoir, l)
        }
      }

determinBounds返回分區(qū)間隔符,實現(xiàn)邏輯如下:
先將candidate(Array[(key, weight)])按照key進行排序
然后計算出總權(quán)重sumWeights;
總權(quán)重除以分區(qū)數(shù),得到每個分區(qū)的平均權(quán)重step;
while循環(huán)遍歷已排序的candidate,累加其權(quán)重cumWeight,每當(dāng)累加的權(quán)重達到一個分區(qū)的平均權(quán)重step,就獲取一個key作為分區(qū)間隔符;
最后返回所有獲取到的分隔符,determineBounds執(zhí)行完畢,也就返回了變量rangeBounds(分區(qū)分隔符)。

      /**
       * 計算分區(qū)分隔符
       * Determines the bounds for range partitioning from candidates with weights indicating how many
       * items each represents. Usually this is 1 over the probability used to sample this candidate.
       *
       * @param candidates unordered candidates with weights
       * @param partitions number of partitions
       * @return selected bounds
       */
      def determineBounds[K : Ordering : ClassTag](
          candidates: ArrayBuffer[(K, Float)],
          partitions: Int): Array[K] = {
        val ordering = implicitly[Ordering[K]]
        // 將candidate按照key進行排序
        val ordered = candidates.sortBy(_._1)
        val numCandidates = ordered.size
        // 計算總權(quán)重
        val sumWeights = ordered.map(_._2.toDouble).sum
        // 每個分區(qū)的平均權(quán)重
        val step = sumWeights / partitions
        var cumWeight = 0.0
        var target = step
        val bounds = ArrayBuffer.empty[K]
        var i = 0
        var j = 0
        var previousBound = Option.empty[K]
        // 遍歷已排序的candidate,累加其權(quán)重cumWeight,每當(dāng)權(quán)重達到一個分區(qū)的
        // 平均權(quán)重step,就獲取一個key作為分區(qū)的間隔符,最后返回所有獲取到的分隔符
        while ((i < numCandidates) && (j < partitions - 1)) {
          val (key, weight) = ordered(i)
          cumWeight += weight
          if (cumWeight >= target) {
            // Skip duplicate values.
            if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
              bounds += key
              target += step
              j += 1
              previousBound = Some(key)
            }
          }
          i += 1
        }
        bounds.toArray
      }

自定義Partitioner

Partitioner主要的作用實在shuffle過程中對數(shù)據(jù)的Partition進行重新分區(qū),其主要實現(xiàn)的函數(shù)是:

  • 獲得重新分區(qū)的分區(qū)個數(shù)
  • 針對某個k-v對根據(jù)其中的key,將它按照特定的方法進行分區(qū)
class MyPartitioner(partitions: Int) extends Partitioner{
  def numPartitions: Int = partitions
  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[String]
    return k.length() % partitions
  }
}

具體調(diào)用如下:

val data = sc.textFile("demo.txt")
val dataAry = data.flatMap(_.split(",")).map((_,1))
              .partitionBy(new MyPartitioner(10)).reduceByKey(_ + _)
              .collect()

PreferredLocations

因為Spark的每個Partition運算都是由一個Task進行的,那么Partition的PreferredLocation會成為Task的PreferredLocation,這是data locality的任務(wù)調(diào)度,遵循移動計算比移動數(shù)據(jù)更加高效的原則。
下面是HadoopRDD的getPreferredLocaltions的具體實現(xiàn),可以看到HadoopRDD的PreferredLocaltions就是本地(localhost)。

 override def getPreferredLocations(split: Partition): Seq[String] = {
        val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
        val locs = hsplit match {
          case lsplit: InputSplitWithLocationInfo =>
            HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
          case _ => None
        }
        locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
      }

Reference:

[1] Spark學(xué)習(xí)之路 (三)Spark之RDD

[2] Spark RDD之Partition

[3] <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html>

[4] <https://blog.csdn.net/u011564172/article/details/54380574>

[5] [Spark RDD之Partitioner(https://blog.csdn.net/u011564172/article/details/54667057)

[6] Spark RDD之Dependency

[7] Spark核心RDD:計算函數(shù)compute

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容