Flink源碼閱讀(八)--- Flink SQL 整體執(zhí)行流程

為了批流統(tǒng)一,F(xiàn)link提供了兩種關(guān)系型API,Table API和SQL。Table API是一種語言集成的查詢API,由多個比如selection,filter,join關(guān)系operator組合而成。Flink SQL是基于Calcite來實現(xiàn)的。無論是在streaming還是在batch上,Table API和SQL具有相同的語義并且能夠得到相同的結(jié)果。Table API、SQL以及DataStream API之間都能無縫集成,可以方便的使用。本篇文章主要介紹下Flink SQL的整體流程。本文內(nèi)容是基于Flink 1.12來講解。

1. 整體執(zhí)行流程介紹

Flink SQL的執(zhí)行一般分為四個階段,主要依賴 Calcite 來完成

  1. Parse
    ?? 語法解析,Calcite使用JavaCC把用戶提交的Sql String 轉(zhuǎn)換成一個抽象語法樹(AST),對應(yīng)SqlNode節(jié)點。

  2. Validate
    ?? 語法校驗,根據(jù)元數(shù)據(jù)信息進行驗證,例如查詢的表、使用的函數(shù)是否存在,會分別對 from / where / group by / having / select / order by 等子句進行validate ,校驗之后仍然是 SqlNode 構(gòu)成的語法樹;

  3. Optimize
    ?? 查詢計劃優(yōu)化,這里其實包含了兩部分,1)首先將 SqlNode 語法樹轉(zhuǎn)換成關(guān)系表達式 RelNode 構(gòu)成的邏輯樹,2)然后使用優(yōu)化器基于規(guī)則進行等價變換,例如我們比較熟悉的謂詞下推、列裁剪等,經(jīng)過優(yōu)化器優(yōu)化后得到最優(yōu)的查詢計劃;

  4. Execute
    ?? 將邏輯查詢計劃翻譯成物理執(zhí)行計劃,生成對應(yīng)的可執(zhí)行代碼,提交運行。

2. 源碼分析

Flink1.12 默認的Planner是Blink Planner,本篇文章也是基于Blink Planner來分析。

為了方便講解,咱們給個具體regular join例子來說明,將 Orders 訂單表和 Shipments 運輸單表依據(jù)訂單id進行Regular Join,并且以name進行分組,然后對name進行orderby操作,最后取出name以及sum(price)

SELECT name, SUM(price)
FROM Orders o
    JOIN Shipments s ON o.id = s.orderId
GROUP BY name
HAVING SUM(o.price) > 10
ORDER BY name

2.1 Parse階段

源碼入口TableEnvironment#sqlQuery方法,會調(diào)用實現(xiàn)類TableEnvironmentImpl#sqlQuery方法,看下該方法源碼

    @Override
    public Table sqlQuery(String query) {
        List<Operation> operations = parser.parse(query);

        if (operations.size() != 1) {
            throw new ValidationException(
                    "Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
        }

        Operation operation = operations.get(0);

        if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
            return createTable((QueryOperation) operation);
        } else {
            throw new ValidationException(
                    "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "
                            + "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
        }
    }

然后看下parser.parse(query) --> ParserImpl#parse

    @Override
    public List<Operation> parse(String statement) {
        CalciteParser parser = calciteParserSupplier.get();
        FlinkPlannerImpl planner = validatorSupplier.get();
        // parse the sql query
        SqlNode parsed = parser.parse(statement);

        Operation operation =
                SqlToOperationConverter.convert(planner, catalogManager, parsed)
                        .orElseThrow(() -> new TableException("Unsupported query: " + statement));
        return Collections.singletonList(operation);
    }

首先是基于Calcite的Parse解析,Calcite涉及的東西比較多,這里就不展開來說了,最終返回一個SqlNode節(jié)點。
然后調(diào)用convert方法,在這個方法中,會對解析之后的SqlNode進行validate

2.2 Validate階段

2.2.1 Scope背景知識
  • 首先給出SqlValidatorScope的繼承關(guān)系,該圖來自http://www.itdecent.cn/p/83e88fdc04ec

    Scope繼承關(guān)系.png

  • SqlValidatorScope是所有Scope的父接口,是做名字轉(zhuǎn)換用的,代表在Parse Tree的位置。當(dāng)驗證如"foo"."bar"這個表達式的時候,會首先使用對應(yīng)Scope實現(xiàn)的resolve方法來定位"foo",如果成功,會返回描述結(jié)果類型的SqlValidatorNamespace對象。

  • 這個概念可能有點抽象,咱們看下其中一個具體實現(xiàn)SelectScope的官方解釋
/**
 * The name-resolution scope of a SELECT clause. The objects visible are those
 * in the FROM clause, and objects inherited from the parent scope.
 *
 *
 * <p>This object is both a {@link SqlValidatorScope} and a
 * {@link SqlValidatorNamespace}. In the query</p>
 *
 * <blockquote>
 * <pre>SELECT name FROM (
 *     SELECT *
 *     FROM emp
 *     WHERE gender = 'F')</pre></blockquote>
 *
 * <p>we need to use the {@link SelectScope} as a
 * {@link SqlValidatorNamespace} when resolving 'name', and
 * as a {@link SqlValidatorScope} when resolving 'gender'.</p>
 *
 * <h2>Scopes</h2>
 *
 * <p>In the query</p>
 *
 * <blockquote>
 * <pre>
 * SELECT expr1
 * FROM t1,
 *     t2,
 *     (SELECT expr2 FROM t3) AS q3
 * WHERE c1 IN (SELECT expr3 FROM t4)
 * ORDER BY expr4</pre>
 * </blockquote>
 *
 * <p>The scopes available at various points of the query are as follows:</p>
 *
 * <ul>
 * <li>expr1 can see t1, t2, q3</li>
 * <li>expr2 can see t3</li>
 * <li>expr3 can see t4, t1, t2</li>
 * <li>expr4 can see t1, t2, q3, plus (depending upon the dialect) any aliases
 * defined in the SELECT clause</li>
 * </ul>
 *
 * <h2>Namespaces</h2>
 *
 * <p>In the above query, there are 4 namespaces:</p>
 *
 * <ul>
 * <li>t1</li>
 * <li>t2</li>
 * <li>(SELECT expr2 FROM t3) AS q3</li>
 * <li>(SELECT expr3 FROM t4)</li>
 * </ul>
 *
 * @see SelectNamespace
 */
  • 簡單理解一下,scope直接翻譯就是范圍,表示SQL不同部分可以看到的數(shù)據(jù)源,類似于Java的作用域。Scope中重要的方法包括 1. 獲取Scope的root SqlNode :SqlNode getNode(); 2. 獲取Scope中所有的columns: void findAllColumnNames(List<SqlMoniker> result); 3. 獲取Scope中所有的table aliases:void findAliases(Collection<SqlMoniker> result); 4. 將field轉(zhuǎn)成fully-qualified identifier的方法: SqlQualified fullyQualify(SqlIdentifier identifier); 等
2.2.2 Namespace背景知識
  • 與Scope類似,SqlValidatorNamespace是所有Namespace的父接口,一個Namespace描述了一個SQL查詢的關(guān)系。
    比如1,對于 SELECT emp.deptno, age FROM emp,dept 這樣一個查詢,from子句代表了一個Namespace,這個Namespace由emp和dept兩個table組成,row type由這兩個emp、dept table的column組成。
    另外一個例子2,如果一個查詢的from子句中,包含一個table和一個sub-query,那該Namespace就包含了該table的columns和sub-query的select中的columns字段。

  • SqlValidatorNamespace也有多種實現(xiàn),比如table name對應(yīng)IdentifierNamespace,SELECT queries對應(yīng)SelectNamespace,UNION / EXCEPT / INTERSECT對應(yīng)SetopNamespace等

  • 簡單理解一下,namespace表示scope中的一個數(shù)據(jù)源(數(shù)據(jù)源是一個邏輯概念,可以是table,field或子查詢),一個scope中可以有多個namespace。

2.2.3 Validate源碼分析

源碼入口 ParserImpl#parse --> SqlToOperationConverter#convert --> FlinkPlannerImpl#validate --> SqlValidatorImpl#validate 方法

  public SqlNode validate(SqlNode topNode) {
    SqlValidatorScope scope = new EmptyScope(this);
    scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
    final SqlNode topNode2 = validateScopedExpression(topNode, scope);
    final RelDataType type = getValidatedNodeType(topNode2);
    Util.discard(type);
    return topNode2;
  }

該方法主要的邏輯都在validateScopedExpression中,處理流程如下:

  1. performUnconditionalRewrites
    ?? - 對表達式進行重寫使其更標準化,簡化接下來的validate邏輯。
SELECT name, SUM(price)
FROM Orders o
    JOIN Shipments s ON o.id = s.orderId
GROUP BY name
HAVING SUM(o.price) > 10
ORDER BY name

下面針對這個例子具體調(diào)試跟下源碼

  • 在經(jīng)過Calcite parse之后,得到的SqlNode是一個SqlOrderBy實例,并且join --> inner join


    parse之后的SqlNode節(jié)點.png
  • 然后看下performUnconditionalRewrites方法
    上面提到parse之后,SqlNode是一個SqlOrderBy實例,performUnconditionalRewrites方法首先會先拿到SqlOrderBy對應(yīng)的OperandList,具體都有哪些Operands,可以看下SqlOrderBy#getOperandList,對應(yīng)四個Operands: query, orderList, offset, fetch。(note:不同SqlNode實例,有不同的getOperandList實現(xiàn))

1. query: 是個SqlSelect實例
SELECT `name`, SUM(`price`)
FROM `Orders` AS `o`
INNER JOIN `Shipments` AS `s` ON `o`.`id` = `s`.`orderId`
GROUP BY `name`
HAVING SUM(`o`.`price`) > 10

2. orderList:是個SqlNodeList實例,對應(yīng) name
3. offset: null
4. fetch: null

然后performUnconditionalRewrites方法會對每個Operand遞歸的執(zhí)行performUnconditionalRewrites。

如果operand對應(yīng)的SqlKind是 VALUES / ORDER_BY / EXPLICIT_TABLE / DELETE / UPDATE / MERGE , performUnconditionalRewrites會對其進行rewrite。

performUnconditionalRewrites方法執(zhí)行之后,SqlNode從最開始的SqlOrderBy實例,變成了SqlSelect實例


performUnconditionalRewrites方法執(zhí)行前后節(jié)點.png

?? - performUnconditionalRewrites方法,把SqlOrderBy節(jié)點的orderList, offset, fetch這三個operand賦值給SqlSelect對象,然后直接把SqlSelect返回。

  1. registerQuery
    ?? - 創(chuàng)建scope以及namespace,namespace和scope對象中都會包含SqlNode。

上面例子執(zhí)行完registerQuery方法,最終FlinkCalciteSqlValidator對象為

創(chuàng)建Scope和namespace之后的validate.png

??從圖中可以看出,測試sql共有兩個scopes和4個namespaces

  1. validateQuery
    ?? - 首先根據(jù)node和scope來獲取namespace
    ?? - 對namespace進行validate,validateNamespace(ns, targetRowType);
    ?? - 最后通過調(diào)用validateSelect來進行SQL驗證

調(diào)用關(guān)系鏈如下圖所示:

validateQuery方法調(diào)用鏈.png

validateSelect處理分為8個部分:

  • validateFrom

?? - 在調(diào)用validateFrom之前,首先會去校驗from后面的names有沒有重復(fù)table name,如果有重復(fù),直接報錯。如果沒有from子句,也會直接報錯。

  /**
   * Validates the FROM clause of a query, or (recursively) a child node of
   * the FROM clause: AS, OVER, JOIN, VALUES, or sub-query.
   *
   * @param node          Node in FROM clause, typically a table or derived
   *                      table
   * @param targetRowType Desired row type of this expression, or
   *                      {@link #unknownType} if not fussy. Must not be null.
   * @param scope         Scope
   */
  protected void validateFrom(
      SqlNode node,
      RelDataType targetRowType,
      SqlValidatorScope scope) {
    Objects.requireNonNull(targetRowType);
    switch (node.getKind()) {
    case AS:
    case TABLE_REF:
      validateFrom(
          ((SqlCall) node).operand(0),
          targetRowType,
          scope);
      break;
    case VALUES:
      validateValues((SqlCall) node, targetRowType, scope);
      break;
    case JOIN:
      validateJoin((SqlJoin) node, scope);
      break;
    case OVER:
      validateOver((SqlCall) node, scope);
      break;
    case UNNEST:
      validateUnnest((SqlCall) node, scope, targetRowType);
      break;
    default:
      validateQuery(node, scope, targetRowType);
      break;
    }

    // Validate the namespace representation of the node, just in case the
    // validation did not occur implicitly.
    getNamespace(node, scope).validate(targetRowType);
  }

?? 對于我們的例子,會先調(diào)用validateJoin
?? ?? - validateJoin方法中,又包括了validateFrom(left, unknownType, joinScope); 和 validateFrom(right, unknownType, joinScope); 這兩個方法首先完成的功能是驗證,其次會把left節(jié)點和right節(jié)點的名字補齊,即full qualified name,比如

`Orders` AS `o` --> `default_catalog`.`default_database`.`Orders` AS `o`

?? ?? - validateJoin會去校驗是等值連接還是使用on關(guān)鍵字。如果使用on關(guān)鍵字,那on的條件是否為空,如果為空直接拋異常。
?? - 驗證from中的table是否存在,將sqlNode name擴展為full qualified name并得到數(shù)據(jù)源的數(shù)據(jù)類型可以理解該數(shù)據(jù)源所有field type組成的row type。

  • validateWhereClause:
    ?? - 驗證where條件中的字段是否存在,如果不存在就返回;
    ?? - 擴展field name為full qualified name;
    ?? - 驗證where clause中是否存在aggregate function,如果where clause中存在 Windowed aggregate 表達式 或者 Aggregate表達式,就直接報錯。
    ?? - 驗證where clause的類型是否是boolean類型,如果不是,直接報錯。

  • validateGroupClause
    ?? - 驗證group by field是否存在,如果不存在就返回;
    ?? - 如果存在就擴展該field name
    ?? - 驗證group by中是否存在aggregate function,如果group by中存在 Windowed aggregate 表達式 或者 Aggregate表達式,就直接報錯。

  • validateHavingClause
    ?? - 驗證having field是否存在,如果不存在就返回;
    ?? - 擴展having中的field name
    ?? - 驗證having中不在聚合函數(shù)中的field是否和group by中的field相同;
    ?? - 驗證having中的field是否存在;
    ?? - 驗證having clause的類型是否是boolean類型,如果不是,直接報錯。
    ?? - 驗證having clause中是否有嵌套Aggregate expressions。對于非窗口agg(expr),expr中不能包含嵌套aggregate function,比如 SUM(2 * MAX(x)) 就是非法的;如果是windowed aggregate "agg(expr)",expr可以包含aggregate function,比如下面的例子就是合法的。

    SELECT AVG(2 * MAX(x)) OVER (PARTITION BY y)
    FROM t
    GROUP BY y
  • validateWindowClause:暫略

  • handleOffsetFetch: 暫略

  • validateSelectList
    ?? - 驗證projection list中的aliases是否是唯一的,對于 * 和 TABLE.* 不做該檢查
    ?? - 如果select clause中包含 * 或 TABLE.* ,對字段進行補齊
    ?? - 擴展field name;并且抽取aliases,使用selectItems(List<SqlNode>)和 aliases(Set<String>)分別存儲field的node信息和別名;然后獲得field數(shù)據(jù)類型,組成鍵值對<alias, type> 放入 List<Map.Entry<String, RelDataType>> fields 對象中。
    ?? - 驗證projection list中的field是否存在
    ?? - 判斷projection list中不在聚合函數(shù)中的field是否和group by的field相同

  • validateOrderList
    ?? - 驗證order by field是否存在,如果不存在就返回;
    ?? - 如果order by field不在select projection list中,擴展field name,否則不進行擴展;
    ?? - 驗證order by中field是否是group by的field子集,如果出現(xiàn)不在group by中的field,直接報錯。

到這里,validate驗證階段基本就完成了。

2.3 Optimize階段

2.3.1 Convert轉(zhuǎn)換

convert階段主要的工作是把 SqlNode 語法樹轉(zhuǎn)換成關(guān)系表達式 RelNode 構(gòu)成的邏輯樹,也就是生成相應(yīng)的邏輯計劃(Logical Plan),然后返回根結(jié)點 RelRoot。
?? 即:語義分析,根據(jù) SqlNode及元信息構(gòu)建 RelNode 樹,也就是最初版本的邏輯計劃(Logical Plan),根據(jù)這個已經(jīng)生成的Flink的logical Plan,將它轉(zhuǎn)換成calcite的logicalPlan,這樣我們才能用到calcite強大的優(yōu)化規(guī)則。

SqlNode有很多種,既包括MIN、MAX這種表達式型的,也包括SELECT、JOIN這種關(guān)系型的,轉(zhuǎn)化過程中,將這兩種分離成RexNode表達式型和RelNode關(guān)系型。

介紹下 RelRoot 出現(xiàn)的必要性,這里說兩個場景,都需要借助 RelRoot 來解決。
場景1:

SELECT name
FROM emp
ORDER BY empno DESC

Calcite知道結(jié)果必須sort排序,但不能將其排序順序表示為排序法,因為empno并不在結(jié)果字段中,這種場景我們需要使用RelRoot這么表達

RelRoot: {
    rel: Sort($1 DESC)
           Project(name, empno)
             TableScan(EMP)
    fields: [0]
    collation: [1 DESC]
  }

可以看到,empno字段會出現(xiàn)在結(jié)果字段中,只是 fields 標識給consumer暴露什么字段。

場景2:

SELECT name AS n, name AS n2, empno AS n FROM emp

這里重復(fù)使用了name字段,并且有多列的別名都叫 n ,這種場景應(yīng)該使用RelRoot這么表達

RelRoot: {
    rel: Project(name, empno)
           TableScan(EMP)
    fields: [(0, "n"), (0, "n2"), (1, "n")]
    collation: []
  }
2.3.1.1 源碼入口:SqlToRelConverter#convertQuery
 public RelRoot convertQuery(
      SqlNode query,
      final boolean needsValidation,
      final boolean top) {
    if (needsValidation) {
      query = validator.validate(query);
    }

    RelNode result = convertQueryRecursive(query, top, null).rel;

可以看出在convertQueryRecursive采取了遞歸遍歷的方式來解析query。對于給定的例子,會去調(diào)用 convertSelect --> convertSelectImpl

 /**
   * Implementation of {@link #convertSelect(SqlSelect, boolean)};
   * derived class may override.
   */
  protected void convertSelectImpl(
      final Blackboard bb,
      SqlSelect select) {
    convertFrom(
        bb,
        select.getFrom());
    convertWhere(
        bb,
        select.getWhere());

    final List<SqlNode> orderExprList = new ArrayList<>();
    final List<RelFieldCollation> collationList = new ArrayList<>();
    gatherOrderExprs(
        bb,
        select,
        select.getOrderList(),
        orderExprList,
        collationList);
    final RelCollation collation =
        cluster.traitSet().canonize(RelCollations.of(collationList));

    if (validator.isAggregate(select)) {
      convertAgg(
          bb,
          select,
          orderExprList);
    } else {
      convertSelectList(
          bb,
          select,
          orderExprList);
    }

    if (select.isDistinct()) {
      distinctify(bb, true);
    }

    convertOrder(
        select, bb, collation, orderExprList, select.getOffset(),
        select.getFetch());

    if (select.hasHints()) {
      final List<RelHint> hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
      // Attach the hints to the first Hintable node we found from the root node.
      bb.setRoot(bb.root
          .accept(
              new RelShuttleImpl() {
                boolean attached = false;
                @Override public RelNode visitChild(RelNode parent, int i, RelNode child) {
                  if (parent instanceof Hintable && !attached) {
                    attached = true;
                    return ((Hintable) parent).attachHints(hints);
                  } else {
                    return super.visitChild(parent, i, child);
                  }
                }
              }), true);
    } else {
      bb.setRoot(bb.root, true);
    }
  }

也就是在對select這種SqlNode進行convert的時候,會分別對from,where,aggregate,select,order等進行轉(zhuǎn)換,最終生成的邏輯計劃如下:

== Abstract Syntax Tree ==
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
+- LogicalFilter(condition=[>($1, 10)])
   +- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
      +- LogicalProject(name=[$1], price=[$2])
         +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
            :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
            +- LogicalTableScan(table=[[default_catalog, default_database, Shipments]])

轉(zhuǎn)成一個關(guān)系樹之后:

  1. 首先 會生成一個operation,對應(yīng)代碼 SqlToOperationConverter#toQueryOperation。operation包含兩個成員變量,分別是RelNode calciteTree(就是剛才convert之后的RelNode)和 TableSchema tableSchema(包含字段名,字段類型 列表,對于本文例子這里是select之后到的VARCHAR(2147483647) name, FLOAT EXPR$1),在tableSchema的build方法中,還會對field name是否重復(fù)進行validate,以及驗證watermark對應(yīng)的rowtime屬性類型的root type是否是TIMESTAMP_WITHOUT_TIME_ZONE。
  2. 根據(jù)上面生成的operation,生成一個Table。table包含的成員變量主要包括QueryOperation,TableEnvironmentInternal以及OperationTreeBuilder和LookupCallResolver。
2.3.2 Optimize

// TODO,sql優(yōu)化的東西有點多,這塊內(nèi)容后面再補充

2.4 Execute階段

2.4.1 轉(zhuǎn)換成ExecNode

入口 PlannerBase#translateToExecNodePlan,完成FlinkPhysicalRel DAG 到 ExecNode DAG的轉(zhuǎn)換,并且嘗試reuse重復(fù)的sub-plans。
// TODO,源碼過程盡量補充下

2.4.2 轉(zhuǎn)換成Transformation DAG
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容