【Flink SQL】如何利用 Calcite 擴展 Flink Function 語法

1.Calcite 如何實現(xiàn) sql 語法的解析

Calcite 使用 javacc 作為語法解析器。如下所示,freemarker 將配置文件 config.fmpp 即指定文件為 codegen/data/Parser.tdd、附加模板文件 codegen/includes/parserImpls.ftl、模板文件 codegen/templates/Parser.jj。上述文件輸入 FMPP 后, 會組合生成一個可用的 Parser.jj 文件,即Calcite 的 SQL 解析器語法規(guī)則文件。Parser.jj 文件輸入 javacc 會生成一個繼承自 SqlAbstractParserImplSqlParserImpl 類,即 Calcite 中真正負(fù)責(zé)解析 SQL 語句并生成 SqlNode 樹的類。

codegen
├── config.fmpp
├── default_config.fmpp
├── includes
│   ├── compoundIdentifier.ftl
│   └── parserImpls.ftl
└── templates
    └── Parser.jj
calcite擴展sql語法.JPG

擴展內(nèi)容JavaCC 語法解析并生成抽象語法樹

2.如何擴展 Flink Function SQL

如何擴展語法 CREAT FUNCTION ?

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

在指定 catalog.database 中創(chuàng)建 function ,需指定一個 identifier ,可指定 language。 若 catalog 中,已經(jīng)有同名的函數(shù)注冊了,則無法注冊。

① 如果 language 是 JAVA 或者 SCALA ,則 identifier 是 UDF 實現(xiàn)類的全限定名。關(guān)于 JAVA/SCALA UDF 的實現(xiàn),請參考 自定義函數(shù)。
② 如果 language 是 PYTHON,則 identifier 是 UDF 對象的全限定名,例如 pyflink.table.tests.test_udf.add。關(guān)于 PYTHON UDF 的實現(xiàn),請參考 Python UDFs。
③ 如果 language 是 PYTHON,而當(dāng)前程序是 Java/Scala 程序或者純 SQL 程序,則需要配置 Python 相關(guān)的依賴。

步驟 1:添加 codegen 文件夾到 src/main 目錄,如下所示。

codegen
├── config.fmpp
├── default_config.fmpp
├── includes
│   ├── compoundIdentifier.ftl
│   └── parserImpls.ftl
└── templates
    └── Parser.jj

步驟 2:擴展語法 CREAT FUNCTION
① 在 codegen/includes/parserImpls.ftl 定義 create function 的語法規(guī)則,如下所示

SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
{
    SqlIdentifier functionIdentifier = null;
    SqlCharStringLiteral functionClassName = null;
    String functionLanguage = null;
    boolean ifNotExists = false;
    boolean isSystemFunction = false;
}
{
    (
        <SYSTEM> <FUNCTION>
        ifNotExists = IfNotExistsOpt()
        functionIdentifier = SimpleIdentifier()
        {  isSystemFunction = true; }
    |
        <FUNCTION>
        ifNotExists = IfNotExistsOpt()
        functionIdentifier = CompoundIdentifier()
    )

    <AS> <QUOTED_STRING> {
        String p = SqlParserUtil.parseString(token.image);
        functionClassName = SqlLiteral.createCharString(p, getPos());
    }
    [<LANGUAGE>
        (
            <JAVA>  { functionLanguage = "JAVA"; }
        |
            <SCALA> { functionLanguage = "SCALA"; }
        |
            <SQL>   { functionLanguage = "SQL"; }
        |
            <PYTHON>   { functionLanguage = "PYTHON"; }
        )
    ]
    {
        return new SqlCreateFunction(s.pos(), functionIdentifier, functionClassName, functionLanguage,
                ifNotExists, isTemporary, isSystemFunction);
    }
}

② 擴展 SqlCreate

規(guī)則匹配成功返回一個 SqlCreateFunction 節(jié)點,作為解析樹中的 SqlNode

public class SqlCreateFunction extends SqlCreate {

    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("CREATE FUNCTION", SqlKind.CREATE_FUNCTION);

    private final SqlIdentifier functionIdentifier;

    private final SqlCharStringLiteral functionClassName;

    private final String functionLanguage;

    private final boolean isTemporary;

    private final boolean isSystemFunction;

    public SqlCreateFunction(
            SqlParserPos pos,
            SqlIdentifier functionIdentifier,
            SqlCharStringLiteral functionClassName,
            String functionLanguage,
            boolean ifNotExists,
            boolean isTemporary,
            boolean isSystemFunction) {
        super(OPERATOR, pos, false, ifNotExists);
        this.functionIdentifier = requireNonNull(functionIdentifier);
        this.functionClassName = requireNonNull(functionClassName);
        this.isSystemFunction = isSystemFunction;
        this.isTemporary = isTemporary;
        this.functionLanguage = functionLanguage;
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Nonnull
    @Override
    public List<SqlNode> getOperandList() {
        return ImmutableNullableList.of(functionIdentifier, functionClassName);
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("CREATE");
        if (isTemporary) {
            writer.keyword("TEMPORARY");
        }
        if (isSystemFunction) {
            writer.keyword("SYSTEM");
        }
        writer.keyword("FUNCTION");
        if (ifNotExists) {
            writer.keyword("IF NOT EXISTS");
        }
        functionIdentifier.unparse(writer, leftPrec, rightPrec);
        writer.keyword("AS");
        functionClassName.unparse(writer, leftPrec, rightPrec);
        if (functionLanguage != null) {
            writer.keyword("LANGUAGE");
            writer.keyword(functionLanguage);
        }
    }
   //...省略
}

③ 將新增的語法規(guī)則,添加到配置文件 codegen/data/Parser.tdd
imports 增加 SqlCreateFunction 類,statementParserMethods 增加定義的規(guī)則方法

  package: "org.apache.flink.sql.parser.impl",
  class: "FlinkSqlParserImpl",

 imports: [
    "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
 ]

  statementParserMethods: [
    "SqlCreateFunction()"
  ]

  implementationFiles: [
    "parserImpls.ftl"
  ]

mvn clean compile 編譯后,自動生成 FlinkSqlParserImpl.java

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容