1.flink sql 解析
方法1:直接創(chuàng)建 flink sql parser 解析多行 sql
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parser</artifactId>
<version>1.12.3</version>
<dependency>
SqlParser sqlParser = SqlParser.create(sql, SqlParser.config()
.withParserFactory(FlinkSqlParserImpl.FACTORY)
.withQuoting(Quoting.BACK_TICK)
.withUnquotedCasing(Casing.TO_LOWER)
.withQuotedCasing(Casing.UNCHANGED)
.withConformance(FlinkSqlConformance.DEFAULT)
);
SqlNodeList sqlNodeList = sqlParser.parseStmtList();
List<SqlNode> sqlNodes = sqlNodeList.getList();
for (SqlNode sqlNode : sqlNodes) {
switch (sqlNode.getKind()) {
case INSERT:
SqlNodeList targetColumnNodes = ((SqlInsert) sqlNode).getTargetColumnList();
// todo
break;
case SELECT:
SqlNodeList nodes = ((SqlSelect) sqlNode).getSelectList();
// todo
break;
case XXX:
// todo
default:
// ignore
}
}
方法2:通過(guò) table env 獲取的 flink sql parser 解析單行 sql
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.3</version>
</dependency>
//1. 先創(chuàng)建ExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tableEnv = StreamTableEnvironment.create(env, settings);
//2. 從這個(gè)TableEnv中獲取ParserImpl
Parser parserImpl = ((TableEnvironmentImpl) tableEnv).getParser();
// parse the sql
SqlNode sqlNode = parserImpl.parse(statement);
// todo
if(sqlNode.getKind() == SELECT){
// ...
}
2.flink sql 校驗(yàn)
參考 flink sql 單條 sql 校驗(yàn)和上下文校驗(yàn)源碼
// 單條 sql 校驗(yàn)
// ParserImpl parse
@Override
public List<Operation> parse(String statement) {
//...省略
// 單條 sql 校驗(yàn)
Operation operation =
SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
// 上下文校驗(yàn)
// 1.TableEnvironmentImpl executeSql
@Override
public TableResult executeSql(String statement) {
List<Operation> operations = parser.parse(statement);
if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
return executeOperation(operations.get(0));
}
// 2.TableEnvironmentImpl executeOperation
// 最關(guān)鍵是catalogManager,用于管理table,例如創(chuàng)建、刪除等
private TableResult executeOperation(Operation operation) {
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
if (createTableOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
if (dropTableOperation.isTemporary()) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
}
...省略
}
問(wèn)題1:如何獲取 Planner 和 CatalogManager,進(jìn)而利用 SqlToOperationConverter 校驗(yàn)單個(gè) sqlNode?
問(wèn)題2:如何進(jìn)行 sql 上下文校驗(yàn)?
// 通過(guò) table env 獲取 Planner 和 CatalogManager
CatalogManager catalogManager = ((TableEnvironmentImpl) tableEnv).getCatalogManager();
StreamPlanner planner = (StreamPlanner) ((TableEnvironmentImpl) tableEnv).getPlanner();
//創(chuàng)建 flink planner 實(shí)例
FlinkPlannerImpl flinkPlanner = planner.createFlinkPlanner();
// 校驗(yàn)單條 sqlNode
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, sqlNode).get()
// 上下文校驗(yàn)
// 參考 TableEnvironmentImpl executeOperation
if (operation instanceof ModifyOperation) {
//...省略
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
if (createTableOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
}
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
if (dropTableOperation.isTemporary()) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
}
}
//...省略