本篇文章基于Spark1.6.1源碼解讀Catalyst下的SqlParser
- spark sql中可以分三種sql語(yǔ)句
第一種DDL語(yǔ)句,DDL語(yǔ)句包含以下三種操作,代碼見DDLParser
createTable | describeTable | refreshTable
第二種是spark自身的sql語(yǔ)句,spark自身的sql語(yǔ)句包含以下六種操作,代碼見SparkSQLParser
cache | uncache | set | show | desc | others
第三種是真正的SQL語(yǔ)句,如select語(yǔ)句,SQL語(yǔ)句包含以下三種操作,代碼見SqlParser
start1 | insert | cte
以上這些用"|"分隔的操作會(huì)生成一個(gè)Parser[LogicalPlan],最終變成LogicalPlan
- 從熟悉的sqlContext.sql("....")方法開始,一步一步分析sql語(yǔ)句是怎樣被解析生成LogicalPlan。
第一步從SqlContext的sql方法開始,代碼如下
def sql(sqlText: String): DataFrame = { DataFrame(this, parseSql(sqlText))}
這里調(diào)用了parseSql(sqlText)方法,代碼如下
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
第二步調(diào)用DDLParser的parse方法,代碼如下
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parse(input)
} catch {
case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => parseQuery(input)
case x: Throwable => throw x
}
}
這里有兩步操作,第一步是try語(yǔ)句中的parse(input)語(yǔ)句,他的作用是解析DDL語(yǔ)句,如果成功直接返回。否則看異常,異常中的語(yǔ)句不要忽略了,如果ddl語(yǔ)句解析失敗調(diào)用parseQuery(input),那么parseQuery(input)是從哪里來(lái)的呢?他是在DDLParser實(shí)例化的時(shí)候傳遞進(jìn)來(lái)的
class DDLParser(parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with DataTypeParser with Logging {
- 這里插入一個(gè)知識(shí)點(diǎn)
parse()方法并不是DDLParser的方法,他是父類AbstractSparkSQLParser中的方法,接下來(lái)介紹的SparkSQLParser和SqlParser都繼承自AbstractSparkSQLParser,看一下parse方法的代碼
def parse(input: String): LogicalPlan = synchronized {
//初始化并加載關(guān)鍵詞,關(guān)鍵詞是在子類中定義的,比如DDLParser、SparkSQLParser、SqlParser這三個(gè)類中分別定義了自己的關(guān)鍵詞
//initLexical方法本身很簡(jiǎn)單,這里就不說(shuō)了
initLexical
//phrase就是根據(jù)輸入的語(yǔ)句(input)按照規(guī)則(start)來(lái)解析
//start就是第1段中介紹的三種操作,start方法被子類重寫
//所以DDLParser中調(diào)用了父類的parse方法后會(huì)回調(diào)子類DDLParser中的start方法(或是變量,因?yàn)榉椒ɑ蚝瘮?shù)也可以賦值給變量)
phrase(start)(new lexical.Scanner(input)) match {
case Success(plan, _) => plan
case failureOrError => sys.error(failureOrError.toString)
}
}
- 回到SqlContext中看DDLParser實(shí)例化的代碼
@transient
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
@transient
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
在DDLParser實(shí)例化的時(shí)候傳入了SparkSQLParser中的parse方法,parse方法就是SparkSQLParser父類AbstractSparkSQLParser中的方法,在第3段中介紹過(guò)??吹竭@里終于明白了第2段中的parseQuery(input)就是這里的parse方法。
- 通過(guò)parse方法的調(diào)用回調(diào)SparkSQLParser中的start變量 ,start變量代碼如下
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | desc | others
這里有五種操作,前面四種對(duì)應(yīng)了spark自身操作的sql語(yǔ)句,這里就不展開說(shuō)明了,以后有時(shí)間再具體分析。如果Spark自身操作的sql沒(méi)有匹配成功會(huì)調(diào)用others規(guī)則,others操作其實(shí)是調(diào)用了SqlParser中的parse方法,他是怎么被調(diào)用的呢?接著住下看,這個(gè)調(diào)用有點(diǎn)繞。首先看others變量,代碼如下
private lazy val others: Parser[LogicalPlan] =
wholeInput ^^ {
case input => fallback(input)
}
這里回調(diào) fallback(input)方法,fallback方法就在SparkSQLParser實(shí)例的時(shí)候傳遞進(jìn)來(lái)的,我們看一下fallback方法是怎樣產(chǎn)生的
- 回到SQLContext中看SparkSQLParser的實(shí)例化
@transient
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
這里調(diào)用getSQLDialect()方法生成一個(gè)ParserDialect(方言),為什么需要方言呢,是為了區(qū)分spark sql 和hive sql。這里看DefaultParserDialect(ParserDialect的子類,默認(rèn)是spark sql的方言)的parse方法,代碼如下
override def parse(sqlText: String): LogicalPlan = {
sqlParser.parse(sqlText)
}
看到這里sqlParser終于出現(xiàn)了,調(diào)用了SqlParser的parse方法(SqlParser父類AbstractSparkSQLParser中的parse方法)。這個(gè)parse方法就是傳遞給SparkSQLParser中的fallback(input)函數(shù)
- 這里重點(diǎn)看SqlParser是怎樣解析sql語(yǔ)句的,根據(jù)前面介紹的內(nèi)容知道調(diào)用SqlParser中的parse方法后,會(huì)回調(diào)SqlParser中的start變量,start變量代碼如下
protected lazy val start: Parser[LogicalPlan] = start1 | insert | cte
這里有三種操作,看一下start1 的代碼
protected lazy val start1: Parser[LogicalPlan] =
(select | ("(" ~> select <~ ")")) *
( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) }
| INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) }
| EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)}
| UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
這一堆是什么玩意兒,慢慢的來(lái)分析一下
select 會(huì)生成一個(gè)Parser,其他代碼如下
protected lazy val select: Parser[LogicalPlan] =
SELECT ~> DISTINCT.? ~
repsep(projection, ",") ~
(FROM ~> relations).? ~
(WHERE ~> expression).? ~
(GROUP ~ BY ~> rep1sep(expression, ",")).? ~
(HAVING ~> expression).? ~
sortType.? ~
(LIMIT ~> expression).? ^^ {
case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>
val base = r.getOrElse(OneRowRelation)
val withFilter = f.map(Filter(_, base)).getOrElse(base)
val withProjection = g
.map(Aggregate(_, p.map(UnresolvedAlias(_)), withFilter))
.getOrElse(Project(p.map(UnresolvedAlias(_)), withFilter))
val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
withLimit
}
通過(guò)上面簡(jiǎn)單的一二十行代碼就完成了sql語(yǔ)句的解析,太有魔力了。感慨一下scala語(yǔ)句強(qiáng)大的表達(dá)能力。
想看懂上面的代碼,我們先來(lái)看一下那些符號(hào)>、、^^等是什么意思
| 左邊算子和右邊的算子只要有一個(gè)成功了,就返回succeed,類似or
~ 左邊的算子成功后,右邊的算子對(duì)后續(xù)的輸入也計(jì)算成功,就返回succeed
.? 如果p算子成功則返回則返回Some(x) 如果p算子失敗,返回fails
^^^ 如果左邊的算子成功,取消左邊算子的結(jié)果,返回右邊算子。
~> 如果左邊的算子和右邊的算子都成功了,返回的結(jié)果中不包含左邊的返回值。
<~ 這個(gè)和~>操作符的意思相反,如果左邊的算子和右邊的算子都成功了,返回的結(jié)果中不包含右邊的
^^{} 或者 ^^=> 變形連接符,意思是如果左邊的算子成功了,用^^右邊的算子函數(shù)作用于返回的結(jié)果
這些符號(hào)究竟是什么東西,又代表的是什么語(yǔ)法,其實(shí)就是Parser的一個(gè)個(gè)方法而已,原來(lái)還是scala的語(yǔ)法,差點(diǎn)被迷惑了。
這個(gè)語(yǔ)句就是根據(jù)關(guān)鍵字、操作符號(hào)、函數(shù)生成一個(gè)parser[LogicalPlan]類型的withLimit。
關(guān)鍵字代碼如下
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
protected val ELSE = Keyword("ELSE")
protected val END = Keyword("END")
protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
protected val HAVING = Keyword("HAVING")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTERVAL = Keyword("INTERVAL")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val ORDER = Keyword("ORDER")
protected val SORT = Keyword("SORT")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
protected val TABLE = Keyword("TABLE")
protected val THEN = Keyword("THEN")
protected val TRUE = Keyword("TRUE")
protected val UNION = Keyword("UNION")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
protected val WITH = Keyword("WITH")
根據(jù)關(guān)鍵詞我們可以知道在寫sql語(yǔ)句的時(shí)候哪些操作可以使用,哪些操作是不支持的
-
withLimit:Parser[LogicalPlan]是怎么變成LogicalPlan的呢?
Parser[LogicalPlan]繼承自一個(gè)函數(shù),最終返回ParseResult[T]類型,ParseResult[T]有兩個(gè)子類,分別是Success和NoSuccess,代碼如下
case class Success[+T](result: T, override val next: Input) extends ParseResult[T]
sealed abstract class NoSuccess(val msg: String, override val next: Input) extends ParseResult[Nothing]
當(dāng)sql解析成功后會(huì)返回Success。
再次看一下調(diào)用關(guān)系,最后調(diào)用的是start

接著看start的調(diào)用
def parse(input: String): LogicalPlan = synchronized {
// Initialize the Keywords.
// 初始化分詞器的關(guān)鍵字
initLexical
phrase(start)(new lexical.Scanner(input)) match {
case Success(plan, _) => plan
case failureOrError => sys.error(failureOrError.toString)
}
}
用模式匹配去匹配結(jié)果是ParseResult[LogicalPlan]的哪一個(gè)子類,如果是Success,看一下Success的代碼
case class Success[+T](result: T, override val next: Input) extends ParseResult[T]
這里的T就是LogicalPlan
- 雖然還有很多其他操作,但解析的步驟都是一樣的
