1. 前言
spark sql中使用DataFrame/DataSet來(lái)抽象表示結(jié)構(gòu)化數(shù)據(jù)(關(guān)系數(shù)據(jù)庫(kù)中的table),DataSet上支持和RDD類似的操作,和RDD上的操作生成新的RDD一樣,DataSet上的操作生成新的DataSet來(lái)表示新的數(shù)據(jù)抽象。最終DataSet上的這些操作經(jīng)過(guò):
logical plan -> analyzed logical plan -> optimized logical pan -> physical plan -> rdd dag的轉(zhuǎn)化提交rdd 運(yùn)行。這里plan(執(zhí)行計(jì)劃)就是DataSet上的轉(zhuǎn)換操作,一個(gè)DataSet也就是對(duì)應(yīng)一個(gè)logical plan生成的數(shù)據(jù)。
cacheTable也就是緩存DataSet抽象表示的數(shù)據(jù),也就是DataSet的plan生成的數(shù)據(jù)。
2. cacheTable
從上面的介紹可以看出DataSet只是數(shù)據(jù)的抽象,它描述了從數(shù)據(jù)源頭開(kāi)始經(jīng)過(guò)怎樣的執(zhí)行計(jì)劃(plan)才能得到當(dāng)前的DataSet表示的真實(shí)數(shù)據(jù),也就是必須等到執(zhí)行計(jì)劃提交spark job運(yùn)行結(jié)束后才能得到數(shù)據(jù)。spark實(shí)現(xiàn)cacheTable時(shí),并沒(méi)有立即提交table(DataSet)對(duì)應(yīng)的plan去運(yùn)行,然后得到運(yùn)行結(jié)果數(shù)據(jù)去緩存,而是采用一種lazy模式:最終在DataSet上調(diào)用一些觸發(fā)任務(wù)提交的方法時(shí)(類似RDD的action操作),發(fā)現(xiàn)plan對(duì)應(yīng)的抽象語(yǔ)法樹(shù)中發(fā)現(xiàn)子樹(shù)是表緩存plan,如果這個(gè)時(shí)候數(shù)據(jù)已經(jīng)緩存了,直接使用緩存的數(shù)據(jù),沒(méi)有則觸發(fā)緩存表的plan去執(zhí)行,然后采用按列緩存的方式緩存數(shù)據(jù)。
看看代碼實(shí)現(xiàn):
調(diào)用SQLContext # cacheTable(tableName : String)最終會(huì)走到下面的調(diào)用:
// query 即緩存的Dataset
// storageLevel 可以使用memory和disk緩存
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
// 拿到dataset的plan
val planToCache = query.logicalPlan
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
// 緩存是建立一個(gè)plan到InMemoryRelation的映射
cachedData.add(CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)))
}
}
上面InMemoryRelation是執(zhí)行計(jì)劃中一個(gè)節(jié)點(diǎn),當(dāng)出現(xiàn)select * from table_a語(yǔ)句(或者任何邏輯執(zhí)行計(jì)劃中有table_a出現(xiàn)),假設(shè)table_a被緩存了,那么這條語(yǔ)句生成的邏輯執(zhí)行計(jì)劃中,table_a對(duì)應(yīng)的Relation節(jié)點(diǎn)會(huì)被執(zhí)行計(jì)劃優(yōu)化器(optimizer)替換成InMemoryRelation。
InMemoryRelation構(gòu)造參數(shù):
- columnBatchSize,后面會(huì)提到,table緩存是按列緩存的,然后數(shù)據(jù)又被按行分為一個(gè)個(gè)batch,這個(gè)參數(shù)用來(lái)控制一個(gè)batch里行數(shù),通過(guò)配置項(xiàng)
spark.sql.inMemoryColumnarStorage.batchSize設(shè)置,默認(rèn)是10000行。
下面是InMemoryRelation中和緩存相關(guān)的代碼:
private def buildBuffers(): Unit = {
// output輸出的是Seq[Attribute],也就是表的schema,包含所有列名,列類型等信息
// child也就是緩存的dataset對(duì)應(yīng)的plan
val output = child.output
// 調(diào)用邏輯執(zhí)行計(jì)劃的execute返回的是RDD[InternalRow],返回RDD是整個(gè)執(zhí)行計(jì)劃分析的最后一步了,接下來(lái)就rdd的提交運(yùn)行。那么這個(gè)rdd也就是dataset表示的數(shù)據(jù)的rdd形式的抽象。
// 這里在rdd上調(diào)用mapPartitionsInternal,實(shí)現(xiàn)的是將遍歷每一行數(shù)據(jù),然后按列緩存。
// 這個(gè)地方返回新的RDD的數(shù)據(jù)類型CachedBatch,CachedBatch是一個(gè)batch內(nèi)若干行上的按列緩存。
val cached = child.execute().mapPartitionsInternal { rowIterator =>
new Iterator[CachedBatch] {
def next(): CachedBatch = {
// 按照每一列的類型生成ColumnBuilder,內(nèi)部使用數(shù)組來(lái)保存列數(shù)據(jù)
val columnBuilders = output.map { attribute =>
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray
var rowCount = 0
var totalSize = 0L
// 遍歷每一行數(shù)據(jù),控制當(dāng)前batch行數(shù) rowCount不超過(guò)batchSize,且同時(shí)batch中數(shù)據(jù)大小不超過(guò)MAX_BATCH_SIZE_IN_BYTE(4MB)
while (rowIterator.hasNext && rowCount < batchSize
&& totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
val row = rowIterator.next()
assert(
row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")
var i = 0
totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1
}
batchStats.add(totalSize)
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}
def hasNext: Boolean = rowIterator.hasNext
}
// 調(diào)用persist緩存RDD,所以cacheTable最終還是調(diào)用rdd的緩存接口完成緩存的
}.persist(storageLevel)
cached.setName(
tableName.map(n => s"In-memory table $n")
.getOrElse(StringUtils.abbreviate(child.toString, 1024)))
_cachedColumnBuffers = cached
}
上面代碼可以看出cacheTable實(shí)際上還是通過(guò)cache rdd實(shí)現(xiàn)的。上面InMemoryRelation只是邏輯執(zhí)行計(jì)劃中一個(gè)節(jié)點(diǎn),邏輯執(zhí)行計(jì)劃需要轉(zhuǎn)換成物理執(zhí)行計(jì)劃,再轉(zhuǎn)換成RDD dag才能執(zhí)行,加上spark中RDD的計(jì)算是lazy模式的,所以上面的緩存rdd并沒(méi)有提交運(yùn)行,所以數(shù)據(jù)還沒(méi)有緩存下來(lái)。
真正緩存還得看InMemoryRelation所在的執(zhí)行計(jì)劃真正提交后,這個(gè)緩存rdd被計(jì)算,數(shù)據(jù)才會(huì)被緩存在內(nèi)存中。