為了批流統(tǒng)一,F(xiàn)link提供了兩種關(guān)系型API,Table API和SQL。Table API是一種語言集成的查詢API,由多個比如selection,filter,join關(guān)系operator組合而成。Flink SQL是基于Calcite來實現(xiàn)的。無論是在streaming還是在batch上,Table API和SQL具有相同的語義并且能夠得到相同的結(jié)果。Table API、SQL以及DataStream API之間都能無縫集成,可以方便的使用。本篇文章主要介紹下Flink SQL的整體流程。本文內(nèi)容是基于Flink 1.12來講解。
1. 整體執(zhí)行流程介紹
Flink SQL的執(zhí)行一般分為四個階段,主要依賴 Calcite 來完成
Parse
?? 語法解析,Calcite使用JavaCC把用戶提交的Sql String 轉(zhuǎn)換成一個抽象語法樹(AST),對應(yīng)SqlNode節(jié)點。Validate
?? 語法校驗,根據(jù)元數(shù)據(jù)信息進行驗證,例如查詢的表、使用的函數(shù)是否存在,會分別對 from / where / group by / having / select / order by 等子句進行validate ,校驗之后仍然是 SqlNode 構(gòu)成的語法樹;Optimize
?? 查詢計劃優(yōu)化,這里其實包含了兩部分,1)首先將 SqlNode 語法樹轉(zhuǎn)換成關(guān)系表達式 RelNode 構(gòu)成的邏輯樹,2)然后使用優(yōu)化器基于規(guī)則進行等價變換,例如我們比較熟悉的謂詞下推、列裁剪等,經(jīng)過優(yōu)化器優(yōu)化后得到最優(yōu)的查詢計劃;Execute
?? 將邏輯查詢計劃翻譯成物理執(zhí)行計劃,生成對應(yīng)的可執(zhí)行代碼,提交運行。
2. 源碼分析
Flink1.12 默認的Planner是Blink Planner,本篇文章也是基于Blink Planner來分析。
為了方便講解,咱們給個具體regular join例子來說明,將 Orders 訂單表和 Shipments 運輸單表依據(jù)訂單id進行Regular Join,并且以name進行分組,然后對name進行orderby操作,最后取出name以及sum(price)
SELECT name, SUM(price)
FROM Orders o
JOIN Shipments s ON o.id = s.orderId
GROUP BY name
HAVING SUM(o.price) > 10
ORDER BY name
2.1 Parse階段
源碼入口TableEnvironment#sqlQuery方法,會調(diào)用實現(xiàn)類TableEnvironmentImpl#sqlQuery方法,看下該方法源碼
@Override
public Table sqlQuery(String query) {
List<Operation> operations = parser.parse(query);
if (operations.size() != 1) {
throw new ValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
}
Operation operation = operations.get(0);
if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
return createTable((QueryOperation) operation);
} else {
throw new ValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "
+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
}
}
然后看下parser.parse(query) --> ParserImpl#parse
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation =
SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
首先是基于Calcite的Parse解析,Calcite涉及的東西比較多,這里就不展開來說了,最終返回一個SqlNode節(jié)點。
然后調(diào)用convert方法,在這個方法中,會對解析之后的SqlNode進行validate
2.2 Validate階段
2.2.1 Scope背景知識
-
首先給出SqlValidatorScope的繼承關(guān)系,該圖來自http://www.itdecent.cn/p/83e88fdc04ec
Scope繼承關(guān)系.png SqlValidatorScope是所有Scope的父接口,是做名字轉(zhuǎn)換用的,代表在Parse Tree的位置。當(dāng)驗證如"foo"."bar"這個表達式的時候,會首先使用對應(yīng)Scope實現(xiàn)的resolve方法來定位"foo",如果成功,會返回描述結(jié)果類型的SqlValidatorNamespace對象。
- 這個概念可能有點抽象,咱們看下其中一個具體實現(xiàn)SelectScope的官方解釋
/**
* The name-resolution scope of a SELECT clause. The objects visible are those
* in the FROM clause, and objects inherited from the parent scope.
*
*
* <p>This object is both a {@link SqlValidatorScope} and a
* {@link SqlValidatorNamespace}. In the query</p>
*
* <blockquote>
* <pre>SELECT name FROM (
* SELECT *
* FROM emp
* WHERE gender = 'F')</pre></blockquote>
*
* <p>we need to use the {@link SelectScope} as a
* {@link SqlValidatorNamespace} when resolving 'name', and
* as a {@link SqlValidatorScope} when resolving 'gender'.</p>
*
* <h2>Scopes</h2>
*
* <p>In the query</p>
*
* <blockquote>
* <pre>
* SELECT expr1
* FROM t1,
* t2,
* (SELECT expr2 FROM t3) AS q3
* WHERE c1 IN (SELECT expr3 FROM t4)
* ORDER BY expr4</pre>
* </blockquote>
*
* <p>The scopes available at various points of the query are as follows:</p>
*
* <ul>
* <li>expr1 can see t1, t2, q3</li>
* <li>expr2 can see t3</li>
* <li>expr3 can see t4, t1, t2</li>
* <li>expr4 can see t1, t2, q3, plus (depending upon the dialect) any aliases
* defined in the SELECT clause</li>
* </ul>
*
* <h2>Namespaces</h2>
*
* <p>In the above query, there are 4 namespaces:</p>
*
* <ul>
* <li>t1</li>
* <li>t2</li>
* <li>(SELECT expr2 FROM t3) AS q3</li>
* <li>(SELECT expr3 FROM t4)</li>
* </ul>
*
* @see SelectNamespace
*/
- 簡單理解一下,scope直接翻譯就是范圍,表示SQL不同部分可以看到的數(shù)據(jù)源,類似于Java的作用域。Scope中重要的方法包括 1. 獲取Scope的root SqlNode :SqlNode getNode(); 2. 獲取Scope中所有的columns: void findAllColumnNames(List<SqlMoniker> result); 3. 獲取Scope中所有的table aliases:void findAliases(Collection<SqlMoniker> result); 4. 將field轉(zhuǎn)成fully-qualified identifier的方法: SqlQualified fullyQualify(SqlIdentifier identifier); 等
2.2.2 Namespace背景知識
與Scope類似,SqlValidatorNamespace是所有Namespace的父接口,一個Namespace描述了一個SQL查詢的關(guān)系。
比如1,對于 SELECT emp.deptno, age FROM emp,dept 這樣一個查詢,from子句代表了一個Namespace,這個Namespace由emp和dept兩個table組成,row type由這兩個emp、dept table的column組成。
另外一個例子2,如果一個查詢的from子句中,包含一個table和一個sub-query,那該Namespace就包含了該table的columns和sub-query的select中的columns字段。SqlValidatorNamespace也有多種實現(xiàn),比如table name對應(yīng)IdentifierNamespace,SELECT queries對應(yīng)SelectNamespace,UNION / EXCEPT / INTERSECT對應(yīng)SetopNamespace等
簡單理解一下,namespace表示scope中的一個數(shù)據(jù)源(數(shù)據(jù)源是一個邏輯概念,可以是table,field或子查詢),一個scope中可以有多個namespace。
2.2.3 Validate源碼分析
源碼入口 ParserImpl#parse --> SqlToOperationConverter#convert --> FlinkPlannerImpl#validate --> SqlValidatorImpl#validate 方法
public SqlNode validate(SqlNode topNode) {
SqlValidatorScope scope = new EmptyScope(this);
scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
final SqlNode topNode2 = validateScopedExpression(topNode, scope);
final RelDataType type = getValidatedNodeType(topNode2);
Util.discard(type);
return topNode2;
}
該方法主要的邏輯都在validateScopedExpression中,處理流程如下:
- performUnconditionalRewrites
?? - 對表達式進行重寫使其更標準化,簡化接下來的validate邏輯。
SELECT name, SUM(price)
FROM Orders o
JOIN Shipments s ON o.id = s.orderId
GROUP BY name
HAVING SUM(o.price) > 10
ORDER BY name
下面針對這個例子具體調(diào)試跟下源碼
-
在經(jīng)過Calcite parse之后,得到的SqlNode是一個SqlOrderBy實例,并且join --> inner join
parse之后的SqlNode節(jié)點.png 然后看下performUnconditionalRewrites方法
上面提到parse之后,SqlNode是一個SqlOrderBy實例,performUnconditionalRewrites方法首先會先拿到SqlOrderBy對應(yīng)的OperandList,具體都有哪些Operands,可以看下SqlOrderBy#getOperandList,對應(yīng)四個Operands: query, orderList, offset, fetch。(note:不同SqlNode實例,有不同的getOperandList實現(xiàn))
1. query: 是個SqlSelect實例
SELECT `name`, SUM(`price`)
FROM `Orders` AS `o`
INNER JOIN `Shipments` AS `s` ON `o`.`id` = `s`.`orderId`
GROUP BY `name`
HAVING SUM(`o`.`price`) > 10
2. orderList:是個SqlNodeList實例,對應(yīng) name
3. offset: null
4. fetch: null
然后performUnconditionalRewrites方法會對每個Operand遞歸的執(zhí)行performUnconditionalRewrites。
如果operand對應(yīng)的SqlKind是 VALUES / ORDER_BY / EXPLICIT_TABLE / DELETE / UPDATE / MERGE , performUnconditionalRewrites會對其進行rewrite。
performUnconditionalRewrites方法執(zhí)行之后,SqlNode從最開始的SqlOrderBy實例,變成了SqlSelect實例

?? - performUnconditionalRewrites方法,把SqlOrderBy節(jié)點的orderList, offset, fetch這三個operand賦值給SqlSelect對象,然后直接把SqlSelect返回。
- registerQuery
?? - 創(chuàng)建scope以及namespace,namespace和scope對象中都會包含SqlNode。
上面例子執(zhí)行完registerQuery方法,最終FlinkCalciteSqlValidator對象為

??從圖中可以看出,測試sql共有兩個scopes和4個namespaces
- validateQuery
?? - 首先根據(jù)node和scope來獲取namespace
?? - 對namespace進行validate,validateNamespace(ns, targetRowType);
?? - 最后通過調(diào)用validateSelect來進行SQL驗證
調(diào)用關(guān)系鏈如下圖所示:

validateSelect處理分為8個部分:
- validateFrom
?? - 在調(diào)用validateFrom之前,首先會去校驗from后面的names有沒有重復(fù)table name,如果有重復(fù),直接報錯。如果沒有from子句,也會直接報錯。
/**
* Validates the FROM clause of a query, or (recursively) a child node of
* the FROM clause: AS, OVER, JOIN, VALUES, or sub-query.
*
* @param node Node in FROM clause, typically a table or derived
* table
* @param targetRowType Desired row type of this expression, or
* {@link #unknownType} if not fussy. Must not be null.
* @param scope Scope
*/
protected void validateFrom(
SqlNode node,
RelDataType targetRowType,
SqlValidatorScope scope) {
Objects.requireNonNull(targetRowType);
switch (node.getKind()) {
case AS:
case TABLE_REF:
validateFrom(
((SqlCall) node).operand(0),
targetRowType,
scope);
break;
case VALUES:
validateValues((SqlCall) node, targetRowType, scope);
break;
case JOIN:
validateJoin((SqlJoin) node, scope);
break;
case OVER:
validateOver((SqlCall) node, scope);
break;
case UNNEST:
validateUnnest((SqlCall) node, scope, targetRowType);
break;
default:
validateQuery(node, scope, targetRowType);
break;
}
// Validate the namespace representation of the node, just in case the
// validation did not occur implicitly.
getNamespace(node, scope).validate(targetRowType);
}
?? 對于我們的例子,會先調(diào)用validateJoin
?? ?? - validateJoin方法中,又包括了validateFrom(left, unknownType, joinScope); 和 validateFrom(right, unknownType, joinScope); 這兩個方法首先完成的功能是驗證,其次會把left節(jié)點和right節(jié)點的名字補齊,即full qualified name,比如
`Orders` AS `o` --> `default_catalog`.`default_database`.`Orders` AS `o`
?? ?? - validateJoin會去校驗是等值連接還是使用on關(guān)鍵字。如果使用on關(guān)鍵字,那on的條件是否為空,如果為空直接拋異常。
?? - 驗證from中的table是否存在,將sqlNode name擴展為full qualified name并得到數(shù)據(jù)源的數(shù)據(jù)類型可以理解該數(shù)據(jù)源所有field type組成的row type。
validateWhereClause:
?? - 驗證where條件中的字段是否存在,如果不存在就返回;
?? - 擴展field name為full qualified name;
?? - 驗證where clause中是否存在aggregate function,如果where clause中存在 Windowed aggregate 表達式 或者 Aggregate表達式,就直接報錯。
?? - 驗證where clause的類型是否是boolean類型,如果不是,直接報錯。validateGroupClause
?? - 驗證group by field是否存在,如果不存在就返回;
?? - 如果存在就擴展該field name
?? - 驗證group by中是否存在aggregate function,如果group by中存在 Windowed aggregate 表達式 或者 Aggregate表達式,就直接報錯。validateHavingClause
?? - 驗證having field是否存在,如果不存在就返回;
?? - 擴展having中的field name
?? - 驗證having中不在聚合函數(shù)中的field是否和group by中的field相同;
?? - 驗證having中的field是否存在;
?? - 驗證having clause的類型是否是boolean類型,如果不是,直接報錯。
?? - 驗證having clause中是否有嵌套Aggregate expressions。對于非窗口agg(expr),expr中不能包含嵌套aggregate function,比如 SUM(2 * MAX(x)) 就是非法的;如果是windowed aggregate "agg(expr)",expr可以包含aggregate function,比如下面的例子就是合法的。
SELECT AVG(2 * MAX(x)) OVER (PARTITION BY y)
FROM t
GROUP BY y
validateWindowClause:暫略
handleOffsetFetch: 暫略
validateSelectList
?? - 驗證projection list中的aliases是否是唯一的,對于 * 和 TABLE.* 不做該檢查
?? - 如果select clause中包含 * 或 TABLE.* ,對字段進行補齊
?? - 擴展field name;并且抽取aliases,使用selectItems(List<SqlNode>)和 aliases(Set<String>)分別存儲field的node信息和別名;然后獲得field數(shù)據(jù)類型,組成鍵值對<alias, type> 放入 List<Map.Entry<String, RelDataType>> fields 對象中。
?? - 驗證projection list中的field是否存在
?? - 判斷projection list中不在聚合函數(shù)中的field是否和group by的field相同validateOrderList
?? - 驗證order by field是否存在,如果不存在就返回;
?? - 如果order by field不在select projection list中,擴展field name,否則不進行擴展;
?? - 驗證order by中field是否是group by的field子集,如果出現(xiàn)不在group by中的field,直接報錯。
到這里,validate驗證階段基本就完成了。
2.3 Optimize階段
2.3.1 Convert轉(zhuǎn)換
convert階段主要的工作是把 SqlNode 語法樹轉(zhuǎn)換成關(guān)系表達式 RelNode 構(gòu)成的邏輯樹,也就是生成相應(yīng)的邏輯計劃(Logical Plan),然后返回根結(jié)點 RelRoot。
?? 即:語義分析,根據(jù) SqlNode及元信息構(gòu)建 RelNode 樹,也就是最初版本的邏輯計劃(Logical Plan),根據(jù)這個已經(jīng)生成的Flink的logical Plan,將它轉(zhuǎn)換成calcite的logicalPlan,這樣我們才能用到calcite強大的優(yōu)化規(guī)則。
SqlNode有很多種,既包括MIN、MAX這種表達式型的,也包括SELECT、JOIN這種關(guān)系型的,轉(zhuǎn)化過程中,將這兩種分離成RexNode表達式型和RelNode關(guān)系型。
介紹下 RelRoot 出現(xiàn)的必要性,這里說兩個場景,都需要借助 RelRoot 來解決。
場景1:
SELECT name
FROM emp
ORDER BY empno DESC
Calcite知道結(jié)果必須sort排序,但不能將其排序順序表示為排序法,因為empno并不在結(jié)果字段中,這種場景我們需要使用RelRoot這么表達
RelRoot: {
rel: Sort($1 DESC)
Project(name, empno)
TableScan(EMP)
fields: [0]
collation: [1 DESC]
}
可以看到,empno字段會出現(xiàn)在結(jié)果字段中,只是 fields 標識給consumer暴露什么字段。
場景2:
SELECT name AS n, name AS n2, empno AS n FROM emp
這里重復(fù)使用了name字段,并且有多列的別名都叫 n ,這種場景應(yīng)該使用RelRoot這么表達
RelRoot: {
rel: Project(name, empno)
TableScan(EMP)
fields: [(0, "n"), (0, "n2"), (1, "n")]
collation: []
}
2.3.1.1 源碼入口:SqlToRelConverter#convertQuery
public RelRoot convertQuery(
SqlNode query,
final boolean needsValidation,
final boolean top) {
if (needsValidation) {
query = validator.validate(query);
}
RelNode result = convertQueryRecursive(query, top, null).rel;
可以看出在convertQueryRecursive采取了遞歸遍歷的方式來解析query。對于給定的例子,會去調(diào)用 convertSelect --> convertSelectImpl
/**
* Implementation of {@link #convertSelect(SqlSelect, boolean)};
* derived class may override.
*/
protected void convertSelectImpl(
final Blackboard bb,
SqlSelect select) {
convertFrom(
bb,
select.getFrom());
convertWhere(
bb,
select.getWhere());
final List<SqlNode> orderExprList = new ArrayList<>();
final List<RelFieldCollation> collationList = new ArrayList<>();
gatherOrderExprs(
bb,
select,
select.getOrderList(),
orderExprList,
collationList);
final RelCollation collation =
cluster.traitSet().canonize(RelCollations.of(collationList));
if (validator.isAggregate(select)) {
convertAgg(
bb,
select,
orderExprList);
} else {
convertSelectList(
bb,
select,
orderExprList);
}
if (select.isDistinct()) {
distinctify(bb, true);
}
convertOrder(
select, bb, collation, orderExprList, select.getOffset(),
select.getFetch());
if (select.hasHints()) {
final List<RelHint> hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
// Attach the hints to the first Hintable node we found from the root node.
bb.setRoot(bb.root
.accept(
new RelShuttleImpl() {
boolean attached = false;
@Override public RelNode visitChild(RelNode parent, int i, RelNode child) {
if (parent instanceof Hintable && !attached) {
attached = true;
return ((Hintable) parent).attachHints(hints);
} else {
return super.visitChild(parent, i, child);
}
}
}), true);
} else {
bb.setRoot(bb.root, true);
}
}
也就是在對select這種SqlNode進行convert的時候,會分別對from,where,aggregate,select,order等進行轉(zhuǎn)換,最終生成的邏輯計劃如下:
== Abstract Syntax Tree ==
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
+- LogicalFilter(condition=[>($1, 10)])
+- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+- LogicalProject(name=[$1], price=[$2])
+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
+- LogicalTableScan(table=[[default_catalog, default_database, Shipments]])
轉(zhuǎn)成一個關(guān)系樹之后:
- 首先 會生成一個operation,對應(yīng)代碼 SqlToOperationConverter#toQueryOperation。operation包含兩個成員變量,分別是RelNode calciteTree(就是剛才convert之后的RelNode)和 TableSchema tableSchema(包含字段名,字段類型 列表,對于本文例子這里是select之后到的VARCHAR(2147483647) name, FLOAT EXPR$1),在tableSchema的build方法中,還會對field name是否重復(fù)進行validate,以及驗證watermark對應(yīng)的rowtime屬性類型的root type是否是TIMESTAMP_WITHOUT_TIME_ZONE。
- 根據(jù)上面生成的operation,生成一個Table。table包含的成員變量主要包括QueryOperation,TableEnvironmentInternal以及OperationTreeBuilder和LookupCallResolver。
2.3.2 Optimize
// TODO,sql優(yōu)化的東西有點多,這塊內(nèi)容后面再補充
2.4 Execute階段
2.4.1 轉(zhuǎn)換成ExecNode
入口 PlannerBase#translateToExecNodePlan,完成FlinkPhysicalRel DAG 到 ExecNode DAG的轉(zhuǎn)換,并且嘗試reuse重復(fù)的sub-plans。
// TODO,源碼過程盡量補充下

