spark sql 2.3 源碼解讀 - 架構(gòu)概覽 (1)

? spark sql 的前身是shark,類似于 hive, 用戶可以基于spark引擎使用sql語(yǔ)句對(duì)數(shù)據(jù)進(jìn)行分析,而不用去編寫程序代碼。

屏幕快照 2018-08-12 下午2.55.58

? 上圖很好的展示了spark sql的功能,提供了 jdbc,console,編程接口三種方式來操作RDD(Resilient Distributed Datasets),用戶只需要編寫sql即可,不需要編寫程序代碼。

? spark sql的運(yùn)行流程如下:

屏幕快照 2018-08-12 下午2.56.38

大概有6步:

  1. sql 語(yǔ)句經(jīng)過 SqlParser 解析成 Unresolved Logical Plan;
  2. analyzer 結(jié)合 catalog 進(jìn)行綁定,生成 Logical Plan;
  3. optimizer 對(duì) Logical Plan 優(yōu)化,生成 Optimized LogicalPlan;
  4. SparkPlan 將 Optimized LogicalPlan 轉(zhuǎn)換成 Physical Plan;
  5. prepareForExecution()將 Physical Plan 轉(zhuǎn)換成 executed Physical Plan;
  6. execute()執(zhí)行可執(zhí)行物理計(jì)劃,得到RDD;

上述流程在spark中對(duì)應(yīng)的源碼部分:

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {

  // TODO: Move the planner an optimizer into here from SessionState.
  protected def planner = sparkSession.sessionState.planner

  def assertAnalyzed(): Unit = analyzed

  def assertSupported(): Unit = {
    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
      UnsupportedOperationChecker.checkForBatch(analyzed)
    }
  }

  lazy val analyzed: LogicalPlan = {
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.executeAndCheck(logical)
  }

  lazy val withCachedData: LogicalPlan = {
    assertAnalyzed()
    assertSupported()
    sparkSession.sharedState.cacheManager.useCachedData(analyzed)
  }

  lazy val optimizedPlan: LogicalPlan = {
    sparkSession.sessionState.optimizer.execute(withCachedData)
  }

  lazy val sparkPlan: SparkPlan = {
    SparkSession.setActiveSession(sparkSession)
    // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
    //       but we will implement to choose the best plan.
    planner.plan(ReturnAnswer(optimizedPlan)).next()
  }

  // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

  /** Internal version of the RDD. Avoids copies and has no schema */
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

? 邏輯交代的非常清楚,從最后一行的 lazy val toRdd: RDD[InternalRow] = executedPlan.execute() 往前推便可清晰看到整個(gè)流程。細(xì)心的同學(xué)也可以看到 所有步驟都是lazy的,只有調(diào)用了execute才會(huì)觸發(fā)執(zhí)行,這也是spark的重要設(shè)計(jì)思想。

? 后面的文章將對(duì)這些流程進(jìn)行詳細(xì)的介紹。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容