問(wèn)題
假如我們?cè)趕park-shell上執(zhí)行:
spark.sql("select * from table limit 1000").collect()
spark會(huì)開(kāi)多少多個(gè)任務(wù)去跑這個(gè)任務(wù)呢?
實(shí)驗(yàn)
OK,我們來(lái)做一個(gè)實(shí)驗(yàn)吧!

通過(guò)實(shí)驗(yàn)結(jié)果我們可以看到就開(kāi)了一個(gè)Task執(zhí)行,but, 是這樣的嘛?
其實(shí)開(kāi)多少Task還真不是固定的,這個(gè)取決于我們take的條數(shù)和這張表底層每個(gè)分區(qū)數(shù)據(jù)量的大小,怎么說(shuō)呢,我們舉個(gè)??。
首先spark2后,spark默認(rèn)會(huì)先去讀取一個(gè)分區(qū)的數(shù)據(jù),假如我limit 1000條,那我就從這個(gè)分區(qū)去取1000條數(shù)據(jù)就好了,但是如果這個(gè)分區(qū)的數(shù)據(jù)不過(guò)1000條怎么辦,這時(shí)spark會(huì)通一個(gè)公式去計(jì)算出下次讀取的分區(qū)個(gè)數(shù)。
limit 操作最終會(huì)調(diào)用 SparkPlan.executeTake(n: Int) 來(lái)獲取至多 n 條 records, 待我貼出源碼
def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
return new Array[InternalRow](0)
}
val childRDD = getByteArrayRdd(n).map(_._2)
val buf = new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length
var partsScanned = 0
# 通過(guò)while循環(huán)去runJob獲取records, 直到獲取的records達(dá)到take條數(shù)
while (buf.size < n && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2)
if (buf.isEmpty) {
numPartsToTry = partsScanned * limitScaleUpFactor
} else {
val left = n - buf.size
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val sc = sqlContext.sparkContext
val res = sc.runJob(childRDD,
(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p)
buf ++= res.flatMap(decodeUnsafeRows)
partsScanned += p.size
}
if (buf.size > n) {
buf.take(n).toArray
} else {
buf.toArray
}
}
默認(rèn)情況下每次 runJob 掃描的 partitions 數(shù):
1
4
20
100
500
2500
6875
通過(guò)讀取的partitions的個(gè)數(shù)我們可以發(fā)現(xiàn)最初讀取的partition數(shù)量太少,后面讀取的partition數(shù)據(jù)量太多。
其實(shí)這邊我們可以通過(guò)計(jì)算每次讀取partitions得到的records估算出下去應(yīng)該讀取的分區(qū),這樣會(huì)比較靠譜些。