ShardingSphere-jdbc sql執(zhí)行過程

ShardingSphere-jdbc sql執(zhí)行過程

執(zhí)行過程

  1. 調(diào)用ShardingSphereDataSource的getConnection方法
  2. 在DriverStateContext中的STATE容器中拿到之前在構(gòu)造ShardingSphereDataSource時已存入的OKDriverState
  3. 通過OKDriverState的getConnection方法構(gòu)造ShardingSphereConnection
  4. 通過調(diào)用ShardingSphereConnection中的createStatement方法,構(gòu)造ShardingSphereStatement
  5. 在ShardingSpherePreparedStatement的構(gòu)造方法中,完成多個變量的初始化
    5.1 jdbcExecutor,本質(zhì)是ShardingSphereDataSource中生成的executorEngine,存放在metaDataContexts
    5.2 ShardingSphereSQLParserEngine,sql解析引擎,根據(jù)不同的數(shù)據(jù)庫類型
    5.3 BatchPreparedStatementExecutor,封裝需要執(zhí)行的sql單元JDBCExecutionUnit
  6. 調(diào)用jdbcExecutor中的execute方法將sql的執(zhí)行封裝成任務(wù)由executorEngine中的線程池調(diào)度執(zhí)行
  7. 調(diào)用ListenableFuture的get方法獲取sql的執(zhí)行結(jié)果

方法解析

example模塊例子中,調(diào)用以下sql

CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, address_id BIGINT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))

調(diào)用鏈如下:

    public Connection getConnection() {
        return DriverStateContext.getConnection(schemaName, getDataSourceMap(), contextManager, TransactionTypeHolder.get());
    }
    private static final Map<String, DriverState> STATES;
    
    static {
        // TODO add singleton cache with TypedSPI init
        ShardingSphereServiceLoader.register(DriverState.class);
        Collection<DriverState> driverStates = ShardingSphereServiceLoader.getSingletonServiceInstances(DriverState.class);
        STATES = new HashMap<>();
        for (DriverState each : driverStates) {
            STATES.put(each.getType(), each);
        }
    }

    public static Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
        return STATES.get(contextManager.getMetaDataContexts().getStateContext().getCurrentState()).getConnection(schemaName, dataSourceMap, contextManager, transactionType);
    }
public final class OKDriverState implements DriverState {
    
    @Override
    public Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
        return new ShardingSphereConnection(schemaName, dataSourceMap, contextManager, TransactionTypeHolder.get());
    }
    
    @Override
    public String getType() {
        return "OK";
    }
}

DriverStateContext中的成員變量STATES,是在TypedSPI調(diào)用init的時候初始化,也就是在生成ShardingSphereDataSource的過程中,這里實際是從OKDriverState中g(shù)etConnection,構(gòu)建一個ShardingSphereConnection

    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new ShardingSpherePreparedStatement(this, sql);
    }
    public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }
    private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        metaDataContexts = connection.getContextManager().getMetaDataContexts();
        this.sql = sql;
        statements = new ArrayList<>();
        parameterSets = new ArrayList<>();
        ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(
                DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType()));
        sqlStatement = sqlParserEngine.parse(sql, true);
        parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
        statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
        JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
        driverJDBCExecutor = new DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
        // TODO Consider FederateRawExecutor
        federateExecutor = new FederateJDBCExecutor(connection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getSchemaName());
        kernelProcessor = new KernelProcessor();
    }

通過調(diào)用ShardingSphereConnectionprepareStatement方法,構(gòu)造一個ShardingSpherePreparedStatement,在ShardingSpherePreparedStatement的構(gòu)造方法中初始化各種成員變量,與后續(xù)sql的執(zhí)行有關(guān)

    public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
                                  final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
        if (executionGroupContext.getInputGroups().isEmpty()) {
            return Collections.emptyList();
        }
        return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
                : parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
    }
  • executionGroupContext--把需要執(zhí)行的sql封裝成JDBCExecutionUnit
  • callback--ShardingSphereStatement里通過createExecuteCallback生成的JDBCExecutorCallback
    private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
        ExecutionGroup<I> firstInputs = executionGroups.next();
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback);
        return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
        while (executionGroups.hasNext()) {
            result.add(asyncExecute(executionGroups.next(), callback));
        }
        return result;
    }
    private <I, O> ListenableFuture<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) {
        Map<String, Object> dataMap = ExecutorDataMap.getValue();
        return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, dataMap));
    }
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
        List<O> result = new LinkedList<>(firstResults);
        for (ListenableFuture<Collection<O>> each : restFutures) {
            try {
                result.addAll(each.get());
            } catch (final InterruptedException | ExecutionException ex) {
                return throwException(ex);
            }
        }
        return result;
    }
  1. 把執(zhí)行sql封裝成線程池任務(wù)丟入線程池中調(diào)度執(zhí)行
  2. 把執(zhí)行sql任務(wù)的調(diào)用結(jié)果封裝成future收集起來
  3. 通過future,.get獲取sql執(zhí)行結(jié)果
    private <T> List<T> doExecute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final Collection<RouteUnit> routeUnits,
                                  final JDBCExecutorCallback<T> callback, final SQLStatement sqlStatement) throws SQLException {
        List<T> results = jdbcExecutor.execute(executionGroupContext, callback);
        refreshMetadata(sqlStatement, routeUnits);
        return results;
    }
  • jdbcExecutor.execute方法是真正執(zhí)行sql的地方,在JDBCExecutor中存放了由ShardingSphereDataSource構(gòu)造的executorEngine,所以本質(zhì)上是由executorEngine來執(zhí)行sql
  • refreshMetadata方法中,sqlStatement存放了sql語句中一些元數(shù)據(jù)信息(表字段,約束等),routeUnits存放了路由信息(數(shù)據(jù)源+字表)
  • 最后根據(jù)routeUnits的數(shù)量,返回相同數(shù)量的results,相當(dāng)于在每個routeUnits中都執(zhí)行一遍sql
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容