很多同學(xué)在使用Calcite的過程中需要自定義函數(shù), 現(xiàn)在講講如何定自義函數(shù)
1. Calcite 內(nèi)置函數(shù)和對(duì)應(yīng)的流程
Calcite中內(nèi)置的函數(shù)主要在SqlStdOperatorTable中, 包括常見的算術(shù)運(yùn)算符、時(shí)間函數(shù)等?,F(xiàn)在就以一個(gè)列子來說明在SqlStdOperatorTable 中添加函數(shù)以達(dá)到注冊(cè)函數(shù)的功能
- 在
SqlStdOperatorTable.java中添加對(duì)應(yīng)函數(shù)
public static final SqlFunction TIMESTR2LONG = new SqlFunction(
new SqlIdentifier("TIMESTR2LONG", SqlParserPos.ZERO),
//返回值為L(zhǎng)ong, 可以為NULL
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.BIGINT), SqlTypeTransforms.TO_NULLABLE),
//輸入類型推測(cè)為Varchar, 即java中的String
InferTypes.VARCHAR_1024,
//類型檢查,如果類型不是String, 報(bào)錯(cuò)
OperandTypes.family(SqlTypeFamily.STRING),
Lists.newArrayList(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT).createSqlType(SqlTypeName.VARCHAR)),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
- 在自已的代碼中將Parser的OperatorTable 設(shè)置成上述 SqlStdOperatorTable
public class FunctionTestOne {
public static final SqlTypeFactoryImpl TYPE_FACTORY = new SqlTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
public static final RelDataTypeSystem TYPE_SYSTEM = RelDataTypeSystem.DEFAULT;
public static void main(String[] args) {
CalciteSchema rootSchema = CalciteSchema
.createRootSchema(false, false);
//添加表Test
rootSchema.add("test", new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataTypeFactory.Builder builder = new RelDataTypeFactory
.Builder(TYPE_FACTORY);
//列id, 類型int
builder.add("id", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.INTEGER));
//列name, 類型為varchar
builder.add("name", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
builder.add("time_str", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
return builder.build();
}
});
SqlParser.ConfigBuilder builder = SqlParser.configBuilder();
//以下需要設(shè)置成大寫并且忽略大小寫
builder.setQuotedCasing(Casing.TO_UPPER);
builder.setUnquotedCasing(Casing.TO_UPPER);
builder.setCaseSensitive(false);
final FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema.plus())
.parserConfig(builder.build())
//注意用你自已的SqlStdOperatorTable, 此處之所以同名
//目的是覆蓋calcite中SqlStdOperatorTable
.operatorTable(SqlStdOperatorTable.instance())
.build();
Planner planner = Frameworks.getPlanner(config);
//now start to parser
try {
SqlNode originSqlNode = planner.parse("select name, timestr2long(time_str) from test where id < 5");
SqlNode sqlNode = planner.validate(originSqlNode);
RelRoot root = planner.rel(sqlNode);
System.out.println(RelOptUtil.toString(root.rel, ALL_ATTRIBUTES));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
稍微解釋一下上述代碼中主要步驟
創(chuàng)建rootSchema, 你可以把其理解為L(zhǎng)inux中root目錄, 當(dāng)往rootSchema添加table是相當(dāng)于添加一個(gè)普通文件,當(dāng)往rootSchema添加schema時(shí)相當(dāng)于添加一個(gè)目錄, 相應(yīng)的rootSchema就是整個(gè)數(shù)據(jù)庫(kù)的根,可以往根數(shù)據(jù)庫(kù)中添加函數(shù)、數(shù)據(jù)庫(kù)、表等,添加的子數(shù)據(jù)庫(kù)又可以把函數(shù)、數(shù)據(jù)庫(kù)、表放置其中,不斷遞歸
然后往rootSchema添加表后,然后設(shè)置Parser和OperatorTable 后生成Planner, 至于中間是怎么用的,后面會(huì)詳細(xì)說明
最后進(jìn)行Parser/Validate并轉(zhuǎn)化成RelNode
現(xiàn)在簡(jiǎn)要說明一下函數(shù)注冊(cè)主要過程
Parser階段,在Parser.jj文件中 有以下內(nèi)容:
LOOKAHEAD( [<SPECIFIC>] FunctionName() <LPAREN>)
e = NamedFunctionCall()
// NamedFunctionCall的主要內(nèi)容如下
...
SqlNode NamedFunctionCall() :
qualifiedName = FunctionName()
createCall(qualifiedName, s.end(this), funcType, quantifier, args);
...
FunctionName() 主要是獲取函數(shù)名,如例中的函數(shù)名 timestr2long或者是Calcite內(nèi)置的一些固定的函數(shù)名,如ABS, AVG等
createCall()是生成SqlCall(函數(shù)的主要形式), 而createCall()最終會(huì)調(diào)用
protected SqlCall createCall(
SqlIdentifier funName,
SqlParserPos pos,
SqlFunctionCategory funcType,
SqlLiteral functionQualifier,
SqlNode[] operands) {
SqlOperator fun = null;
// First, try a half-hearted resolution as a builtin function.
// If we find one, use it; this will guarantee that we
// preserve the correct syntax (i.e. don't quote builtin function
/// name when regenerating SQL).
if (funName.isSimple()) {
final List<SqlOperator> list = new ArrayList<>();
//這里opTab的值為SqlStdOperatorTable.instance();
opTab.lookupOperatorOverloads(funName, funcType, SqlSyntax.FUNCTION, list);
if (list.size() == 1) {
fun = list.get(0);
}
}
// Otherwise, just create a placeholder function. Later, during
// validation, it will be resolved into a real function reference.
if (fun == null) {
fun = new SqlUnresolvedFunction(funName, null, null, null, null,
funcType);
}
return fun.createCall(functionQualifier, pos, operands);
}
經(jīng)過以上步驟可以發(fā)現(xiàn)注冊(cè)過程流程為:
- 在SqlStdOperatorTable按照格式注冊(cè)自已的函數(shù)名
- 在Parser過程中,會(huì)根據(jù)函數(shù)名自動(dòng)在SqlStdOperatorTable查找對(duì)應(yīng)函數(shù)
- 如果沒有找到,會(huì)自動(dòng)將函數(shù)解析成
SqlUnresolvedFunction
那問題來了,SqlUnresolvedFunction什么時(shí)候會(huì)解析成具體的函數(shù)?答案是: Validate階段, 在Validate階段,Calcite 會(huì)針對(duì)函數(shù)做以下工作
- 根據(jù)函數(shù)名在所有函數(shù)注冊(cè)表中查找函數(shù)(之所以說是所有函數(shù)注冊(cè)表是因?yàn)镃alcite還可以在其它地方注冊(cè)函數(shù), 后面的
ChainedSqlOperatorTable,ListSqlOperatorTable等)查找函數(shù) - 驗(yàn)證函數(shù)的入?yún)?包話參數(shù)類型、個(gè)數(shù)、函數(shù)類型等
- 遞歸驗(yàn)證每個(gè)入?yún)⑹欠窈戏ǎ唧w過程后續(xù)會(huì)詳細(xì)說明
其它方式注冊(cè)函數(shù)
上述注冊(cè)函數(shù)略微tricky, 需要要本地搭建一個(gè)org.apache.calcite.sql.fun包并且從calcite-core中復(fù)制SqlStdOperatorTable.java的內(nèi)容以覆蓋內(nèi)置的SqlStdOperatorTable,最后添加自已的函數(shù),通過這種方式實(shí)現(xiàn)并不那么優(yōu)雅,其實(shí)還有其它相對(duì)優(yōu)雅的方式進(jìn)行函數(shù)注冊(cè),主要有以下兩種方式
1. 修改Parser.jj 注冊(cè)
假如我要實(shí)現(xiàn)以下兩個(gè)方法
public static Integer func1(String s) {
return s == null ? null : Integer.valueOf(s);
}
public static String func2(Integer i) {
return i == null ? null : String.valueOf(i);
}
要在Calcite中注冊(cè)以上兩個(gè)函數(shù),只需要在parser.jj中添加以下內(nèi)容, 見代碼
/*
* Create User-defined function
*/
SqlNode extraFunction() :
{
SqlNode node;
SqlNode e;
List<SqlNode> args = null;
final Span s;
}
{ (
(
<FUNC1> { s = span(); }
<LPAREN>
e = Expression(ExprContext.ACCEPT_NON_QUERY)
{
startList(e);
}
<LPAREN>
{
node = FunctionUtil.FUNC1.createCall(s.end(this), args);
}
)
|
(
<FUNC2> { s = span(); }
<LPAREN>
e = Expression(ExprContext.ACCEPT_NON_QUERY)
{
startList(e);
}
<LPAREN>
{
node = FunctionUtil.FUNC2.createCall(s.end(this), args);
}
)
)
{
return node;
}
}
<DEFAULT, DQID, BTID> TOKEN :
...
| < FUNCTION: "FUNCTION" >
| < FUNC1: "FUNC1"> //添加的內(nèi)容
| < FUNC2: "FUNC2"> //添加的內(nèi)容
...
FunctionUtil 內(nèi)容如下, 見代碼
public class FunctionUtil {
public static final SqlFunction FUNC1 = new SqlFunction(
new SqlIdentifier("FUNC1", SqlParserPos.ZERO),
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.INTEGER), SqlTypeTransforms.TO_NULLABLE),
InferTypes.VARCHAR_1024,
OperandTypes.family(SqlTypeFamily.STRING),
Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR)),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
public static final SqlFunction FUNC2 = new SqlFunction(
new SqlIdentifier("FUNC2", SqlParserPos.ZERO),
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
InferTypes.FIRST_KNOWN,
OperandTypes.family(SqlTypeFamily.INTEGER),
Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
最后的測(cè)試代碼如下,見代碼
//now test func
//將創(chuàng)建的函數(shù)放入SqlOperatorTable()中
ListSqlOperatorTable listSqlOperatorTable = new ListSqlOperatorTable();
listSqlOperatorTable.add(FUNC1);
listSqlOperatorTable.add(FUNC2);
final FrameworkConfig funcConfig = Frameworks.newConfigBuilder()
.defaultSchema(rootSchema.plus())
.parserConfig(builder.build())
//添加一個(gè)專們用于添加函數(shù)的 listSqlOperatorTable
.operatorTable(ChainedSqlOperatorTable.of(listSqlOperatorTable,
SqlStdOperatorTable.instance()))
.build();
Planner planner2 = Frameworks.getPlanner(funcConfig);
SqlNode func1SqlNodeOrg = planner2.parse("select func1(name) from test where id > 4");
SqlNode func1SqlNode = planner2.validate(func1SqlNodeOrg);
RelRoot func1Root = planner2.rel(func1SqlNode);
System.out.println("-------- func1 test -------");
System.out.println(RelOptUtil.toString(func1Root.rel, ALL_ATTRIBUTES));
Planner planner3 = Frameworks.getPlanner(funcConfig);
SqlNode func2SqlNodeOrg = planner3.parse("select func2(id) from test where id > 4");
SqlNode func2SqlNode = planner3.validate(func2SqlNodeOrg);
RelRoot func2Root = planner3.rel(func2SqlNode);
System.out.println("-------- func2 test -------");
System.out.println(RelOptUtil.toString(func2Root.rel, ALL_ATTRIBUTES));
2. Schema 注冊(cè)
細(xì)心的讀者可以發(fā)現(xiàn),以上兩種方式都要對(duì)calcite做侵入式修改。內(nèi)置方法需要覆蓋SqlStdOpeartorTable, 而修改Parser.jj則需要自已書寫Parser邏輯,并在Validate階段注冊(cè)函數(shù)。考慮到一下場(chǎng)景,需要類似于Hive 注冊(cè)UDF那樣,動(dòng)態(tài)的注冊(cè)函數(shù),上述兩種方式是無法實(shí)現(xiàn)的,那如何實(shí)現(xiàn)動(dòng)態(tài)注冊(cè)函數(shù)呢? 用schema 注冊(cè), 下面以一個(gè)例子說明, 代碼在這
//主要代碼
public static RelRoot sqlToRelNode(String sql) {
try {
SchemaPlus plus = ROOT_SCHEMA.plus();
plus.add("FUNC1", ScalarFunctionImpl.create(
FunctionUtil.class, "func1"));
plus.add("FUNC2", ScalarFunctionImpl.create(
FunctionUtil.class, "func2"));
SqlParser parser = SqlParser.create(sql, config.getParserConfig());
SqlNode sqlNode = parser.parseStmt();
//這里需要注意大小寫問題,否則表會(huì)無法找到
Properties properties = new Properties();
properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
String.valueOf(config.getParserConfig().caseSensitive()));
CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(rootSchema(plus)),
CalciteSchema.from(config.getDefaultSchema()).path(null),
TYPE_FACTORY,
new CalciteConnectionConfigImpl(properties));
//to supported user' define function
SqlOperatorTable sqlOperatorTable = ChainedSqlOperatorTable
.of(config.getOperatorTable(), calciteCatalogReader);
TestSqlValidatorImpl validator = new TestSqlValidatorImpl(
sqlOperatorTable,
calciteCatalogReader,
TYPE_FACTORY,
SqlConformanceEnum.DEFAULT);
//try to union trait set
//addRelTraitDef for is HepPlanner has not effect in fact
VolcanoPlanner volcanoPlanner = new VolcanoPlanner();
SqlNode validateSqlNode = validator.validate(sqlNode);
final RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
RelOptCluster cluster = RelOptCluster.create(volcanoPlanner, rexBuilder);
final SqlToRelConverter.Config sqlToRelConverterConfig
= SqlToRelConverter.configBuilder()
.withConfig(config.getSqlToRelConverterConfig())
.withTrimUnusedFields(false)
.withConvertTableAccess(false)
.build();
final SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(null, validator,
calciteCatalogReader, cluster, config.getConvertletTable(),
sqlToRelConverterConfig);
RelRoot root =
sqlToRelConverter.convertQuery(validateSqlNode, false, true);
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
final RelBuilder relBuilder = sqlToRelConverterConfig
.getRelBuilderFactory().create(cluster, null);
//change trait set of TableScan
return root.withRel(
RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Schema 注冊(cè)相對(duì)于前兩者來說過程復(fù)雜一點(diǎn),相好處是可以定制化, 主要有以下特點(diǎn)
- 隨時(shí)注冊(cè) 隨時(shí)使用
- 引入了schema空間,同一個(gè)function可以注冊(cè)到不同的schema并可以隔離
采用Schema 注冊(cè)的主要步驟在CalciteCatalogReader讀取schema, 限于篇幅,我會(huì)專門用一個(gè)篇文章詳細(xì)說明CalciteCatalogReader
3. 總結(jié)
以上為三種方式在Calcite注冊(cè)函數(shù),那么這三種有什么區(qū)別
- 從靈活度來說,前兩種不如通過schema注冊(cè),而且通過schema注冊(cè)也不需要去修改calcite核心代碼,適合于初學(xué)者使用。更重要的是通過schema注冊(cè)可以實(shí)現(xiàn)函數(shù)隔離,可以實(shí)現(xiàn)不同數(shù)據(jù)庫(kù)級(jí)別的函數(shù)之持。
- 從實(shí)現(xiàn)容易度來說, 第一種更為簡(jiǎn)單,只需要簡(jiǎn)單修改一個(gè)內(nèi)置的表即可
那么對(duì)于一個(gè)項(xiàng)目來說 采用哪里方式更好?我的建議是
- 如果需要?jiǎng)討B(tài)注冊(cè)或者schema級(jí)別函數(shù)隔離,建議采用第三種
- 如果函數(shù)相對(duì)固定而且函數(shù)數(shù)量較多,建議采用第一種
- 如果需要更加細(xì)致定制函數(shù)且函數(shù)數(shù)量不多,可以采用第二種