1、Catalyst源碼解讀之SqlParser

本篇文章基于Spark1.6.1源碼解讀Catalyst下的SqlParser

  1. spark sql中可以分三種sql語(yǔ)句
    第一種DDL語(yǔ)句,DDL語(yǔ)句包含以下三種操作,代碼見DDLParser
createTable | describeTable | refreshTable

第二種是spark自身的sql語(yǔ)句,spark自身的sql語(yǔ)句包含以下六種操作,代碼見SparkSQLParser

cache | uncache | set | show | desc | others

第三種是真正的SQL語(yǔ)句,如select語(yǔ)句,SQL語(yǔ)句包含以下三種操作,代碼見SqlParser

start1 | insert | cte

以上這些用"|"分隔的操作會(huì)生成一個(gè)Parser[LogicalPlan],最終變成LogicalPlan

  1. 從熟悉的sqlContext.sql("....")方法開始,一步一步分析sql語(yǔ)句是怎樣被解析生成LogicalPlan。
    第一步從SqlContext的sql方法開始,代碼如下
def sql(sqlText: String): DataFrame = {  DataFrame(this, parseSql(sqlText))}

這里調(diào)用了parseSql(sqlText)方法,代碼如下

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

第二步調(diào)用DDLParser的parse方法,代碼如下

  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
    try {
      parse(input)
    } catch {
      case ddlException: DDLException => throw ddlException
      case _ if !exceptionOnError => parseQuery(input)
      case x: Throwable => throw x
    }
  }

這里有兩步操作,第一步是try語(yǔ)句中的parse(input)語(yǔ)句,他的作用是解析DDL語(yǔ)句,如果成功直接返回。否則看異常,異常中的語(yǔ)句不要忽略了,如果ddl語(yǔ)句解析失敗調(diào)用parseQuery(input),那么parseQuery(input)是從哪里來(lái)的呢?他是在DDLParser實(shí)例化的時(shí)候傳遞進(jìn)來(lái)的

class DDLParser(parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with DataTypeParser with Logging {
  1. 這里插入一個(gè)知識(shí)點(diǎn)
    parse()方法并不是DDLParser的方法,他是父類AbstractSparkSQLParser中的方法,接下來(lái)介紹的SparkSQLParser和SqlParser都繼承自AbstractSparkSQLParser,看一下parse方法的代碼
  def parse(input: String): LogicalPlan = synchronized {
    //初始化并加載關(guān)鍵詞,關(guān)鍵詞是在子類中定義的,比如DDLParser、SparkSQLParser、SqlParser這三個(gè)類中分別定義了自己的關(guān)鍵詞
    //initLexical方法本身很簡(jiǎn)單,這里就不說(shuō)了
    initLexical
    //phrase就是根據(jù)輸入的語(yǔ)句(input)按照規(guī)則(start)來(lái)解析
    //start就是第1段中介紹的三種操作,start方法被子類重寫
    //所以DDLParser中調(diào)用了父類的parse方法后會(huì)回調(diào)子類DDLParser中的start方法(或是變量,因?yàn)榉椒ɑ蚝瘮?shù)也可以賦值給變量)
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
  }
  1. 回到SqlContext中看DDLParser實(shí)例化的代碼
  @transient
  protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
  @transient
  protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

在DDLParser實(shí)例化的時(shí)候傳入了SparkSQLParser中的parse方法,parse方法就是SparkSQLParser父類AbstractSparkSQLParser中的方法,在第3段中介紹過(guò)??吹竭@里終于明白了第2段中的parseQuery(input)就是這里的parse方法。

  1. 通過(guò)parse方法的調(diào)用回調(diào)SparkSQLParser中的start變量 ,start變量代碼如下
override protected lazy val start: Parser[LogicalPlan] =  cache | uncache | set | show | desc | others

這里有五種操作,前面四種對(duì)應(yīng)了spark自身操作的sql語(yǔ)句,這里就不展開說(shuō)明了,以后有時(shí)間再具體分析。如果Spark自身操作的sql沒(méi)有匹配成功會(huì)調(diào)用others規(guī)則,others操作其實(shí)是調(diào)用了SqlParser中的parse方法,他是怎么被調(diào)用的呢?接著住下看,這個(gè)調(diào)用有點(diǎn)繞。首先看others變量,代碼如下

  private lazy val others: Parser[LogicalPlan] =
    wholeInput ^^ {
      case input => fallback(input)
    }

這里回調(diào) fallback(input)方法,fallback方法就在SparkSQLParser實(shí)例的時(shí)候傳遞進(jìn)來(lái)的,我們看一下fallback方法是怎樣產(chǎn)生的

  1. 回到SQLContext中看SparkSQLParser的實(shí)例化
  @transient
  protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))

這里調(diào)用getSQLDialect()方法生成一個(gè)ParserDialect(方言),為什么需要方言呢,是為了區(qū)分spark sql 和hive sql。這里看DefaultParserDialect(ParserDialect的子類,默認(rèn)是spark sql的方言)的parse方法,代碼如下

 override def parse(sqlText: String): LogicalPlan = {
    sqlParser.parse(sqlText)
  }

看到這里sqlParser終于出現(xiàn)了,調(diào)用了SqlParser的parse方法(SqlParser父類AbstractSparkSQLParser中的parse方法)。這個(gè)parse方法就是傳遞給SparkSQLParser中的fallback(input)函數(shù)

  1. 這里重點(diǎn)看SqlParser是怎樣解析sql語(yǔ)句的,根據(jù)前面介紹的內(nèi)容知道調(diào)用SqlParser中的parse方法后,會(huì)回調(diào)SqlParser中的start變量,start變量代碼如下
protected lazy val start: Parser[LogicalPlan] =  start1 | insert | cte

這里有三種操作,看一下start1 的代碼

 protected lazy val start1: Parser[LogicalPlan] =
    (select | ("(" ~> select <~ ")")) *
    ( UNION ~ ALL        ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) }
    | INTERSECT          ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) }
    | EXCEPT             ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)}
    | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
    )

這一堆是什么玩意兒,慢慢的來(lái)分析一下
select 會(huì)生成一個(gè)Parser,其他代碼如下

 protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> DISTINCT.? ~
      repsep(projection, ",") ~
      (FROM   ~> relations).? ~
      (WHERE  ~> expression).? ~
      (GROUP  ~  BY ~> rep1sep(expression, ",")).? ~
      (HAVING ~> expression).? ~
      sortType.? ~
      (LIMIT  ~> expression).? ^^ {
        case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>
          val base = r.getOrElse(OneRowRelation)
          val withFilter = f.map(Filter(_, base)).getOrElse(base)
          val withProjection = g
            .map(Aggregate(_, p.map(UnresolvedAlias(_)), withFilter))
            .getOrElse(Project(p.map(UnresolvedAlias(_)), withFilter))
          val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
          val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
          val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
          val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
          withLimit
      }

通過(guò)上面簡(jiǎn)單的一二十行代碼就完成了sql語(yǔ)句的解析,太有魔力了。感慨一下scala語(yǔ)句強(qiáng)大的表達(dá)能力。
想看懂上面的代碼,我們先來(lái)看一下那些符號(hào)>、、^^等是什么意思

|  左邊算子和右邊的算子只要有一個(gè)成功了,就返回succeed,類似or
~  左邊的算子成功后,右邊的算子對(duì)后續(xù)的輸入也計(jì)算成功,就返回succeed
.?  如果p算子成功則返回則返回Some(x) 如果p算子失敗,返回fails
^^^  如果左邊的算子成功,取消左邊算子的結(jié)果,返回右邊算子。
~> 如果左邊的算子和右邊的算子都成功了,返回的結(jié)果中不包含左邊的返回值。  
<~ 這個(gè)和~>操作符的意思相反,如果左邊的算子和右邊的算子都成功了,返回的結(jié)果中不包含右邊的
^^{} 或者 ^^=> 變形連接符,意思是如果左邊的算子成功了,用^^右邊的算子函數(shù)作用于返回的結(jié)果

這些符號(hào)究竟是什么東西,又代表的是什么語(yǔ)法,其實(shí)就是Parser的一個(gè)個(gè)方法而已,原來(lái)還是scala的語(yǔ)法,差點(diǎn)被迷惑了。
這個(gè)語(yǔ)句就是根據(jù)關(guān)鍵字、操作符號(hào)、函數(shù)生成一個(gè)parser[LogicalPlan]類型的withLimit。
關(guān)鍵字代碼如下

protected val ALL = Keyword("ALL")
  protected val AND = Keyword("AND")
  protected val APPROXIMATE = Keyword("APPROXIMATE")
  protected val AS = Keyword("AS")
  protected val ASC = Keyword("ASC")
  protected val BETWEEN = Keyword("BETWEEN")
  protected val BY = Keyword("BY")
  protected val CASE = Keyword("CASE")
  protected val CAST = Keyword("CAST")
  protected val DESC = Keyword("DESC")
  protected val DISTINCT = Keyword("DISTINCT")
  protected val ELSE = Keyword("ELSE")
  protected val END = Keyword("END")
  protected val EXCEPT = Keyword("EXCEPT")
  protected val FALSE = Keyword("FALSE")
  protected val FROM = Keyword("FROM")
  protected val FULL = Keyword("FULL")
  protected val GROUP = Keyword("GROUP")
  protected val HAVING = Keyword("HAVING")
  protected val IN = Keyword("IN")
  protected val INNER = Keyword("INNER")
  protected val INSERT = Keyword("INSERT")
  protected val INTERSECT = Keyword("INTERSECT")
  protected val INTERVAL = Keyword("INTERVAL")
  protected val INTO = Keyword("INTO")
  protected val IS = Keyword("IS")
  protected val JOIN = Keyword("JOIN")
  protected val LEFT = Keyword("LEFT")
  protected val LIKE = Keyword("LIKE")
  protected val LIMIT = Keyword("LIMIT")
  protected val NOT = Keyword("NOT")
  protected val NULL = Keyword("NULL")
  protected val ON = Keyword("ON")
  protected val OR = Keyword("OR")
  protected val ORDER = Keyword("ORDER")
  protected val SORT = Keyword("SORT")
  protected val OUTER = Keyword("OUTER")
  protected val OVERWRITE = Keyword("OVERWRITE")
  protected val REGEXP = Keyword("REGEXP")
  protected val RIGHT = Keyword("RIGHT")
  protected val RLIKE = Keyword("RLIKE")
  protected val SELECT = Keyword("SELECT")
  protected val SEMI = Keyword("SEMI")
  protected val TABLE = Keyword("TABLE")
  protected val THEN = Keyword("THEN")
  protected val TRUE = Keyword("TRUE")
  protected val UNION = Keyword("UNION")
  protected val WHEN = Keyword("WHEN")
  protected val WHERE = Keyword("WHERE")
  protected val WITH = Keyword("WITH")

根據(jù)關(guān)鍵詞我們可以知道在寫sql語(yǔ)句的時(shí)候哪些操作可以使用,哪些操作是不支持的

  1. withLimit:Parser[LogicalPlan]是怎么變成LogicalPlan的呢?



    Parser[LogicalPlan]繼承自一個(gè)函數(shù),最終返回ParseResult[T]類型,ParseResult[T]有兩個(gè)子類,分別是Success和NoSuccess,代碼如下

case class Success[+T](result: T, override val next: Input) extends ParseResult[T]
sealed abstract class NoSuccess(val msg: String, override val next: Input) extends ParseResult[Nothing] 

當(dāng)sql解析成功后會(huì)返回Success。
再次看一下調(diào)用關(guān)系,最后調(diào)用的是start



接著看start的調(diào)用

def parse(input: String): LogicalPlan = synchronized {
    // Initialize the Keywords.
    // 初始化分詞器的關(guān)鍵字
    initLexical
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
}

用模式匹配去匹配結(jié)果是ParseResult[LogicalPlan]的哪一個(gè)子類,如果是Success,看一下Success的代碼

case class Success[+T](result: T, override val next: Input) extends ParseResult[T]

這里的T就是LogicalPlan

  1. 雖然還有很多其他操作,但解析的步驟都是一樣的
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容