深入理解 Seata:分布式事務(wù)的最佳實(shí)踐

在微服務(wù)架構(gòu)盛行的今天,分布式事務(wù)管理成為了一個(gè)必須面對(duì)的挑戰(zhàn)。傳統(tǒng)的單機(jī)事務(wù)模型在分布式環(huán)境下失效,如何保證多個(gè)服務(wù)間的數(shù)據(jù)一致性成為了開(kāi)發(fā)者的痛點(diǎn)。Apache Seata 作為一款開(kāi)源的分布式事務(wù)解決方案,為我們提供了優(yōu)雅的解決方案。本文將深入介紹 Seata 的核心原理、使用方法和最佳實(shí)踐。

一、分布式事務(wù)的挑戰(zhàn)

在微服務(wù)架構(gòu)中,一個(gè)業(yè)務(wù)操作可能涉及多個(gè)服務(wù)的調(diào)用,例如:

  1. 用戶下單 → 創(chuàng)建訂單(訂單服務(wù))

  2. 扣減庫(kù)存(庫(kù)存服務(wù))

  3. 扣減賬戶余額(賬戶服務(wù))

如果在這個(gè)過(guò)程中任何一個(gè)步驟失敗,如何保證所有操作的一致性?傳統(tǒng)的單機(jī)事務(wù)(ACID)無(wú)法解決這個(gè)問(wèn)題,因?yàn)樗鼈兛缭搅硕鄠€(gè)服務(wù)和數(shù)據(jù)庫(kù)。

二、什么是 Seata

2.1 Seata 簡(jiǎn)介

Seata 是阿里巴巴開(kāi)源的分布式事務(wù)解決方案,致力于提供高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。它支持多種事務(wù)模式,包括 AT、TCC、SAGA 和 XA,滿足不同場(chǎng)景的需求。

2.2 Seata 核心組件

Seata 定義了三個(gè)核心組件,三者協(xié)同工作實(shí)現(xiàn)分布式事務(wù)的協(xié)調(diào)與管控:

  • TC (Transaction Coordinator):事務(wù)協(xié)調(diào)器,維護(hù)全局事務(wù)的運(yùn)行狀態(tài),負(fù)責(zé)協(xié)調(diào)并驅(qū)動(dòng)全局事務(wù)的提交或回滾,是 Seata 分布式事務(wù)的核心中樞,需單獨(dú)部署為 Seata Server。

  • TM (Transaction Manager):事務(wù)管理器,定義全局事務(wù)的范圍,負(fù)責(zé)開(kāi)啟、提交或回滾全局事務(wù),嵌入在業(yè)務(wù)應(yīng)用中(如通過(guò)注解觸發(fā))。

  • RM (Resource Manager):資源管理器,管理分支事務(wù)的資源,與 TC 交互注冊(cè)分支事務(wù)并匯報(bào)分支事務(wù)的狀態(tài),嵌入在各微服務(wù)中,管理本地?cái)?shù)據(jù)庫(kù)等資源。

2.3 全局事務(wù) ID

Seata 使用全局唯一的 XID 來(lái)標(biāo)識(shí)一個(gè)全局事務(wù),XID 貫穿整個(gè)分布式事務(wù)的生命周期,用于關(guān)聯(lián)全局事務(wù)與各個(gè)分支事務(wù),確保事務(wù)追蹤的一致性。

三、Seata 事務(wù)模式

Seata 提供四種核心事務(wù)模式,可根據(jù)業(yè)務(wù)場(chǎng)景靈活選擇,覆蓋絕大多數(shù)分布式事務(wù)需求。

重要說(shuō)明:四種模式都需要使用 @GlobalTransactional 注解在發(fā)起方方法上開(kāi)啟全局事務(wù),Seata 才能協(xié)調(diào)各分支事務(wù)的提交或回滾。

數(shù)據(jù)可見(jiàn)性總結(jié)

  • AT:本地已提交,數(shù)據(jù)庫(kù)真實(shí)新數(shù)據(jù);僅應(yīng)用內(nèi)全局事務(wù)查詢隔離;外部普通事務(wù)可見(jiàn)中間臟數(shù)據(jù)(弱隔離、讀未提交級(jí)別)
  • TCC:本地提交凍結(jié)態(tài),不改真實(shí)業(yè)務(wù)數(shù)據(jù),外部可見(jiàn)凍結(jié)狀態(tài)
  • SAGA:本地提交真實(shí)數(shù)據(jù),完全對(duì)外可見(jiàn),無(wú)任何隔離
  • XA:本地不提交,數(shù)據(jù)庫(kù)未更新,全局未提交前所有外部都絕對(duì)看不見(jiàn)(強(qiáng)隔離)

模式差異總結(jié):Seata 的 4 種模式(AT、TCC、SAGA、XA)整體流程完全一樣,都是 TM → TC → RM 這套協(xié)作邏輯,區(qū)別只在 RM 怎么干活:

  • AT 模式
    提交:刪除對(duì)應(yīng)的 undo log
    回滾:根據(jù) undo log 進(jìn)行鏡像回滾
  • TCC 模式
    提交:執(zhí)行 Confirm
    回滾:執(zhí)行 Cancel
    整個(gè)流程圍繞 Try - Confirm - Cancel 三個(gè)階段完成。
  • SAGA 模式
    提交:按業(yè)務(wù)編排順序執(zhí)行正向服務(wù)
    回滾:通過(guò)補(bǔ)償服務(wù)反向執(zhí)行,實(shí)現(xiàn)最終一致性
  • XA 模式直接基于數(shù)據(jù)庫(kù)原生 XA 協(xié)議,由 RM 驅(qū)動(dòng)數(shù)據(jù)庫(kù)完成 XA 分支的提交與回滾。

但上層角色不變:

  • TM:依然是發(fā)起全局事務(wù)、管邊界;
  • TC:依然是協(xié)調(diào)、存狀態(tài)、統(tǒng)一下發(fā)提交 / 回滾;
  • RM:依然是注冊(cè)分支、執(zhí)行本地操作。

3.1 AT 模式(自動(dòng)補(bǔ)償)

適用場(chǎng)景:關(guān)系型數(shù)據(jù)庫(kù)(如 MySQL、Oracle),適用于大多數(shù)常規(guī)業(yè)務(wù)場(chǎng)景,對(duì)業(yè)務(wù)代碼侵入性極低。

核心原理

  1. 業(yè)務(wù)數(shù)據(jù)和回滾日志記錄在同一個(gè)本地事務(wù)中,確保本地操作的原子性;

  2. 提交階段:直接提交業(yè)務(wù)數(shù)據(jù),回滾日志保留用于異?;貪L;

  3. 回滾階段:根據(jù)回滾日志自動(dòng)執(zhí)行補(bǔ)償操作,恢復(fù)數(shù)據(jù)至事務(wù)前狀態(tài)。

優(yōu)勢(shì):使用簡(jiǎn)單,對(duì)業(yè)務(wù)無(wú)侵入,無(wú)需修改原有業(yè)務(wù)代碼,性能優(yōu)異。

3.2 TCC 模式(Try-Confirm-Cancel)

適用場(chǎng)景:非關(guān)系型數(shù)據(jù)庫(kù)(如 MongoDB、Redis)、特殊業(yè)務(wù)場(chǎng)景(如自定義資源管控),需要靈活控制事務(wù)流程。

核心原理

  1. Try:資源檢查和預(yù)留,確保業(yè)務(wù)操作所需資源可用,并鎖定資源;

  2. Confirm:確認(rèn)執(zhí)行業(yè)務(wù)操作,釋放鎖定的資源,完成最終數(shù)據(jù)修改;

  3. Cancel:取消執(zhí)行,釋放預(yù)留的資源,恢復(fù)數(shù)據(jù)至 Try 操作前狀態(tài)。

核心要點(diǎn):TCC 的核心就是「預(yù)留 / 凍結(jié)資源」,如果去掉這一步,它在機(jī)制上確實(shí)就和 Saga 基本一樣了。

實(shí)現(xiàn)方式

  • 每個(gè)服務(wù)實(shí)現(xiàn)三段式:每個(gè)參與分布式事務(wù)的服務(wù)都需要實(shí)現(xiàn) Try、Confirm、Cancel 三個(gè)方法
  • 業(yè)務(wù)代碼只調(diào)用 Try:應(yīng)用程序只需要調(diào)用 Try 方法進(jìn)行資源預(yù)留
  • Seata 自動(dòng)調(diào)度:Seata 根據(jù)全局事務(wù)的執(zhí)行狀態(tài),自動(dòng)調(diào)度 Confirm 或 Cancel 方法
    • 如果所有服務(wù)的 Try 都成功,Seata 會(huì)調(diào)用所有服務(wù)的 Confirm 方法
    • 如果任何一個(gè)服務(wù)的 Try 失敗,Seata 會(huì)調(diào)用所有服務(wù)的 Cancel 方法
    • 重要:只有當(dāng) Try 方法拋出異常時(shí)才會(huì)觸發(fā)回滾,返回 false 不會(huì)觸發(fā)回滾

優(yōu)勢(shì):靈活性高,可定制性強(qiáng),支持非關(guān)系型數(shù)據(jù)庫(kù),適配復(fù)雜業(yè)務(wù)場(chǎng)景。

3.3 SAGA 模式

適用場(chǎng)景:長(zhǎng)事務(wù)、業(yè)務(wù)流程復(fù)雜(如跨多個(gè)服務(wù)的鏈路操作)、容錯(cuò)要求高的場(chǎng)景。

核心原理:基于狀態(tài)機(jī)定義服務(wù)調(diào)用流程,支持正向服務(wù)和補(bǔ)償服務(wù),通過(guò)狀態(tài)流轉(zhuǎn)驅(qū)動(dòng)事務(wù)推進(jìn),若某一步失敗則執(zhí)行對(duì)應(yīng)補(bǔ)償服務(wù)回滾。

優(yōu)勢(shì):適合長(zhǎng)事務(wù),容錯(cuò)能力強(qiáng),可應(yīng)對(duì)服務(wù)間長(zhǎng)時(shí)間交互的場(chǎng)景。

3.4 XA 模式

適用場(chǎng)景:需要強(qiáng)一致性的場(chǎng)景(如金融、支付領(lǐng)域),依賴數(shù)據(jù)庫(kù)原生 XA 協(xié)議。

核心原理:基于數(shù)據(jù)庫(kù)原生 XA 協(xié)議,分為準(zhǔn)備階段(所有分支事務(wù)準(zhǔn)備就緒并匯報(bào)狀態(tài))和提交階段(TC 指令所有分支事務(wù)統(tǒng)一提交或回滾)。

?? 生產(chǎn)環(huán)境警告:Seata XA 模式能用,但生產(chǎn)環(huán)境極其不推薦,基本沒(méi)人用。

  • 并發(fā)性能差:事務(wù)鎖持有時(shí)間長(zhǎng),高并發(fā)下直接卡死;
  • MySQL 主從切換有數(shù)據(jù)不一致風(fēng)險(xiǎn):這個(gè)坑官方都沒(méi)徹底解決。

優(yōu)勢(shì):強(qiáng)一致性,可靠性高,完全遵循 ACID 特性,適配高一致性要求場(chǎng)景。

四、快速上手:環(huán)境搭建

Seata 環(huán)境搭建核心是部署 Seata Server(TC),并配置注冊(cè)中心、配置中心,最終集成業(yè)務(wù)客戶端,以下為基于 Nacos 注冊(cè)/配置中心的最簡(jiǎn)搭建流程。

4.1 步驟 1:安裝 Seata Server

下載并解壓 Seata Server:推薦使用 1.7.1 穩(wěn)定版本,下載地址可直接通過(guò) GitHub 官方鏈接獲取,執(zhí)行以下命令完成下載和解壓:

# 下載 Seata Server 1.7.1
wget https://github.com/seata/seata/releases/download/v1.7.1/seata-server-1.7.1.tar.gz

# 解壓壓縮包
tar -zxvf seata-server-1.7.1.tar.gz
# 進(jìn)入解壓后的目錄
cd seata-server-1.7.1

4.2 步驟 2:配置注冊(cè)中心

修改 Seata Server 目錄下的 registry.conf 文件,配置 Nacos 作為注冊(cè)中心,讓業(yè)務(wù)客戶端能夠發(fā)現(xiàn) TC 服務(wù):

registry {
  # 可選類型:file、nacos、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"  # Seata Server 在 Nacos 中的服務(wù)名
    serverAddr = "127.0.0.1:8848" # Nacos 服務(wù)地址(本地部署默認(rèn)地址)
    group = "SEATA_GROUP"         # 服務(wù)分組,默認(rèn) SEATA_GROUP
    namespace = ""                # Nacos 命名空間,默認(rèn)為 public
    cluster = "default"           # 集群名稱,默認(rèn) default
    username = "nacos"            # Nacos 登錄用戶名(默認(rèn) nacos)
    password = "nacos"            # Nacos 登錄密碼(默認(rèn) nacos)
  }
}

4.3 步驟 3:配置配置中心

Seata 配置需推送至 Nacos 配置中心,便于集群部署和動(dòng)態(tài)配置,步驟如下:

  1. 創(chuàng)建 config.txt 文件,配置核心參數(shù)(最簡(jiǎn)配置):
# 事務(wù)組映射配置,my_test_tx_group 為事務(wù)組名稱,default 為集群名稱
service.vgroupMapping.my_test_tx_group=default
# 存儲(chǔ)模式配置,file 表示使用文件存儲(chǔ)(開(kāi)發(fā)環(huán)境推薦),生產(chǎn)環(huán)境推薦使用 db 模式
store.mode=file
# 文件存儲(chǔ)目錄,用于存儲(chǔ)事務(wù)日志等數(shù)據(jù)
store.file.dir=file_store
  1. 下載 Nacos 配置推送腳本 nacos-config.sh,并執(zhí)行腳本將配置推送至 Nacos:
# 下載 nacos-config.sh 腳本(官方腳本地址)
wget https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh

# 執(zhí)行腳本,推送配置至 Nacos(需替換為實(shí)際 Nacos 地址和賬號(hào)密碼)
sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos

說(shuō)明:該腳本會(huì)讀取 config.txt 中的配置,自動(dòng)編碼并推送至 Nacos 配置中心,推送成功后會(huì)提示“Init nacos config finished”。

4.4 步驟 4:?jiǎn)?dòng) Seata Server

進(jìn)入 Seata Server 目錄,執(zhí)行以下命令啟動(dòng)服務(wù):

# 啟動(dòng) Seata Server(默認(rèn)端口 8091)
sh bin/seata-server.sh

啟動(dòng)成功后,可在 Nacos 控制臺(tái)的“服務(wù)列表”中看到 seata-server 服務(wù),說(shuō)明 TC 部署成功。

五、實(shí)戰(zhàn)示例:AT 模式

AT 模式是最常用的事務(wù)模式,對(duì)業(yè)務(wù)無(wú)侵入,以下以“下單-扣庫(kù)存-扣余額”場(chǎng)景為例,實(shí)現(xiàn) AT 模式分布式事務(wù)。

5.1 訂單服務(wù)(TM 發(fā)起全局事務(wù))

undo_log 表:

AT 模式需要在每個(gè)微服務(wù)的數(shù)據(jù)庫(kù)中創(chuàng)建 undo_log 表,用于存儲(chǔ)回滾日志。當(dāng)事務(wù)需要回滾時(shí),Seata 會(huì)根據(jù)這個(gè)表中的記錄自動(dòng)執(zhí)行補(bǔ)償操作。默認(rèn)情況下,Seata 使用 undo_log 作為回滾日志表名。如果需要自定義表名,可以在配置文件中進(jìn)行設(shè)置。

表結(jié)構(gòu)及字段說(shuō)明

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT, -- 主鍵ID,自增
  `branch_id` bigint(20) NOT NULL, -- 分支事務(wù)ID,由Seata Server分配給參與分布式事務(wù)的各個(gè)微服務(wù)的分支事務(wù)
  `xid` varchar(100) NOT NULL, -- 全局事務(wù)ID,貫穿整個(gè)分布式事務(wù)生命周期
  `context` varchar(128) NOT NULL, -- 上下文信息,存儲(chǔ)額外的業(yè)務(wù)相關(guān)數(shù)據(jù)
  `rollback_info` longblob NOT NULL, -- 回滾信息,存儲(chǔ)事務(wù)執(zhí)行前的數(shù)據(jù)鏡像,用于回滾操作
  `log_status` int(11) NOT NULL, -- 日志狀態(tài):0-正常,1-已刪除
  `log_created` datetime NOT NULL, -- 日志創(chuàng)建時(shí)間
  `log_modified` datetime NOT NULL, -- 日志修改時(shí)間
  PRIMARY KEY (`id`), -- 主鍵索引
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) -- 唯一索引,確保每個(gè)分支事務(wù)只有一條回滾日志
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='AT transaction mode undo table';

表的作用

  1. 存儲(chǔ)回滾數(shù)據(jù):記錄事務(wù)執(zhí)行前的數(shù)據(jù)狀態(tài),當(dāng)事務(wù)需要回滾時(shí),Seata 會(huì)根據(jù)這些數(shù)據(jù)恢復(fù)到事務(wù)前的狀態(tài)。

  2. 確保原子性:與業(yè)務(wù)操作在同一個(gè)本地事務(wù)中提交,確保回滾日志的寫入與業(yè)務(wù)操作的原子性。

  3. 支持分布式事務(wù)回滾:當(dāng)分布式事務(wù)中的任何一個(gè)分支事務(wù)失敗時(shí),TC 會(huì)指令所有分支事務(wù)執(zhí)行回滾,通過(guò) undo_log 表中的數(shù)據(jù)實(shí)現(xiàn)自動(dòng)補(bǔ)償。

  4. 事務(wù)追蹤:通過(guò) xid 和 branch_id 關(guān)聯(lián)全局事務(wù)和分支事務(wù),便于事務(wù)狀態(tài)的追蹤和管理。

  5. 數(shù)據(jù)一致性保障:確保在分布式環(huán)境下,即使部分服務(wù)失敗,也能通過(guò)回滾機(jī)制保證整個(gè)系統(tǒng)的數(shù)據(jù)一致性。

多服務(wù)共用數(shù)據(jù)庫(kù)的區(qū)分

當(dāng)多個(gè)微服務(wù)共用同一個(gè)數(shù)據(jù)庫(kù)時(shí),undo_log 表會(huì)存儲(chǔ)所有微服務(wù)的回滾日志。Seata 通過(guò)以下機(jī)制區(qū)分不同微服務(wù)的回滾日志:

  1. 全局事務(wù) XID:每個(gè)分布式事務(wù)都有一個(gè)全局唯一的 XID,所有參與該事務(wù)的微服務(wù)分支事務(wù)都會(huì)關(guān)聯(lián)到同一個(gè) XID。

  2. 分支事務(wù) Branch ID:每個(gè)微服務(wù)的分支事務(wù)都有一個(gè)唯一的 Branch ID,由 Seata Server 分配。Branch ID 與 XID 組合形成唯一標(biāo)識(shí),確保不同微服務(wù)的回滾日志不會(huì)混淆。

  3. 上下文信息 Contextcontext 字段可以存儲(chǔ)額外的上下文信息,例如微服務(wù)名稱、業(yè)務(wù)操作類型等,進(jìn)一步幫助區(qū)分不同微服務(wù)的回滾日志。

添加依賴

在 Spring Boot 項(xiàng)目中引入 Seata 客戶端依賴(需與 Seata Server 版本匹配):

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

配置文件

修改 application.yaml 配置,關(guān)聯(lián) Seata 服務(wù):

spring:
  cloud:
    seata:
      tx-service-group: my_test_tx_group # 事務(wù)組名稱,需與 config.txt 中配置一致
      registry:
        type: nacos # 注冊(cè)中心類型
        nacos:
          server-addr: 127.0.0.1:8848 # Nacos 地址
          group: SEATA_GROUP # 與 Seata Server 配置一致
          application: seata-server # Seata Server 服務(wù)名
      data-source-proxy:
        undo:
          log-table: undo_log # 自定義回滾日志表名

業(yè)務(wù)代碼

通過(guò) @GlobalTransactional 注解定義全局事務(wù)邊界,發(fā)起全局事務(wù):

重要說(shuō)明:在 Seata AT 模式下,@Transactional 注解在全局事務(wù)環(huán)境下基本等于無(wú)效。即使方法上加了該注解,每執(zhí)行一次數(shù)據(jù)庫(kù)操作依然會(huì)立即提交本地事務(wù),每條 SQL 都會(huì)產(chǎn)生獨(dú)立的提交日志并立刻提交事務(wù)。

原因就是:

  • Seata AT 模式下,本地事務(wù)默認(rèn)自動(dòng)提交,每條 SQL 立即執(zhí)行,這是設(shè)計(jì)特性;
  • @Transactional 沒(méi)有失效,只是失去了本地事務(wù)提交控制權(quán),被 Seata 代理接管;
  • @GlobalTransactional 才是分布式事務(wù)的核心,控制全局提交 / 回滾;
@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("/create")
    // 定義全局事務(wù),name 為事務(wù)標(biāo)識(shí),rollbackFor 指定異?;貪L條件
    @GlobalTransactional(name = "create-order-at", rollbackFor = Exception.class)
    public String createOrder(@RequestBody OrderDTO orderDTO) {
        orderService.createOrder(orderDTO);
        return "訂單創(chuàng)建成功";
    }
}

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private InventoryFeignClient inventoryFeignClient;
    @Autowired
    private AccountFeignClient accountFeignClient;

    public void createOrder(OrderDTO orderDTO) {
        // 1. 創(chuàng)建訂單(本地分支事務(wù))
        Order order = new Order();
        order.setOrderNo(UUID.randomUUID().toString());
        order.setUserId(orderDTO.getUserId());
        order.setProductId(orderDTO.getProductId());
        order.setCount(orderDTO.getCount());
        order.setAmount(orderDTO.getAmount());
        order.setStatus(1); // 訂單狀態(tài):1-待支付
        orderMapper.insert(order);

        // 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存(遠(yuǎn)程分支事務(wù))
        InventoryDTO inventoryDTO = new InventoryDTO();
        inventoryDTO.setProductId(orderDTO.getProductId());
        inventoryDTO.setCount(orderDTO.getCount());
        inventoryFeignClient.deduct(inventoryDTO);

        // 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額(遠(yuǎn)程分支事務(wù))
        AccountDTO accountDTO = new AccountDTO();
        accountDTO.setUserId(orderDTO.getUserId());
        accountDTO.setAmount(orderDTO.getAmount());
        accountFeignClient.deduct(accountDTO);
    }
}

5.2 庫(kù)存服務(wù)(RM 參與分支事務(wù))

庫(kù)存服務(wù)無(wú)需額外配置全局事務(wù)注解,僅需實(shí)現(xiàn)本地扣減邏輯,Seata 客戶端會(huì)自動(dòng)將其注冊(cè)為分支事務(wù):

@RestController
@RequestMapping("/inventory")
public class InventoryController {

    @Autowired
    private InventoryService inventoryService;

    @PostMapping("/deduct")
    public String deduct(@RequestBody InventoryDTO inventoryDTO) {
        inventoryService.deduct(inventoryDTO.getProductId(), inventoryDTO.getCount());
        return "庫(kù)存扣減成功";
    }
}

@Service
public class InventoryService {

    @Autowired
    private InventoryMapper inventoryMapper;

    public void deduct(Long productId, Integer count) {
        // 檢查庫(kù)存是否充足
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        if (inventory.getStock()< count) {
            throw new RuntimeException("庫(kù)存不足"); // 拋出異常,觸發(fā)全局回滾
        }
        // 扣減庫(kù)存
        inventory.setStock(inventory.getStock() - count);
        inventoryMapper.updateById(inventory);
    }
}

5.3 賬戶服務(wù)(RM 參與分支事務(wù))

與庫(kù)存服務(wù)類似,僅實(shí)現(xiàn)本地扣減邏輯,異常時(shí)拋出異常觸發(fā)回滾:

@RestController
@RequestMapping("/account")
public class AccountController {

    @Autowired
    private AccountService accountService;

    @PostMapping("/deduct")
    public String deduct(@RequestBody AccountDTO accountDTO) {
        accountService.deduct(accountDTO.getUserId(), accountDTO.getAmount());
        return "賬戶扣減成功";
    }
}

@Service
public class AccountService {

    @Autowired
    private AccountMapper accountMapper;

    public void deduct(Long userId, BigDecimal amount) {
        // 檢查余額是否充足
        Account account = accountMapper.selectByUserId(userId);
        if (account.getBalance().compareTo(amount) < 0) {
            throw new RuntimeException("余額不足"); // 拋出異常,觸發(fā)全局回滾
        }
        // 扣減余額
        account.setBalance(account.getBalance().subtract(amount));
        accountMapper.updateById(account);
    }
}

說(shuō)明:AT 模式下,Seata 會(huì)自動(dòng)生成回滾日志,當(dāng)某一分支事務(wù)失?。⊕伋霎惓#琓C 會(huì)指令所有分支事務(wù)執(zhí)行回滾,恢復(fù)數(shù)據(jù)一致性。

六、TCC 模式示例

TCC 模式需要手動(dòng)實(shí)現(xiàn) Try、Confirm、Cancel 三個(gè)方法,適用于非關(guān)系型數(shù)據(jù)庫(kù)或自定義業(yè)務(wù)場(chǎng)景,以下以"下單-扣庫(kù)存-扣余額"場(chǎng)景為例,實(shí)現(xiàn) TCC 模式分布式事務(wù)。

重要說(shuō)明:每個(gè) Try 方法都必須加 @Transactional 注解,Confirm 和 Cancel 方法也最好都加。這不是為了分布式事務(wù),而是為了確保本地?cái)?shù)據(jù)庫(kù)操作的原子性。

6.1 訂單服務(wù) TCC 實(shí)現(xiàn)

訂單服務(wù) TCC 接口

@LocalTCC
public interface OrderTccService {

    // Try 階段:創(chuàng)建訂單,commitMethod 指定確認(rèn)方法,rollbackMethod 指定回滾方法
    // name 參數(shù):業(yè)務(wù)動(dòng)作名稱,建議全局唯一,用于標(biāo)識(shí)不同的 TCC 業(yè)務(wù)操作
    // commitMethod:指定 Confirm 階段的方法名
    // rollbackMethod:指定 Cancel 階段的方法名
    @TwoPhaseBusinessAction(name = "tryCreateOrder", commitMethod = "commitCreateOrder", rollbackMethod = "cancelCreateOrder")
    // @BusinessActionContextParameter:將參數(shù)傳遞到 BusinessActionContext 中,供 Confirm 和 Cancel 階段使用
    // paramName:參數(shù)在 BusinessActionContext 中的鍵名
    boolean tryCreateOrder(@BusinessActionContextParameter(paramName = "order") Order order);

    // Confirm 階段:確認(rèn)創(chuàng)建訂單
    boolean commitCreateOrder(BusinessActionContext context);

    // Cancel 階段:回滾訂單創(chuàng)建
    boolean cancelCreateOrder(BusinessActionContext context);
}

@Service
public class OrderTccServiceImpl implements OrderTccService {

    @Autowired
    private OrderMapper orderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)(rollbackFor = Exception.class)
    public boolean tryCreateOrder(Order order) {
        // Try 階段:創(chuàng)建訂單,狀態(tài)設(shè)置為待確認(rèn)
        order.setStatus(0); // 0-待確認(rèn)
        boolean result = orderMapper.insert(order) > 0;
        if (!result) {
            throw new RuntimeException("創(chuàng)建訂單失敗"); // 拋出異常觸發(fā)回滾
        }
        return true;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean commitCreateOrder(BusinessActionContext context) {
        // Confirm 階段:更新訂單狀態(tài)為待支付
        Order order = (Order) context.getActionContext("order");
        order.setStatus(1); // 1-待支付
        return orderMapper.updateById(order) > 0;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean cancelCreateOrder(BusinessActionContext context) {
        // Cancel 階段:刪除訂單
        Order order = (Order) context.getActionContext("order");
        return orderMapper.deleteById(order.getId()) > 0;
    }
}

訂單服務(wù)控制器

@RestController
@RequestMapping("/order")
public class OrderTccController {

    @Autowired
    private OrderTccService orderTccService;
    @Autowired
    private InventoryFeignClient inventoryFeignClient;
    @Autowired
    private AccountFeignClient accountFeignClient;

    @PostMapping("/create-tcc")
    @GlobalTransactional(name = "create-order-tcc", rollbackFor = Exception.class)
    public String createOrderTcc(@RequestBody OrderDTO orderDTO) {
        // 1. 創(chuàng)建訂單(TCC Try 階段)
        Order order = new Order();
        order.setOrderNo(UUID.randomUUID().toString());
        order.setUserId(orderDTO.getUserId());
        order.setProductId(orderDTO.getProductId());
        order.setCount(orderDTO.getCount());
        order.setAmount(orderDTO.getAmount());
        order.setStatus(0); // 0-待確認(rèn)
        orderTccService.tryCreateOrder(order);

        // 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存(TCC Try 階段)
        InventoryDTO inventoryDTO = new InventoryDTO();
        inventoryDTO.setProductId(orderDTO.getProductId());
        inventoryDTO.setCount(orderDTO.getCount());
        inventoryFeignClient.deductTcc(inventoryDTO);

        // 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額(TCC Try 階段)
        AccountDTO accountDTO = new AccountDTO();
        accountDTO.setUserId(orderDTO.getUserId());
        accountDTO.setAmount(orderDTO.getAmount());
        accountFeignClient.deductTcc(accountDTO);

        return "訂單創(chuàng)建成功(TCC模式)";
    }
}

6.2 庫(kù)存服務(wù) TCC 實(shí)現(xiàn)

@LocalTCC
public interface InventoryTccService {

    // Try 階段:檢查并預(yù)留庫(kù)存
    @TwoPhaseBusinessAction(name = "tryDeductInventory", commitMethod = "commitDeductInventory", rollbackMethod = "cancelDeductInventory")
    boolean tryDeductInventory(@BusinessActionContextParameter(paramName = "productId") Long productId,
                         @BusinessActionContextParameter(paramName = "count") Integer count);

    // Confirm 階段:確認(rèn)扣減庫(kù)存
    boolean commitDeductInventory(BusinessActionContext context);

    // Cancel 階段:回滾庫(kù)存預(yù)留
    boolean cancelDeductInventory(BusinessActionContext context);
}

@Service
public class InventoryTccServiceImpl implements InventoryTccService {

    @Autowired
    private InventoryMapper inventoryMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean tryDeductInventory(Long productId, Integer count) {
        // 1. 檢查庫(kù)存是否充足
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        if (inventory.getStock() < count) {
            throw new RuntimeException("庫(kù)存不足");
        }
        // 2. 預(yù)留庫(kù)存:減少可用庫(kù)存,增加預(yù)留庫(kù)存
        inventory.setStock(inventory.getStock() - count);
        inventory.setReservedStock(inventory.getReservedStock() + count);
        boolean result = inventoryMapper.updateById(inventory) > 0;
        if (!result) {
            throw new RuntimeException("扣減庫(kù)存失敗"); // 拋出異常觸發(fā)回滾
        }
        return true;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeductInventory(BusinessActionContext context) {
        // Confirm 階段:減少預(yù)留庫(kù)存(實(shí)際扣減)
        Long productId = Long.valueOf(context.getActionContext("productId").toString());
        Integer count = Integer.valueOf(context.getActionContext("count").toString());

        Inventory inventory = inventoryMapper.selectByProductId(productId);
        inventory.setReservedStock(inventory.getReservedStock() - count);
        return inventoryMapper.updateById(inventory) > 0;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean cancelDeductInventory(BusinessActionContext context) {
        // Cancel 階段:恢復(fù)可用庫(kù)存,減少預(yù)留庫(kù)存
        Long productId = Long.valueOf(context.getActionContext("productId").toString());
        Integer count = Integer.valueOf(context.getActionContext("count").toString());

        Inventory inventory = inventoryMapper.selectByProductId(productId);
        inventory.setStock(inventory.getStock() + count);
        inventory.setReservedStock(inventory.getReservedStock() - count);
        return inventoryMapper.updateById(inventory) > 0;
    }
}

@RestController
@RequestMapping("/inventory")
public class InventoryTccController {

    @Autowired
    private InventoryTccService inventoryTccService;

    @PostMapping("/deduct-tcc")
    public String deductTcc(@RequestBody InventoryDTO inventoryDTO) {
        inventoryTccService.tryDeductInventory(inventoryDTO.getProductId(), inventoryDTO.getCount());
        return "庫(kù)存扣減成功(TCC模式)";
    }
}

6.3 賬戶服務(wù) TCC 實(shí)現(xiàn)

@LocalTCC
public interface AccountTccService {

    // Try 階段:資源檢查和預(yù)留,commitMethod 指定確認(rèn)方法,rollbackMethod 指定回滾方法
    @TwoPhaseBusinessAction(name = "tryDeductBalance", commitMethod = "commitDeductBalance", rollbackMethod = "cancelDeductBalance")
    boolean tryDeductBalance(@BusinessActionContextParameter(paramName = "userId") Long userId,
                       @BusinessActionContextParameter(paramName = "amount") BigDecimal amount);

    // Confirm 階段:確認(rèn)執(zhí)行業(yè)務(wù),無(wú)需額外操作(Try 階段已完成核心邏輯)
    boolean commitDeductBalance(BusinessActionContext context);

    // Cancel 階段:回滾操作,解凍預(yù)留的資源
    boolean cancelDeductBalance(BusinessActionContext context);
}

@Service
public class AccountTccServiceImpl implements AccountTccService {

    @Autowired
    private AccountMapper accountMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean tryDeductBalance(Long userId, BigDecimal amount) {
        // 1. 檢查余額是否充足
        Account account = accountMapper.selectByUserId(userId);
        if (account.getBalance().compareTo(amount) < 0) {
            throw new RuntimeException("余額不足");
        }
        // 2. 凍結(jié)金額(預(yù)留資源):余額減少,凍結(jié)金額增加
        account.setFreezeAmount(account.getFreezeAmount().add(amount));
        account.setBalance(account.getBalance().subtract(amount));
        boolean result = accountMapper.updateById(account) > 0;
        if (!result) {
            throw new RuntimeException("凍結(jié)賬戶失敗"); // 拋出異常觸發(fā)回滾
        }
        return true; // 返回 true 表示 Try 成功
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeductBalance(BusinessActionContext context) {
        // 提交階段:無(wú)需操作,凍結(jié)金額已經(jīng)在 prepare 階段扣除,確認(rèn)后無(wú)需額外處理
        return true;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean cancelDeductBalance(BusinessActionContext context) {
        // 回滾階段:解凍金額,恢復(fù)余額
        // 從上下文獲取 Try 階段傳入的參數(shù)
        Long userId = Long.valueOf(context.getActionContext("userId").toString());
        BigDecimal amount = new BigDecimal(context.getActionContext("amount").toString());

        Account account = accountMapper.selectByUserId(userId);
        // 解凍:凍結(jié)金額減少,余額增加
        account.setFreezeAmount(account.getFreezeAmount().subtract(amount));
        account.setBalance(account.getBalance().add(amount));
        return accountMapper.updateById(account) > 0; // 返回 true 表示回滾成功
    }
}

@RestController
@RequestMapping("/account")
public class AccountTccController {

    @Autowired
    private AccountTccService accountTccService;

    @PostMapping("/deduct-tcc")
    public String deductTcc(@RequestBody AccountDTO accountDTO) {
        accountTccService.tryDeductBalance(accountDTO.getUserId(), accountDTO.getAmount());
        return "賬戶扣減成功(TCC模式)";
    }
}

說(shuō)明:TCC 模式下,TM 同樣通過(guò)@GlobalTransactional 發(fā)起全局事務(wù),TC 協(xié)調(diào)各分支的 Try、Confirm、Cancel 階段執(zhí)行,確保事務(wù)一致性。

TCC 模式的重試機(jī)制:

  • Try 階段:通常不進(jìn)行重試,因?yàn)?Try 操作可能包含業(yè)務(wù)檢查和資源預(yù)留,如果重試可能導(dǎo)致重復(fù)預(yù)留資源(如重復(fù)凍結(jié)金額)。

  • Confirm 階段:需要進(jìn)行重試,直到成功為止。因?yàn)?Confirm 操作是冪等的,多次執(zhí)行不會(huì)產(chǎn)生副作用,確保事務(wù)最終提交。

  • Cancel 階段:需要進(jìn)行重試,直到成功為止。因?yàn)?Cancel 操作也是冪等的,多次執(zhí)行不會(huì)產(chǎn)生副作用,確保事務(wù)最終回滾。

實(shí)現(xiàn)重試機(jī)制的建議:

  1. 冪等設(shè)計(jì):確保 Confirm 和 Cancel 方法是冪等的,即多次執(zhí)行產(chǎn)生相同的結(jié)果。

  2. 重試策略:不建議設(shè)置具體重試次數(shù),建議使用超時(shí)時(shí)間控制重試,而不是次數(shù)。Confirm/Cancel 應(yīng)保持無(wú)限重試,只添加一個(gè)最長(zhǎng)超時(shí)兜底即可:

# seata-server/conf/application.yml
# 全局事務(wù)超時(shí)時(shí)間配置
global:
  transactionTimeoutMills: 60000  # 全局事務(wù)超時(shí)時(shí)間(毫秒),默認(rèn)60秒
  # 不設(shè)置具體重試次數(shù),使用超時(shí)時(shí)間控制

# 服務(wù)器配置
server:
  # 提交(Confirm)最長(zhǎng)重試超時(shí)時(shí)間,毫秒
  # 1小時(shí) = 3600000ms
  maxCommitRetryTimeout: 3600000
  # 回滾(Cancel)最長(zhǎng)重試超時(shí)時(shí)間
  maxRollbackRetryTimeout: 3600000
  # 恢復(fù)機(jī)制配置
  recovery:
    # 回滾中狀態(tài)重試間隔,默認(rèn)1s
    rollbackingRetryPeriod: 1000
    # 提交中狀態(tài)重試間隔,默認(rèn)1s
    committingRetryPeriod: 1000
  1. 業(yè)務(wù)日志:在 Confirm 和 Cancel 方法中添加詳細(xì)日志,便于排查重試失敗的原因。

  2. 超時(shí)處理:設(shè)置合理的全局事務(wù)超時(shí)時(shí)間,作為重試的最終兜底,避免重試無(wú)限期進(jìn)行。

  3. 人工補(bǔ)償:當(dāng)系統(tǒng)自動(dòng)重試失敗后,需要建立人工補(bǔ)償機(jī)制:

    • 事務(wù)狀態(tài)監(jiān)控:定期掃描超時(shí)未完成的事務(wù),識(shí)別需要人工干預(yù)的事務(wù)
    • 補(bǔ)償流程:建立標(biāo)準(zhǔn)化的人工補(bǔ)償流程,包括事務(wù)狀態(tài)分析、手動(dòng)執(zhí)行 Confirm/Cancel 操作
    • 操作記錄:記錄所有人工補(bǔ)償操作,便于審計(jì)和追溯
    • 告警機(jī)制:當(dāng)出現(xiàn)大量需要人工補(bǔ)償?shù)氖聞?wù)時(shí),及時(shí)觸發(fā)告警,通知運(yùn)維人員

TCC 模式的常見(jiàn)問(wèn)題

1. 冪等問(wèn)題

問(wèn)題:由于網(wǎng)絡(luò)重試或其他原因,Confirm 或 Cancel 操作可能被多次調(diào)用,如果不處理冪等性,可能導(dǎo)致業(yè)務(wù)邏輯錯(cuò)誤(如重復(fù)扣錢、重復(fù)解凍)。

2. 空回滾問(wèn)題

問(wèn)題:當(dāng) Try 操作由于網(wǎng)絡(luò)超時(shí)等原因未執(zhí)行,但 TC 仍然會(huì)調(diào)用 Cancel 操作,導(dǎo)致 Cancel 操作處理一個(gè)不存在的分支事務(wù)(不該回滾時(shí)別亂回滾)。

3. 懸掛問(wèn)題

問(wèn)題:當(dāng) Try 操作由于網(wǎng)絡(luò)延遲等原因后于 Cancel 操作執(zhí)行,導(dǎo)致 Cancel 操作執(zhí)行后,Try 操作才開(kāi)始執(zhí)行,造成數(shù)據(jù)不一致(別回滾完了又去凍結(jié))。

TCC 模式的統(tǒng)一解決方案

為了同時(shí)解決冪等、空回滾和懸掛問(wèn)題,建議使用 TCC 事務(wù)日志表來(lái)統(tǒng)一管理事務(wù)狀態(tài),所有操作都圍繞這張表進(jìn)行狀態(tài)判斷。

TCC 事務(wù)日志表結(jié)構(gòu)

CREATE TABLE `tcc_transaction_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT, -- 主鍵ID
  `xid` varchar(100) NOT NULL, -- 全局事務(wù)ID
  `branch_id` bigint(20) NOT NULL, -- 分支事務(wù)ID
  `business_key` varchar(100) NOT NULL, -- 業(yè)務(wù)唯一標(biāo)識(shí)(如訂單號(hào))
  `status` int(11) NOT NULL, -- 事務(wù)狀態(tài):1-Try成功,2-Confirm成功,3-Cancel成功
  `create_time` datetime NOT NULL, -- 創(chuàng)建時(shí)間
  `update_time` datetime NOT NULL, -- 更新時(shí)間
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_xid_branch` (`xid`, `branch_id`), -- 確保每個(gè)分支事務(wù)只有一條記錄
  KEY `idx_business_key` (`business_key`) -- 業(yè)務(wù)鍵索引
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='TCC transaction log table';

基于上述 OrderTccServiceImpl 提供修改示例

public enum TccStatusEnum {
    /**
     * Try 已執(zhí)行
     */
    TRY_SUCCESS(1),
    /**
     * Confirm 已執(zhí)行
     */
    CONFIRM_SUCCESS(2),
    /**
     * Cancel 已執(zhí)行
     */
    CANCEL_SUCCESS(3);

    private final int code;

    TccStatusEnum(int code) {
        this.code = code;
    }

    public int getCode() {
        return code;
    }
}

@Service
public class OrderTccServiceImpl implements OrderTccService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private TccTransactionLogMapper logMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean tryCreateOrder(@BusinessActionContextParameter(paramName = "order") Order order) {
        String xid = RootContext.getXID();
        long branchId = BranchContext.getBranchId();

        // ======================== 防懸掛 ========================
        TccTransactionLog log = logMapper.selectByXidAndBranchId(xid, branchId);
        if (log != null) {
            // 已經(jīng) Cancel(空回滾)→ 絕對(duì)禁止執(zhí)行 Try
            if (log.getStatus() == TccStatusEnum.CANCEL_SUCCESS.getCode()) {
                System.out.println("防懸掛攔截:Cancel已執(zhí)行,Try拒絕執(zhí)行");
                return false;
            }
            // 已經(jīng)執(zhí)行過(guò) Try → 冪等,返回成功
            if (log.getStatus() == TccStatusEnum.TRY_SUCCESS.getCode()) {
                return true;
            }
        }

        // Try 業(yè)務(wù)邏輯
        order.setStatus(0);
        boolean result = orderMapper.insert(order) > 0;

        if (result) {
            log = new TccTransactionLog();
            log.setXid(xid);
            log.setBranchId(branchId);
            log.setBusinessKey(order.getOrderNo());
            log.setStatus(TccStatusEnum.TRY_SUCCESS.getCode());
            log.setCreateTime(new Date());
            log.setUpdateTime(new Date());
            logMapper.insert(log);
        } else {
            throw new RuntimeException("創(chuàng)建訂單失敗");
        }

        return true;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean commitCreateOrder(BusinessActionContext context) {
        String xid = context.getXid();
        long branchId = context.getBranchId();

        TccTransactionLog log = logMapper.selectByXidAndBranchId(xid, branchId);
        if (log == null) {
            System.out.println("Confirm:日志不存在");
            return false;
        }

        // 冪等:已 Confirm
        if (log.getStatus() == TccStatusEnum.CONFIRM_SUCCESS.getCode()) {
            return true;
        }
        // 只有 Try 成功才能 Confirm
        if (log.getStatus() != TccStatusEnum.TRY_SUCCESS.getCode()) {
            System.out.println("Confirm:狀態(tài)非法");
            return false;
        }

        // 執(zhí)行業(yè)務(wù)
        Order order = (Order) context.getActionContext("order");
        order.setStatus(1);
        boolean update = orderMapper.updateById(order) > 0;

        if (update) {
            log.setStatus(TccStatusEnum.CONFIRM_SUCCESS.getCode());
            log.setUpdateTime(new Date());
            logMapper.updateById(log);
        }

        return update;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean cancelCreateOrder(BusinessActionContext context) {
        String xid = context.getXid();
        long branchId = context.getBranchId();

        TccTransactionLog log = logMapper.selectByXidAndBranchId(xid, branchId);

        // ======================== 空回滾 ========================
        if (log == null) {
            log = new TccTransactionLog();
            log.setXid(xid);
            log.setBranchId(branchId);
            log.setStatus(TccStatusEnum.CANCEL_SUCCESS.getCode());
            log.setCreateTime(new Date());
            log.setUpdateTime(new Date());
            logMapper.insert(log);
            return true;
        }

        // 冪等:已 Cancel
        if (log.getStatus() == TccStatusEnum.CANCEL_SUCCESS.getCode()) {
            return true;
        }

        // 只有 Try 成功才能 Cancel
        if (log.getStatus() != TccStatusEnum.TRY_SUCCESS.getCode()) {
            System.out.println("Cancel:狀態(tài)非法");
            return false;
        }

        // 執(zhí)行業(yè)務(wù)
        Order order = (Order) context.getActionContext("order");
        boolean delete = orderMapper.deleteById(order.getId()) > 0;

        if (delete) {
            log.setStatus(TccStatusEnum.CANCEL_SUCCESS.getCode());
            log.setUpdateTime(new Date());
            logMapper.updateById(log);
        }

        return delete;
    }
}

執(zhí)行流程

  • 所有 Try 成功 → 全局提交 → 執(zhí)行所有 Confirm
  • 任意 Try 失敗 / 超時(shí) → 全局回滾 → 執(zhí)行所有 Cancel
  • Confirm 和 Cancel 絕對(duì)互斥,永遠(yuǎn)不會(huì)同時(shí)執(zhí)行
  • Confirm / Cancel 失敗,Seata 會(huì)自動(dòng)重試,直到成功

七、SAGA 模式示例

SAGA 模式適用于長(zhǎng)事務(wù)和復(fù)雜業(yè)務(wù)流程,通過(guò)狀態(tài)機(jī)定義服務(wù)調(diào)用流程,支持正向服務(wù)和補(bǔ)償服務(wù)。以下以“下單-扣庫(kù)存-扣余額”場(chǎng)景為例,實(shí)現(xiàn) SAGA 模式分布式事務(wù)。

7.1 SAGA 模式實(shí)現(xiàn)

1. 定義狀態(tài)機(jī)配置

@Configuration
public class SagaStateMachineConfig {

    @Bean
    public StateMachineFactory<String, String, Object> stateMachineFactory() {
        // 定義狀態(tài)機(jī)構(gòu)建器
        StateMachineBuilder.Builder<String, String, Object> builder = StateMachineBuilder.builder();

        // 定義狀態(tài)和轉(zhuǎn)換
        builder.configureStates()
            .withStates()
            .initial("START")
            .states(EnumSet.of("CREATE_ORDER", "DEDUCT_INVENTORY", "DEDUCT_ACCOUNT", "END"))
            .end("END")
            .end("ROLLBACK");

        // 定義轉(zhuǎn)換
        builder.configureTransitions()
            // 正向流程
            .withExternal()
            .source("START").target("CREATE_ORDER").event("CREATE_ORDER")
            .and()
            .withExternal()
            .source("CREATE_ORDER").target("DEDUCT_INVENTORY").event("DEDUCT_INVENTORY")
            .and()
            .withExternal()
            .source("DEDUCT_INVENTORY").target("DEDUCT_ACCOUNT").event("DEDUCT_ACCOUNT")
            .and()
            .withExternal()
            .source("DEDUCT_ACCOUNT").target("END").event("SUCCESS")
            // 回滾流程
            .and()
            .withExternal()
            .source("CREATE_ORDER").target("ROLLBACK").event("ROLLBACK_CREATE_ORDER")
            .and()
            .withExternal()
            .source("DEDUCT_INVENTORY").target("ROLLBACK").event("ROLLBACK_DEDUCT_INVENTORY")
            .and()
            .withExternal()
            .source("DEDUCT_ACCOUNT").target("ROLLBACK").event("ROLLBACK_DEDUCT_ACCOUNT");

        return builder.build();
    }
}

2. 業(yè)務(wù)服務(wù)實(shí)現(xiàn)

@Service
public class SagaOrderService {

    @Autowired
    private StateMachineFactory<String, String, Object> stateMachineFactory;
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private InventoryFeignClient inventoryFeignClient;
    @Autowired
    private AccountFeignClient accountFeignClient;

    @GlobalTransactional
    public void createOrderWithSaga(OrderDTO orderDTO) {
        // 創(chuàng)建狀態(tài)機(jī)實(shí)例
        StateMachine<String, String, Object> stateMachine = stateMachineFactory.getStateMachine(UUID.randomUUID().toString());

        try {
            // 啟動(dòng)狀態(tài)機(jī)
            stateMachine.start();

            // 1. 執(zhí)行創(chuàng)建訂單
            Order order = new Order();
            order.setOrderNo(UUID.randomUUID().toString());
            order.setUserId(orderDTO.getUserId());
            order.setProductId(orderDTO.getProductId());
            order.setCount(orderDTO.getCount());
            order.setAmount(orderDTO.getAmount());
            order.setStatus(1); // 訂單狀態(tài):1-待支付
            orderMapper.insert(order);
            stateMachine.sendEvent(MessageBuilder.withPayload("CREATE_ORDER").build());

            // 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存
            InventoryDTO inventoryDTO = new InventoryDTO();
            inventoryDTO.setProductId(orderDTO.getProductId());
            inventoryDTO.setCount(orderDTO.getCount());
            inventoryFeignClient.deductSaga(inventoryDTO);
            stateMachine.sendEvent(MessageBuilder.withPayload("DEDUCT_INVENTORY").build());

            // 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額
            AccountDTO accountDTO = new AccountDTO();
            accountDTO.setUserId(orderDTO.getUserId());
            accountDTO.setAmount(orderDTO.getAmount());
            accountFeignClient.deductSaga(accountDTO);
            stateMachine.sendEvent(MessageBuilder.withPayload("DEDUCT_ACCOUNT").build());

            // 4. 完成事務(wù)
            stateMachine.sendEvent(MessageBuilder.withPayload("SUCCESS").build());
        } catch (Exception e) {
            // 根據(jù)當(dāng)前狀態(tài)執(zhí)行相應(yīng)的回滾
            String currentState = stateMachine.getState().getId();
            switch (currentState) {
                case "CREATE_ORDER":
                    // 回滾訂單創(chuàng)建
                    stateMachine.sendEvent(MessageBuilder.withPayload("ROLLBACK_CREATE_ORDER").build());
                    break;
                case "DEDUCT_INVENTORY":
                    // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),回滾庫(kù)存扣減
                    InventoryDTO inventoryDTO = new InventoryDTO();
                    inventoryDTO.setProductId(orderDTO.getProductId());
                    inventoryDTO.setCount(orderDTO.getCount());
                    inventoryFeignClient.rollbackSaga(inventoryDTO);
                    stateMachine.sendEvent(MessageBuilder.withPayload("ROLLBACK_DEDUCT_INVENTORY").build());
                    break;
                case "DEDUCT_ACCOUNT":
                    // 遠(yuǎn)程調(diào)用賬戶服務(wù),回滾賬戶扣減
                    AccountDTO accountDTO = new AccountDTO();
                    accountDTO.setUserId(orderDTO.getUserId());
                    accountDTO.setAmount(orderDTO.getAmount());
                    accountFeignClient.rollbackSaga(accountDTO);
                    stateMachine.sendEvent(MessageBuilder.withPayload("ROLLBACK_DEDUCT_ACCOUNT").build());
                    break;
            }
            throw e;
        } finally {
            stateMachine.stop();
        }
    }
}

@RestController
@RequestMapping("/order")
public class SagaOrderController {

    @Autowired
    private SagaOrderService sagaOrderService;

    @PostMapping("/create-saga")
    public String createOrderSaga(@RequestBody OrderDTO orderDTO) {
        sagaOrderService.createOrderWithSaga(orderDTO);
        return "訂單創(chuàng)建成功(SAGA模式)";
    }
}

3. 庫(kù)存服務(wù) SAGA 實(shí)現(xiàn)

@RestController
@RequestMapping("/inventory")
public class InventorySagaController {

    @Autowired
    private InventoryService inventoryService;

    @PostMapping("/deduct-saga")
    public String deductSaga(@RequestBody InventoryDTO inventoryDTO) {
        inventoryService.deduct(inventoryDTO.getProductId(), inventoryDTO.getCount());
        return "庫(kù)存扣減成功(SAGA模式)";
    }

    @PostMapping("/rollback-saga")
    public String rollbackSaga(@RequestBody InventoryDTO inventoryDTO) {
        // 回滾庫(kù)存扣減,恢復(fù)庫(kù)存
        Inventory inventory = inventoryService.getByProductId(inventoryDTO.getProductId());
        inventory.setStock(inventory.getStock() + inventoryDTO.getCount());
        inventoryService.update(inventory);
        return "庫(kù)存回滾成功(SAGA模式)";
    }
}

4. 賬戶服務(wù) SAGA 實(shí)現(xiàn)

@RestController
@RequestMapping("/account")
public class AccountSagaController {

    @Autowired
    private AccountService accountService;

    @PostMapping("/deduct-saga")
    public String deductSaga(@RequestBody AccountDTO accountDTO) {
        accountService.deduct(accountDTO.getUserId(), accountDTO.getAmount());
        return "賬戶扣減成功(SAGA模式)";
    }

    @PostMapping("/rollback-saga")
    public String rollbackSaga(@RequestBody AccountDTO accountDTO) {
        // 回滾賬戶扣減,恢復(fù)余額
        Account account = accountService.getByUserId(accountDTO.getUserId());
        account.setBalance(account.getBalance().add(accountDTO.getAmount()));
        accountService.update(account);
        return "賬戶回滾成功(SAGA模式)";
    }
}

八、XA 模式示例

XA 模式適用于需要強(qiáng)一致性的場(chǎng)景,依賴數(shù)據(jù)庫(kù)原生 XA 協(xié)議,提供最強(qiáng)的數(shù)據(jù)一致性保證。以下以“下單-扣庫(kù)存-扣余額”場(chǎng)景為例,實(shí)現(xiàn) XA 模式分布式事務(wù)。

8.1 XA 模式實(shí)現(xiàn)

1. 配置文件開(kāi)啟 XA 模式

# application.yaml
spring:
  cloud:
    seata:
      tx-service-group: my_test_tx_group
      registry:
        type: nacos
        nacos:
          server-addr: 127.0.0.1:8848
      data-source-proxy:
        mode: XA # 開(kāi)啟 XA 模式

2. 訂單服務(wù) XA 實(shí)現(xiàn)

@RestController
@RequestMapping("/order")
public class XAOrderController {

    @Autowired
    private XAOrderService xaOrderService;

    @PostMapping("/create-xa")
    // 定義全局事務(wù),使用 XA 模式
    @GlobalTransactional(name = "create-order-xa", rollbackFor = Exception.class)
    public String createOrderXA(@RequestBody OrderDTO orderDTO) {
        xaOrderService.createOrder(orderDTO);
        return "訂單創(chuàng)建成功(XA模式)";
    }
}

@Service
public class XAOrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private InventoryFeignClient inventoryFeignClient;
    @Autowired
    private AccountFeignClient accountFeignClient;

    public void createOrder(OrderDTO orderDTO) {
        // 1. 創(chuàng)建訂單(本地分支事務(wù) - XA)
        Order order = new Order();
        order.setOrderNo(UUID.randomUUID().toString());
        order.setUserId(orderDTO.getUserId());
        order.setProductId(orderDTO.getProductId());
        order.setCount(orderDTO.getCount());
        order.setAmount(orderDTO.getAmount());
        order.setStatus(1); // 訂單狀態(tài):1-待支付
        orderMapper.insert(order);

        // 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存(遠(yuǎn)程分支事務(wù) - XA)
        InventoryDTO inventoryDTO = new InventoryDTO();
        inventoryDTO.setProductId(orderDTO.getProductId());
        inventoryDTO.setCount(orderDTO.getCount());
        inventoryFeignClient.deductXa(inventoryDTO);

        // 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額(遠(yuǎn)程分支事務(wù) - XA)
        AccountDTO accountDTO = new AccountDTO();
        accountDTO.setUserId(orderDTO.getUserId());
        accountDTO.setAmount(orderDTO.getAmount());
        accountFeignClient.deductXa(accountDTO);
    }
}

3. 庫(kù)存服務(wù) XA 實(shí)現(xiàn)

@RestController
@RequestMapping("/inventory")
public class XAInventoryController {

    @Autowired
    private XAInventoryService inventoryService;

    @PostMapping("/deduct-xa")
    public String deductXA(@RequestBody InventoryDTO inventoryDTO) {
        inventoryService.deduct(inventoryDTO.getProductId(), inventoryDTO.getCount());
        return "庫(kù)存扣減成功(XA模式)";
    }
}

@Service
public class XAInventoryService {

    @Autowired
    private InventoryMapper inventoryMapper;

    public void deduct(Long productId, Integer count) {
        // 檢查庫(kù)存是否充足
        Inventory inventory = inventoryMapper.selectByProductId(productId);
        if (inventory.getStock() < count) {
            throw new RuntimeException("庫(kù)存不足"); // 拋出異常,觸發(fā)全局回滾
        }
        // 扣減庫(kù)存
        inventory.setStock(inventory.getStock() - count);
        inventoryMapper.updateById(inventory);
    }
}

4. 賬戶服務(wù) XA 實(shí)現(xiàn)

@RestController
@RequestMapping("/account")
public class XAAccountController {

    @Autowired
    private XAAccountService accountService;

    @PostMapping("/deduct-xa")
    public String deductXA(@RequestBody AccountDTO accountDTO) {
        accountService.deduct(accountDTO.getUserId(), accountDTO.getAmount());
        return "賬戶扣減成功(XA模式)";
    }
}

@Service
public class XAAccountService {

    @Autowired
    private AccountMapper accountMapper;

    public void deduct(Long userId, BigDecimal amount) {
        // 檢查余額是否充足
        Account account = accountMapper.selectByUserId(userId);
        if (account.getBalance().compareTo(amount) < 0) {
            throw new RuntimeException("余額不足"); // 拋出異常,觸發(fā)全局回滾
        }
        // 扣減余額
        account.setBalance(account.getBalance().subtract(amount));
        accountMapper.updateById(account);
    }
}

九、生產(chǎn)環(huán)境部署

開(kāi)發(fā)環(huán)境的單機(jī)部署無(wú)法滿足生產(chǎn)環(huán)境的高可用需求,生產(chǎn)環(huán)境需部署 Seata 集群、注冊(cè)中心集群和數(shù)據(jù)庫(kù)主從,確保服務(wù)穩(wěn)定運(yùn)行。

9.1 集群部署

生產(chǎn)環(huán)境推薦使用 DB 模式存儲(chǔ)事務(wù)數(shù)據(jù)(替代單機(jī)的 file 模式),確保集群數(shù)據(jù)一致性,修改 file.conf 配置:

store {
  mode = "db" # 存儲(chǔ)模式:db(數(shù)據(jù)庫(kù)),生產(chǎn)環(huán)境推薦
  db {
    datasource = "druid" # 數(shù)據(jù)源類型
    dbType = "mysql" # 數(shù)據(jù)庫(kù)類型
    driverClassName = "com.mysql.jdbc.Driver" # 數(shù)據(jù)庫(kù)驅(qū)動(dòng)
    url = "jdbc:mysql://127.0.0.1:3306/seata?useSSL=false" # 數(shù)據(jù)庫(kù)地址(需提前創(chuàng)建 seata 數(shù)據(jù)庫(kù))
    user = "root" # 數(shù)據(jù)庫(kù)用戶名
    password = "123456" # 數(shù)據(jù)庫(kù)密碼
  }
}

部署多個(gè) Seata Server 節(jié)點(diǎn),指定不同端口,組成集群:

# 節(jié)點(diǎn) 1:端口 8091
sh bin/seata-server.sh -h 127.0.0.1 -p 8091

# 節(jié)點(diǎn) 2:端口 8092
sh bin/seata-server.sh -h 127.0.0.1 -p 8092

# 節(jié)點(diǎn) 3:端口 8093
sh bin/seata-server.sh -h 127.0.0.1 -p 8093

所有節(jié)點(diǎn)配置相同的 Nacos 注冊(cè)中心和 DB 存儲(chǔ),Nacos 會(huì)自動(dòng)將多個(gè) Seata Server 節(jié)點(diǎn)識(shí)別為一個(gè)集群,實(shí)現(xiàn)負(fù)載均衡和故障轉(zhuǎn)移。

9.2 高可用性配置

  • Nacos 集群:部署多個(gè) Nacos 節(jié)點(diǎn),避免注冊(cè)中心單點(diǎn)故障,確保 Seata Server 和業(yè)務(wù)客戶端能夠正常注冊(cè)和發(fā)現(xiàn)。

  • Seata 集群:至少部署 2 個(gè) Seata Server 節(jié)點(diǎn),實(shí)現(xiàn)故障轉(zhuǎn)移,某一節(jié)點(diǎn)宕機(jī)后,其他節(jié)點(diǎn)可繼續(xù)提供服務(wù)。

  • 數(shù)據(jù)庫(kù)主從:配置 MySQL 主從復(fù)制,Seata 數(shù)據(jù)庫(kù)的主庫(kù)宕機(jī)后,從庫(kù)可切換為新主庫(kù),確保事務(wù)數(shù)據(jù)不丟失。

十、最佳實(shí)踐

在實(shí)際項(xiàng)目中,合理運(yùn)用 Seata 的特性,結(jié)合業(yè)務(wù)場(chǎng)景優(yōu)化設(shè)計(jì),可在保證數(shù)據(jù)一致性的同時(shí),提升系統(tǒng)性能和穩(wěn)定性。

10.1 事務(wù)邊界設(shè)計(jì)

  • 最小化事務(wù)范圍:只包含必要的業(yè)務(wù)操作,避免將無(wú)關(guān)操作納入分布式事務(wù),減少事務(wù)執(zhí)行時(shí)間。

  • 避免長(zhǎng)事務(wù):將大事務(wù)拆分為多個(gè)小事務(wù),避免事務(wù)長(zhǎng)時(shí)間占用資源,降低超時(shí)和鎖沖突風(fēng)險(xiǎn)。

  • 合理設(shè)置超時(shí)時(shí)間:根據(jù)業(yè)務(wù)執(zhí)行時(shí)間,設(shè)置合適的 globalTransactionTimeout 參數(shù),避免事務(wù)卡住。

10.2 性能優(yōu)化

  • 優(yōu)先使用 AT 模式:AT 模式對(duì)業(yè)務(wù)無(wú)侵入,性能優(yōu)于 TCC、XA 模式,適合大多數(shù)常規(guī)業(yè)務(wù)場(chǎng)景。

  • 合理設(shè)置資源鎖定時(shí)間:避免長(zhǎng)時(shí)間鎖定數(shù)據(jù)庫(kù)資源,減少鎖沖突,提升并發(fā)能力。

  • 使用異步處理:非核心操作(如日志記錄、消息通知)使用消息隊(duì)列異步處理,不納入分布式事務(wù)范圍。

10.3 監(jiān)控和告警

  • 集成 Prometheus:監(jiān)控 Seata 服務(wù)的運(yùn)行狀態(tài)(如事務(wù)成功率、超時(shí)率、節(jié)點(diǎn)負(fù)載),實(shí)時(shí)掌握系統(tǒng)情況。

  • 配置告警:設(shè)置事務(wù)失敗率、超時(shí)、節(jié)點(diǎn)宕機(jī)等告警規(guī)則,及時(shí)發(fā)現(xiàn)并處理異常。

  • 日志管理:集中管理事務(wù)日志(如全局事務(wù) XID、分支事務(wù)狀態(tài)、回滾日志),便于問(wèn)題排查。

十一、常見(jiàn)問(wèn)題與解決方案

在 Seata 使用過(guò)程中,常見(jiàn)問(wèn)題主要集中在事務(wù)超時(shí)、網(wǎng)絡(luò)異常和數(shù)據(jù)一致性,以下為具體解決方案。

11.1 事務(wù)超時(shí)

問(wèn)題:事務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng),超過(guò)默認(rèn)超時(shí)時(shí)間,導(dǎo)致全局事務(wù)回滾。

解決方案

  • 優(yōu)化業(yè)務(wù)邏輯,減少事務(wù)執(zhí)行時(shí)間(如拆分大事務(wù)、優(yōu)化數(shù)據(jù)庫(kù)查詢);

  • 合理設(shè)置 globalTransactionTimeout 參數(shù),延長(zhǎng)超時(shí)時(shí)間(需結(jié)合業(yè)務(wù)實(shí)際);

  • 將大事務(wù)拆分為多個(gè)小事務(wù),降低單個(gè)事務(wù)的執(zhí)行時(shí)間。

11.2 網(wǎng)絡(luò)異常

問(wèn)題:服務(wù)間網(wǎng)絡(luò)波動(dòng),導(dǎo)致分支事務(wù)狀態(tài)無(wú)法正常上報(bào),或 TC 指令無(wú)法正常下發(fā),造成事務(wù)狀態(tài)不一致。

解決方案

  • 配置服務(wù)調(diào)用重試機(jī)制(如 Feign 重試),應(yīng)對(duì)短暫網(wǎng)絡(luò)波動(dòng);

  • 使用可靠的網(wǎng)絡(luò)環(huán)境,避免網(wǎng)絡(luò)中斷、延遲過(guò)高;

  • 實(shí)現(xiàn)業(yè)務(wù)冪等性設(shè)計(jì),避免重試導(dǎo)致的數(shù)據(jù)重復(fù)操作。

11.3 數(shù)據(jù)一致性

問(wèn)題:極端場(chǎng)景下(如 TC 宕機(jī)、數(shù)據(jù)庫(kù)異常),出現(xiàn)最終數(shù)據(jù)不一致。

解決方案

  • 使用可靠的消息隊(duì)列(如 RocketMQ、Kafka),實(shí)現(xiàn)最終一致性補(bǔ)償;

  • 實(shí)現(xiàn)對(duì)賬機(jī)制,定期比對(duì)各服務(wù)的數(shù)據(jù),發(fā)現(xiàn)不一致時(shí)手動(dòng)修復(fù);

  • 開(kāi)啟 Seata 的日志恢復(fù)機(jī)制,TC 重啟后可恢復(fù)未完成的事務(wù)。

十二、總結(jié)

Seata 為分布式事務(wù)提供了一套完整的解決方案,支持 AT、TCC、SAGA、XA 四種事務(wù)模式,可靈活適配不同業(yè)務(wù)場(chǎng)景,解決微服務(wù)架構(gòu)下的數(shù)據(jù)一致性難題。

通過(guò)本文的介紹,你應(yīng)該已經(jīng)掌握了:

  1. Seata 的核心架構(gòu)(TC、TM、RM)和工作原理;

  2. 四種事務(wù)模式的使用場(chǎng)景和實(shí)現(xiàn)方式;

  3. Seata 環(huán)境搭建(Server 部署、注冊(cè)/配置中心配置)和代碼實(shí)現(xiàn);

  4. 生產(chǎn)環(huán)境集群部署和高可用配置;

  5. 常見(jiàn)問(wèn)題的排查和解決方案。

在實(shí)際項(xiàng)目中,應(yīng)根據(jù)業(yè)務(wù)場(chǎng)景選擇合適的事務(wù)模式,結(jié)合消息隊(duì)列、緩存、數(shù)據(jù)庫(kù)主從等技術(shù),構(gòu)建可靠、高性能的分布式系統(tǒng)。同時(shí),關(guān)注 Seata 開(kāi)源項(xiàng)目的更新,及時(shí)升級(jí)版本,享受更完善的功能和更穩(wěn)定的性能。

Seata 作為一個(gè)活躍的開(kāi)源項(xiàng)目,正在不斷完善和發(fā)展,為微服務(wù)架構(gòu)下的分布式事務(wù)管理提供更加成熟的解決方案。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容