如何在Calcite中注冊(cè)函數(shù)

很多同學(xué)在使用Calcite的過程中需要自定義函數(shù), 現(xiàn)在講講如何定自義函數(shù)

1. Calcite 內(nèi)置函數(shù)和對(duì)應(yīng)的流程

Calcite中內(nèi)置的函數(shù)主要在SqlStdOperatorTable中, 包括常見的算術(shù)運(yùn)算符、時(shí)間函數(shù)等?,F(xiàn)在就以一個(gè)列子來說明在SqlStdOperatorTable 中添加函數(shù)以達(dá)到注冊(cè)函數(shù)的功能

  1. SqlStdOperatorTable.java 中添加對(duì)應(yīng)函數(shù)
public static final SqlFunction TIMESTR2LONG = new SqlFunction(
          new SqlIdentifier("TIMESTR2LONG", SqlParserPos.ZERO),
          //返回值為L(zhǎng)ong, 可以為NULL
          ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.BIGINT), SqlTypeTransforms.TO_NULLABLE),
          //輸入類型推測(cè)為Varchar, 即java中的String
          InferTypes.VARCHAR_1024,
         //類型檢查,如果類型不是String, 報(bào)錯(cuò)
          OperandTypes.family(SqlTypeFamily.STRING),
          Lists.newArrayList(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT).createSqlType(SqlTypeName.VARCHAR)),
          SqlFunctionCategory.USER_DEFINED_FUNCTION);
  1. 在自已的代碼中將Parser的OperatorTable 設(shè)置成上述 SqlStdOperatorTable
public class FunctionTestOne {
    public static final SqlTypeFactoryImpl TYPE_FACTORY = new SqlTypeFactoryImpl(
            RelDataTypeSystem.DEFAULT);
    public static final RelDataTypeSystem TYPE_SYSTEM = RelDataTypeSystem.DEFAULT;

    public static void main(String[] args) {
        CalciteSchema rootSchema = CalciteSchema
                .createRootSchema(false, false);

        //添加表Test
        rootSchema.add("test", new AbstractTable() {
            @Override
            public RelDataType getRowType(RelDataTypeFactory typeFactory) {
                RelDataTypeFactory.Builder builder = new RelDataTypeFactory
                        .Builder(TYPE_FACTORY);
                //列id, 類型int
                builder.add("id", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.INTEGER));
                //列name, 類型為varchar
                builder.add("name", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
                builder.add("time_str", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
                return builder.build();
            }
        });

        SqlParser.ConfigBuilder builder = SqlParser.configBuilder();
        //以下需要設(shè)置成大寫并且忽略大小寫
        builder.setQuotedCasing(Casing.TO_UPPER);
        builder.setUnquotedCasing(Casing.TO_UPPER);
        builder.setCaseSensitive(false);

        final FrameworkConfig config = Frameworks.newConfigBuilder()
                .defaultSchema(rootSchema.plus())
                .parserConfig(builder.build())
                //注意用你自已的SqlStdOperatorTable, 此處之所以同名
                //目的是覆蓋calcite中SqlStdOperatorTable
                .operatorTable(SqlStdOperatorTable.instance())
                .build();

        Planner planner = Frameworks.getPlanner(config);

        //now start to parser

        try {
            SqlNode originSqlNode = planner.parse("select name, timestr2long(time_str) from test where id < 5");
            SqlNode sqlNode = planner.validate(originSqlNode);
            RelRoot root = planner.rel(sqlNode);
            System.out.println(RelOptUtil.toString(root.rel, ALL_ATTRIBUTES));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

稍微解釋一下上述代碼中主要步驟

  1. 創(chuàng)建rootSchema, 你可以把其理解為L(zhǎng)inux中root目錄, 當(dāng)往rootSchema添加table是相當(dāng)于添加一個(gè)普通文件,當(dāng)往rootSchema添加schema時(shí)相當(dāng)于添加一個(gè)目錄, 相應(yīng)的rootSchema就是整個(gè)數(shù)據(jù)庫(kù)的根,可以往根數(shù)據(jù)庫(kù)中添加函數(shù)、數(shù)據(jù)庫(kù)、表等,添加的子數(shù)據(jù)庫(kù)又可以把函數(shù)、數(shù)據(jù)庫(kù)、表放置其中,不斷遞歸

  2. 然后往rootSchema添加表后,然后設(shè)置Parser和OperatorTable 后生成Planner, 至于中間是怎么用的,后面會(huì)詳細(xì)說明

  3. 最后進(jìn)行Parser/Validate并轉(zhuǎn)化成RelNode

現(xiàn)在簡(jiǎn)要說明一下函數(shù)注冊(cè)主要過程

Parser階段,在Parser.jj文件中 有以下內(nèi)容:

LOOKAHEAD( [<SPECIFIC>] FunctionName() <LPAREN>)
e = NamedFunctionCall()

// NamedFunctionCall的主要內(nèi)容如下
...
SqlNode NamedFunctionCall() :
    qualifiedName = FunctionName() 
    createCall(qualifiedName, s.end(this), funcType, quantifier, args);
...

FunctionName() 主要是獲取函數(shù)名,如例中的函數(shù)名 timestr2long或者是Calcite內(nèi)置的一些固定的函數(shù)名,如ABS, AVG等
createCall()是生成SqlCall(函數(shù)的主要形式), 而createCall()最終會(huì)調(diào)用

 protected SqlCall createCall(
      SqlIdentifier funName,
      SqlParserPos pos,
      SqlFunctionCategory funcType,
      SqlLiteral functionQualifier,
      SqlNode[] operands) {
    SqlOperator fun = null;

    // First, try a half-hearted resolution as a builtin function.
    // If we find one, use it; this will guarantee that we
    // preserve the correct syntax (i.e. don't quote builtin function
    /// name when regenerating SQL).
    if (funName.isSimple()) {
      final List<SqlOperator> list = new ArrayList<>();
      //這里opTab的值為SqlStdOperatorTable.instance();
      opTab.lookupOperatorOverloads(funName, funcType, SqlSyntax.FUNCTION, list);
      if (list.size() == 1) {
        fun = list.get(0);
      }
    }

    // Otherwise, just create a placeholder function.  Later, during
    // validation, it will be resolved into a real function reference.
    if (fun == null) {
      fun = new SqlUnresolvedFunction(funName, null, null, null, null,
          funcType);
    }

    return fun.createCall(functionQualifier, pos, operands);
  }

經(jīng)過以上步驟可以發(fā)現(xiàn)注冊(cè)過程流程為:

  1. 在SqlStdOperatorTable按照格式注冊(cè)自已的函數(shù)名
  2. 在Parser過程中,會(huì)根據(jù)函數(shù)名自動(dòng)在SqlStdOperatorTable查找對(duì)應(yīng)函數(shù)
  3. 如果沒有找到,會(huì)自動(dòng)將函數(shù)解析成SqlUnresolvedFunction

那問題來了,SqlUnresolvedFunction什么時(shí)候會(huì)解析成具體的函數(shù)?答案是: Validate階段, 在Validate階段,Calcite 會(huì)針對(duì)函數(shù)做以下工作

  1. 根據(jù)函數(shù)名在所有函數(shù)注冊(cè)表中查找函數(shù)(之所以說是所有函數(shù)注冊(cè)表是因?yàn)镃alcite還可以在其它地方注冊(cè)函數(shù), 后面的ChainedSqlOperatorTable, ListSqlOperatorTable等)查找函數(shù)
  2. 驗(yàn)證函數(shù)的入?yún)?包話參數(shù)類型、個(gè)數(shù)、函數(shù)類型等
  3. 遞歸驗(yàn)證每個(gè)入?yún)⑹欠窈戏ǎ唧w過程后續(xù)會(huì)詳細(xì)說明

其它方式注冊(cè)函數(shù)

上述注冊(cè)函數(shù)略微tricky, 需要要本地搭建一個(gè)org.apache.calcite.sql.fun包并且從calcite-core中復(fù)制SqlStdOperatorTable.java的內(nèi)容以覆蓋內(nèi)置的SqlStdOperatorTable,最后添加自已的函數(shù),通過這種方式實(shí)現(xiàn)并不那么優(yōu)雅,其實(shí)還有其它相對(duì)優(yōu)雅的方式進(jìn)行函數(shù)注冊(cè),主要有以下兩種方式

1. 修改Parser.jj 注冊(cè)

假如我要實(shí)現(xiàn)以下兩個(gè)方法

    public static Integer func1(String s) {
        return s == null ? null : Integer.valueOf(s);
    }

    public static String func2(Integer i) {
        return i == null ? null : String.valueOf(i);
    }

要在Calcite中注冊(cè)以上兩個(gè)函數(shù),只需要在parser.jj中添加以下內(nèi)容, 見代碼

/*
*   Create User-defined function
*/
SqlNode extraFunction() :
{
    SqlNode node;
    SqlNode e;
    List<SqlNode> args = null;
    final Span s;
}
{   (
        (
            <FUNC1> { s = span(); }
            <LPAREN>
            e = Expression(ExprContext.ACCEPT_NON_QUERY)
            {
                startList(e);
            }
            <LPAREN>
            {
                node = FunctionUtil.FUNC1.createCall(s.end(this), args);
            }
        )
        |
        (
            <FUNC2> { s = span(); }
            <LPAREN>
            e = Expression(ExprContext.ACCEPT_NON_QUERY)
            {
                startList(e);
            }
            <LPAREN>
            {
                node = FunctionUtil.FUNC2.createCall(s.end(this), args);
            }

        )
    )
    {
        return node;
    }
}


<DEFAULT, DQID, BTID> TOKEN :
...
|   < FUNCTION: "FUNCTION" >
|   < FUNC1: "FUNC1"> //添加的內(nèi)容
|   < FUNC2: "FUNC2"> //添加的內(nèi)容
...

FunctionUtil 內(nèi)容如下, 見代碼

public class FunctionUtil {

    public static final SqlFunction FUNC1 = new SqlFunction(
            new SqlIdentifier("FUNC1", SqlParserPos.ZERO),
            ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.INTEGER), SqlTypeTransforms.TO_NULLABLE),
            InferTypes.VARCHAR_1024,
            OperandTypes.family(SqlTypeFamily.STRING),
            Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR)),
            SqlFunctionCategory.USER_DEFINED_FUNCTION);

    public static final SqlFunction FUNC2 = new SqlFunction(
            new SqlIdentifier("FUNC2", SqlParserPos.ZERO),
            ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
            InferTypes.FIRST_KNOWN,
            OperandTypes.family(SqlTypeFamily.INTEGER),
            Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)),
            SqlFunctionCategory.USER_DEFINED_FUNCTION);
}

最后的測(cè)試代碼如下,見代碼

            //now test func
            //將創(chuàng)建的函數(shù)放入SqlOperatorTable()中
            ListSqlOperatorTable listSqlOperatorTable = new ListSqlOperatorTable();
            listSqlOperatorTable.add(FUNC1);
            listSqlOperatorTable.add(FUNC2);

            final FrameworkConfig funcConfig = Frameworks.newConfigBuilder()
                    .defaultSchema(rootSchema.plus())
                    .parserConfig(builder.build())
                    //添加一個(gè)專們用于添加函數(shù)的 listSqlOperatorTable
                    .operatorTable(ChainedSqlOperatorTable.of(listSqlOperatorTable,
                            SqlStdOperatorTable.instance()))
                    .build();

            Planner planner2 = Frameworks.getPlanner(funcConfig);
            SqlNode func1SqlNodeOrg = planner2.parse("select func1(name) from test where id > 4");
            SqlNode func1SqlNode = planner2.validate(func1SqlNodeOrg);
            RelRoot func1Root = planner2.rel(func1SqlNode);
            System.out.println("-------- func1 test -------");
            System.out.println(RelOptUtil.toString(func1Root.rel, ALL_ATTRIBUTES));

            Planner planner3 = Frameworks.getPlanner(funcConfig);
            SqlNode func2SqlNodeOrg = planner3.parse("select func2(id) from test where id > 4");
            SqlNode func2SqlNode = planner3.validate(func2SqlNodeOrg);
            RelRoot func2Root = planner3.rel(func2SqlNode);
            System.out.println("-------- func2 test -------");
            System.out.println(RelOptUtil.toString(func2Root.rel, ALL_ATTRIBUTES));

2. Schema 注冊(cè)

細(xì)心的讀者可以發(fā)現(xiàn),以上兩種方式都要對(duì)calcite做侵入式修改。內(nèi)置方法需要覆蓋SqlStdOpeartorTable, 而修改Parser.jj則需要自已書寫Parser邏輯,并在Validate階段注冊(cè)函數(shù)。考慮到一下場(chǎng)景,需要類似于Hive 注冊(cè)UDF那樣,動(dòng)態(tài)的注冊(cè)函數(shù),上述兩種方式是無法實(shí)現(xiàn)的,那如何實(shí)現(xiàn)動(dòng)態(tài)注冊(cè)函數(shù)呢? 用schema 注冊(cè), 下面以一個(gè)例子說明, 代碼在

 //主要代碼
 public static RelRoot sqlToRelNode(String sql) {

        try {
            SchemaPlus plus = ROOT_SCHEMA.plus();
            plus.add("FUNC1", ScalarFunctionImpl.create(
                    FunctionUtil.class, "func1"));

            plus.add("FUNC2", ScalarFunctionImpl.create(
                    FunctionUtil.class, "func2"));


            SqlParser parser = SqlParser.create(sql, config.getParserConfig());
            SqlNode sqlNode = parser.parseStmt();

            //這里需要注意大小寫問題,否則表會(huì)無法找到
            Properties properties = new Properties();
            properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
                    String.valueOf(config.getParserConfig().caseSensitive()));

            CalciteCatalogReader calciteCatalogReader =  new CalciteCatalogReader(
                    CalciteSchema.from(rootSchema(plus)),
                    CalciteSchema.from(config.getDefaultSchema()).path(null),
                    TYPE_FACTORY,
                    new CalciteConnectionConfigImpl(properties));

            //to supported user' define function
            SqlOperatorTable sqlOperatorTable = ChainedSqlOperatorTable
                    .of(config.getOperatorTable(), calciteCatalogReader);

            TestSqlValidatorImpl validator = new TestSqlValidatorImpl(
                    sqlOperatorTable,
                    calciteCatalogReader,
                    TYPE_FACTORY,
                    SqlConformanceEnum.DEFAULT);

            //try to union trait set
            //addRelTraitDef for is HepPlanner has not effect in fact
            VolcanoPlanner volcanoPlanner = new VolcanoPlanner();
            SqlNode validateSqlNode = validator.validate(sqlNode);
            final RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
            RelOptCluster cluster = RelOptCluster.create(volcanoPlanner, rexBuilder);

            final SqlToRelConverter.Config sqlToRelConverterConfig
                    = SqlToRelConverter.configBuilder()
                    .withConfig(config.getSqlToRelConverterConfig())
                    .withTrimUnusedFields(false)
                    .withConvertTableAccess(false)
                    .build();

            final SqlToRelConverter sqlToRelConverter =
                    new SqlToRelConverter(null, validator,
                            calciteCatalogReader, cluster, config.getConvertletTable(),
                            sqlToRelConverterConfig);

            RelRoot root =
                    sqlToRelConverter.convertQuery(validateSqlNode, false, true);
            root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
            final RelBuilder relBuilder = sqlToRelConverterConfig
                    .getRelBuilderFactory().create(cluster, null);

            //change trait set of TableScan
            return root.withRel(
                    RelDecorrelator.decorrelateQuery(root.rel, relBuilder));

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

Schema 注冊(cè)相對(duì)于前兩者來說過程復(fù)雜一點(diǎn),相好處是可以定制化, 主要有以下特點(diǎn)

  1. 隨時(shí)注冊(cè) 隨時(shí)使用
  2. 引入了schema空間,同一個(gè)function可以注冊(cè)到不同的schema并可以隔離

采用Schema 注冊(cè)的主要步驟在CalciteCatalogReader讀取schema, 限于篇幅,我會(huì)專門用一個(gè)篇文章詳細(xì)說明CalciteCatalogReader

3. 總結(jié)

以上為三種方式在Calcite注冊(cè)函數(shù),那么這三種有什么區(qū)別

  1. 從靈活度來說,前兩種不如通過schema注冊(cè),而且通過schema注冊(cè)也不需要去修改calcite核心代碼,適合于初學(xué)者使用。更重要的是通過schema注冊(cè)可以實(shí)現(xiàn)函數(shù)隔離,可以實(shí)現(xiàn)不同數(shù)據(jù)庫(kù)級(jí)別的函數(shù)之持。
  2. 從實(shí)現(xiàn)容易度來說, 第一種更為簡(jiǎn)單,只需要簡(jiǎn)單修改一個(gè)內(nèi)置的表即可

那么對(duì)于一個(gè)項(xiàng)目來說 采用哪里方式更好?我的建議是

  1. 如果需要?jiǎng)討B(tài)注冊(cè)或者schema級(jí)別函數(shù)隔離,建議采用第三種
  2. 如果函數(shù)相對(duì)固定而且函數(shù)數(shù)量較多,建議采用第一種
  3. 如果需要更加細(xì)致定制函數(shù)且函數(shù)數(shù)量不多,可以采用第二種
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容