Mybatis-Plus集成Sharding-JDBC與Flyway實(shí)現(xiàn)多租戶分庫分表

背景

公司產(chǎn)品部收到了一些重要客戶的需求,他們希望能夠依賴獨(dú)立的數(shù)據(jù)庫存儲來支持他們的業(yè)務(wù)數(shù)據(jù)。與此同時(shí),仍有許多中小客戶,可以繼續(xù)使用公共庫以滿足其需求。技術(shù)實(shí)現(xiàn)方面,此前持久層框架使用的Mybatis-plus,部分業(yè)務(wù)場景使用到了Sharding-JDBC用于分表,另外,我們的數(shù)據(jù)庫版本控制工具使用的是Flyway。

方案說明

這里將方案進(jìn)行簡要說明,配置統(tǒng)一通過Nacos管理(有需要的可以自行定義租戶配置頁面)。

  • 1.首先多數(shù)據(jù)源管理使用Mybatis-Plus官方推薦的dynamic-datasource-spring-boot-starter組件,需要注意的是構(gòu)建動態(tài)多數(shù)據(jù)源時(shí),我們要把Sharding-JDBC數(shù)據(jù)源也納入管理。因?yàn)槲覀兊膸炖锩娈吘怪挥胁糠直碛玫搅薙harding-JDBC,這樣可以復(fù)用數(shù)據(jù)源。


    file
  • 2.其次,租戶與數(shù)據(jù)源之間在Nacos建立關(guān)系配置,確保根據(jù)租戶ID能夠路由到唯一的租戶數(shù)據(jù)源。我們需要自定義Sharding分片策略和多數(shù)據(jù)源切換邏輯,根據(jù)http請求傳入的租戶ID,設(shè)置正確的數(shù)據(jù)源。


    file
  • 3.動態(tài)數(shù)據(jù)源與Sharding數(shù)據(jù)源配置做為公共配置在Nacos維護(hù),在業(yè)務(wù)服務(wù)啟動時(shí),讀取公共配置初始化多數(shù)據(jù)源,并添加對公共多數(shù)據(jù)源配置的監(jiān)聽。當(dāng)配置變更時(shí),重新構(gòu)造Sharding數(shù)據(jù)源,并并更新動態(tài)多數(shù)據(jù)源。另外數(shù)據(jù)庫腳本通過自定義flyway配置執(zhí)行。


    file

技術(shù)實(shí)現(xiàn)

前提

需要在Nacos提前維護(hù)租戶與數(shù)據(jù)源關(guān)系配置。

不使用Sharding-JDBC場景

  • 1.引入相關(guān)組件依賴。
        <dependency>
            <groupId>com.alibaba.nacos</groupId>
            <artifactId>nacos-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.flywaydb</groupId>
            <artifactId>flyway-core</artifactId>
            <version>7.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.6</version>
        </dependency>
  • 2.關(guān)閉Flyway自動配置和配置多數(shù)據(jù)源。
spring:
  flyway:
    #關(guān)閉flyway自動配置,自定義實(shí)現(xiàn)
    enabled: false
  datasource:
    dynamic:
      #默認(rèn)數(shù)據(jù)源
      primary: ds0
      datasource:
        ds0:
          type: com.alibaba.druid.pool.DruidDataSource
          driverClassName: org.postgresql.Driver
          url: jdbc:postgresql://127.0.0.1:5432/ds0
          username: ds0
          password: ds0123
        ds1:
          type: com.alibaba.druid.pool.DruidDataSource
          driverClassName: org.postgresql.Driver
          url: jdbc:postgresql://127.0.0.1:5432/ds1
          username: ds1
          password: ds1123
  • 3.自定義實(shí)現(xiàn)Flyway配置類,對應(yīng)的flyway腳本目錄結(jié)構(gòu)見下圖,主庫和租戶庫SQL腳本獨(dú)立維護(hù)。
Java
@Slf4j
@Configuration
@EnableTransactionManagement
public class FlywayConfig {
    @Value("${spring.application.name}")
    private String appName;
    @Autowired
    private DataSource dataSource;

    @Bean
    public void migrate() {
        log.info("flyway開始逐數(shù)據(jù)源執(zhí)行腳本");
        DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
        Map<String, DataSource> dataSources = ds.getDataSources();
        dataSources.forEach((k, v) -> {
            if (!"sharding".equals(k)) {
                            // Flyway相關(guān)參數(shù)建議通過配置管理,以下代碼僅供參考
                Flyway flyway = Flyway.configure()
                        .dataSource(v)
                        .table("t_" + k + "_" + appName + "_version")
                        .baselineOnMigrate(true)
                        .outOfOrder(true)
                        .baselineVersion("1.0.0")
                        .baselineDescription(k + "初始化")
                        .locations(CommonConstant.SQL_BASE_LOCATION + (CommonConstant.DEFAULT_DS_NAME.equals(k) ? CommonConstant.MASTER_DB : CommonConstant.TENANT_DB))
                        .load();
                flyway.migrate();
                log.info("flyway在 {} 數(shù)據(jù)源執(zhí)行腳本成功", k);
            }
        });
    }
}
file
  • 4.自定義實(shí)現(xiàn)數(shù)據(jù)源切換Filter類。
@Slf4j
@Component
@WebFilter(filterName = "dynamicDatasourceFilter", urlPatterns = {"/*"})
public class DynamicDatasourceFilter implements Filter {
    // 構(gòu)建演示用租戶與數(shù)據(jù)源關(guān)系配置
        private static Map<String, String> tenantDsMap = new HashMap<>();
    static {
        tenantDsMap.put("tenant123", "ds0");
        tenantDsMap.put("tenant456", "ds0");
                tenantDsMap.put("tenant789", "ds1");
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        // 從請求頭獲取租戶ID
        String tenantId = httpRequest.getHeader(CommonConstant.TENANT_HEADER);
        try {
            // 設(shè)置數(shù)據(jù)源
            if (tenantDsMap.get(tenantId) == null) {
                // 如果根據(jù)租戶ID未找到租戶數(shù)據(jù)源配置,默認(rèn)走主庫
                DynamicDataSourceContextHolder.push(CommonConstant.DEFAULT_DS_NAME);
            } else {
                //注意,如果是分片表,那么需要在分片表Service類或方法上加@DS("sharding")注解,最終由sharding的庫分片策略決定SQL在哪個(gè)庫執(zhí)行。而這里的設(shè)置將會被@DS注解配置覆蓋
                DynamicDataSourceContextHolder.push(tenantDsMap.get(tenantId));
            }
            // 執(zhí)行
            chain.doFilter(request, response);
        } catch (Exception e) {
            log.error("切換數(shù)據(jù)源失敗,tenantId={},請求接口uri={},異常原因:{}", tenantId, httpRequest.getRequestURI(), ExceptionUtils.getStackTrace(e));
        } finally {
            // 清空當(dāng)前線程數(shù)據(jù)源
            DynamicDataSourceContextHolder.poll();
        }
    }
file

使用Sharding-JDBC

如果微服務(wù)還需要使用Sharding分片,那么還需要引入sharding-jdbc組件依賴,并配置sharding數(shù)據(jù)源和分片規(guī)則。如果是多個(gè)服務(wù)共用數(shù)據(jù)庫,那么建議將Sharding數(shù)據(jù)源配置做為公共配置在Nacos管理,而Sharding分片規(guī)則則做為服務(wù)個(gè)性化配置單獨(dú)維護(hù)(分片規(guī)則基本不需要動態(tài)變更),這樣當(dāng)有新租戶需要申請開通獨(dú)立租戶庫的時(shí)候,直接變更Sharding數(shù)據(jù)源公共配置,服務(wù)在監(jiān)聽到公共配置變更后,即可重新構(gòu)建新的Sharding數(shù)據(jù)源實(shí)例和動態(tài)數(shù)據(jù)源更新,無需重啟服務(wù)。

  • 1.引入sharding-jdbc組件依賴。
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>4.1.1</version>
        </dependency>
  • 2.配置Sharding數(shù)據(jù)源和分片規(guī)則。
# sharding數(shù)據(jù)源配置
dataSources:
  ds0: !!com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/ds0
    username: ds0
    password: ds0123
  ds1: !!com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/ds1
    username: ds1
    password: ds1123
  ds2: !!com.alibaba.druid.pool.DruidDataSource
    driverClassName: org.postgresql.Driver
    url: jdbc:postgresql://127.0.0.1:5432/ds2
    username: ds2
    password: ds2123
# sharding分片規(guī)則配置
shardingRule:
  tables:
    t_order:
      actualDataNodes: ds$->{0..2}.t_order$->{0..1}
      tableStrategy:
        inline:
          shardingColumn: order_no
          algorithmExpression: t_order$->{order_no.toBigInteger() % 2}
  defaultDataSourceName: ds0
  # 默認(rèn)庫分片策略
  defaultDatabaseStrategy:
    standard:
      shardingColumn: tenant_id
            # 自定義精確分片策略
      preciseAlgorithmClassName: cn.xtstu.demo.config.CustomDataSourcePreciseShardingAlgorithm
    #hint:
            # 
    #  algorithmClassName: cn.xtstu.demo.config.CustomHintShardingAlgorithm
  defaultTableStrategy:
    none:
props:
  sql.show: true
  • 3.自定義精確分片策略。
public class CustomDataSourcePreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> {

    // 構(gòu)建演示用租戶與數(shù)據(jù)源關(guān)系配置
        private static Map<String, String> tenantDsMap = new HashMap<>();
    static {
        tenantDsMap.put("tenant123", "ds0");
        tenantDsMap.put("tenant456", "ds0");
                tenantDsMap.put("tenant789", "ds1");
    }
        
    @Override
    public String doSharding(Collection<String> dataSourceNames, PreciseShardingValue<String> shardingValue) {
            // 庫分片策略配置的分片鍵是字段tenant_id,根據(jù)分片鍵查詢配置的數(shù)據(jù)源
            String dsName = tenantDsMap.get(shardingValue.getValue());
        // 如果如前文所屬,Sharding子數(shù)據(jù)源key與dynamic數(shù)據(jù)源key保持一致的話,這里直接返回就行了
                return dsName;
        // TODO 需要處理未匹配到數(shù)據(jù)源的情況
    }
}
  • 4.自定義Hint分片策略(可選),適用于分片鍵與SQL無關(guān)的場景。
public class CustomHintShardingAlgorithm implements HintShardingAlgorithm<Integer> {

    // 構(gòu)建演示用租戶與數(shù)據(jù)源關(guān)系配置
        private static Map<String, String> tenantDsMap = new HashMap<>();
    static {
        tenantDsMap.put("tenant123", "ds0");
        tenantDsMap.put("tenant456", "ds0");
                tenantDsMap.put("tenant789", "ds1");
    }
        
    @Override
    public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Integer> hintShardingValue) {
        Collection<String> result = new ArrayList<>();
        // 從請求頭取到當(dāng)前租戶ID
                HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        result.add(tenantDsMap.get(request.getHeader("tenantId")));
                // TODO  需要處理未匹配到數(shù)據(jù)源的情況
        return result;
    }
}
  • 5.自定義動態(tài)數(shù)據(jù)源配置(核心就是將sharding數(shù)據(jù)源及其子數(shù)據(jù)源添加到動態(tài)數(shù)據(jù)源一起管理)。
@Slf4j
@Configuration
public class CustomDynamicDataSourceConfig {
    @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
    private String dataId;
    @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
    private String group;
    @Resource
    private DynamicDataSourceProperties properties;
    @Resource
    private NacosHelper nacosHelper;

    /**
     * 啟動時(shí)通過查詢Nacos上sharding數(shù)據(jù)源及分片規(guī)則yaml配置初始化sharding-jdbc數(shù)據(jù)源
     *
     * @return
     */
    @Bean
    public ShardingDataSource shardingDataSource() {
        ConfigService configService = nacosHelper.getConfigService();
        if (configService == null) {
            log.error("連接nacos失敗");
        }
        String configInfo = null;
        try {
            configInfo = configService.getConfig(dataId, group, 5000);
        } catch (NacosException e) {
            log.error("獲取{}配置失敗,異常原因:{}", dataId, ExceptionUtils.getStackTrace(e));
        }
        if (StringUtils.isBlank(configInfo)) {
            log.error("{}配置為空,啟動失敗", dataId);
            throw new NullPointerException(dataId + "配置為空");
        }
        try {
            // 通過工廠類和yaml配置創(chuàng)建Sharding數(shù)據(jù)源
            return (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            log.error("創(chuàng)建sharding-jdbc數(shù)據(jù)源異常:{}", ExceptionUtils.getStackTrace(e));
            throw new NullPointerException("sharding-jdbc數(shù)據(jù)源為空");
        }
    }

    /**
     * 將動態(tài)數(shù)據(jù)源設(shè)置為首選的
     * 當(dāng)spring存在多個(gè)數(shù)據(jù)源時(shí), 自動注入的是首選的對象
     * 設(shè)置為主要的數(shù)據(jù)源之后,就可以支持shardingJdbc原生的配置方式了
     */
    @Primary
    @Bean
    public DataSource dataSource() {
        DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
        dataSource.setPrimary(properties.getPrimary());
        dataSource.setStrict(properties.getStrict());
        dataSource.setStrategy(properties.getStrategy());
        dataSource.setP6spy(properties.getP6spy());
        dataSource.setSeata(properties.getSeata());
        return dataSource;
    }

    /**
     * 初始化動態(tài)數(shù)據(jù)源
     *
     * @return
     */
    @Bean
    public DynamicDataSourceProvider dynamicDataSourceProvider(ShardingDataSource shardingDataSource) {
        return new AbstractDataSourceProvider() {
            @Override
            public Map<String, DataSource> loadDataSources() {
                Map<String, DataSource> dataSourceMap = new HashMap<>();
                // 將sharding數(shù)據(jù)源整體添加到動態(tài)數(shù)據(jù)源里
                dataSourceMap.put(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
                // 同時(shí)把sharding內(nèi)部管理的子數(shù)據(jù)源也添加到動態(tài)數(shù)據(jù)源里
                Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
                dataSourceMap.putAll(shardingInnerDataSources);
                return dataSourceMap;
            }
        };
    }
}
  • 6.最后給出一份通過監(jiān)聽Nacos配置變更動態(tài)更新數(shù)據(jù)源的示例代碼。注意:這份示例代碼中只給出了Sharding配置變更時(shí)的處理邏輯,如果是dynamic數(shù)據(jù)源配置的話,有需要的可以參考著自行實(shí)現(xiàn)。
@Slf4j
@Configuration
public class NacosShardingConfigListener {
    @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
    private String dataId;
    @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
    private String group;
    @Value("${spring.application.name}")
    private String appName;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private NacosHelper nacosHelper;

    @PostConstruct
    public void shardingConfigListener() throws Exception {
        ConfigService configService = nacosHelper.getConfigService();
        if (configService == null) {
            return;
        }
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                log.info("configInfo:\n{}", configInfo);
                if (StringUtils.isBlank(configInfo)) {
                    log.warn("sharding-jdbc配置為空,不會刷新數(shù)據(jù)源");
                    return;
                }
                try {
                    if (StringUtils.isNotBlank(configInfo)) {
                        // 通過yaml配置創(chuàng)建sharding數(shù)據(jù)源(注意:如果分片規(guī)則是獨(dú)立配置文件,那么需要提前合并數(shù)據(jù)源和分片規(guī)則配置)
                        ShardingDataSource shardingDataSource = (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
                        Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
                        DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
                        // 遍歷sharding子數(shù)據(jù)源
                        for (String poolName : shardingInnerDataSources.keySet()) {
                            // TODO 這里還有個(gè)細(xì)節(jié),如果yaml配置刪減了數(shù)據(jù)源,對應(yīng)數(shù)據(jù)源應(yīng)該要從ds中remove掉,且主數(shù)據(jù)源不能被remove。另外其實(shí)只有新增的數(shù)據(jù)源才需要執(zhí)行flyway腳本
                            // 將sharding子數(shù)據(jù)源逐個(gè)添加到動態(tài)數(shù)據(jù)源
                            ds.addDataSource(poolName, shardingInnerDataSources.get(poolName));
                            // 通過代碼完成數(shù)據(jù)源Flyway配置,并執(zhí)行遷移操作
                                                        Flyway flyway = Flyway.configure()
                                    .dataSource(dataSource)
                                    .table("t_" + poolName + "_" + appName + "_version")
                                    .baselineOnMigrate(true)
                                    .outOfOrder(true)
                                    .baselineVersion("1.0.0")
                                    .baselineDescription(poolName + "初始化")
                                    .locations(CommonConstant.SQL_BASE_LOCATION + CommonConstant.TENANT_DB)
                                    .load();
                            flyway.migrate();
                        }
                        // 將sharding數(shù)據(jù)源自身也添加到動態(tài)數(shù)據(jù)源
                        ds.addDataSource(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
                        log.info("動態(tài)數(shù)據(jù)源刷新完成,現(xiàn)有數(shù)據(jù)源:{}", JSONUtil.toJsonStr(ds.getDataSources().keySet()));
                    }
                } catch (Exception e) {
                    log.error("創(chuàng)建sharding-jdbc數(shù)據(jù)源異常:{}", ExceptionUtils.getStackTrace(e));
                }
            }
        });
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容