Spark Sql 之 cacheTable

1. 前言

spark sql中使用DataFrame/DataSet來(lái)抽象表示結(jié)構(gòu)化數(shù)據(jù)(關(guān)系數(shù)據(jù)庫(kù)中的table),DataSet上支持和RDD類似的操作,和RDD上的操作生成新的RDD一樣,DataSet上的操作生成新的DataSet來(lái)表示新的數(shù)據(jù)抽象。最終DataSet上的這些操作經(jīng)過(guò):
logical plan -> analyzed logical plan -> optimized logical pan -> physical plan -> rdd dag的轉(zhuǎn)化提交rdd 運(yùn)行。這里plan(執(zhí)行計(jì)劃)就是DataSet上的轉(zhuǎn)換操作,一個(gè)DataSet也就是對(duì)應(yīng)一個(gè)logical plan生成的數(shù)據(jù)。

cacheTable也就是緩存DataSet抽象表示的數(shù)據(jù),也就是DataSet的plan生成的數(shù)據(jù)。

2. cacheTable

從上面的介紹可以看出DataSet只是數(shù)據(jù)的抽象,它描述了從數(shù)據(jù)源頭開(kāi)始經(jīng)過(guò)怎樣的執(zhí)行計(jì)劃(plan)才能得到當(dāng)前的DataSet表示的真實(shí)數(shù)據(jù),也就是必須等到執(zhí)行計(jì)劃提交spark job運(yùn)行結(jié)束后才能得到數(shù)據(jù)。spark實(shí)現(xiàn)cacheTable時(shí),并沒(méi)有立即提交table(DataSet)對(duì)應(yīng)的plan去運(yùn)行,然后得到運(yùn)行結(jié)果數(shù)據(jù)去緩存,而是采用一種lazy模式:最終在DataSet上調(diào)用一些觸發(fā)任務(wù)提交的方法時(shí)(類似RDD的action操作),發(fā)現(xiàn)plan對(duì)應(yīng)的抽象語(yǔ)法樹(shù)中發(fā)現(xiàn)子樹(shù)是表緩存plan,如果這個(gè)時(shí)候數(shù)據(jù)已經(jīng)緩存了,直接使用緩存的數(shù)據(jù),沒(méi)有則觸發(fā)緩存表的plan去執(zhí)行,然后采用按列緩存的方式緩存數(shù)據(jù)。

看看代碼實(shí)現(xiàn):
調(diào)用SQLContext # cacheTable(tableName : String)最終會(huì)走到下面的調(diào)用:

// query 即緩存的Dataset
// storageLevel 可以使用memory和disk緩存
def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    // 拿到dataset的plan
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      // 緩存是建立一個(gè)plan到InMemoryRelation的映射
      cachedData.add(CachedData(
        planToCache,
        InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize,
          storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan,
          tableName)))
    }
  }

上面InMemoryRelation是執(zhí)行計(jì)劃中一個(gè)節(jié)點(diǎn),當(dāng)出現(xiàn)select * from table_a語(yǔ)句(或者任何邏輯執(zhí)行計(jì)劃中有table_a出現(xiàn)),假設(shè)table_a被緩存了,那么這條語(yǔ)句生成的邏輯執(zhí)行計(jì)劃中,table_a對(duì)應(yīng)的Relation節(jié)點(diǎn)會(huì)被執(zhí)行計(jì)劃優(yōu)化器(optimizer)替換成InMemoryRelation。

InMemoryRelation構(gòu)造參數(shù):

  • columnBatchSize,后面會(huì)提到,table緩存是按列緩存的,然后數(shù)據(jù)又被按行分為一個(gè)個(gè)batch,這個(gè)參數(shù)用來(lái)控制一個(gè)batch里行數(shù),通過(guò)配置項(xiàng)spark.sql.inMemoryColumnarStorage.batchSize設(shè)置,默認(rèn)是10000行。

下面是InMemoryRelation中和緩存相關(guān)的代碼:


  private def buildBuffers(): Unit = {
    // output輸出的是Seq[Attribute],也就是表的schema,包含所有列名,列類型等信息
    // child也就是緩存的dataset對(duì)應(yīng)的plan
    val output = child.output
    // 調(diào)用邏輯執(zhí)行計(jì)劃的execute返回的是RDD[InternalRow],返回RDD是整個(gè)執(zhí)行計(jì)劃分析的最后一步了,接下來(lái)就rdd的提交運(yùn)行。那么這個(gè)rdd也就是dataset表示的數(shù)據(jù)的rdd形式的抽象。
   // 這里在rdd上調(diào)用mapPartitionsInternal,實(shí)現(xiàn)的是將遍歷每一行數(shù)據(jù),然后按列緩存。
   // 這個(gè)地方返回新的RDD的數(shù)據(jù)類型CachedBatch,CachedBatch是一個(gè)batch內(nèi)若干行上的按列緩存。
    val cached = child.execute().mapPartitionsInternal { rowIterator =>
      new Iterator[CachedBatch] {
        def next(): CachedBatch = {
          // 按照每一列的類型生成ColumnBuilder,內(nèi)部使用數(shù)組來(lái)保存列數(shù)據(jù)
          val columnBuilders = output.map { attribute =>
            ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
          }.toArray

          var rowCount = 0
          var totalSize = 0L
         // 遍歷每一行數(shù)據(jù),控制當(dāng)前batch行數(shù) rowCount不超過(guò)batchSize,且同時(shí)batch中數(shù)據(jù)大小不超過(guò)MAX_BATCH_SIZE_IN_BYTE(4MB)
          while (rowIterator.hasNext && rowCount < batchSize
            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
            val row = rowIterator.next()

            assert(
              row.numFields == columnBuilders.length,
              s"Row column number mismatch, expected ${output.size} columns, " +
                s"but got ${row.numFields}." +
                s"\nRow content: $row")

            var i = 0
            totalSize = 0
            while (i < row.numFields) {
              columnBuilders(i).appendFrom(row, i)
              totalSize += columnBuilders(i).columnStats.sizeInBytes
              i += 1
            }
            rowCount += 1
          }

          batchStats.add(totalSize)

          val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
            .flatMap(_.values))
          CachedBatch(rowCount, columnBuilders.map { builder =>
            JavaUtils.bufferToArray(builder.build())
          }, stats)
        }

        def hasNext: Boolean = rowIterator.hasNext
      }
      // 調(diào)用persist緩存RDD,所以cacheTable最終還是調(diào)用rdd的緩存接口完成緩存的
    }.persist(storageLevel)

    cached.setName(
      tableName.map(n => s"In-memory table $n")
        .getOrElse(StringUtils.abbreviate(child.toString, 1024)))
  
    _cachedColumnBuffers = cached
  }

上面代碼可以看出cacheTable實(shí)際上還是通過(guò)cache rdd實(shí)現(xiàn)的。上面InMemoryRelation只是邏輯執(zhí)行計(jì)劃中一個(gè)節(jié)點(diǎn),邏輯執(zhí)行計(jì)劃需要轉(zhuǎn)換成物理執(zhí)行計(jì)劃,再轉(zhuǎn)換成RDD dag才能執(zhí)行,加上spark中RDD的計(jì)算是lazy模式的,所以上面的緩存rdd并沒(méi)有提交運(yùn)行,所以數(shù)據(jù)還沒(méi)有緩存下來(lái)。

真正緩存還得看InMemoryRelation所在的執(zhí)行計(jì)劃真正提交后,這個(gè)緩存rdd被計(jì)算,數(shù)據(jù)才會(huì)被緩存在內(nèi)存中。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容