阿飛Javaer,轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!
繼續(xù)以sharding-jdbc-example-jdbc模塊中的com.dangdang.ddframe.rdb.sharding.example.jdbc.Main為基礎(chǔ),剖析分庫分表簡單查詢SQL實現(xiàn)--printSimpleSelect(dataSource);,即如何執(zhí)行簡單的查詢SQL,接下來的分析以執(zhí)行SQL語句"SELECT o.* FROM t_order o where o.user_id=? AND o.order_id=?"為例;
單表查詢
Main中printSimpleSelect()方法調(diào)用preparedStatement.executeQuery(),即調(diào)用ShardingPreparedStatement中的executeQuery()方法,核心源碼如下:
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
// 核心方法route(),即解析SQL如何路由執(zhí)行
Collection<PreparedStatementUnit> preparedStatementUnits = route();
// 根據(jù)路由信息執(zhí)行SQL
List<ResultSet> resultSets = new PreparedStatementExecutor(
getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
// 對返回的結(jié)果進(jìn)行merge合并
result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
通過上面的源碼可知,SQL查詢兩個核心:路由和結(jié)果合并,接下來一一分析sharding-jdbc如何實現(xiàn);
單表查詢之路由
接下來分析下面這段代碼是如何取得路由信息的:
Collection<PreparedStatementUnit> preparedStatementUnits = route();
route()核心源碼如下:
private Collection<PreparedStatementUnit> route() throws SQLException {
Collection<PreparedStatementUnit> result = new LinkedList<>();
// 調(diào)用PreparedStatementRoutingEngine中的route()方法,route()方法調(diào)用sqlRouter.route(logicSQL, parameters, sqlStatement)
routeResult = routingEngine.route(getParameters());
for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
SQLType sqlType = routeResult.getSqlStatement().getType();
Collection<PreparedStatement> preparedStatements;
if (SQLType.DDL == sqlType) {
preparedStatements = generatePreparedStatementForDDL(each);
} else {
preparedStatements = Collections.singletonList(generatePreparedStatement(each));
}
routedStatements.addAll(preparedStatements);
for (PreparedStatement preparedStatement : preparedStatements) {
replaySetParameter(preparedStatement);
result.add(new PreparedStatementUnit(each, preparedStatement));
}
}
return result;
}
SQLRouter接口有兩個實現(xiàn)類:DatabaseHintSQLRouter和ParsingSQLRouter,由于這里沒有用hint語法強(qiáng)制執(zhí)行某個庫,所以調(diào)用ParsingSQLRouter中的route()方法:
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
Collection<String> tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
// 如果sql中只有一個表名,或者多個表名之間是綁定表關(guān)系,或者所有表都在默認(rèn)數(shù)據(jù)源指定的數(shù)據(jù)庫中(即不參與分庫分表的表),那么用SimpleRoutingEngine作為路由判斷引擎;
if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
} else {
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
}
return routingEngine.route();
}
接下來分析一下SimpleRoutingEngine和ComplexRoutingEngine;
SimpleRoutingEngine
執(zhí)行SQL:"SELECT o.* FROM t_order o where o.user_id=? AND o.order_id=?"時,由于SQL中只有一個表(1 == tableNames.size()),所以路由引擎是SimpleRoutingEngine;SimpleRoutingEngine.route()源碼如下:
@Override
public RoutingResult route() {
// 根據(jù)邏輯表得到tableRule,邏輯表為t_order;表規(guī)則的配置為:.actualTables(Arrays.asList("t_order_0", "t_order_1")),所以有兩個實際表;
TableRule tableRule = shardingRule.getTableRule(logicTableName);
// 根據(jù)規(guī)則先路由數(shù)據(jù)源:即根據(jù)user_id取模路由
Collection<String> routedDataSources = routeDataSources(tableRule);
// routedMap保存路由到的目標(biāo)數(shù)據(jù)源和表的結(jié)果:key為數(shù)據(jù)源,value為該數(shù)據(jù)源下路由到的目標(biāo)表集合
Map<String, Collection<String>> routedMap = new LinkedHashMap<>(routedDataSources.size());
// 遍歷路由到的目標(biāo)數(shù)據(jù)源
for (String each : routedDataSources) {
// 再根據(jù)規(guī)則路由表:即根據(jù)order_id取模路由
routedMap.put(each, routeTables(tableRule, each));
}
// 將得到的路由數(shù)據(jù)源和表信息封裝到RoutingResult中,RoutingResult中有個TableUnits類型屬性,TableUnits類中有個List<TableUnit> tableUnits屬性,TableUnit包含三個屬性:dataSourceName--數(shù)據(jù)源名稱,logicTableName--邏輯表名稱,actualTableName--實際表名稱,例如:TableUnit:{dataSourceName:ds_jdbc_1, logicTableName:t_order, actualTableName: t_order_1}
return generateRoutingResult(tableRule, routedMap);
}
數(shù)據(jù)源路由詳細(xì)解讀:由于數(shù)據(jù)源的sharding策略為
databaseShardingStrategy(new DatabaseShardingStrategy("user_id", new ModuloDatabaseShardingAlgorithm()));且where條件為where o.user_id=? AND o.order_id=?,即where條件中有user_id,根據(jù)取模路由策略,當(dāng)user_id為奇數(shù)時,數(shù)據(jù)源為ds_jdbc_1;當(dāng)user_id為偶數(shù)時,數(shù)據(jù)源為ds_jdbc_0;
表路由詳細(xì)解讀:表的sharding策略為tableShardingStrategy(new TableShardingStrategy("order_id", new ModuloTableShardingAlgorithm())),即where條件中有order_id,根據(jù)取模路由策略,當(dāng)order_id為奇數(shù)時,表為t_order_1;當(dāng)order_id為偶數(shù)時,表為t_order_0;
綜上所述:最終需要執(zhí)行的表數(shù)量為路由到的數(shù)據(jù)源個數(shù)路由到的實際表個數(shù)*;
實例1:where o.order_id=1001 AND o.user_id=10,user_id=10所以路由得到數(shù)據(jù)源為ds_jdbc_0; order_id=1001,路由得到實際表為t_order_1;那么最終只需在ds_jdbc_0這個數(shù)據(jù)源中的t_order_1表中執(zhí)行即可;
實例2:where o.order_id=1000,user_id沒有值所以路由得到所有數(shù)據(jù)源ds_jdbc_0和ds_jdbc_1; order_id=1000,路由得到實際表為t_order_0;那么最終需在ds_jdbc_0和ds_jdbc_1兩個數(shù)據(jù)源中的t_order_0表中執(zhí)行即可;
實例3:where o.user_id=11,user_id=11所以路由得到數(shù)據(jù)源為ds_jdbc_1; order_id沒有值所以路由得到實際表為t_order_0和t_order_1;那么最終只需在ds_jdbc_1這個數(shù)據(jù)源中的t_order_0和t_order_1表中執(zhí)行即可;
ComplexRoutingEngine
待定... ...
單表查詢之執(zhí)行
路由完成后就決定了SQL需要在哪些數(shù)據(jù)源的哪些實際表中執(zhí)行,接下來以執(zhí)行SELECT o.* FROM t_order o where o.user_id=10為例分析下面這段Java代碼sharding-jdbc是如何執(zhí)行的:
根據(jù)前面的路由分析可知,這條SQL會路由到ds_jdbc_0這個數(shù)據(jù)源中,且在所有實際表([t_order_0, t_order_1])中執(zhí)行這個SQL;
List<ResultSet> resultSets = new PreparedStatementExecutor(
getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
執(zhí)行的核心代碼在ExecutorEngine中,核心源碼如下:
public <T> List<T> executePreparedStatement(
final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
// preparedStatementUnits就是前面路由分析結(jié)果:執(zhí)行SQL select o.* from t_order o where o.user_id=10時,只需在ds_jdbc_0這個數(shù)據(jù)源中的t_order_0和t_order_1兩個實際表中執(zhí)行即可;
return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}
private <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
// 第一個任務(wù)分離出來
BaseStatementUnit firstInput = iterator.next();
// 除第一個任務(wù)之外的任務(wù)異步執(zhí)行
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
// 第一個任務(wù)同步執(zhí)行[猜測是不是考慮到分庫分表后只需路由到一個數(shù)據(jù)源中的一個表的SQL執(zhí)行性能問題,優(yōu)化這種SQL執(zhí)行為同步執(zhí)行?分庫分表后,面向用戶的API占用了99%的請求量,而這些API對應(yīng)的SQL 99%只需要在一個數(shù)據(jù)源上的一個實際表執(zhí)行即可,例如根據(jù)訂單表根據(jù)user_id分庫分表后,查詢用戶的訂單信息這種場景]
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 取得其他異步執(zhí)行任務(wù)的結(jié)果
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
List<T> result = Lists.newLinkedList(restOutputs);
// 將第一個任務(wù)同步執(zhí)行結(jié)果與其他任務(wù)異步執(zhí)行結(jié)果合并就是最終的結(jié)果
result.add(0, firstOutput);
return result;
}
異步執(zhí)行核心代碼:
private final ListeningExecutorService executorService;
public ExecutorEngine(final int executorSize) {
// 異步執(zhí)行的線程池是通過google-guava封裝的線程池,設(shè)置了線程名稱為增加了ShardingJDBC-***,增加了shutdown hook--應(yīng)用關(guān)閉時最多等待60秒直到所有任務(wù)完成,從而實現(xiàn)優(yōu)雅停機(jī)
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}
private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
// 構(gòu)造一個存放異步執(zhí)行后的結(jié)果的list
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final BaseStatementUnit each : baseStatementUnits) {
// 線程池方式異步執(zhí)行所有SQL,線程池在ExecutorEngine的構(gòu)造方法中初始化;
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
// google-guava的方法--將所有異步執(zhí)行結(jié)果轉(zhuǎn)為list類型
return Futures.allAsList(result);
}
同步執(zhí)行核心代碼:
private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
}
由同步執(zhí)行核心代碼和異步執(zhí)行核心代碼可知,最終都是調(diào)用
executeInternal(),跟讀這個方法的源碼可知:最終就是在目標(biāo)數(shù)據(jù)庫表上執(zhí)行PreparedStatement的execute***()方法;且在執(zhí)行前會利用google-guava的EventBus發(fā)布BEFORE_EXECUTE的事件(執(zhí)行完成后,如果執(zhí)行成功還會發(fā)布EXECUTE_SUCCESS事件,如果執(zhí)行失敗發(fā)布EXECUTE_FAILURE事件),部分核心源碼如下:
// 發(fā)布事件
List<AbstractExecutionEvent> events = new LinkedList<>();
if (parameterSets.isEmpty()) {
// 構(gòu)造無參SQL的事件(事件類型為BEFORE_EXECUTE)
events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
}
for (List<Object> each : parameterSets) {
// 構(gòu)造有參SQL的事件(事件類型為BEFORE_EXECUTE)
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
// 調(diào)用google-guava的EventBus.post()提交事件
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
try {
// 執(zhí)行SQL
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
// 如果執(zhí)行過程中拋出SQLException,即執(zhí)行SQL失敗,那么post一個EXECUTE_FAILURE類型的事件
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(Optional.of(ex));
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
for (AbstractExecutionEvent each : events) {
// // 如果執(zhí)行成功,那么post一個EXECUTE_SUCCESS類型的事件
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
接下來需要對并行執(zhí)行后得到的結(jié)果集進(jìn)行merge,下面的sharding-jdbc源碼分析系列文章繼續(xù)對其進(jìn)行分析;
EventBus
說明:EventBus是google-guava提供的消息發(fā)布-訂閱類庫;
google-guava的EventBus正確打開姿勢:
- 發(fā)布事務(wù):調(diào)用EventBus的post()--sharding-jdbc中發(fā)布事務(wù):EventBusInstance.getInstance().post(each);
- 訂閱事務(wù):調(diào)用EventBus的register()--sharding-jdbc中注冊事務(wù):EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
EventBusInstance源碼如下--EventBus全類名為com.google.common.eventbus.EventBus:
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EventBusInstance {
private static final EventBus INSTANCE = new EventBus();
/**
* Get event bus instance.
*
* @return event bus instance
*/
public static EventBus getInstance() {
return INSTANCE;
}
}