1、前言
之前我們自定義了 mybatis 的插件,實(shí)際上 mybatis 的插件功能相當(dāng)簡(jiǎn)單,mybatis 插件注入是在 sqlSessionFactory.openSession 的時(shí)候,具體可以參照源碼。既然他的插件這么好用,我們可以參照使用它實(shí)現(xiàn)分庫(kù)分表的功能。
2、設(shè)計(jì)
分庫(kù)分表場(chǎng)景中,將 MyBatis Spring 集成使用,選擇具體分庫(kù)的功能并不是直接 MyBatis 中完成的,而是在 Spring 中配置了多個(gè)數(shù)據(jù)源,并通過(guò) spring 攔截器實(shí)現(xiàn)的。具體的分表功能是通過(guò) MyBatis 中添加 個(gè)分表插件實(shí)現(xiàn)的,在該插件中攔截 Executor update() 方法和 query() 方法,并根據(jù)用戶傳入的用戶 ID 計(jì)算分表的編號(hào)后綴。之后,該插件會(huì)將表名與編號(hào)后綴組合形成分表名稱,解析并修改 SQL 語(yǔ)句,最終得到可以在當(dāng)前分庫(kù)中直接執(zhí)行的 SQL 語(yǔ)句。
對(duì)于分庫(kù)分表的規(guī)則:
用戶所在數(shù)據(jù)庫(kù) ID = 用戶 ID(或者其他 key) % 數(shù)據(jù)庫(kù)數(shù)量
用戶所在數(shù)據(jù)表 ID = 用戶 ID / 數(shù)據(jù)庫(kù)數(shù)量 % 每個(gè)數(shù)據(jù)庫(kù)表數(shù)量(至于為啥是這樣算,不用計(jì)較太多,我看過(guò)各種版本的,也有直接 id % 表數(shù)量的,能唯一找到就行)
3、實(shí)現(xiàn)
首先定義好分庫(kù)策略、分表策略接口。并實(shí)現(xiàn)具體的分庫(kù)策略、分表策略。
package cn.blogxin.sharding.plugin.strategy.database;
/**
* 分庫(kù)策略
*
*/
public interface ShardingDataBaseStrategy {
/**
* 計(jì)算獲取對(duì)應(yīng)分庫(kù)序號(hào)
*
* @param sharingDataBaseCount 分庫(kù)數(shù)量
* @param shardingKey 分表key
* @return 分庫(kù)序號(hào)
*/
Integer calculate(int sharingDataBaseCount, String shardingKey);
}
默認(rèn)分庫(kù)實(shí)現(xiàn):
package cn.blogxin.sharding.plugin.strategy.database;
/**
* 默認(rèn)分庫(kù)策略,將分表從小到大均勻分配至各分庫(kù)中
* 比如:
* 2個(gè)庫(kù),10個(gè)表
* 0-4表在0庫(kù),5-9表在1庫(kù)
*
*/
public class DefaultShardingDataBaseStrategy implements ShardingDataBaseStrategy {
@Override
public Integer calculate(int sharingDataBaseCount, String shardingKey) {
return Math.abs(shardingKey.hashCode() % sharingDataBaseCount);
}
}
分表接口:
package cn.blogxin.sharding.plugin.strategy;
import cn.blogxin.sharding.plugin.Sharding;
/**
* 分表策略
*
*/
public interface ShardingTableStrategy {
String UNDERLINE = "_";
/**
* 獲取分表位的實(shí)際表名
*
* @param sharding Sharding信息
* @param shardingKey 分庫(kù)分表 key
* @return 帶分表位的實(shí)際表名
*/
String getTargetTableName(Sharding sharding, String shardingKey);
/**
* 計(jì)算分表
*
* @param sharding Sharding信息
* @param shardingCount 庫(kù)數(shù)量
* @param shardingKey 分庫(kù)分表 key
* @return 計(jì)算分表
*/
Integer calculateTableSuffix(Sharding sharding, Integer shardingCount, String shardingKey);
}
分表基類:
package cn.blogxin.sharding.plugin.strategy;
import cn.blogxin.sharding.plugin.Sharding;
import cn.blogxin.sharding.plugin.ShardingContext;
import cn.blogxin.sharding.plugin.bean.ShardingDataSourceInfo;
import com.google.common.collect.Maps;
import java.util.Map;
/**
* 帶分庫(kù)的分表策略,使用分庫(kù)插件時(shí),分表插件必須繼承該類
*
*/
public abstract class AbstractShardingStrategyWithDataBase implements ShardingTableStrategy {
private static Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap = Maps.newHashMap();
public static void setShardingDataSourceInfoMap(Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap) {
AbstractShardingStrategyWithDataBase.shardingDataSourceInfoMap = shardingDataSourceInfoMap;
}
// 這邊做的不是特別好,分庫(kù)分表職責(zé)沒(méi)有分開(kāi)
@Override
public String getTargetTableName(Sharding sharding, String shardingKey) {
// 確定庫(kù)名
ShardingDataSourceInfo shardingDataSourceInfo = shardingDataSourceInfoMap.get(sharding.databaseName());
if (shardingDataSourceInfo != null) {
int databaseNum = shardingDataSourceInfo.getShardingDataBaseStrategy().calculate(shardingDataSourceInfo.getShardingCount(), shardingKey);
// 設(shè)置上下文的數(shù)據(jù)庫(kù)
ShardingContext.setShardingDatabase(sharding.databaseName() + ShardingContext.getMasterSalve() + databaseNum);
}
// 確定表名
Integer tableSuffix = calculateTableSuffix(sharding, shardingDataSourceInfo.getShardingCount(), shardingKey);
return getTableName(sharding.tableName(), tableSuffix);
}
private String getTableName(String tableName, Integer shardingKey) {
return tableName + UNDERLINE + shardingKey;
}
}
分表默認(rèn)實(shí)現(xiàn):
package cn.blogxin.sharding.plugin.strategy;
import cn.blogxin.sharding.plugin.Sharding;
/**
* hash 分表策略,key / databaseCount % tableCount
*
*/
public class HashShardingStrategyWithDataBase extends AbstractShardingStrategyWithDataBase {
@Override
public Integer calculateTableSuffix(Sharding sharding, Integer shardingCount, String shardingKey) {
return Math.abs(shardingKey.hashCode()) / shardingCount % sharding.count();
}
}
然后設(shè)置好 spring 的動(dòng)態(tài)數(shù)據(jù)源,key 是什么,value 為數(shù)據(jù)源,后續(xù)進(jìn)行 mybatis 的自定義攔截器查詢時(shí),會(huì)將數(shù)據(jù)源 key 設(shè)置到上下文,然后根據(jù) key 獲取相應(yīng)的數(shù)據(jù)源、獲取連接執(zhí)行 sql 操作。
package cn.blogxin.sharding.plugin;
import cn.blogxin.sharding.plugin.bean.Database;
import cn.blogxin.sharding.plugin.bean.ShardingDataSourceInfo;
import cn.blogxin.sharding.plugin.strategy.AbstractShardingStrategyWithDataBase;
import cn.blogxin.sharding.plugin.strategy.database.ShardingDataBaseStrategy;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.collections.MapUtils;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Set;
/**
* 加載分庫(kù)分表插件
*
*/
@Configuration
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
@ConditionalOnProperty(name = "sharding.databases", havingValue = "enable")
@EnableConfigurationProperties(ShardingProperties.class)
public class ShardingDataSourceConfiguration {
@Resource
private ShardingProperties shardingProperties;
private DataSource shardingDataSource() {
Map<String, Database> databases = shardingProperties.getDatabases();
Preconditions.checkArgument(!CollectionUtils.isEmpty(databases), "不存在分庫(kù)配置");
Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap = Maps.newHashMap();
Map<Object, Object> targetDataSources = Maps.newHashMap();
DataSource dataSource = null;
for (Map.Entry<String, Database> entry : databases.entrySet()) {
String dataBaseName = entry.getKey(); // 你說(shuō)你用啥獲取不好,非要在配置文件的 key 中獲取,容易造成疑惑
Database database = entry.getValue();
ShardingDataSourceInfo shardingDataSourceInfo = new ShardingDataSourceInfo();
shardingDataSourceInfo.setShardingCount(database.getShardingCount());
shardingDataSourceInfo.setShardingDataBaseStrategy(createShardingDataBaseStrategy(database.getShardingStrategy()));
shardingDataSourceInfoMap.put(dataBaseName, shardingDataSourceInfo);
// 每個(gè)庫(kù)對(duì)應(yīng)的數(shù)據(jù)源
Set<Map.Entry<String, Map<Integer, DataSourceProperties>>> entries = database.getDataSource().entrySet();
for (Map.Entry<String, Map<Integer, DataSourceProperties>> masterSlave : entries) {
String masterSlaveKey = masterSlave.getKey();
Map<Integer, DataSourceProperties> masterSlaveValue = masterSlave.getValue();
for (Map.Entry<Integer, DataSourceProperties> propertiesEntry : masterSlaveValue.entrySet()) {
// 設(shè)置數(shù)據(jù)源的 key
String shardingDataBaseKey = dataBaseName + masterSlaveKey + propertiesEntry.getKey();
dataSource = createDataSource(propertiesEntry.getValue(), HikariDataSource.class);
// 設(shè)置數(shù)據(jù)源
targetDataSources.put(shardingDataBaseKey, dataSource);
}
}
}
Preconditions.checkArgument(MapUtils.isNotEmpty(targetDataSources), "找不到database配置");
Preconditions.checkNotNull(dataSource, "找不到database配置");
AbstractShardingStrategyWithDataBase.setShardingDataSourceInfoMap(shardingDataSourceInfoMap);
ShardingDataSource shardingDataSource = new ShardingDataSource();
shardingDataSource.setTargetDataSources(targetDataSources);
/**
* 用于創(chuàng)建LazyConnectionDataSourceProxy時(shí)獲取真實(shí)數(shù)據(jù)庫(kù)連接,來(lái)獲取實(shí)際數(shù)據(jù)庫(kù)的自動(dòng)提交配置和隔離級(jí)別
*/
shardingDataSource.setDefaultTargetDataSource(dataSource);
shardingDataSource.setLenientFallback(false);
shardingDataSource.afterPropertiesSet();
return shardingDataSource;
}
@Bean
public DataSource dataSource() {
LazyConnectionDataSourceProxy dataSourceProxy = new LazyConnectionDataSourceProxy();
dataSourceProxy.setTargetDataSource(shardingDataSource());
return dataSourceProxy;
}
@SuppressWarnings("unchecked")
private <T> T createDataSource(DataSourceProperties properties,
Class<? extends DataSource> type) {
return (T) properties.initializeDataSourceBuilder().type(type).build();
}
private ShardingDataBaseStrategy createShardingDataBaseStrategy(String shardingDataBaseStrategyClassName) {
try {
return (ShardingDataBaseStrategy) Class.forName(shardingDataBaseStrategyClassName).newInstance();
} catch (Exception e) {
throw new RuntimeException("初始化ShardingDataBaseStrategy失敗。ShardingDataBaseStrategy=" + shardingDataBaseStrategyClassName);
}
}
/**
* 根據(jù)分庫(kù)上下文路由DataSource
*
* @author kris
*/
public static class ShardingDataSource extends AbstractRoutingDataSource {
/**
* ShardingContext.getShardingDatabase() 為庫(kù)名+分庫(kù)序號(hào)
*
* 實(shí)際上怎么拿數(shù)據(jù)源進(jìn)行連接時(shí),就是根據(jù)這里的 key 來(lái)決定的。實(shí)際上整個(gè)流程是,這邊的類先加載裝配給 spring,當(dāng)用戶
* 執(zhí)行 sql 被 mybatis 攔截器攔截時(shí),會(huì)設(shè)置數(shù)據(jù)庫(kù)的 key,然后執(zhí)行完攔截器。后面執(zhí)行到這里拿出數(shù)據(jù)庫(kù)的 key 來(lái)決定
* 連接哪個(gè)庫(kù),最后再執(zhí)行 sql。
*
* 也就是 spring 先把接口定義好暴露給我們,它的流程已經(jīng)進(jìn)行了方法調(diào)用,后續(xù)我們只要實(shí)現(xiàn)接口就能實(shí)現(xiàn)真正的功能調(diào)用,有點(diǎn)類似于
* 模板方法模式
* @return
*/
@Override
protected Object determineCurrentLookupKey() {
return ShardingContext.getShardingDatabase();
}
}
}
最后定義好 mybatis 的攔截器,這邊主要替換 sql 語(yǔ)句,將表替換成需要操作的表。設(shè)置數(shù)據(jù)源 key,以便后續(xù)執(zhí)行 sql 時(shí)選擇對(duì)應(yīng)的連接。
package cn.blogxin.sharding.plugin.interceptor;
import cn.blogxin.sharding.plugin.Sharding;
import cn.blogxin.sharding.plugin.ShardingContext;
import cn.blogxin.sharding.plugin.ShardingTableConfiguration;
import cn.blogxin.sharding.plugin.strategy.DefaultShardingStrategyWithDataBase;
import cn.blogxin.sharding.plugin.strategy.ShardingTableStrategy;
import com.google.common.collect.Maps;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.Map;
import java.util.Properties;
/**
* mybatis分表插件
*
* @see ShardingTableConfiguration
*/
@Intercepts({@Signature(method = "prepare", type = StatementHandler.class, args = {Connection.class, Integer.class})})
public class ShardingInterceptor implements Interceptor {
private final static Logger logger = LoggerFactory.getLogger(ShardingInterceptor.class);
private static final String DELEGATE_BOUND_SQL_SQL = "delegate.boundSql.sql";
private static final String DELEGATE_MAPPED_STATEMENT_ID = "delegate.mappedStatement.id";
private static final String DELEGATE_PARAMETER_HANDLER_PARAMETER_OBJECT = "delegate.parameterHandler.parameterObject";
private static final String PARAM_1 = "param1";
private static final String POINT = ".";
private static final ShardingTableStrategy DEFAULT_SHARDING_STRATEGY = new DefaultShardingStrategyWithDataBase();
private static final Map<String, ShardingTableStrategy> SHARDING_STRATEGY_MAP = Maps.newConcurrentMap();
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) realTarget(invocation.getTarget());
MetaObject metaObject = SystemMetaObject.forObject(statementHandler);
String id = (String) metaObject.getValue(DELEGATE_MAPPED_STATEMENT_ID);
String className = id.substring(0, id.lastIndexOf(POINT));
Sharding sharding = Class.forName(className).getDeclaredAnnotation(Sharding.class);
if (sharding != null && sharding.sharding()) {
String sql = (String) metaObject.getValue(DELEGATE_BOUND_SQL_SQL);
sql = sql.replaceAll(sharding.tableName(), getTargetTableName(metaObject, sharding));
metaObject.setValue(DELEGATE_BOUND_SQL_SQL, sql);
}
return invocation.proceed();
}
private String getTargetTableName(MetaObject metaObject, Sharding sharding) throws Exception {
String shardingKey = getShardingKey(metaObject);
String targetTableName;
if (!StringUtils.isEmpty(shardingKey)) {
targetTableName = getShardingStrategy(sharding).getTargetTableName(sharding, shardingKey);
} else if (StringUtils.isEmpty(shardingKey) && !StringUtils.isEmpty(ShardingContext.getShardingTable())) {
targetTableName = DEFAULT_SHARDING_STRATEGY.getTargetTableName(sharding, ShardingContext.getShardingTable());
} else {
throw new RuntimeException("沒(méi)有找到分表信息。shardingKey=" + shardingKey + ",ShardingContext=" + ShardingContext.getShardingTable());
}
return targetTableName;
}
private ShardingTableStrategy getShardingStrategy(Sharding sharding) throws Exception {
String strategyClassName = sharding.strategy();
ShardingTableStrategy shardingStrategy = SHARDING_STRATEGY_MAP.get(strategyClassName);
if (shardingStrategy == null) {
ShardingTableStrategy strategy = (ShardingTableStrategy) Class.forName(strategyClassName).newInstance();
SHARDING_STRATEGY_MAP.putIfAbsent(strategyClassName, strategy);
shardingStrategy = SHARDING_STRATEGY_MAP.get(strategyClassName);
}
return shardingStrategy;
}
/**
* 默認(rèn)取第一個(gè)參數(shù)作為分表鍵
* @param metaObject
* @return
*/
private String getShardingKey(MetaObject metaObject) {
String shardingKey = null;
Object parameterObject = metaObject.getValue(DELEGATE_PARAMETER_HANDLER_PARAMETER_OBJECT);
if (parameterObject instanceof String) {
shardingKey = (String) parameterObject;
} else if (parameterObject instanceof Map) {
Map<String, Object> parameterMap = (Map<String, Object>) parameterObject;
Object param1 = parameterMap.get(PARAM_1);
if (param1 instanceof String) {
shardingKey = (String) param1;
}
}
return shardingKey;
}
@Override
public Object plugin(Object target) {
if (target instanceof StatementHandler) {
return Plugin.wrap(target, this);
}
return target;
}
@Override
public void setProperties(Properties properties) {
}
private Object realTarget(Object target) {
if (Proxy.isProxyClass(target.getClass())) {
MetaObject metaObject = SystemMetaObject.forObject(target);
return realTarget(metaObject.getValue("h.target"));
}
return target;
}
}
4、缺點(diǎn)
大家只是考慮了分庫(kù)分表,有沒(méi)有想過(guò)分庫(kù)分表后數(shù)據(jù)的遷移工作。一種策略是新數(shù)據(jù)庫(kù)直接走分庫(kù)分表,老數(shù)據(jù)還是走原來(lái)的邏輯,然后跑 job 把老數(shù)據(jù)放入新數(shù)據(jù)中,直到老數(shù)據(jù)跑完,那么原來(lái)代碼中的強(qiáng)制邏輯就可以去掉。
還有一點(diǎn)是最煩的,你的數(shù)據(jù)爆炸了,你又得增加庫(kù)跟表,但是你根據(jù) hash 來(lái)算的,你原來(lái)的數(shù)據(jù)要重新遷移,是不是很崩潰?。?!所以,一般一次性考慮好,后面的話直接歸檔算了,別折騰太多。就算你上一致性 hash 算法,還是得遷移數(shù)據(jù),數(shù)據(jù)多少的問(wèn)題。
還有,生產(chǎn)上除非你有十足的把我,否則還是乖乖用 sharding-jdbc。因?yàn)槌嘶镜母鶕?jù) key 進(jìn)行插入、刪除、查詢的單個(gè)數(shù)據(jù)情況,還有批量插入、查詢、根據(jù)時(shí)間查詢各種,要做的工作量不小。