Sharding-JDBC源碼解析

1. Sharding-JDBC初始化

Sharding-JDBC的初始化主要包括兩個方面:

  1. 數(shù)據(jù)源元數(shù)據(jù)信息和表元數(shù)據(jù)信息的收集。
  2. 表分庫分表策略和算法的配置信息收集。

工廠類ShardingDataSourceFactory.createDataSource()方法在創(chuàng)建Sharding-JDBC的數(shù)據(jù)源實現(xiàn)類ShardingDataSource的同時還創(chuàng)建了ShardingRule、ShardingContext兩個核心類的對象,如下相關(guān)源碼:

public final class ShardingDataSourceFactory {
    
    public static DataSource createDataSource(
            final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Map<String, Object> configMap, final Properties props) throws SQLException {
        return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
    }
}

public class ShardingDataSource extends AbstractDataSourceAdapter {
    
    private final ShardingContext shardingContext;
    
    public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Map<String, Object> configMap, final Properties props) throws SQLException {
        super(dataSourceMap);
        checkDataSourceType(dataSourceMap);
        if (!configMap.isEmpty()) {
            ConfigMapContext.getInstance().getConfigMap().putAll(configMap);
        }
        shardingContext = new ShardingContext(getDataSourceMap(), shardingRule, getDatabaseType(), props);
    }
}

ShardingContext持有兩個屬性ShardingRule、ShardingMetaData,ShardingRule保存了表的分庫分表配置,這些配置包括分庫策略以及算法、分表策略以及算法,也就是說根據(jù)一個表以及這個表的列可以從ShardingRule中獲取這個表的分庫分表策略和算法。ShardingMetaData則維護了數(shù)據(jù)源和表的元數(shù)據(jù)信息,其有兩個屬性:ShardingDataSourceMetaData和ShardingTableMetaData,分表表示數(shù)據(jù)源的元數(shù)據(jù)信息和表的元數(shù)據(jù)信息,這兩個屬性在ShardingMetaData的構(gòu)造函數(shù)中被創(chuàng)建,源碼如下:

public final class ShardingMetaData {
    
    private final ShardingDataSourceMetaData dataSource;
    
    private final ShardingTableMetaData table;
    
    public ShardingMetaData(final Map<String, String> dataSourceURLs, final ShardingRule shardingRule, final DatabaseType databaseType, final ShardingExecuteEngine executeEngine, 
                            final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) {
        dataSource = new ShardingDataSourceMetaData(dataSourceURLs, shardingRule, databaseType);
        table = new ShardingTableMetaData(new TableMetaDataInitializer(dataSource, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData).load(shardingRule));
    }
}

ShardingRuleConfiguration是分庫分表配置的核心和入口,它可以包含多個TableRuleConfiguration和MasterSlaveRuleConfiguration。每一組相同規(guī)則分片的表配置一個TableRuleConfiguration。一個TableRuleConfiguration表示一個表的分庫分表策略配置,其持有兩個類型為ShardingStrategyConfiguration的屬性:databaseShardingStrategyConfig和tableShardingStrategyConfig,分別表示分庫策略配置和分表策略配置。ShardingStrategyConfiguration有如下四種實現(xiàn):

  • StandardShardingStrategyConfiguration 支持精確分片和范圍分片
  • ComplexShardingStrategyConfiguration 支持復(fù)雜分表
  • HintShardingStrategyConfiguration 強制某種策略分片
  • InlineShardingStrategyConfiguration 支持表達式分片
    以上每種分片策略配置都關(guān)聯(lián)一到兩個對應(yīng)的分片算法,分片算法由接口ShardingAlgorithm表示,其抽象子類有:
  • PreciseShardingAlgorithm 精確分片算法
  • RangeShardingAlgorithm 范圍分片算法
  • HintShardingAlgorithm 強制分片算法
  • ComplexKeysShardingAlgorithm 復(fù)雜分片算法

Sharding-JDBC會使用ShardingRuleConfiguration實力化TableRule對象,源碼如下:

public class ShardingRule {
    
    private final ShardingRuleConfiguration shardingRuleConfig;
    
    private final ShardingDataSourceNames shardingDataSourceNames;
    
    private final Collection<TableRule> tableRules = new LinkedList<>();
    
    private final ShardingStrategy defaultDatabaseShardingStrategy;
    
    private final ShardingStrategy defaultTableShardingStrategy;
    
    private final KeyGenerator defaultKeyGenerator;
    
    private final Collection<MasterSlaveRule> masterSlaveRules = new LinkedList<>();
    
    public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) {
        Preconditions.checkNotNull(dataSourceNames, "Data sources cannot be null.");
        Preconditions.checkArgument(!dataSourceNames.isEmpty(), "Data sources cannot be empty.");
        this.shardingRuleConfig = shardingRuleConfig;
        shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames);
        for (TableRuleConfiguration each : shardingRuleConfig.getTableRuleConfigs()) {
            // 對于每一個TableRuleConfiguration都生成一個TableRule對象,TableRule構(gòu)造函數(shù)會排列組合表和數(shù)據(jù)源實力化DataNode集合
            tableRules.add(new TableRule(each, shardingDataSourceNames));
        }
        for (String group : shardingRuleConfig.getBindingTableGroups()) {
            List<TableRule> tableRulesForBinding = new LinkedList<>();
            for (String logicTableNameForBindingTable : StringUtil.splitWithComma(group)) {
                tableRulesForBinding.add(getTableRuleByLogicTableName(logicTableNameForBindingTable));
            }
            bindingTableRules.add(new BindingTableRule(tableRulesForBinding));
        }
        broadcastTables.addAll(shardingRuleConfig.getBroadcastTables());
        defaultDatabaseShardingStrategy = null == shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig()
                ? new NoneShardingStrategy() : ShardingStrategyFactory.newInstance(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig());
        defaultTableShardingStrategy = null == shardingRuleConfig.getDefaultTableShardingStrategyConfig()
                ? new NoneShardingStrategy() : ShardingStrategyFactory.newInstance(shardingRuleConfig.getDefaultTableShardingStrategyConfig());
        defaultKeyGenerator = null == shardingRuleConfig.getDefaultKeyGenerator() ? new DefaultKeyGenerator() : shardingRuleConfig.getDefaultKeyGenerator();
        for (MasterSlaveRuleConfiguration each : shardingRuleConfig.getMasterSlaveRuleConfigs()) {
            masterSlaveRules.add(new MasterSlaveRule(each));
        }
    }
    // 略
}

一個TableRule對象表示一個邏輯表的庫表資源,其維護一個類型為DataNode的集合屬性actualDataNodes,這個DataNode集合表示此邏輯表對應(yīng)的實際庫表的集合,例如:現(xiàn)在有兩個庫db0、db1,每個庫有三個表,邏輯表名為t_order,那么TableRule對象的屬性actualDataNodes則有6個元素:

db0 t_order0
db0 t_order1
db0 t_order2
db1 t_order0
db1 t_order1
db1 t_order2

Sharding-JDBC做路由時即是根據(jù)此集合使用相應(yīng)的算法進行實際的庫表選取的。

相關(guān)類關(guān)系圖如下:

image

2. sql的解析與路由

ShardingPreparedStatement實現(xiàn)了PreparedStatement的三個核心方法:executeQuery()、executeUpdate()、execute(),相關(guān)源碼如下:

public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final PreparedStatementRoutingEngine routingEngine;
    
    private final PreparedStatementExecutor preparedStatementExecutor;
    
    private SQLRouteResult routeResult;
    
    private ResultSet currentResultSet;
    
    // 略
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            sqlRoute();
            initPreparedStatementExecutor();
            MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(), 
                    routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), preparedStatementExecutor.executeQuery());
            result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergeEngine.merge(), this);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
    
    @Override
    public int executeUpdate() throws SQLException {
        try {
            clearPrevious();
            sqlRoute();
            initPreparedStatementExecutor();
            return preparedStatementExecutor.executeUpdate();
        } finally {
            refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
            clearBatch();
        }
    }
    
    @Override
    public boolean execute() throws SQLException {
        try {
            clearPrevious();
            sqlRoute();
            initPreparedStatementExecutor();
            return preparedStatementExecutor.execute();
        } finally {
            refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
            clearBatch();
        }
    }
    
    private void sqlRoute() {
        routeResult = routingEngine.route(new ArrayList<>(getParameters()));
    }
    
    // 略

從上面的源碼可以看到,三個核心方法都調(diào)用到了sqlRoute()方法,此方法是sql的解析與路由的入口,sqlRoute()方法直接調(diào)用PreparedStatementRoutingEngine的route()方法,此方法首先調(diào)用shardingRouter的parse()方法sql進行解析,接著調(diào)用route()方法進行路由,相關(guān)源碼如下:

public final class PreparedStatementRoutingEngine {
    
    private final String logicSQL;
    
    private final ShardingRouter shardingRouter;
    
    private final ShardingMasterSlaveRouter masterSlaveRouter;
    
    private SQLStatement sqlStatement;
    
    public PreparedStatementRoutingEngine(final String logicSQL, final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
        this.logicSQL = logicSQL;
        shardingRouter = ShardingRouterFactory.newInstance(shardingRule, shardingMetaData, databaseType, showSQL);
        masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
    }
    
    public SQLRouteResult route(final List<Object> parameters) {
        if (null == sqlStatement) {
            // 調(diào)用parse()方法解析sql
            sqlStatement = shardingRouter.parse(logicSQL, true);
        }
        // 調(diào)用route()方法路由
        return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));
    }
}

從上面源碼可以看到ShardingRouter是解析和路由的核心接口,其實現(xiàn)類為ParsingSQLRouter,ParsingSQLRouter使用四個引擎對sql進行解析、解析和重寫,這四個引擎為:

  • SQLParsingEngine
    解析sql,返回SQLStatement作為解析的結(jié)果。
  • OptimizeEngine
    對SQLStatement進行優(yōu)化,返回ShardingConditions對象。
  • RoutingEngine
    根據(jù)庫表分片配置以及ShardingConditions找到目標庫表,返回RoutingResult對象。
  • SQLRewriteEngine
    根據(jù)路由結(jié)果重寫sql。

源碼如下:

public final class ParsingSQLRouter implements ShardingRouter {
    
    private final ShardingRule shardingRule;
    
    private final ShardingMetaData shardingMetaData;
    
    private final DatabaseType databaseType;
    
    @Override
    public SQLStatement parse(final String logicSQL, final boolean useCache) {
        parsingHook.start(logicSQL);
        try {
            // 1. 調(diào)用SQLParsingEngine解析sql。
            SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable()).parse(useCache);
            parsingHook.finishSuccess();
            return result;
        } catch (final Exception ex) {
            parsingHook.finishFailure(ex);
            throw ex;
        }
    }
    
    @Override
    public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement ? getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters) : Optional.<GeneratedKey>absent();
        SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());
        // 2. 調(diào)用OptimizeEngine優(yōu)化SQLStatement。
        ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();
        if (generatedKey.isPresent()) {
            setGeneratedKeys(result, generatedKey.get());
        }
        if (sqlStatement instanceof SelectStatement && !sqlStatement.getTables().isEmpty() && !((SelectStatement) sqlStatement).getSubQueryConditions().isEmpty()) {
            mergeShardingValueForSubQuery(sqlStatement.getConditions(), shardingConditions);
        }
        // 3. 調(diào)用RoutingEngine找到目標庫表。
        RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, shardingConditions).route();
        SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
            processLimit(parameters, (SelectStatement) sqlStatement);
        }
        // 4. 調(diào)用SQLRewriteEngine重寫sql。
        SQLBuilder sqlBuilder = rewriteEngine.rewrite(routingResult.isSingleRouting());
        for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
            result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingMetaData.getDataSource())));
        }
        if (showSQL) {
            SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits());
        }
        return result;
    }
    
    // 略
}

相關(guān)類關(guān)系圖如下:

image

這里重點關(guān)注一下解析和優(yōu)化后的結(jié)果,即ShardingConditions對象,此對象和自定義分片算法有關(guān),因為在自定義分片算法的時候需要知道條件列和值,這些信息都在類ShardingConditions關(guān)聯(lián)的ShardingCondition的集合屬性ShardingValue中。如上圖所示,ShardingValue有三個子類:ListShardingValue、PreciseShardingValue、RangeShardingValue,這三個子類搭配PreciseShardingAlgorithm、RangeShardingAlgorithm、HintShardingAlgorithm、ComplexKeysShardingAlgorithm四種分片算法可以覆蓋到我們絕大多數(shù)的分片業(yè)務(wù)場景。

上面提到RoutingEngine接口,這里使用到此接口的實現(xiàn)類為StandardRoutingEngine,下面看下它是如何做路由的,如下為StandardRoutingEngin相關(guān)源碼:

public final class StandardRoutingEngine implements RoutingEngine {
    
    private final ShardingRule shardingRule;
    
    private final String logicTableName;
    
    private final ShardingConditions shardingConditions;
   
    @Override
    public RoutingResult route() {
        return generateRoutingResult(getDataNodes(shardingRule.getTableRuleByLogicTableName(logicTableName)));
    }
    
    private Collection<DataNode> getDataNodes(final TableRule tableRule) {
        // 根據(jù)ShardingStrategy的類型執(zhí)行對應(yīng)的方法
        if (isRoutingByHint(tableRule)) {
            return routeByHint(tableRule);
        }
        if (isRoutingByShardingConditions(tableRule)) {
            return routeByShardingConditions(tableRule);
        }
        return routeByMixedConditions(tableRule);
    }
    
    private Collection<DataNode> routeByHint(final TableRule tableRule) {
        return route(tableRule, getDatabaseShardingValuesFromHint(), getTableShardingValuesFromHint());
    }
    
    private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
        return shardingConditions.getShardingConditions().isEmpty() ? route(tableRule, Collections.<ShardingValue>emptyList(), Collections.<ShardingValue>emptyList())
                : routeByShardingConditionsWithCondition(tableRule);
    }
    
    private Collection<DataNode> route(final TableRule tableRule, final List<ShardingValue> databaseShardingValues, final List<ShardingValue> tableShardingValues) {
        // 1. 先找?guī)?        Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedDataSources) {
            result.addAll(routeTables(tableRule, each, tableShardingValues));
        }
        return result;
    }
    
    private Collection<DataNode> routeByMixedConditions(final TableRule tableRule) {
        return shardingConditions.getShardingConditions().isEmpty() ? routeByMixedConditionsWithHint(tableRule) : routeByMixedConditionsWithCondition(tableRule);
    }
    
    private Collection<DataNode> routeByMixedConditionsWithCondition(final TableRule tableRule) {
        Collection<DataNode> result = new LinkedList<>();
        for (ShardingCondition each : shardingConditions.getShardingConditions()) {
            Collection<DataNode> dataNodes = route(tableRule, getDatabaseShardingValues(tableRule, each), getTableShardingValues(tableRule, each));
            reviseShardingConditions(each, dataNodes);
            result.addAll(dataNodes);
        }
        return result;
    }
    
    private Collection<DataNode> routeByMixedConditionsWithHint(final TableRule tableRule) {
        if (shardingRule.getDatabaseShardingStrategy(tableRule) instanceof HintShardingStrategy) {
            return route(tableRule, getDatabaseShardingValuesFromHint(), Collections.<ShardingValue>emptyList());
        }
        return route(tableRule, Collections.<ShardingValue>emptyList(), getTableShardingValuesFromHint());
    }
    
    private Collection<String> routeDataSources(final TableRule tableRule, final List<ShardingValue> databaseShardingValues) {
        Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
        if (databaseShardingValues.isEmpty()) {
            return availableTargetDatabases;
        }
        Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));
        Preconditions.checkState(!result.isEmpty(), "no database route info");
        return result;
    }
    
    private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<ShardingValue> tableShardingValues) {
        Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
        Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));
        Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedTables) {
            result.add(new DataNode(routedDataSource, each));
        }
        return result;
    }
    
    // 略
}

可以看到route()方法是入庫,此方法首先通過ShardingRule獲取到邏輯表所對應(yīng)的TableRule對象,上面說了TableRule保存了邏輯表對應(yīng)的實際的庫表關(guān)系集合,接著根據(jù)庫和表的ShardingStrategy的類型走了三個不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪個方法最終都會執(zhí)行到含有三個參數(shù)的route()方法,此方法先調(diào)用routeDataSources()方法路由數(shù)據(jù)源(庫),接著調(diào)用routeTables()方法路由表,路由庫表的方法也很簡單:

  1. 從TableRule中獲取可用的庫表集合。
  2. 從TableRule中獲取庫表的分片策略ShardingStrategy對象。
  3. 執(zhí)行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的庫表。

路由的結(jié)果以RoutingResult的形式返回,接著調(diào)用SQLRewriteEngine重寫sql,因為此時sql中的表還只是邏輯表名,并不是具體的哪個表,接著生成SQLUnit,并最終以SQLRouteResult形式返回路由結(jié)果。

3. sql的執(zhí)行與結(jié)果合并

類PreparedStatementExecutor負責sql的執(zhí)行,其定義了executeQuery()、executeUpdate()、execute()方法,用于對應(yīng)ShardingPreparedStatement中相應(yīng)方法,不過在執(zhí)行這些方法之前需要先調(diào)用init()方法,init()方法根據(jù)路由的結(jié)果SQLRouteResult中的RouteUnit集合創(chuàng)建StatementExecuteUnit集合對象,并保存在AbstractStatementExecutor的屬性executeGroups中,其間會根據(jù)邏輯數(shù)據(jù)源名稱獲取真實數(shù)據(jù)源DataSource,并從數(shù)據(jù)源獲取連接Connection,進而從連接處獲取Statement。相關(guān)源碼如下:

public void init(final SQLRouteResult routeResult) throws SQLException {
    // 把創(chuàng)建出的ShardingExecuteGroup<StatementExecuteUnit>對象設(shè)置到AbstractStatementExecutor的屬性executeGroups中
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();
}

private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
    return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {
        
        @Override
        public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
            // 從ShardingConnection處獲取一定數(shù)量的連接,ShardingConnection維護了一個Map<String, DataSource>類型的屬性dataSourceMap,其中保存了數(shù)據(jù)源信息。
            return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
        }
        
        @Override
        public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
            // 從Connection處獲取PreparedStatement對象并創(chuàng)建StatementExecuteUnit對象。
            return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
        }
    });
}

@SuppressWarnings("MagicConstant")
private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
    return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
            : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}

綜上,init()方法其實是把SQLRouteResult中的RouteUnit對象轉(zhuǎn)換為ShardingExecuteGroup<StatementExecuteUnit>對象集合并從數(shù)據(jù)源獲取連接和PreparedStatement的過程。

接著類PreparedStatementExecutor的executeQuery()、executeUpdate()或者execute()方法被調(diào)用,不管哪個方法被調(diào)用,都會執(zhí)行到PreparedStatementExecutor的父類AbstractStatementExecutor中的executeCallback(SQLExecuteCallback<T> executeCallback)方法,相關(guān)源碼如下:

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        
        @Override
        protected QueryResult executeSQL(final StatementExecuteUnit statementExecuteUnit) throws SQLException {
            return getQueryResult(statementExecuteUnit);
        }
    };
    return executeCallback(executeCallback);
}

public int executeUpdate() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<Integer> executeCallback = SQLExecuteCallbackFactory.getPreparedUpdateSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
    List<Integer> results = executeCallback(executeCallback);
    return accumulate(results);
}

public boolean execute() throws SQLException {
    boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<Boolean> executeCallback = SQLExecuteCallbackFactory.getPreparedSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
    List<Boolean> result = executeCallback(executeCallback);
    if (null == result || result.isEmpty() || null == result.get(0)) {
        return false;
    }
    return result.get(0);
}

類AbstractStatementExecutor中相關(guān)源碼:

public class AbstractStatementExecutor {

    private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
    
    private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();
    
    protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
            return sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
    }
    // 略
}

從上面源碼可以看到executeCallback()方法調(diào)用了SQLExecutePrepareTemplate的executeGroup()方法,此方法調(diào)用ShardingExecuteEngine的groupExecute()執(zhí)行ShardingExecuteGroup,executeGroup()以及相關(guān)方法源碼如下

public final class ShardingExecuteEngine implements AutoCloseable {
    
    private final ShardingExecutorService shardingExecutorService;
    
    private ListeningExecutorService executorService;
    
    public ShardingExecuteEngine(final int executorSize) {
        shardingExecutorService = new ShardingExecutorService(executorSize);
        executorService = shardingExecutorService.getExecutorService();
    }
    public <I, O> List<O> groupExecute(
            final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        if (inputGroups.isEmpty()) {
            return Collections.emptyList();
        }
        Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
        ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
        // 異步執(zhí)行從第二個開始的其他ShardingExecuteGroup,如果inputGroups集合不止一個元素的話。
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
        // 同步執(zhí)行第一個ShardingExecuteGroup。
        return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }
    
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
        for (ShardingExecuteGroup<I> each : inputGroups) {
            result.add(asyncGroupExecute(each, callback));
        }
        return result;
    }
    
    private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
        final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
        return executorService.submit(new Callable<Collection<O>>() {
            
            @Override
            public Collection<O> call() throws SQLException {
                return callback.execute(inputGroup.getInputs(), false, dataMap);
            }
        });
    }
    // 略   
}

從上面源碼可以看到對于Collection<ShardingExecuteGroup<I>>集合,第一個采用同步執(zhí)行,其他的提交至ListeningExecutorService執(zhí)行器中采用異步執(zhí)行,如果inputGroups集合不止一個元素的話。

如果執(zhí)行的是executeQuery()則返回的是一個List<QueryResult>集合,此時需要調(diào)用MergeEngine的merge()方法對QueryResult集合進行合并,MergeEngine接口有兩個實現(xiàn)類:DQLMergeEngine和DALMergeEngine,這兩個實現(xiàn)類分別負責數(shù)據(jù)查詢sql的合并和數(shù)據(jù)庫管理sql的合并,如下:

public final class MergeEngineFactory {
    
    public static MergeEngine newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, 
                                          final SQLStatement sqlStatement, final ShardingTableMetaData shardingTableMetaData, final List<QueryResult> queryResults) throws SQLException {
        if (sqlStatement instanceof SelectStatement) {
            return new DQLMergeEngine(databaseType, (SelectStatement) sqlStatement, queryResults);
        } 
        if (sqlStatement instanceof DALStatement) {
            return new DALMergeEngine(shardingRule, queryResults, (DALStatement) sqlStatement, shardingTableMetaData);
        }
        throw new UnsupportedOperationException(String.format("Cannot support type '%s'", sqlStatement.getType()));
    }
}

相關(guān)類結(jié)構(gòu)關(guān)系圖:

image

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 說在前面 sql路由這里的內(nèi)容比較多,包含單表路由或者綁定表路由、多庫多表路由、笛卡爾積路由,分三部分來介紹,今天...
    天河2018閱讀 2,020評論 0 0
  • 說在前面 第三部分主要解析的是分片規(guī)則構(gòu)造的源碼實現(xiàn),這一部分邏輯實現(xiàn)比較多,所以單獨拿出來最為一次解析。 上次我...
    天河2018閱讀 1,403評論 1 1
  • 打卡1.以前人們在四月開始收獲 ,躺在高高的谷堆上面笑著 ,我穿過金黃的麥田 ,去給稻草人唱歌 ,等著落山風吹過 ...
    四維式閱讀 194評論 0 0
  • 如何排練即興講話 排練是快速準備即興講話的好方法。它會讓最后的講話大不相同,并且促使你一直圍繞要點講話。排練即興講...
    肖家菇?jīng)?/span>閱讀 563評論 0 0
  • 今天爸爸媽媽帶我和姐姐去游樂園玩,一路上好期待,終于到目的地了,我們一下車就看到了很多人,人山人海,密密麻麻...
    倪語軒閱讀 418評論 0 2

友情鏈接更多精彩內(nèi)容