Spark Partitioner 源碼分析

Partitioner

首先RDD類(lèi)型為K/V對(duì)的數(shù)據(jù)才會(huì)有分區(qū)器,用來(lái)確定數(shù)據(jù)按照Key值劃分到哪一個(gè)分區(qū),其定義如下:

abstract class Partitioner extends Serializable {
  def numPartitions: Int //分區(qū)總數(shù)
  def getPartition(key: Any): Int //key對(duì)應(yīng)的partition索引
}

Spark內(nèi)部提供了HashPartitioner和RangePartitioner兩種分區(qū)策略

HashPartitioner

通過(guò)key的hashCode,對(duì)numPartitions取模,如果key比較均勻,能夠大致確保每個(gè)partition中數(shù)據(jù)量均勻分布

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

RangePartitioner

先進(jìn)行一次采樣,如果不夠均勻,再次采樣,每次采樣都會(huì)使用collect()方法,所以最壞情況下運(yùn)行到sortByKey時(shí),需要額外啟動(dòng)2個(gè)job,對(duì)應(yīng)的stage要跑三次才能完成

大致步驟:

  • 計(jì)算每個(gè)分區(qū)的采樣數(shù)目
  • 蓄水池采樣,輸出rdd元素的總數(shù),以及每個(gè)分區(qū)對(duì)應(yīng)的元素個(gè)數(shù)和采樣結(jié)果(collect()會(huì)觸發(fā)Job)
  • 計(jì)算總體的采樣率
  • 如果分區(qū)的采樣率過(guò)低,標(biāo)記該分區(qū),需要重新采樣
  • 采樣率合格,每個(gè)采樣的到的key對(duì)應(yīng)一個(gè)權(quán)重,數(shù)值為該分區(qū)采樣率的倒數(shù),即分區(qū)元素?cái)?shù)目 / 采樣數(shù)目
  • 對(duì)不合格的分區(qū)重新采樣(collect()會(huì)觸發(fā)Job),這一次會(huì)直接設(shè)定采樣率為總體采樣率,同樣,每個(gè)采樣的到的key對(duì)應(yīng)一個(gè)權(quán)重
  • 完成采樣,獲取總權(quán)重,計(jì)算出每個(gè)分區(qū)對(duì)應(yīng)的權(quán)重
  • 對(duì)(key,權(quán)重)按照key排序,根據(jù)權(quán)重劃分范圍
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {
  def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
    this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
  }

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      //總樣本大小sampleSize,每個(gè)Partition取樣20條,最多不超過(guò)1M
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      //過(guò)采樣,總采樣數(shù)目乘以系數(shù)3,假定每個(gè)輸入分區(qū)的數(shù)據(jù)量大致均衡
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      //通過(guò)蓄水池取樣 返回RDD元素的總數(shù),以及一個(gè)抽樣數(shù)據(jù)的數(shù)組Array[(Int, Long, Array[K])]),對(duì)應(yīng)為分區(qū)號(hào),分區(qū)內(nèi)的元素?cái)?shù)目,該分區(qū)的取樣數(shù)據(jù)
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // 對(duì)包含過(guò)多元素的partition重新采樣,確保采集到足夠充分的數(shù)據(jù)
        // 平均采樣率,實(shí)際采樣率要高三倍
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          //該P(yáng)artition的元素?cái)?shù)目過(guò)多,實(shí)際采樣率低于fraction,記錄
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // 采樣率達(dá)到要求,設(shè)定每個(gè)樣本(鍵值Key)的權(quán)重 權(quán)重=分區(qū)元素總數(shù)/分區(qū)采樣數(shù),采樣率的倒數(shù)
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          // 以期望的采樣概率重新采樣不均勻的Partition
          // 創(chuàng)建分區(qū)修剪RDD,對(duì)采樣不均勻的分區(qū)重新采樣,并對(duì)樣本設(shè)定權(quán)重
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          //設(shè)定每個(gè)采樣到的元素對(duì)應(yīng)的權(quán)重,采樣率的倒數(shù)
          candidates ++= reSampled.map(x => (x, weight))
        }
        // 決定分區(qū)的劃分邊界
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }
}

邊界劃分

依據(jù)候選中的權(quán)重劃分分區(qū),權(quán)重值可以理解為該Key值所代表的元素?cái)?shù)目
返回一個(gè)數(shù)組,長(zhǎng)度為partitions - 1,第i個(gè)元素作為第i個(gè)分區(qū)內(nèi)元素key值的上界

  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    //依據(jù)Key進(jìn)行排序,升序
    val ordered = candidates.sortBy(_._1)
    val numCandidates = ordered.size
    //計(jì)算出權(quán)重和,以及每個(gè)Partition的平均權(quán)重
    val sumWeights = ordered.map(_._2.toDouble).sum
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      //權(quán)重累加
      cumWeight += weight
      //達(dá)到分割的目標(biāo)值
      if (cumWeight >= target) {
        // 相同key值處于相同的Partition中,key值不同可以進(jìn)行分割
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key //記錄邊界
          target += step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }

獲取分區(qū)

getPartition,邊界數(shù)目少于等于128,直接遍歷比較key和邊界數(shù)組,得到分區(qū)索引,否則使用二分查找獲取分區(qū)位置,最后根據(jù)升序還是降序,返回相應(yīng)的PartitionId

 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

蓄水池取樣 Reservoir Sampling

適用于從包含n個(gè)項(xiàng)目的集合中選取k個(gè)樣本,其中n為一很大或未知的數(shù)量

數(shù)學(xué)原理:共有n個(gè)對(duì)象,將前k個(gè)對(duì)象放入“水庫(kù)”,從k+1個(gè)對(duì)象開(kāi)始,以k/(k+1)的概率選擇該對(duì)象,以k/(k+2)的概率選擇第k+2個(gè)對(duì)象,以此類(lèi)推,以k/m的概率選擇第m個(gè)對(duì)象(m>k)。如果m被選中,則隨機(jī)替換水庫(kù)中的一個(gè)對(duì)象。最終每個(gè)對(duì)象被選中的概率均為k/n

  /**
   * 對(duì)每個(gè)分區(qū)進(jìn)行蓄水池采樣,采樣實(shí)際上會(huì)觸發(fā)一個(gè)Job
   *
   * @param rdd 需要掃描的 RDD,只包含key值
   * @param sampleSizePerPartition 每個(gè)分區(qū)最大采樣數(shù)目
   * @return (total number of items, an array of (partitionId, number of items, sample))
   */
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id 
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16)) //隨機(jī)種子
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()//觸發(fā)Job
    val numItems = sketched.map(_._2).sum//各個(gè)分區(qū)元素?cái)?shù)目之和
    (numItems, sketched)
  }

采樣的核心方法,返回采樣結(jié)果,以及輸入數(shù)據(jù)總數(shù)

  def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k) //蓄水池的大小為K
    //把前k個(gè)元素放入蓄水池中
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    if (i < k) {
      // 如果輸入數(shù)據(jù)量小于水池的大小k,截?cái)鄶?shù)組直接返回
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // 蓄水池已經(jīng)填滿,繼續(xù)取樣,根據(jù)概率決定是否進(jìn)行替換已有采樣數(shù)據(jù)
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
        l += 1
        //產(chǎn)生[0,l)類(lèi)型為double的隨機(jī)數(shù)
        val replacementIndex = (rand.nextDouble() * l).toLong
        //新的數(shù)據(jù)被選擇的概率為k/l,替換對(duì)應(yīng)索引位置的元素
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容