ShardingSphere-jdbc sql執(zhí)行過程
執(zhí)行過程
- 調(diào)用ShardingSphereDataSource的getConnection方法
- 在DriverStateContext中的STATE容器中拿到之前在構(gòu)造ShardingSphereDataSource時已存入的OKDriverState
- 通過OKDriverState的getConnection方法構(gòu)造ShardingSphereConnection
- 通過調(diào)用ShardingSphereConnection中的createStatement方法,構(gòu)造ShardingSphereStatement
- 在ShardingSpherePreparedStatement的構(gòu)造方法中,完成多個變量的初始化
5.1 jdbcExecutor,本質(zhì)是ShardingSphereDataSource中生成的executorEngine,存放在metaDataContexts
5.2 ShardingSphereSQLParserEngine,sql解析引擎,根據(jù)不同的數(shù)據(jù)庫類型
5.3 BatchPreparedStatementExecutor,封裝需要執(zhí)行的sql單元JDBCExecutionUnit - 調(diào)用jdbcExecutor中的execute方法將sql的執(zhí)行封裝成任務(wù)由executorEngine中的線程池調(diào)度執(zhí)行
- 調(diào)用ListenableFuture的get方法獲取sql的執(zhí)行結(jié)果
方法解析
在example模塊例子中,調(diào)用以下sql
CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, address_id BIGINT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))
調(diào)用鏈如下:
public Connection getConnection() {
return DriverStateContext.getConnection(schemaName, getDataSourceMap(), contextManager, TransactionTypeHolder.get());
}
private static final Map<String, DriverState> STATES;
static {
// TODO add singleton cache with TypedSPI init
ShardingSphereServiceLoader.register(DriverState.class);
Collection<DriverState> driverStates = ShardingSphereServiceLoader.getSingletonServiceInstances(DriverState.class);
STATES = new HashMap<>();
for (DriverState each : driverStates) {
STATES.put(each.getType(), each);
}
}
public static Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
return STATES.get(contextManager.getMetaDataContexts().getStateContext().getCurrentState()).getConnection(schemaName, dataSourceMap, contextManager, transactionType);
}
public final class OKDriverState implements DriverState {
@Override
public Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
return new ShardingSphereConnection(schemaName, dataSourceMap, contextManager, TransactionTypeHolder.get());
}
@Override
public String getType() {
return "OK";
}
}
DriverStateContext中的成員變量STATES,是在TypedSPI調(diào)用init的時候初始化,也就是在生成ShardingSphereDataSource的過程中,這里實際是從OKDriverState中g(shù)etConnection,構(gòu)建一個ShardingSphereConnection
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingSpherePreparedStatement(this, sql);
}
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
metaDataContexts = connection.getContextManager().getMetaDataContexts();
this.sql = sql;
statements = new ArrayList<>();
parameterSets = new ArrayList<>();
ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(
DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType()));
sqlStatement = sqlParserEngine.parse(sql, true);
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
driverJDBCExecutor = new DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
// TODO Consider FederateRawExecutor
federateExecutor = new FederateJDBCExecutor(connection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getSchemaName());
kernelProcessor = new KernelProcessor();
}
通過調(diào)用ShardingSphereConnection的prepareStatement方法,構(gòu)造一個ShardingSpherePreparedStatement,在ShardingSpherePreparedStatement的構(gòu)造方法中初始化各種成員變量,與后續(xù)sql的執(zhí)行有關(guān)
public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
if (executionGroupContext.getInputGroups().isEmpty()) {
return Collections.emptyList();
}
return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
: parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
}
- executionGroupContext--把需要執(zhí)行的sql封裝成JDBCExecutionUnit
- callback--ShardingSphereStatement里通過createExecuteCallback生成的JDBCExecutorCallback
private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
ExecutionGroup<I> firstInputs = executionGroups.next();
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback);
return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
while (executionGroups.hasNext()) {
result.add(asyncExecute(executionGroups.next(), callback));
}
return result;
}
private <I, O> ListenableFuture<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) {
Map<String, Object> dataMap = ExecutorDataMap.getValue();
return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, dataMap));
}
private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
List<O> result = new LinkedList<>(firstResults);
for (ListenableFuture<Collection<O>> each : restFutures) {
try {
result.addAll(each.get());
} catch (final InterruptedException | ExecutionException ex) {
return throwException(ex);
}
}
return result;
}
- 把執(zhí)行sql封裝成線程池任務(wù)丟入線程池中調(diào)度執(zhí)行
- 把執(zhí)行sql任務(wù)的調(diào)用結(jié)果封裝成future收集起來
- 通過future,.get獲取sql執(zhí)行結(jié)果
private <T> List<T> doExecute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final Collection<RouteUnit> routeUnits,
final JDBCExecutorCallback<T> callback, final SQLStatement sqlStatement) throws SQLException {
List<T> results = jdbcExecutor.execute(executionGroupContext, callback);
refreshMetadata(sqlStatement, routeUnits);
return results;
}
- jdbcExecutor.execute方法是真正執(zhí)行sql的地方,在
JDBCExecutor中存放了由ShardingSphereDataSource構(gòu)造的executorEngine,所以本質(zhì)上是由executorEngine來執(zhí)行sql -
refreshMetadata方法中,sqlStatement存放了sql語句中一些元數(shù)據(jù)信息(表字段,約束等),routeUnits存放了路由信息(數(shù)據(jù)源+字表) - 最后根據(jù)
routeUnits的數(shù)量,返回相同數(shù)量的results,相當(dāng)于在每個routeUnits中都執(zhí)行一遍sql