Mybatis 插件之分庫(kù)分表

1、前言

之前我們自定義了 mybatis 的插件,實(shí)際上 mybatis 的插件功能相當(dāng)簡(jiǎn)單,mybatis 插件注入是在 sqlSessionFactory.openSession 的時(shí)候,具體可以參照源碼。既然他的插件這么好用,我們可以參照使用它實(shí)現(xiàn)分庫(kù)分表的功能。

2、設(shè)計(jì)

分庫(kù)分表場(chǎng)景中,將 MyBatis Spring 集成使用,選擇具體分庫(kù)的功能并不是直接 MyBatis 中完成的,而是在 Spring 中配置了多個(gè)數(shù)據(jù)源,并通過(guò) spring 攔截器實(shí)現(xiàn)的。具體的分表功能是通過(guò) MyBatis 中添加 個(gè)分表插件實(shí)現(xiàn)的,在該插件中攔截 Executor update() 方法和 query() 方法,并根據(jù)用戶傳入的用戶 ID 計(jì)算分表的編號(hào)后綴。之后,該插件會(huì)將表名與編號(hào)后綴組合形成分表名稱,解析并修改 SQL 語(yǔ)句,最終得到可以在當(dāng)前分庫(kù)中直接執(zhí)行的 SQL 語(yǔ)句。

對(duì)于分庫(kù)分表的規(guī)則:
用戶所在數(shù)據(jù)庫(kù) ID = 用戶 ID(或者其他 key) % 數(shù)據(jù)庫(kù)數(shù)量
用戶所在數(shù)據(jù)表 ID = 用戶 ID / 數(shù)據(jù)庫(kù)數(shù)量 % 每個(gè)數(shù)據(jù)庫(kù)表數(shù)量(至于為啥是這樣算,不用計(jì)較太多,我看過(guò)各種版本的,也有直接 id % 表數(shù)量的,能唯一找到就行)

3、實(shí)現(xiàn)

首先定義好分庫(kù)策略、分表策略接口。并實(shí)現(xiàn)具體的分庫(kù)策略、分表策略。

package cn.blogxin.sharding.plugin.strategy.database;

/**
 * 分庫(kù)策略
 *
 */
public interface ShardingDataBaseStrategy {

    /**
     * 計(jì)算獲取對(duì)應(yīng)分庫(kù)序號(hào)
     *
     * @param sharingDataBaseCount    分庫(kù)數(shù)量
     * @param shardingKey             分表key
     * @return 分庫(kù)序號(hào)
     */
    Integer calculate(int sharingDataBaseCount, String shardingKey);
}

默認(rèn)分庫(kù)實(shí)現(xiàn):

package cn.blogxin.sharding.plugin.strategy.database;

/**
 * 默認(rèn)分庫(kù)策略,將分表從小到大均勻分配至各分庫(kù)中
 * 比如:
 * 2個(gè)庫(kù),10個(gè)表
 * 0-4表在0庫(kù),5-9表在1庫(kù)
 *
 */
public class DefaultShardingDataBaseStrategy implements ShardingDataBaseStrategy {

    @Override
    public Integer calculate(int sharingDataBaseCount, String shardingKey) {
        return Math.abs(shardingKey.hashCode() % sharingDataBaseCount);
    }
}

分表接口:

package cn.blogxin.sharding.plugin.strategy;

import cn.blogxin.sharding.plugin.Sharding;

/**
 * 分表策略
 *
 */
public interface ShardingTableStrategy {

    String UNDERLINE = "_";

    /**
     * 獲取分表位的實(shí)際表名
     *
     * @param sharding    Sharding信息
     * @param shardingKey 分庫(kù)分表 key
     * @return 帶分表位的實(shí)際表名
     */
    String getTargetTableName(Sharding sharding, String shardingKey);

    /**
     * 計(jì)算分表
     *
     * @param sharding      Sharding信息
     * @param shardingCount 庫(kù)數(shù)量
     * @param shardingKey   分庫(kù)分表 key
     * @return 計(jì)算分表
     */
    Integer calculateTableSuffix(Sharding sharding, Integer shardingCount, String shardingKey);
}

分表基類:

package cn.blogxin.sharding.plugin.strategy;

import cn.blogxin.sharding.plugin.Sharding;
import cn.blogxin.sharding.plugin.ShardingContext;
import cn.blogxin.sharding.plugin.bean.ShardingDataSourceInfo;
import com.google.common.collect.Maps;

import java.util.Map;

/**
 * 帶分庫(kù)的分表策略,使用分庫(kù)插件時(shí),分表插件必須繼承該類
 *
 */
public abstract class AbstractShardingStrategyWithDataBase implements ShardingTableStrategy {

    private static Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap = Maps.newHashMap();

    public static void setShardingDataSourceInfoMap(Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap) {
        AbstractShardingStrategyWithDataBase.shardingDataSourceInfoMap = shardingDataSourceInfoMap;
    }

    // 這邊做的不是特別好,分庫(kù)分表職責(zé)沒(méi)有分開(kāi)
    @Override
    public String getTargetTableName(Sharding sharding, String shardingKey) {
        // 確定庫(kù)名
        ShardingDataSourceInfo shardingDataSourceInfo = shardingDataSourceInfoMap.get(sharding.databaseName());
        if (shardingDataSourceInfo != null) {
            int databaseNum = shardingDataSourceInfo.getShardingDataBaseStrategy().calculate(shardingDataSourceInfo.getShardingCount(), shardingKey);

            // 設(shè)置上下文的數(shù)據(jù)庫(kù)
            ShardingContext.setShardingDatabase(sharding.databaseName() + ShardingContext.getMasterSalve() + databaseNum);
        }

        // 確定表名
        Integer tableSuffix = calculateTableSuffix(sharding, shardingDataSourceInfo.getShardingCount(), shardingKey);
        return getTableName(sharding.tableName(), tableSuffix);
    }

    private String getTableName(String tableName, Integer shardingKey) {
        return tableName + UNDERLINE + shardingKey;
    }

}

分表默認(rèn)實(shí)現(xiàn):

package cn.blogxin.sharding.plugin.strategy;

import cn.blogxin.sharding.plugin.Sharding;

/**
 *  hash 分表策略,key / databaseCount % tableCount
 *
 */
public class HashShardingStrategyWithDataBase extends AbstractShardingStrategyWithDataBase {

    @Override
    public Integer calculateTableSuffix(Sharding sharding, Integer shardingCount, String shardingKey) {
        return Math.abs(shardingKey.hashCode()) / shardingCount % sharding.count();
    }

}

然后設(shè)置好 spring 的動(dòng)態(tài)數(shù)據(jù)源,key 是什么,value 為數(shù)據(jù)源,后續(xù)進(jìn)行 mybatis 的自定義攔截器查詢時(shí),會(huì)將數(shù)據(jù)源 key 設(shè)置到上下文,然后根據(jù) key 獲取相應(yīng)的數(shù)據(jù)源、獲取連接執(zhí)行 sql 操作。

package cn.blogxin.sharding.plugin;

import cn.blogxin.sharding.plugin.bean.Database;
import cn.blogxin.sharding.plugin.bean.ShardingDataSourceInfo;
import cn.blogxin.sharding.plugin.strategy.AbstractShardingStrategyWithDataBase;
import cn.blogxin.sharding.plugin.strategy.database.ShardingDataBaseStrategy;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.collections.MapUtils;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Set;

/**
 * 加載分庫(kù)分表插件
 *
 */
@Configuration
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
@ConditionalOnProperty(name = "sharding.databases", havingValue = "enable")
@EnableConfigurationProperties(ShardingProperties.class)
public class ShardingDataSourceConfiguration {

    @Resource
    private ShardingProperties shardingProperties;

    private DataSource shardingDataSource() {
        Map<String, Database> databases = shardingProperties.getDatabases();
        Preconditions.checkArgument(!CollectionUtils.isEmpty(databases), "不存在分庫(kù)配置");

        Map<String, ShardingDataSourceInfo> shardingDataSourceInfoMap = Maps.newHashMap();
        Map<Object, Object> targetDataSources = Maps.newHashMap();
        DataSource dataSource = null;

        for (Map.Entry<String, Database> entry : databases.entrySet()) {
            String dataBaseName = entry.getKey(); // 你說(shuō)你用啥獲取不好,非要在配置文件的 key 中獲取,容易造成疑惑
            Database database = entry.getValue();

            ShardingDataSourceInfo shardingDataSourceInfo = new ShardingDataSourceInfo();
            shardingDataSourceInfo.setShardingCount(database.getShardingCount());
            shardingDataSourceInfo.setShardingDataBaseStrategy(createShardingDataBaseStrategy(database.getShardingStrategy()));
            shardingDataSourceInfoMap.put(dataBaseName, shardingDataSourceInfo);

            // 每個(gè)庫(kù)對(duì)應(yīng)的數(shù)據(jù)源
            Set<Map.Entry<String, Map<Integer, DataSourceProperties>>> entries = database.getDataSource().entrySet();
            for (Map.Entry<String, Map<Integer, DataSourceProperties>> masterSlave : entries) {
                String masterSlaveKey = masterSlave.getKey();
                Map<Integer, DataSourceProperties> masterSlaveValue = masterSlave.getValue();
                for (Map.Entry<Integer, DataSourceProperties> propertiesEntry : masterSlaveValue.entrySet()) {
                    // 設(shè)置數(shù)據(jù)源的 key
                    String shardingDataBaseKey = dataBaseName + masterSlaveKey + propertiesEntry.getKey();
                    dataSource = createDataSource(propertiesEntry.getValue(), HikariDataSource.class);
                    
                    // 設(shè)置數(shù)據(jù)源
                    targetDataSources.put(shardingDataBaseKey, dataSource);
                }
            }
        }

        Preconditions.checkArgument(MapUtils.isNotEmpty(targetDataSources), "找不到database配置");
        Preconditions.checkNotNull(dataSource, "找不到database配置");

        AbstractShardingStrategyWithDataBase.setShardingDataSourceInfoMap(shardingDataSourceInfoMap);

        ShardingDataSource shardingDataSource = new ShardingDataSource();
        shardingDataSource.setTargetDataSources(targetDataSources);
        /**
         * 用于創(chuàng)建LazyConnectionDataSourceProxy時(shí)獲取真實(shí)數(shù)據(jù)庫(kù)連接,來(lái)獲取實(shí)際數(shù)據(jù)庫(kù)的自動(dòng)提交配置和隔離級(jí)別
         */
        shardingDataSource.setDefaultTargetDataSource(dataSource);
        shardingDataSource.setLenientFallback(false);
        shardingDataSource.afterPropertiesSet();
        return shardingDataSource;
    }

    @Bean
    public DataSource dataSource() {
        LazyConnectionDataSourceProxy dataSourceProxy = new LazyConnectionDataSourceProxy();
        dataSourceProxy.setTargetDataSource(shardingDataSource());
        return dataSourceProxy;
    }

    @SuppressWarnings("unchecked")
    private <T> T createDataSource(DataSourceProperties properties,
                                   Class<? extends DataSource> type) {
        return (T) properties.initializeDataSourceBuilder().type(type).build();
    }

    private ShardingDataBaseStrategy createShardingDataBaseStrategy(String shardingDataBaseStrategyClassName) {
        try {
            return (ShardingDataBaseStrategy) Class.forName(shardingDataBaseStrategyClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("初始化ShardingDataBaseStrategy失敗。ShardingDataBaseStrategy=" + shardingDataBaseStrategyClassName);
        }
    }

    /**
     * 根據(jù)分庫(kù)上下文路由DataSource
     *
     * @author kris
     */
    public static class ShardingDataSource extends AbstractRoutingDataSource {

        /**
         * ShardingContext.getShardingDatabase() 為庫(kù)名+分庫(kù)序號(hào)
         *
         * 實(shí)際上怎么拿數(shù)據(jù)源進(jìn)行連接時(shí),就是根據(jù)這里的 key 來(lái)決定的。實(shí)際上整個(gè)流程是,這邊的類先加載裝配給 spring,當(dāng)用戶
         * 執(zhí)行 sql 被 mybatis 攔截器攔截時(shí),會(huì)設(shè)置數(shù)據(jù)庫(kù)的 key,然后執(zhí)行完攔截器。后面執(zhí)行到這里拿出數(shù)據(jù)庫(kù)的 key 來(lái)決定
         * 連接哪個(gè)庫(kù),最后再執(zhí)行 sql。
         *
         * 也就是 spring 先把接口定義好暴露給我們,它的流程已經(jīng)進(jìn)行了方法調(diào)用,后續(xù)我們只要實(shí)現(xiàn)接口就能實(shí)現(xiàn)真正的功能調(diào)用,有點(diǎn)類似于
         * 模板方法模式
         * @return
         */
        @Override
        protected Object determineCurrentLookupKey() {
            return ShardingContext.getShardingDatabase();
        }
    }
}

最后定義好 mybatis 的攔截器,這邊主要替換 sql 語(yǔ)句,將表替換成需要操作的表。設(shè)置數(shù)據(jù)源 key,以便后續(xù)執(zhí)行 sql 時(shí)選擇對(duì)應(yīng)的連接。

package cn.blogxin.sharding.plugin.interceptor;

import cn.blogxin.sharding.plugin.Sharding;
import cn.blogxin.sharding.plugin.ShardingContext;
import cn.blogxin.sharding.plugin.ShardingTableConfiguration;
import cn.blogxin.sharding.plugin.strategy.DefaultShardingStrategyWithDataBase;
import cn.blogxin.sharding.plugin.strategy.ShardingTableStrategy;
import com.google.common.collect.Maps;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.Map;
import java.util.Properties;

/**
 * mybatis分表插件
 *
 * @see ShardingTableConfiguration
 */
@Intercepts({@Signature(method = "prepare", type = StatementHandler.class, args = {Connection.class, Integer.class})})
public class ShardingInterceptor implements Interceptor {

    private final static Logger logger = LoggerFactory.getLogger(ShardingInterceptor.class);

    private static final String DELEGATE_BOUND_SQL_SQL = "delegate.boundSql.sql";
    private static final String DELEGATE_MAPPED_STATEMENT_ID = "delegate.mappedStatement.id";
    private static final String DELEGATE_PARAMETER_HANDLER_PARAMETER_OBJECT = "delegate.parameterHandler.parameterObject";
    private static final String PARAM_1 = "param1";
    private static final String POINT = ".";

    private static final ShardingTableStrategy DEFAULT_SHARDING_STRATEGY = new DefaultShardingStrategyWithDataBase();
    private static final Map<String, ShardingTableStrategy> SHARDING_STRATEGY_MAP = Maps.newConcurrentMap();

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        StatementHandler statementHandler = (StatementHandler) realTarget(invocation.getTarget());
        MetaObject metaObject = SystemMetaObject.forObject(statementHandler);

        String id = (String) metaObject.getValue(DELEGATE_MAPPED_STATEMENT_ID);
        String className = id.substring(0, id.lastIndexOf(POINT));
        Sharding sharding = Class.forName(className).getDeclaredAnnotation(Sharding.class);
        if (sharding != null && sharding.sharding()) {
            String sql = (String) metaObject.getValue(DELEGATE_BOUND_SQL_SQL);
            sql = sql.replaceAll(sharding.tableName(), getTargetTableName(metaObject, sharding));
            metaObject.setValue(DELEGATE_BOUND_SQL_SQL, sql);
        }
        return invocation.proceed();
    }

    private String getTargetTableName(MetaObject metaObject, Sharding sharding) throws Exception {
        String shardingKey = getShardingKey(metaObject);
        String targetTableName;
        if (!StringUtils.isEmpty(shardingKey)) {
            targetTableName = getShardingStrategy(sharding).getTargetTableName(sharding, shardingKey);
        } else if (StringUtils.isEmpty(shardingKey) && !StringUtils.isEmpty(ShardingContext.getShardingTable())) {
            targetTableName = DEFAULT_SHARDING_STRATEGY.getTargetTableName(sharding, ShardingContext.getShardingTable());
        } else {
            throw new RuntimeException("沒(méi)有找到分表信息。shardingKey=" + shardingKey + ",ShardingContext=" + ShardingContext.getShardingTable());
        }
        return targetTableName;
    }

    private ShardingTableStrategy getShardingStrategy(Sharding sharding) throws Exception {
        String strategyClassName = sharding.strategy();
        ShardingTableStrategy shardingStrategy = SHARDING_STRATEGY_MAP.get(strategyClassName);
        if (shardingStrategy == null) {
            ShardingTableStrategy strategy = (ShardingTableStrategy) Class.forName(strategyClassName).newInstance();
            SHARDING_STRATEGY_MAP.putIfAbsent(strategyClassName, strategy);
            shardingStrategy = SHARDING_STRATEGY_MAP.get(strategyClassName);
        }
        return shardingStrategy;
    }

    /**
     * 默認(rèn)取第一個(gè)參數(shù)作為分表鍵
     * @param metaObject
     * @return
     */
    private String getShardingKey(MetaObject metaObject) {
        String shardingKey = null;
        Object parameterObject = metaObject.getValue(DELEGATE_PARAMETER_HANDLER_PARAMETER_OBJECT);
        if (parameterObject instanceof String) {
            shardingKey = (String) parameterObject;
        } else if (parameterObject instanceof Map) {
            Map<String, Object> parameterMap = (Map<String, Object>) parameterObject;
            Object param1 = parameterMap.get(PARAM_1);
            if (param1 instanceof String) {
                shardingKey = (String) param1;
            }
        }
        return shardingKey;
    }

    @Override
    public Object plugin(Object target) {
        if (target instanceof StatementHandler) {
            return Plugin.wrap(target, this);
        }
        return target;
    }

    @Override
    public void setProperties(Properties properties) {

    }

    private Object realTarget(Object target) {
        if (Proxy.isProxyClass(target.getClass())) {
            MetaObject metaObject = SystemMetaObject.forObject(target);
            return realTarget(metaObject.getValue("h.target"));
        }
        return target;
    }
}

4、缺點(diǎn)

大家只是考慮了分庫(kù)分表,有沒(méi)有想過(guò)分庫(kù)分表后數(shù)據(jù)的遷移工作。一種策略是新數(shù)據(jù)庫(kù)直接走分庫(kù)分表,老數(shù)據(jù)還是走原來(lái)的邏輯,然后跑 job 把老數(shù)據(jù)放入新數(shù)據(jù)中,直到老數(shù)據(jù)跑完,那么原來(lái)代碼中的強(qiáng)制邏輯就可以去掉。

還有一點(diǎn)是最煩的,你的數(shù)據(jù)爆炸了,你又得增加庫(kù)跟表,但是你根據(jù) hash 來(lái)算的,你原來(lái)的數(shù)據(jù)要重新遷移,是不是很崩潰?。?!所以,一般一次性考慮好,后面的話直接歸檔算了,別折騰太多。就算你上一致性 hash 算法,還是得遷移數(shù)據(jù),數(shù)據(jù)多少的問(wèn)題。

還有,生產(chǎn)上除非你有十足的把我,否則還是乖乖用 sharding-jdbc。因?yàn)槌嘶镜母鶕?jù) key 進(jìn)行插入、刪除、查詢的單個(gè)數(shù)據(jù)情況,還有批量插入、查詢、根據(jù)時(shí)間查詢各種,要做的工作量不小。

5、參考資料

http://blogxin.cn/2019/06/01/sharding-db/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容