SpringBoot整合atomikos實(shí)現(xiàn)跨庫(kù)事務(wù)

背景

框架之前完成了多數(shù)據(jù)源的動(dòng)態(tài)切換及事務(wù)的處理,想更近一步提供一個(gè)簡(jiǎn)單的跨庫(kù)事務(wù)處理功能,經(jīng)過(guò)網(wǎng)上的搜索調(diào)研,大致有XA事務(wù)/SEGA事務(wù)/TCC事務(wù)等方案,因?yàn)闃I(yè)務(wù)主要涉及政府及企業(yè)且并發(fā)量不大,所以采用XA事務(wù),雖然性能有所損失,但是可以保證數(shù)據(jù)的強(qiáng)一致性

方案設(shè)計(jì)

針對(duì)注冊(cè)的數(shù)據(jù)源拷貝一份用于XA事務(wù),使得本地事務(wù)和XA全局事務(wù)相互獨(dú)立可選擇的使用

[圖片上傳失敗...(image-137275-1654008832292)]

Maven配置

引入atomikos第三方組件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

注冊(cè)XA數(shù)據(jù)源

使用Druid連接池,需要使用DruidXADataSource數(shù)據(jù)源對(duì)象,再使用AtomikosDataSourceBean進(jìn)行包裝

注冊(cè)數(shù)據(jù)源時(shí)針對(duì)同一個(gè)連接注冊(cè)兩份,一份正常數(shù)據(jù)源,一份用于XA事務(wù)的數(shù)據(jù)源,數(shù)據(jù)源標(biāo)識(shí)區(qū)分并關(guān)聯(lián)

因?yàn)閟pring默認(rèn)注冊(cè)了XA事務(wù)管理器后,所有事務(wù)操作不再走本地事務(wù),我們通過(guò)切換不同的數(shù)據(jù)源決定走本地事務(wù)還是XA事務(wù)

 //主數(shù)據(jù)源xa模式
    @Bean
    @Qualifier("masterXADataSource")
    public DataSource masterXADataSource() {
        DruidXADataSource datasource = new DruidXADataSource();
        if(driverClassName.equals("com.mysql.cj.jdbc.Driver")){
            if(!dbUrl.contains("useOldAliasMetadataBehavior")){
                dbUrl += "&useOldAliasMetadataBehavior=true";
            }
            if(!dbUrl.contains("useAffectedRows")){
                dbUrl += "&useAffectedRows=true";
            }
        }
        datasource.setUrl(this.dbUrl);
        datasource.setUsername(username);
        datasource.setPassword(password);
        datasource.setDriverClassName(driverClassName);
        //configuration
        datasource.setInitialSize(1);
        datasource.setMinIdle(3);
        datasource.setMaxActive(20);
        datasource.setMaxWait(60000);
        datasource.setTimeBetweenEvictionRunsMillis(60000);
        datasource.setMinEvictableIdleTimeMillis(60000);
        datasource.setValidationQuery("select 'x'");
        datasource.setTestWhileIdle(true);
        datasource.setTestOnBorrow(false);
        datasource.setTestOnReturn(false);
        datasource.setPoolPreparedStatements(true);
        datasource.setMaxPoolPreparedStatementPerConnectionSize(20);
        datasource.setLogAbandoned(false); //移除泄露連接發(fā)生是是否記錄日志
        try {
            datasource.setFilters("stat,slf4j");
        } catch (SQLException e) {
            logger.error("druid configuration initialization filter", e);
        }
        datasource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");//connectionProperties);

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setUniqueResourceName("master-xa");
        atomikosDataSourceBean.setXaDataSource(datasource);
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setMaxPoolSize(20);
        atomikosDataSourceBean.setTestQuery("select 1");
        return atomikosDataSourceBean;
    }

注冊(cè)XA事務(wù)管理器

使用spring內(nèi)置的JtaTransactionManager事務(wù)管理器對(duì)象,設(shè)置AllowCustomIsolationLevels為true,否則指定自定義事務(wù)隔離級(jí)別會(huì)報(bào)錯(cuò)

    //xa模式全局事務(wù)管理器
    @Bean(name = "jtaTransactionManager")
    public PlatformTransactionManager transactionManager() throws Throwable {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        JtaTransactionManager jtaTransactionManager =  new JtaTransactionManager(userTransaction, userTransactionManager);
        jtaTransactionManager.setAllowCustomIsolationLevels(true);
        return jtaTransactionManager;
    }

定義XA事務(wù)切面

自定義注解@GlobalTransactional并定義對(duì)應(yīng)切面,使用指定注解時(shí)在ThreadLocal變量值進(jìn)行標(biāo)識(shí),組合

@Transactional注解指定XA事務(wù)管理器,在切換數(shù)據(jù)原時(shí)判斷當(dāng)前是否在XA事物中,從而切換不同的數(shù)據(jù)源

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Transactional(rollbackFor = Exception.class,isolation = Isolation.READ_UNCOMMITTED,transactionManager = "jtaTransactionManager")
public @interface GlobalTransactional {
}
@Aspect
@Component
@Order(value = 99)
public class GlobalTransitionAspect {
    private static Logger logger = LoggerFactory.getLogger(GlobalTransitionAspect.class);
    @Autowired
    private DynamicDataSource dynamicDataSource;

    /**
     * 切面點(diǎn) 指定注解
     */
    @Pointcut("@annotation(com.code2roc.fastkernel.datasource.GlobalTransactional) " +
            "|| @within(com.code2roc.fastkernel.datasource.GlobalTransactional)")
    public void dataSourcePointCut() {

    }

    /**
     * 攔截方法指定為 dataSourcePointCut
     */
    @Around("dataSourcePointCut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        GlobalTransactional methodAnnotation = method.getAnnotation(GlobalTransactional.class);
        if (methodAnnotation != null) {
            DataSourceContextHolder.tagGlobal();
            logger.info("標(biāo)記全局事務(wù)");
        }
        try {
            return point.proceed();
        } finally {
            logger.info("清除全局事務(wù)");
            DataSourceContextHolder.clearGlobal();
        }
    }
}
public class DataSourceContextHolder {
    private static Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class);
    // 對(duì)當(dāng)前線程的操作-線程安全的
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
    private static final ThreadLocal<String> contextGlobalHolder = new ThreadLocal<String>();

    // 調(diào)用此方法,切換數(shù)據(jù)源
    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
        log.debug("已切換到數(shù)據(jù)源:{}", dataSource);
    }

    // 獲取數(shù)據(jù)源
    public static String getDataSource() {
        String value = contextHolder.get();
        if (StringUtil.isEmpty(value)) {
            value = "master";
        }
        if (!StringUtil.isEmpty(getGlobal())) {
            value = value + "-xa";
        }
        return value;
    }

    // 刪除數(shù)據(jù)源
    public static void clearDataSource() {
        contextHolder.remove();
        log.debug("已切換到主數(shù)據(jù)源");
    }

    //====================全局事務(wù)標(biāo)記================
    public static void tagGlobal() {
        contextGlobalHolder.set("1");
    }

    public static String getGlobal() {
        String value = contextGlobalHolder.get();
        return value;
    }

    public static void clearGlobal() {
        contextGlobalHolder.remove();
    }
    //===================================================
}

配置XA事務(wù)日志

通過(guò)在resource文件夾下創(chuàng)建transactions.properties文件可以指定XA事務(wù)日志的存儲(chǔ)路徑

com.atomikos.icatch.log_base_dir= tempfiles/transition/
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容