3. sharding-jdbc源碼之路由&執(zhí)行

阿飛Javaer,轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!

繼續(xù)以sharding-jdbc-example-jdbc模塊中的com.dangdang.ddframe.rdb.sharding.example.jdbc.Main為基礎(chǔ),剖析分庫分表簡單查詢SQL實現(xiàn)--printSimpleSelect(dataSource);,即如何執(zhí)行簡單的查詢SQL,接下來的分析以執(zhí)行SQL語句"SELECT o.* FROM t_order o where o.user_id=? AND o.order_id=?"為例;

單表查詢

MainprintSimpleSelect()方法調(diào)用preparedStatement.executeQuery(),即調(diào)用ShardingPreparedStatement中的executeQuery()方法,核心源碼如下:

@Override
public ResultSet executeQuery() throws SQLException {
    ResultSet result;
    try {
        // 核心方法route(),即解析SQL如何路由執(zhí)行
        Collection<PreparedStatementUnit> preparedStatementUnits = route();
        // 根據(jù)路由信息執(zhí)行SQL
        List<ResultSet> resultSets = new PreparedStatementExecutor(
                getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
        // 對返回的結(jié)果進(jìn)行merge合并
        result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());
    } finally {
        clearBatch();
    }
    currentResultSet = result;
    return result;
}

通過上面的源碼可知,SQL查詢兩個核心:路由和結(jié)果合并,接下來一一分析sharding-jdbc如何實現(xiàn);

單表查詢之路由

接下來分析下面這段代碼是如何取得路由信息的:

Collection<PreparedStatementUnit> preparedStatementUnits = route();

route()核心源碼如下:

private Collection<PreparedStatementUnit> route() throws SQLException {
    Collection<PreparedStatementUnit> result = new LinkedList<>();
    // 調(diào)用PreparedStatementRoutingEngine中的route()方法,route()方法調(diào)用sqlRouter.route(logicSQL, parameters, sqlStatement)
    routeResult = routingEngine.route(getParameters());
    for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
        SQLType sqlType = routeResult.getSqlStatement().getType();
        Collection<PreparedStatement> preparedStatements;
        if (SQLType.DDL == sqlType) {
            preparedStatements = generatePreparedStatementForDDL(each);
        } else {
            preparedStatements = Collections.singletonList(generatePreparedStatement(each));
        }
        routedStatements.addAll(preparedStatements);
        for (PreparedStatement preparedStatement : preparedStatements) {
            replaySetParameter(preparedStatement);
            result.add(new PreparedStatementUnit(each, preparedStatement));
        }
    }
    return result;
}

SQLRouter接口有兩個實現(xiàn)類:DatabaseHintSQLRouter和ParsingSQLRouter,由于這里沒有用hint語法強(qiáng)制執(zhí)行某個庫,所以調(diào)用ParsingSQLRouter中的route()方法:

private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
    Collection<String> tableNames = sqlStatement.getTables().getTableNames();
    RoutingEngine routingEngine;
    // 如果sql中只有一個表名,或者多個表名之間是綁定表關(guān)系,或者所有表都在默認(rèn)數(shù)據(jù)源指定的數(shù)據(jù)庫中(即不參與分庫分表的表),那么用SimpleRoutingEngine作為路由判斷引擎;
    if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
        routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
    } else {
        routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
    }
    return routingEngine.route();
}

接下來分析一下SimpleRoutingEngine和ComplexRoutingEngine;

SimpleRoutingEngine

執(zhí)行SQL:"SELECT o.* FROM t_order o where o.user_id=? AND o.order_id=?"時,由于SQL中只有一個表(1 == tableNames.size()),所以路由引擎是SimpleRoutingEngine;SimpleRoutingEngine.route()源碼如下:

@Override
public RoutingResult route() {
    // 根據(jù)邏輯表得到tableRule,邏輯表為t_order;表規(guī)則的配置為:.actualTables(Arrays.asList("t_order_0", "t_order_1")),所以有兩個實際表;
    TableRule tableRule = shardingRule.getTableRule(logicTableName);
    // 根據(jù)規(guī)則先路由數(shù)據(jù)源:即根據(jù)user_id取模路由
    Collection<String> routedDataSources = routeDataSources(tableRule);
    // routedMap保存路由到的目標(biāo)數(shù)據(jù)源和表的結(jié)果:key為數(shù)據(jù)源,value為該數(shù)據(jù)源下路由到的目標(biāo)表集合
    Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
    // 遍歷路由到的目標(biāo)數(shù)據(jù)源
    for (String each : routedDataSources) {
        // 再根據(jù)規(guī)則路由表:即根據(jù)order_id取模路由
        routedMap.put(each, routeTables(tableRule, each));
    }
    // 將得到的路由數(shù)據(jù)源和表信息封裝到RoutingResult中,RoutingResult中有個TableUnits類型屬性,TableUnits類中有個List<TableUnit> tableUnits屬性,TableUnit包含三個屬性:dataSourceName--數(shù)據(jù)源名稱,logicTableName--邏輯表名稱,actualTableName--實際表名稱,例如:TableUnit:{dataSourceName:ds_jdbc_1, logicTableName:t_order, actualTableName: t_order_1}
    return generateRoutingResult(tableRule, routedMap);
}

數(shù)據(jù)源路由詳細(xì)解讀:由于數(shù)據(jù)源的sharding策略為databaseShardingStrategy(new DatabaseShardingStrategy("user_id", new ModuloDatabaseShardingAlgorithm()));且where條件為where o.user_id=? AND o.order_id=?,即where條件中有user_id,根據(jù)取模路由策略,當(dāng)user_id為奇數(shù)時,數(shù)據(jù)源為ds_jdbc_1;當(dāng)user_id為偶數(shù)時,數(shù)據(jù)源為ds_jdbc_0;
表路由詳細(xì)解讀:表的sharding策略為tableShardingStrategy(new TableShardingStrategy("order_id", new ModuloTableShardingAlgorithm())),即where條件中有order_id,根據(jù)取模路由策略,當(dāng)order_id為奇數(shù)時,表為t_order_1;當(dāng)order_id為偶數(shù)時,表為t_order_0;
綜上所述:最終需要執(zhí)行的表數(shù)量為路由到的數(shù)據(jù)源個數(shù)路由到的實際表個數(shù)*;

實例1where o.order_id=1001 AND o.user_id=10,user_id=10所以路由得到數(shù)據(jù)源為ds_jdbc_0; order_id=1001,路由得到實際表為t_order_1;那么最終只需在ds_jdbc_0這個數(shù)據(jù)源中的t_order_1表中執(zhí)行即可;
實例2where o.order_id=1000,user_id沒有值所以路由得到所有數(shù)據(jù)源ds_jdbc_0和ds_jdbc_1; order_id=1000,路由得到實際表為t_order_0;那么最終需在ds_jdbc_0和ds_jdbc_1兩個數(shù)據(jù)源中的t_order_0表中執(zhí)行即可;
實例3where o.user_id=11,user_id=11所以路由得到數(shù)據(jù)源為ds_jdbc_1; order_id沒有值所以路由得到實際表為t_order_0和t_order_1;那么最終只需在ds_jdbc_1這個數(shù)據(jù)源中的t_order_0和t_order_1表中執(zhí)行即可;

ComplexRoutingEngine

待定... ...

單表查詢之執(zhí)行

路由完成后就決定了SQL需要在哪些數(shù)據(jù)源的哪些實際表中執(zhí)行,接下來以執(zhí)行SELECT o.* FROM t_order o where o.user_id=10為例分析下面這段Java代碼sharding-jdbc是如何執(zhí)行的:

根據(jù)前面的路由分析可知,這條SQL會路由到ds_jdbc_0這個數(shù)據(jù)源中,且在所有實際表([t_order_0, t_order_1])中執(zhí)行這個SQL;

List<ResultSet> resultSets = new PreparedStatementExecutor(
                getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();

執(zhí)行的核心代碼在ExecutorEngine中,核心源碼如下:

public <T> List<T> executePreparedStatement(
        final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
    // preparedStatementUnits就是前面路由分析結(jié)果:執(zhí)行SQL select o.* from t_order o where o.user_id=10時,只需在ds_jdbc_0這個數(shù)據(jù)源中的t_order_0和t_order_1兩個實際表中執(zhí)行即可;
    return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}

private  <T> List<T> execute(
        final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
    if (baseStatementUnits.isEmpty()) {
        return Collections.emptyList();
    }
    Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
    // 第一個任務(wù)分離出來
    BaseStatementUnit firstInput = iterator.next();
    // 除第一個任務(wù)之外的任務(wù)異步執(zhí)行
    ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
    T firstOutput;
    List<T> restOutputs;
    try {
        // 第一個任務(wù)同步執(zhí)行[猜測是不是考慮到分庫分表后只需路由到一個數(shù)據(jù)源中的一個表的SQL執(zhí)行性能問題,優(yōu)化這種SQL執(zhí)行為同步執(zhí)行?分庫分表后,面向用戶的API占用了99%的請求量,而這些API對應(yīng)的SQL 99%只需要在一個數(shù)據(jù)源上的一個實際表執(zhí)行即可,例如根據(jù)訂單表根據(jù)user_id分庫分表后,查詢用戶的訂單信息這種場景]
        firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
        // 取得其他異步執(zhí)行任務(wù)的結(jié)果
        restOutputs = restFutures.get();
        //CHECKSTYLE:OFF
    } catch (final Exception ex) {
        //CHECKSTYLE:ON
        ExecutorExceptionHandler.handleException(ex);
        return null;
    }
    List<T> result = Lists.newLinkedList(restOutputs);
    // 將第一個任務(wù)同步執(zhí)行結(jié)果與其他任務(wù)異步執(zhí)行結(jié)果合并就是最終的結(jié)果
    result.add(0, firstOutput);
    return result;
}

異步執(zhí)行核心代碼:

private final ListeningExecutorService executorService;

public ExecutorEngine(final int executorSize) {
    // 異步執(zhí)行的線程池是通過google-guava封裝的線程池,設(shè)置了線程名稱為增加了ShardingJDBC-***,增加了shutdown hook--應(yīng)用關(guān)閉時最多等待60秒直到所有任務(wù)完成,從而實現(xiàn)優(yōu)雅停機(jī)
    executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
            executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
    MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}

private <T> ListenableFuture<List<T>> asyncExecute(
        final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
    // 構(gòu)造一個存放異步執(zhí)行后的結(jié)果的list
    List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
    for (final BaseStatementUnit each : baseStatementUnits) {
        // 線程池方式異步執(zhí)行所有SQL,線程池在ExecutorEngine的構(gòu)造方法中初始化;
        result.add(executorService.submit(new Callable<T>() {
            
            @Override
            public T call() throws Exception {
                return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
            }
        }));
    }
    // google-guava的方法--將所有異步執(zhí)行結(jié)果轉(zhuǎn)為list類型
    return Futures.allAsList(result);
}

同步執(zhí)行核心代碼:

    private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
        return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
    }

由同步執(zhí)行核心代碼和異步執(zhí)行核心代碼可知,最終都是調(diào)用executeInternal(),跟讀這個方法的源碼可知:最終就是在目標(biāo)數(shù)據(jù)庫表上執(zhí)行PreparedStatementexecute***()方法;且在執(zhí)行前會利用google-guava的EventBus發(fā)布BEFORE_EXECUTE的事件(執(zhí)行完成后,如果執(zhí)行成功還會發(fā)布EXECUTE_SUCCESS事件,如果執(zhí)行失敗發(fā)布EXECUTE_FAILURE事件),部分核心源碼如下:

// 發(fā)布事件
List<AbstractExecutionEvent> events = new LinkedList<>();
if (parameterSets.isEmpty()) {
    // 構(gòu)造無參SQL的事件(事件類型為BEFORE_EXECUTE)
    events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
}
for (List<Object> each : parameterSets) {
    // 構(gòu)造有參SQL的事件(事件類型為BEFORE_EXECUTE)
    events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
// 調(diào)用google-guava的EventBus.post()提交事件
for (AbstractExecutionEvent event : events) {
    EventBusInstance.getInstance().post(event);
}

try {
    // 執(zhí)行SQL
    result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
    // 如果執(zhí)行過程中拋出SQLException,即執(zhí)行SQL失敗,那么post一個EXECUTE_FAILURE類型的事件
    for (AbstractExecutionEvent each : events) {
        each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
        each.setException(Optional.of(ex));
        EventBusInstance.getInstance().post(each);
        ExecutorExceptionHandler.handleException(ex);
    }
    return null;
}
for (AbstractExecutionEvent each : events) {
    // // 如果執(zhí)行成功,那么post一個EXECUTE_SUCCESS類型的事件
    each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
    EventBusInstance.getInstance().post(each);
}

接下來需要對并行執(zhí)行后得到的結(jié)果集進(jìn)行merge,下面的sharding-jdbc源碼分析系列文章繼續(xù)對其進(jìn)行分析;

EventBus

說明:EventBus是google-guava提供的消息發(fā)布-訂閱類庫;
google-guava的EventBus正確打開姿勢:

  1. 發(fā)布事務(wù):調(diào)用EventBus的post()--sharding-jdbc中發(fā)布事務(wù):EventBusInstance.getInstance().post(each);
  2. 訂閱事務(wù):調(diào)用EventBus的register()--sharding-jdbc中注冊事務(wù):EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());

EventBusInstance源碼如下--EventBus全類名為com.google.common.eventbus.EventBus

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EventBusInstance {
    
    private static final EventBus INSTANCE = new EventBus();
    
    /**
     * Get event bus instance.
     * 
     * @return event bus instance
     */
    public static EventBus getInstance() {
        return INSTANCE;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法,內(nèi)部類的語法,繼承相關(guān)的語法,異常的語法,線程的語...
    子非魚_t_閱讀 34,623評論 18 399
  • 目錄;(一) 拆分實施策略和示例演示(二) 全局主鍵生成策略(三) 關(guān)于使用框架還是自主開發(fā)以及sharding實...
    linking12閱讀 10,568評論 1 52
  • 這是2017年閱讀的第29本書,離150本還有122本,連續(xù)每日讀書第11天。 2017年2月18日,《一口氣讀完...
    吞書獸小布閱讀 461評論 0 0
  • 偶然的一次機(jī)會,聽了一節(jié)營銷課,給心靈一次全新的啟迪。想起過往有意識或無意識的聽一些育兒課,人生感悟課,無形中積累...
    煥真中道禪舞閱讀 386評論 2 0

友情鏈接更多精彩內(nèi)容