新一代分布式事務(wù)解決方案-seata

自從分布式服務(wù)盛行江湖之后,分布式事務(wù)一直都是個熱門話題,網(wǎng)上也有很多其解決方案,如 TCC、基于可靠消息的最終一致性等等。但是大多都是偏于理論,缺乏實戰(zhàn)案例。最重要的是,這些方案都要求在應(yīng)用的業(yè)務(wù)層面把分布式事務(wù)技術(shù)約束考慮到設(shè)計中,通常每一個服務(wù)都需要設(shè)計實現(xiàn)正向和反向的冪等接口。這樣的設(shè)計約束,往往會導(dǎo)致很高的研發(fā)和維護成本。

于是乎,一個高效并且對業(yè)務(wù) 0 侵入的方案,呼之而來——seata (原名 Fescar) 阿里巴巴開源的分布式事務(wù)中間件。對于 seata 的介紹,請參考官方文檔,上面已經(jīng)介紹的很好了,而本文章要討論的就是 seata 的使用及其實現(xiàn)機制。強烈建議先看 seata 官方文檔,再看本篇文章,因為好多原理,文章中不會細聊。

seata-server

與 XA 的模型類似,seata 有 3 個組件來協(xié)調(diào)分布式事務(wù)的處理過程。


  • Tansaction Coordinator (TC): 事務(wù)協(xié)調(diào)器,維護全局事務(wù)的運行狀態(tài),負責(zé)協(xié)調(diào)并驅(qū)動全局事務(wù)的提交或回滾。
  • Transaction Manager (TM): 控制全局事務(wù)的邊界,負責(zé)開啟一個全局事務(wù),并最終發(fā)起全局提交或全局回滾的決議。
  • Resource Manager (RM): 控制分支事務(wù),負責(zé)分支注冊、狀態(tài)匯報,并接收事務(wù)協(xié)調(diào)器的指令,驅(qū)動分支(本地)事務(wù)的提交和回滾。

所以,必須先部署 TC,來協(xié)調(diào)整個事務(wù)。我們可以直接下載 seata-server 源碼:https://github.com/seata/seata,然后打開server模塊,修改其registry.conf文件,將 registry 下的 type 改為zk,默認為file,并且修改 zk 的 serverAddr為你自己的 zk 地址。
type 為注冊中心類型,它支持 file、nacos(也是阿里開源的服務(wù)注冊中心及配置中心)、zk、eureka 等,由于我們 dubbo 是用 zk 作為注冊中心的,所以這里為zk

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "zk"

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:1001/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "zk.tbj.com:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }

然后啟動 Server 類:

2019-06-12 20:31:19.255 INFO [main]org.I0Itec.zkclient.ZkClient.waitForKeeperState:936 -Waiting for keeper state SyncConnected
2019-06-12 20:31:19.266 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.logStartConnect:1025 -Opening socket connection to server zk.tbj.com/192.168.85.2:2181. Will not attempt to authenticate using SASL (unknown error)
2019-06-12 20:31:19.281 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.primeConnection:879 -Socket connection established to zk.tbj.com/192.168.85.2:2181, initiating session
2019-06-12 20:31:19.296 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.onConnected:1299 -Session establishment complete on server zk.tbj.com/192.168.85.2:2181, sessionid = 0x16ad4597ef21702, negotiated timeout = 6000
2019-06-12 20:31:19.299 INFO [main-EventThread]org.I0Itec.zkclient.ZkClient.processStateChanged:713 -zookeeper state changed (SyncConnected)

如果看到以上日志,說明 seata-server 即 TC 啟動完畢。其實 seata-server 就是一個 Netty 服務(wù)器,TM、RM 與 seata-server(TC) 之間使用 Netty 進行通信的。

seata 與 springboot 集成

在講實現(xiàn)機制之前,我們先通過一個 demo 來了解下 seata 是怎么用的。我們還是緊跟時代步伐,與當(dāng)下最流行的 Spring Boot 框架集成,Rpc 框架我們采用 dubbo。其實,seata 官網(wǎng)提供了多種集成方式,如 Spring,Spring Cloud 等等。具體看 seata-samples

我們直接看springboot模塊,同樣,我需要改其registry.conf文件,將 registry 下的 type 改為zk,默認為file,并且修改 zk 的 serverAddr為你自己的 zk 地址。 另外,需要修改application.yml文件中的數(shù)據(jù)庫連接參數(shù),和 dubbo 的參數(shù),改成你自己的參數(shù),如:

server:
    port: 9999
    servlet:
        context-path: /demo
spring:
    application:
        name: seata-springboot-app
    http:
        encoding:
        charset: UTF-8
        enabled: true
        force: true
    datasource:
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC
        username: root
        password: 123456
        poolPingConnectionsNotUsedFor: 60000
        removeAbandoned: true
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        minIdle: 1
        validationQuery: SELECT 1 FROM DUAL
        initialSize: 5
        maxWait: 60000
        poolPreparedStatements: false
        filters: stat,wall
        testOnBorrow: false
        testWhileIdle: true
        minEvictableIdleTimeMillis: 300000
        timeBetweenEvictionRunsMillis: 60000
        testOnReturn: false
        maxActive: 50
        druid:
            user: admin
            password: admin

    jpa: 
        hibernate:
            ddl-auto: none
        show-sql: true
    dubbo:
        server: true
        registry: zookeeper://zk.tbj.com:2181
        provider:
            port: 20999

我們看其數(shù)據(jù)庫配置類DruidConfiguration

/**
 * The type Druid configuration.
 */
@Configuration
public class DruidConfiguration {

    @Value("${spring.datasource.druid.user}")
    private String druidUser;

    @Value("${spring.datasource.druid.password}")
    private String druidPassword;

    /**
     * Druid data source druid data source.
     *
     * @return the druid data source
     */
    @Bean(destroyMethod = "close", initMethod = "init")
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    /**
     * Data source data source.
     *
     * @param druidDataSource the druid data source
     * @return the data source
     */
    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
        return dataSourceProxy;
    }

    /**
     * 注冊一個StatViewServlet
     *
     * @return servlet registration bean
     */
    @Bean
    public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
        ServletRegistrationBean<StatViewServlet> servletRegistrationBean = new ServletRegistrationBean<StatViewServlet>(
            new StatViewServlet(), "/druid/*");

        servletRegistrationBean.addInitParameter("loginUsername", druidUser);
        servletRegistrationBean.addInitParameter("loginPassword", druidPassword);
        servletRegistrationBean.addInitParameter("resetEnable", "false");
        return servletRegistrationBean;
    }

    /**
     * 注冊一個:filterRegistrationBean
     *
     * @return filter registration bean
     */
    @Bean
    public FilterRegistrationBean<WebStatFilter> druidStatFilter() {

        FilterRegistrationBean<WebStatFilter> filterRegistrationBean = new FilterRegistrationBean<WebStatFilter>(
            new WebStatFilter());

        // 添加過濾規(guī)則.
        filterRegistrationBean.addUrlPatterns("/*");

        // 添加不需要忽略的格式信息.
        filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
        return filterRegistrationBean;
    }
}

主要的是上面兩個配置,druidDataSource()dataSource(DruidDataSource druidDataSource,后面兩個是 Druid 管理平臺監(jiān)控配置。dataSource(DruidDataSource druidDataSource是把生成的druidDataSource對象使用DataSourceProxy進行代理。

再看 seata 配置類SeataConfiguration

/**
 * The type Fescar configuration.
 */
@Configuration
public class SeataConfiguration {

    @Value("${spring.application.name}")
    private String applicationId;

    /**
     * 注冊一個StatViewServlet
     *
     * @return global transaction scanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
            "my_test_tx_group");
        return globalTransactionScanner;
    }
}

這里配置了一個GlobalTransactionScanner,用來掃描@GlobalTransaction注解。

現(xiàn)在來簡單介紹下業(yè)務(wù),我們需要進行資產(chǎn)分配,會根據(jù)指定資產(chǎn) Id 修改資產(chǎn)計劃記錄的狀態(tài),然后調(diào)用assetService.increase()的 dubbo 服務(wù),對資產(chǎn)的金額進行 + 1操作。

接下來直接看關(guān)鍵代碼AssignServiceImpl

/**
 * The type Assign service.
 */
@Service
public class AssignServiceImpl implements AssignService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AssignServiceImpl.class);

    @Autowired
    private AssignRepository assignRepository;

    @Reference(check = false)
    private io.seata.samples.springboot.service.AssetService assetService;

    @Override
    @Transactional
    @GlobalTransactional
    public AssetAssign increaseAmount(String id) {
        LOGGER.info("Assign Service Begin ... xid: " + RootContext.getXID() + "\n");
        // 此處受 @Transactional 管理,故 seata 并未考慮它的事務(wù)
        AssetAssign assetAssign = assignRepository.findById(id).get();
        assetAssign.setStatus("2");
        assignRepository.save(assetAssign);

        // remote call asset service
        assetService.increase();
        return assetAssign;
    }

}

這里同時使用了@Transactional@GlobalTransactional兩個注解,第一個注解是開啟了對increaseAmount方法操作的本地事務(wù),后者開啟了 seata 管理的全局事務(wù)。需要注意的是,由于increaseAmount方法操作被@Transactional管理,故t_asset_assign表的更新操作,seata 不會去管理它,因為它自己會進行事務(wù)的回滾或提交。于是 seata 只需管理assetService.increase()這個遠程調(diào)用的分支事務(wù)。

遠程方法assetService.increase()

/**
 * The type Asset service.
 */
@Service(interfaceClass = AssetService.class, timeout = 10000)
@Component
public class AssetServiceImpl implements AssetService {

    /**
     * The constant LOGGER.
     */
    public static final Logger LOGGER = LoggerFactory.getLogger(AssetService.class);

    /**
     * The constant ASSET_ID.
     */
    public static final String ASSET_ID = "DF001";

    @Autowired
    private AssetRepository assetRepository;

    @Override
    public int increase() {
        LOGGER.info("Asset Service Begin ... xid: " + RootContext.getXID() + "\n");
        Asset asset = assetRepository.findById(ASSET_ID).get();
        asset.setAmount(asset.getAmount().add(new BigDecimal("1")));
        assetRepository.save(asset);
        throw new RuntimeException("test exception for seata, your transaction should be rollbacked,asset=" + asset);
    }
}

這里Asset更新完之后,直接拋出RuntimeException模擬了事務(wù)回滾。

在執(zhí)行程序之前,我們先看數(shù)據(jù)庫中的數(shù)據(jù)t_asset_assign表:


t_asset表:

直接運行springboot模塊的啟動類SeataSpringbootApp,然后在瀏覽器上訪問:http://localhost:9999/demo/asset/assign,看輸出結(jié)果:

RuntimeException: test exception for seata, your transaction should be rollbacked,
asset=Asset{id='DF001', amount=2, voucherCode='e2d1c4512d554db9ae4a5f30cbc2e4b1'}

異常信息已經(jīng)顯示出來了,說明程序已經(jīng)運行到最后一行了。然后看數(shù)據(jù)庫的數(shù)據(jù)有沒有改變,t_asset_assign表:


t_asset表:

發(fā)現(xiàn)數(shù)據(jù)并沒有變化,說明分布式事務(wù)起作用了。到了這里,小伙伴們是不是很 exciting,只需要簡單的配置,seata 就讓程序像執(zhí)行本地事務(wù)一樣管理分布式事務(wù),最主要的是對業(yè)務(wù)代碼 0 侵入,這對喜歡偷懶的程序猿來說,簡直就是福音呀。

seata 實現(xiàn)機制

接下來我們就要步入本篇文章的主題了,那就是 seata 是如何實現(xiàn)分布式事務(wù)的?

我們發(fā)現(xiàn),我們就僅僅增加了seata 的@GlobalTransactional注解,就實現(xiàn)了分布式事務(wù)。其實 seata 增加了個攔截器來專門處理被@GlobalTransactional注解的方法,即GlobalTransactionalInterceptor,其分布式事務(wù)的執(zhí)行流程都在這里完成的:

/**
 * The type Global transactional interceptor.
 */
public class GlobalTransactionalInterceptor implements MethodInterceptor {

    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();

    private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
    private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
    private final FailureHandler failureHandler;

    /**
     * Instantiates a new Global transactional interceptor.
     *
     * @param failureHandler the failure handler
     */
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
        if (null == failureHandler) {
            failureHandler = DEFAULT_FAIL_HANDLER;
        }
        this.failureHandler = failureHandler;
    }

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }

    private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
        return globalLockTemplate.execute(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    return methodInvocation.proceed();
                } catch (Throwable e) {
                    if (e instanceof Exception) {
                        throw (Exception)e;
                    } else {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
    }

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                @Override
                public int timeout() {
                    return globalTrxAnno.timeoutMills();
                }

                @Override
                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

            }
        }
    }

    private <T extends Annotation> T getAnnotation(Method method, Class<T> clazz) {
        if (method == null) {
            return null;
        }
        return method.getAnnotation(clazz);
    }

    private String formatMethod(Method method) {
        String paramTypes = Arrays.stream(method.getParameterTypes())
                .map(Class::getName)
                .reduce((p1, p2) -> String.format("%s, %s", p1, p2))
                .orElse("");
        return method.getName() + "(" + paramTypes + ")";
    }
}

主要邏輯是在TransactionalTemplate#execute方法:

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        try {

            // 2. begin transaction
            try {
                triggerBeforeBegin();
                tx.begin(business.timeout(), business.name());
                triggerAfterBegin();
            } catch (TransactionException txe) {
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.BeginFailure);

            }
            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3. any business exception, rollback.
                try {
                    triggerBeforeRollback();
                    tx.rollback();
                    triggerAfterRollback();
                    // 3.1 Successfully rolled back
                    throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

                } catch (TransactionException txe) {
                    // 3.2 Failed to rollback
                    throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, ex);

                }

            }
            // 4. everything is fine, commit.
            try {
                triggerBeforeCommit();
                tx.commit();
                triggerAfterCommit();
            } catch (TransactionException txe) {
                // 4.1 Failed to commit
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.CommitFailure);
            }

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

大致的執(zhí)行流程為:

  1. 獲取全局事務(wù)信息:先從ThreadLocal中獲取,如果沒有則創(chuàng)建一個DefaultGlobalTransaction。
  2. 開啟全局事務(wù)tx.begin(business.timeout(), business.name()):通過DefaultTransactionManager的 begin 方法開啟全局事務(wù)。DefaultTransactionManager負責(zé) TM 與 TC 通訊,發(fā)送begin、commit、rollback指令。TC 接收到 TM 發(fā)過來的 begin 指令后,會返回一個全局唯一的 XID 給 TM。
  3. 執(zhí)行業(yè)務(wù)代碼business.execute():在每個本地事務(wù)中,會生成分支事務(wù)標識 BranchId, 然后根據(jù)業(yè)務(wù) SQL 執(zhí)行前后的鏡像,生成 undoLog,并隨著業(yè)務(wù) SQL 一起提交。
  4. 全局事務(wù)回滾tx.rollback():當(dāng)業(yè)務(wù)代碼執(zhí)行過程中拋出任何異常,都會進行全局事務(wù)的回滾操作。根據(jù) XID 和 BranchId 查找 undoLog,然后反向生成業(yè)務(wù) SQL,接著執(zhí)行該 SQL,并且刪除 undoLog 記錄。
  5. 全局事務(wù)提交tx.commit():當(dāng)業(yè)務(wù)代碼執(zhí)行正常時,則會提交全局事務(wù)。分支事務(wù)此時已經(jīng)完成提交,只需要刪除 undoLog 即可。

文章最后,我們對 TC、TM、RM之間交互流程,在官方圖片的基礎(chǔ)上,做一些總結(jié):

至此,TransactionalTemplate#execute大致執(zhí)行流程已經(jīng)講完,由于篇幅問題,我沒有深入探討,后面我會專門出一篇文章來細講 TM 的工作流程,盡請期待。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容