spark2以后對(duì)limit的優(yōu)化和存在問(wèn)題

問(wèn)題

假如我們?cè)趕park-shell上執(zhí)行:
spark.sql("select * from table limit 1000").collect()
spark會(huì)開(kāi)多少多個(gè)任務(wù)去跑這個(gè)任務(wù)呢?

實(shí)驗(yàn)

OK,我們來(lái)做一個(gè)實(shí)驗(yàn)吧!


job

通過(guò)實(shí)驗(yàn)結(jié)果我們可以看到就開(kāi)了一個(gè)Task執(zhí)行,but, 是這樣的嘛?
其實(shí)開(kāi)多少Task還真不是固定的,這個(gè)取決于我們take的條數(shù)和這張表底層每個(gè)分區(qū)數(shù)據(jù)量的大小,怎么說(shuō)呢,我們舉個(gè)??。
首先spark2后,spark默認(rèn)會(huì)先去讀取一個(gè)分區(qū)的數(shù)據(jù),假如我limit 1000條,那我就從這個(gè)分區(qū)去取1000條數(shù)據(jù)就好了,但是如果這個(gè)分區(qū)的數(shù)據(jù)不過(guò)1000條怎么辦,這時(shí)spark會(huì)通一個(gè)公式去計(jì)算出下次讀取的分區(qū)個(gè)數(shù)。

limit 操作最終會(huì)調(diào)用 SparkPlan.executeTake(n: Int) 來(lái)獲取至多 n 條 records, 待我貼出源碼

def executeTake(n: Int): Array[InternalRow] = {
    if (n == 0) {
      return new Array[InternalRow](0)
    }

    val childRDD = getByteArrayRdd(n).map(_._2)

    val buf = new ArrayBuffer[InternalRow]
    val totalParts = childRDD.partitions.length
    var partsScanned = 0
    # 通過(guò)while循環(huán)去runJob獲取records, 直到獲取的records達(dá)到take條數(shù)
    while (buf.size < n && partsScanned < totalParts) {
      // The number of partitions to try in this iteration. It is ok for this number to be
      // greater than totalParts because we actually cap it at totalParts in runJob.
      var numPartsToTry = 1L
      if (partsScanned > 0) {
        // If we didn't find any rows after the previous iteration, quadruple and retry.
        // Otherwise, interpolate the number of partitions we need to try, but overestimate
        // it by 50%. We also cap the estimation in the end.
        val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
        if (buf.isEmpty) {
          numPartsToTry = partsScanned * limitScaleUpFactor
        } else {
          val left = n - buf.size
          // As left > 0, numPartsToTry is always >= 1
          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
          numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
        }
      }

      val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
      val sc = sqlContext.sparkContext
      val res = sc.runJob(childRDD,
        (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)

      buf ++= res.flatMap(decodeUnsafeRows)

      partsScanned += p.size
    }

    if (buf.size > n) {
      buf.take(n).toArray
    } else {
      buf.toArray
    }
  }

默認(rèn)情況下每次 runJob 掃描的 partitions 數(shù):

1
4
20
100
500
2500
6875

通過(guò)讀取的partitions的個(gè)數(shù)我們可以發(fā)現(xiàn)最初讀取的partition數(shù)量太少,后面讀取的partition數(shù)據(jù)量太多。

其實(shí)這邊我們可以通過(guò)計(jì)算每次讀取partitions得到的records估算出下去應(yīng)該讀取的分區(qū),這樣會(huì)比較靠譜些。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容