Spark中RangePartitioner的實現(xiàn)機制分析

一.分區(qū)器的區(qū)別

  • HashPartitioner分區(qū)可能HashPartitioner導(dǎo)致每個分區(qū)中數(shù)據(jù)量的不均勻。
  • RangePartitioner分區(qū)盡量保證每個分區(qū)中數(shù)據(jù)量的均勻,將一定范圍內(nèi)的數(shù)映射到某一個分區(qū)內(nèi)。分區(qū)與分區(qū)之間數(shù)據(jù)是有序的,但分區(qū)內(nèi)的元素是不能保證順序的。

二.RangePartitioner分區(qū)執(zhí)行原理概述

1.計算總體的數(shù)據(jù)抽樣大小sampleSize,計算規(guī)則是:至少每個分區(qū)抽取20個數(shù)據(jù)或者最多1e6的樣本的數(shù)據(jù)量。

2.根據(jù)sampleSize和分區(qū)數(shù)量計算每個分區(qū)的數(shù)據(jù)抽樣樣本數(shù)量最大值sampleSizePrePartition。

3.根據(jù)以上兩個值進(jìn)行水塘抽樣,返回RDD的總數(shù)據(jù)量,分區(qū)中總元素的個數(shù)和每個分區(qū)的采樣數(shù)據(jù)。

4.計算出數(shù)據(jù)量較大的分區(qū)通過RDD.sample進(jìn)行重新抽樣。

5.通過抽樣數(shù)組 candidates: ArrayBuffer[(K, wiegth)]計算出分區(qū)邊界的數(shù)組BoundsArray

6.在取數(shù)據(jù)時,如果分區(qū)數(shù)小于128則直接獲取,如果大于128則通過二分法,獲取當(dāng)前Key屬于那個區(qū)間,返回對應(yīng)的BoundsArray下標(biāo)即為partitionsID。

源碼分析可參考以下幾篇博客

下面只對RanagePartitioner的核心機制進(jìn)行分析總結(jié)。

三.RangePartitioner的實現(xiàn)機制

1.在總數(shù)不知道的情況下如何等概率地從中抽取N行?

類比水塘抽樣法,該方法可以解決在總數(shù)不知道的情況下如何等概率地從中抽取一行數(shù)據(jù)
定義取出的行號為choice,第一次直接以第一行作為取出行 choice ,而后第二次以二分之一概率決定是否用第二行替換 choice ,第三次以三分之一的概率決定是否以第三行替換 choice ……,以此類推。

由上面的分析我們可以得出結(jié)論,在取第n個數(shù)據(jù)的時候,我們生成一個0到1的隨機數(shù)p,如果p小于1/n,保留第n個數(shù)。大于1/n,繼續(xù)保留前面的數(shù)。直到數(shù)據(jù)流結(jié)束,返回此數(shù),算法結(jié)束。

解決方案:在RangePartition中如何實現(xiàn)在總數(shù)不知道的情況下如何等概率地從中抽取N行數(shù)據(jù):

采樣算法是RangePartitioner分區(qū)的核心,其內(nèi)部使用的就是水塘抽樣,而這個抽樣特別適合那種總數(shù)很大而且未知,并無法將所有的數(shù)據(jù)全部存放到主內(nèi)存中的情況。也就是我們不需要事先知道RDD中元素的個數(shù)。

def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // 把前K個元素放入到數(shù)組reservoir中,k為設(shè)置的每個分區(qū)的樣本數(shù),及sampleSizePerPartition
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // 如果分區(qū)記錄數(shù)少于設(shè)置的分區(qū)樣本數(shù),則直接返回
    // 否則使用迭代器,每次迭代出的數(shù)據(jù),為其生成一個0至 l 的隨機數(shù),如果隨機數(shù)小于K,則把reservoir數(shù)組中的對應(yīng)記錄替換
    if (i < k) {
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
       // l 的值不斷迭代的
        l += 1
        val replacementIndex = (rand.nextDouble() * l).toLong
         //如果隨機數(shù)小于K,則把reservoir數(shù)組中的對應(yīng)記錄替換
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }

分析
RangePartition中水塘抽樣的理解,首先從池塘中先從前往后取出足夠的樣本數(shù)據(jù),暫且稱這批數(shù)據(jù)為舊數(shù)據(jù),之后對未遍歷的數(shù)據(jù)進(jìn)行遍歷,此次i值為該分區(qū)元素的第 i 條記錄,i = k。

之后 i 值不斷的迭代,然后取出(0,i)的隨機數(shù),如果該隨機數(shù)小于k則,進(jìn)行替換,保留第 i 個數(shù),隨著不斷的迭代,i 值是一定是逐漸變大的,k 值是不變的,所以取出的新數(shù)據(jù)替換舊數(shù)據(jù)的幾率就越來越小,例如,一開始,i = k,隨機值小于 k 的概率為 (k-1)/k,迭代 n 次之后,該幾率為 (k-1)/k+n。

用偽代碼表示如下所示

從S中抽取首k項放入「水塘」中
對于每一個S[j]項(j ≥ k):
   隨機產(chǎn)生一個范圍0到j(luò)的整數(shù)r
   若 r < k 則把水塘中的第r項換成S[j]項
2.計算樣本權(quán)重,對數(shù)據(jù)多的分區(qū)再進(jìn)行抽樣

父RDD各分區(qū)中的數(shù)據(jù)量可能不均勻,在極端情況下,有些分區(qū)內(nèi)的數(shù)據(jù)量會占有整個RDD的絕大多數(shù)的數(shù)據(jù),如果按照水塘抽樣進(jìn)行采樣,會導(dǎo)致該分區(qū)所采樣的數(shù)據(jù)量不足,因此需要對取樣數(shù)不足的分區(qū)還需要重新進(jìn)行采樣。

通過(采樣因子*分區(qū)記錄數(shù))得到每個分區(qū)應(yīng)采樣本數(shù)。

如果fraction * 分區(qū)內(nèi)記錄數(shù) > sampleSizePerPartition,則該分區(qū)會再進(jìn)行一次抽樣,否則計算權(quán)重weight為 1 /(總樣本數(shù)/總記錄總數(shù)),因為sample中的比例就是(總樣本數(shù)/總記錄總數(shù))。

如果fraction * 分區(qū)內(nèi)記錄數(shù) < sampleSizePerPartition,權(quán)重weight 為(分區(qū)總數(shù) / 采樣總數(shù)),為該分區(qū)的取出的樣本的真實權(quán)重,可能會比平均權(quán)重大,因為有可能在上面的reservoirSampleAndCount水塘抽樣中采樣總數(shù)已經(jīng)達(dá)到了該分區(qū)記錄數(shù)的最大值。

// 計算總樣本數(shù)量和總記錄數(shù)的占比,占比最大為1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存樣本數(shù)據(jù)的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存數(shù)據(jù)分布不均衡的分區(qū)id(數(shù)據(jù)量超過fraction比率的分區(qū))
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 計算抽取出來的樣本數(shù)據(jù)
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以當(dāng)前分區(qū)中的數(shù)據(jù)量大于之前計算的每個分區(qū)的抽象數(shù)據(jù)大小,
            // 那么表示當(dāng)前分區(qū)抽取的數(shù)據(jù)太少了,該分區(qū)數(shù)據(jù)分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 當(dāng)前分區(qū)不屬于數(shù)據(jù)分布不均衡的分區(qū),計算占比權(quán)重,并添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }

        // 對于數(shù)據(jù)分布不均衡的RDD分區(qū),重新進(jìn)行數(shù)據(jù)抽樣
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 獲取數(shù)據(jù)分布不均衡的RDD分區(qū),并構(gòu)成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 隨機種子
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽樣函數(shù)API進(jìn)行數(shù)據(jù)抽樣
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

        // 將最終的抽樣數(shù)據(jù)計算出rangeBounds出來
        RangePartitioner.determineBounds(candidates, partitions)
3.根據(jù)樣本權(quán)重解決分區(qū)邊界問題

先將candidate(Array[(key, weight)])按照key排序,計算總權(quán)重sumWeights,除以分區(qū)數(shù),得到每個分區(qū)的平均權(quán)重step,接下來while循環(huán)遍歷已排序的candidate,累加其權(quán)重cumWeight,每當(dāng)累加權(quán)重達(dá)到一個分區(qū)的平均權(quán)重step,就獲取一個key作為分區(qū)間隔符,最后返回所有獲取到的分隔符,determineBounds執(zhí)行完畢,也就返回了變量rangeBounds作為每個分區(qū)邊界的key的集合。

def determineBounds[K: Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // 按照數(shù)據(jù)進(jìn)行數(shù)據(jù)排序,默認(rèn)升序排列
    val ordered = candidates.sortBy(_._1)
    // 獲取總的樣本數(shù)量大小
    val numCandidates = ordered.size
    // 計算總的權(quán)重大小
    val sumWeights = ordered.map(_._2.toDouble).sum
    // 計算步長
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      // 獲取排序后的第i個數(shù)據(jù)及權(quán)重
      val (key, weight) = ordered(i)
      // 累計權(quán)重
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        // 權(quán)重已經(jīng)達(dá)到一個步長的范圍,計算出一個分區(qū)id的值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          // 上一個邊界值為空,或者當(dāng)前邊界key數(shù)據(jù)大于上一個邊界的值,那么當(dāng)前key有效,進(jìn)行計算
          // 添加當(dāng)前key到邊界集合中
          bounds += key
          // 累計target步長界限
          target += step
          // 分區(qū)數(shù)量加1
          j += 1
          // 上一個邊界的值重置為當(dāng)前邊界的值
          previousBound = Some(key)
        }
      }
      i += 1
    }
    // 返回結(jié)果
    bounds.toArray
  }
4.由rangeBounds計算分區(qū)數(shù)和key屬于哪個分區(qū)

rangeBounds少于128,直接遍歷比較key和分隔符,得到PartitionId,否則使用二分查找,并做了邊界條件的判斷,最后,根據(jù)升序還是降序返回PartitionId。

5.父RDD中每個分區(qū)采樣樣本數(shù)的確定
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

父RDD每個分區(qū)需要采樣的數(shù)據(jù)量是正常數(shù)的3倍。

因為父RDD各分區(qū)中的數(shù)據(jù)量可能會出現(xiàn)傾斜的情況,乘于3的目的就是保證數(shù)據(jù)量小的分區(qū)能夠采樣到足夠的數(shù)據(jù),而對于數(shù)據(jù)量大的分區(qū)會進(jìn)行第二次采樣。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容