在微服務(wù)架構(gòu)盛行的今天,分布式事務(wù)管理成為了一個(gè)必須面對(duì)的挑戰(zhàn)。傳統(tǒng)的單機(jī)事務(wù)模型在分布式環(huán)境下失效,如何保證多個(gè)服務(wù)間的數(shù)據(jù)一致性成為了開(kāi)發(fā)者的痛點(diǎn)。Apache Seata 作為一款開(kāi)源的分布式事務(wù)解決方案,為我們提供了優(yōu)雅的解決方案。本文將深入介紹 Seata 的核心原理、使用方法和最佳實(shí)踐。
一、分布式事務(wù)的挑戰(zhàn)
在微服務(wù)架構(gòu)中,一個(gè)業(yè)務(wù)操作可能涉及多個(gè)服務(wù)的調(diào)用,例如:
用戶下單 → 創(chuàng)建訂單(訂單服務(wù))
扣減庫(kù)存(庫(kù)存服務(wù))
扣減賬戶余額(賬戶服務(wù))
如果在這個(gè)過(guò)程中任何一個(gè)步驟失敗,如何保證所有操作的一致性?傳統(tǒng)的單機(jī)事務(wù)(ACID)無(wú)法解決這個(gè)問(wèn)題,因?yàn)樗鼈兛缭搅硕鄠€(gè)服務(wù)和數(shù)據(jù)庫(kù)。
二、什么是 Seata
2.1 Seata 簡(jiǎn)介
Seata 是阿里巴巴開(kāi)源的分布式事務(wù)解決方案,致力于提供高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。它支持多種事務(wù)模式,包括 AT、TCC、SAGA 和 XA,滿足不同場(chǎng)景的需求。
2.2 Seata 核心組件
Seata 定義了三個(gè)核心組件,三者協(xié)同工作實(shí)現(xiàn)分布式事務(wù)的協(xié)調(diào)與管控:
TC (Transaction Coordinator):事務(wù)協(xié)調(diào)器,維護(hù)全局事務(wù)的運(yùn)行狀態(tài),負(fù)責(zé)協(xié)調(diào)并驅(qū)動(dòng)全局事務(wù)的提交或回滾,是 Seata 分布式事務(wù)的核心中樞,需單獨(dú)部署為 Seata Server。
TM (Transaction Manager):事務(wù)管理器,定義全局事務(wù)的范圍,負(fù)責(zé)開(kāi)啟、提交或回滾全局事務(wù),嵌入在業(yè)務(wù)應(yīng)用中(如通過(guò)注解觸發(fā))。
RM (Resource Manager):資源管理器,管理分支事務(wù)的資源,與 TC 交互注冊(cè)分支事務(wù)并匯報(bào)分支事務(wù)的狀態(tài),嵌入在各微服務(wù)中,管理本地?cái)?shù)據(jù)庫(kù)等資源。
2.3 全局事務(wù) ID
Seata 使用全局唯一的 XID 來(lái)標(biāo)識(shí)一個(gè)全局事務(wù),XID 貫穿整個(gè)分布式事務(wù)的生命周期,用于關(guān)聯(lián)全局事務(wù)與各個(gè)分支事務(wù),確保事務(wù)追蹤的一致性。
三、Seata 事務(wù)模式
Seata 提供四種核心事務(wù)模式,可根據(jù)業(yè)務(wù)場(chǎng)景靈活選擇,覆蓋絕大多數(shù)分布式事務(wù)需求。
重要說(shuō)明:四種模式都需要使用 @GlobalTransactional 注解在發(fā)起方方法上開(kāi)啟全局事務(wù),Seata 才能協(xié)調(diào)各分支事務(wù)的提交或回滾。
數(shù)據(jù)可見(jiàn)性總結(jié):
- AT:本地已提交,數(shù)據(jù)庫(kù)真實(shí)新數(shù)據(jù);僅應(yīng)用內(nèi)全局事務(wù)查詢隔離;外部普通事務(wù)可見(jiàn)中間臟數(shù)據(jù)(弱隔離、讀未提交級(jí)別)
- TCC:本地提交凍結(jié)態(tài),不改真實(shí)業(yè)務(wù)數(shù)據(jù),外部可見(jiàn)凍結(jié)狀態(tài)
- SAGA:本地提交真實(shí)數(shù)據(jù),完全對(duì)外可見(jiàn),無(wú)任何隔離
- XA:本地不提交,數(shù)據(jù)庫(kù)未更新,全局未提交前所有外部都絕對(duì)看不見(jiàn)(強(qiáng)隔離)
模式差異總結(jié):Seata 的 4 種模式(AT、TCC、SAGA、XA)整體流程完全一樣,都是 TM → TC → RM 這套協(xié)作邏輯,區(qū)別只在 RM 怎么干活:
- AT 模式
提交:刪除對(duì)應(yīng)的 undo log
回滾:根據(jù) undo log 進(jìn)行鏡像回滾 - TCC 模式
提交:執(zhí)行 Confirm
回滾:執(zhí)行 Cancel
整個(gè)流程圍繞 Try - Confirm - Cancel 三個(gè)階段完成。 - SAGA 模式
提交:按業(yè)務(wù)編排順序執(zhí)行正向服務(wù)
回滾:通過(guò)補(bǔ)償服務(wù)反向執(zhí)行,實(shí)現(xiàn)最終一致性 - XA 模式直接基于數(shù)據(jù)庫(kù)原生 XA 協(xié)議,由 RM 驅(qū)動(dòng)數(shù)據(jù)庫(kù)完成 XA 分支的提交與回滾。
但上層角色不變:
- TM:依然是發(fā)起全局事務(wù)、管邊界;
- TC:依然是協(xié)調(diào)、存狀態(tài)、統(tǒng)一下發(fā)提交 / 回滾;
- RM:依然是注冊(cè)分支、執(zhí)行本地操作。
3.1 AT 模式(自動(dòng)補(bǔ)償)
適用場(chǎng)景:關(guān)系型數(shù)據(jù)庫(kù)(如 MySQL、Oracle),適用于大多數(shù)常規(guī)業(yè)務(wù)場(chǎng)景,對(duì)業(yè)務(wù)代碼侵入性極低。
核心原理:
業(yè)務(wù)數(shù)據(jù)和回滾日志記錄在同一個(gè)本地事務(wù)中,確保本地操作的原子性;
提交階段:直接提交業(yè)務(wù)數(shù)據(jù),回滾日志保留用于異?;貪L;
回滾階段:根據(jù)回滾日志自動(dòng)執(zhí)行補(bǔ)償操作,恢復(fù)數(shù)據(jù)至事務(wù)前狀態(tài)。
優(yōu)勢(shì):使用簡(jiǎn)單,對(duì)業(yè)務(wù)無(wú)侵入,無(wú)需修改原有業(yè)務(wù)代碼,性能優(yōu)異。
3.2 TCC 模式(Try-Confirm-Cancel)
適用場(chǎng)景:非關(guān)系型數(shù)據(jù)庫(kù)(如 MongoDB、Redis)、特殊業(yè)務(wù)場(chǎng)景(如自定義資源管控),需要靈活控制事務(wù)流程。
核心原理:
Try:資源檢查和預(yù)留,確保業(yè)務(wù)操作所需資源可用,并鎖定資源;
Confirm:確認(rèn)執(zhí)行業(yè)務(wù)操作,釋放鎖定的資源,完成最終數(shù)據(jù)修改;
Cancel:取消執(zhí)行,釋放預(yù)留的資源,恢復(fù)數(shù)據(jù)至 Try 操作前狀態(tài)。
核心要點(diǎn):TCC 的核心就是「預(yù)留 / 凍結(jié)資源」,如果去掉這一步,它在機(jī)制上確實(shí)就和 Saga 基本一樣了。
實(shí)現(xiàn)方式:
- 每個(gè)服務(wù)實(shí)現(xiàn)三段式:每個(gè)參與分布式事務(wù)的服務(wù)都需要實(shí)現(xiàn) Try、Confirm、Cancel 三個(gè)方法
- 業(yè)務(wù)代碼只調(diào)用 Try:應(yīng)用程序只需要調(diào)用 Try 方法進(jìn)行資源預(yù)留
-
Seata 自動(dòng)調(diào)度:Seata 根據(jù)全局事務(wù)的執(zhí)行狀態(tài),自動(dòng)調(diào)度 Confirm 或 Cancel 方法
- 如果所有服務(wù)的 Try 都成功,Seata 會(huì)調(diào)用所有服務(wù)的 Confirm 方法
- 如果任何一個(gè)服務(wù)的 Try 失敗,Seata 會(huì)調(diào)用所有服務(wù)的 Cancel 方法
- 重要:只有當(dāng) Try 方法拋出異常時(shí)才會(huì)觸發(fā)回滾,返回 false 不會(huì)觸發(fā)回滾
優(yōu)勢(shì):靈活性高,可定制性強(qiáng),支持非關(guān)系型數(shù)據(jù)庫(kù),適配復(fù)雜業(yè)務(wù)場(chǎng)景。
3.3 SAGA 模式
適用場(chǎng)景:長(zhǎng)事務(wù)、業(yè)務(wù)流程復(fù)雜(如跨多個(gè)服務(wù)的鏈路操作)、容錯(cuò)要求高的場(chǎng)景。
核心原理:基于狀態(tài)機(jī)定義服務(wù)調(diào)用流程,支持正向服務(wù)和補(bǔ)償服務(wù),通過(guò)狀態(tài)流轉(zhuǎn)驅(qū)動(dòng)事務(wù)推進(jìn),若某一步失敗則執(zhí)行對(duì)應(yīng)補(bǔ)償服務(wù)回滾。
優(yōu)勢(shì):適合長(zhǎng)事務(wù),容錯(cuò)能力強(qiáng),可應(yīng)對(duì)服務(wù)間長(zhǎng)時(shí)間交互的場(chǎng)景。
3.4 XA 模式
適用場(chǎng)景:需要強(qiáng)一致性的場(chǎng)景(如金融、支付領(lǐng)域),依賴數(shù)據(jù)庫(kù)原生 XA 協(xié)議。
核心原理:基于數(shù)據(jù)庫(kù)原生 XA 協(xié)議,分為準(zhǔn)備階段(所有分支事務(wù)準(zhǔn)備就緒并匯報(bào)狀態(tài))和提交階段(TC 指令所有分支事務(wù)統(tǒng)一提交或回滾)。
?? 生產(chǎn)環(huán)境警告:Seata XA 模式能用,但生產(chǎn)環(huán)境極其不推薦,基本沒(méi)人用。
- 并發(fā)性能差:事務(wù)鎖持有時(shí)間長(zhǎng),高并發(fā)下直接卡死;
- MySQL 主從切換有數(shù)據(jù)不一致風(fēng)險(xiǎn):這個(gè)坑官方都沒(méi)徹底解決。
優(yōu)勢(shì):強(qiáng)一致性,可靠性高,完全遵循 ACID 特性,適配高一致性要求場(chǎng)景。
四、快速上手:環(huán)境搭建
Seata 環(huán)境搭建核心是部署 Seata Server(TC),并配置注冊(cè)中心、配置中心,最終集成業(yè)務(wù)客戶端,以下為基于 Nacos 注冊(cè)/配置中心的最簡(jiǎn)搭建流程。
4.1 步驟 1:安裝 Seata Server
下載并解壓 Seata Server:推薦使用 1.7.1 穩(wěn)定版本,下載地址可直接通過(guò) GitHub 官方鏈接獲取,執(zhí)行以下命令完成下載和解壓:
# 下載 Seata Server 1.7.1
wget https://github.com/seata/seata/releases/download/v1.7.1/seata-server-1.7.1.tar.gz
# 解壓壓縮包
tar -zxvf seata-server-1.7.1.tar.gz
# 進(jìn)入解壓后的目錄
cd seata-server-1.7.1
4.2 步驟 2:配置注冊(cè)中心
修改 Seata Server 目錄下的 registry.conf 文件,配置 Nacos 作為注冊(cè)中心,讓業(yè)務(wù)客戶端能夠發(fā)現(xiàn) TC 服務(wù):
registry {
# 可選類型:file、nacos、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server" # Seata Server 在 Nacos 中的服務(wù)名
serverAddr = "127.0.0.1:8848" # Nacos 服務(wù)地址(本地部署默認(rèn)地址)
group = "SEATA_GROUP" # 服務(wù)分組,默認(rèn) SEATA_GROUP
namespace = "" # Nacos 命名空間,默認(rèn)為 public
cluster = "default" # 集群名稱,默認(rèn) default
username = "nacos" # Nacos 登錄用戶名(默認(rèn) nacos)
password = "nacos" # Nacos 登錄密碼(默認(rèn) nacos)
}
}
4.3 步驟 3:配置配置中心
Seata 配置需推送至 Nacos 配置中心,便于集群部署和動(dòng)態(tài)配置,步驟如下:
- 創(chuàng)建
config.txt文件,配置核心參數(shù)(最簡(jiǎn)配置):
# 事務(wù)組映射配置,my_test_tx_group 為事務(wù)組名稱,default 為集群名稱
service.vgroupMapping.my_test_tx_group=default
# 存儲(chǔ)模式配置,file 表示使用文件存儲(chǔ)(開(kāi)發(fā)環(huán)境推薦),生產(chǎn)環(huán)境推薦使用 db 模式
store.mode=file
# 文件存儲(chǔ)目錄,用于存儲(chǔ)事務(wù)日志等數(shù)據(jù)
store.file.dir=file_store
- 下載 Nacos 配置推送腳本
nacos-config.sh,并執(zhí)行腳本將配置推送至 Nacos:
# 下載 nacos-config.sh 腳本(官方腳本地址)
wget https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh
# 執(zhí)行腳本,推送配置至 Nacos(需替換為實(shí)際 Nacos 地址和賬號(hào)密碼)
sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos
說(shuō)明:該腳本會(huì)讀取 config.txt 中的配置,自動(dòng)編碼并推送至 Nacos 配置中心,推送成功后會(huì)提示“Init nacos config finished”。
4.4 步驟 4:?jiǎn)?dòng) Seata Server
進(jìn)入 Seata Server 目錄,執(zhí)行以下命令啟動(dòng)服務(wù):
# 啟動(dòng) Seata Server(默認(rèn)端口 8091)
sh bin/seata-server.sh
啟動(dòng)成功后,可在 Nacos 控制臺(tái)的“服務(wù)列表”中看到 seata-server 服務(wù),說(shuō)明 TC 部署成功。
五、實(shí)戰(zhàn)示例:AT 模式
AT 模式是最常用的事務(wù)模式,對(duì)業(yè)務(wù)無(wú)侵入,以下以“下單-扣庫(kù)存-扣余額”場(chǎng)景為例,實(shí)現(xiàn) AT 模式分布式事務(wù)。
5.1 訂單服務(wù)(TM 發(fā)起全局事務(wù))
undo_log 表:
AT 模式需要在每個(gè)微服務(wù)的數(shù)據(jù)庫(kù)中創(chuàng)建 undo_log 表,用于存儲(chǔ)回滾日志。當(dāng)事務(wù)需要回滾時(shí),Seata 會(huì)根據(jù)這個(gè)表中的記錄自動(dòng)執(zhí)行補(bǔ)償操作。默認(rèn)情況下,Seata 使用 undo_log 作為回滾日志表名。如果需要自定義表名,可以在配置文件中進(jìn)行設(shè)置。
表結(jié)構(gòu)及字段說(shuō)明:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT, -- 主鍵ID,自增
`branch_id` bigint(20) NOT NULL, -- 分支事務(wù)ID,由Seata Server分配給參與分布式事務(wù)的各個(gè)微服務(wù)的分支事務(wù)
`xid` varchar(100) NOT NULL, -- 全局事務(wù)ID,貫穿整個(gè)分布式事務(wù)生命周期
`context` varchar(128) NOT NULL, -- 上下文信息,存儲(chǔ)額外的業(yè)務(wù)相關(guān)數(shù)據(jù)
`rollback_info` longblob NOT NULL, -- 回滾信息,存儲(chǔ)事務(wù)執(zhí)行前的數(shù)據(jù)鏡像,用于回滾操作
`log_status` int(11) NOT NULL, -- 日志狀態(tài):0-正常,1-已刪除
`log_created` datetime NOT NULL, -- 日志創(chuàng)建時(shí)間
`log_modified` datetime NOT NULL, -- 日志修改時(shí)間
PRIMARY KEY (`id`), -- 主鍵索引
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) -- 唯一索引,確保每個(gè)分支事務(wù)只有一條回滾日志
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='AT transaction mode undo table';
表的作用:
存儲(chǔ)回滾數(shù)據(jù):記錄事務(wù)執(zhí)行前的數(shù)據(jù)狀態(tài),當(dāng)事務(wù)需要回滾時(shí),Seata 會(huì)根據(jù)這些數(shù)據(jù)恢復(fù)到事務(wù)前的狀態(tài)。
確保原子性:與業(yè)務(wù)操作在同一個(gè)本地事務(wù)中提交,確保回滾日志的寫入與業(yè)務(wù)操作的原子性。
支持分布式事務(wù)回滾:當(dāng)分布式事務(wù)中的任何一個(gè)分支事務(wù)失敗時(shí),TC 會(huì)指令所有分支事務(wù)執(zhí)行回滾,通過(guò) undo_log 表中的數(shù)據(jù)實(shí)現(xiàn)自動(dòng)補(bǔ)償。
事務(wù)追蹤:通過(guò) xid 和 branch_id 關(guān)聯(lián)全局事務(wù)和分支事務(wù),便于事務(wù)狀態(tài)的追蹤和管理。
數(shù)據(jù)一致性保障:確保在分布式環(huán)境下,即使部分服務(wù)失敗,也能通過(guò)回滾機(jī)制保證整個(gè)系統(tǒng)的數(shù)據(jù)一致性。
多服務(wù)共用數(shù)據(jù)庫(kù)的區(qū)分:
當(dāng)多個(gè)微服務(wù)共用同一個(gè)數(shù)據(jù)庫(kù)時(shí),undo_log 表會(huì)存儲(chǔ)所有微服務(wù)的回滾日志。Seata 通過(guò)以下機(jī)制區(qū)分不同微服務(wù)的回滾日志:
全局事務(wù) XID:每個(gè)分布式事務(wù)都有一個(gè)全局唯一的 XID,所有參與該事務(wù)的微服務(wù)分支事務(wù)都會(huì)關(guān)聯(lián)到同一個(gè) XID。
分支事務(wù) Branch ID:每個(gè)微服務(wù)的分支事務(wù)都有一個(gè)唯一的 Branch ID,由 Seata Server 分配。Branch ID 與 XID 組合形成唯一標(biāo)識(shí),確保不同微服務(wù)的回滾日志不會(huì)混淆。
上下文信息 Context:
context字段可以存儲(chǔ)額外的上下文信息,例如微服務(wù)名稱、業(yè)務(wù)操作類型等,進(jìn)一步幫助區(qū)分不同微服務(wù)的回滾日志。
添加依賴
在 Spring Boot 項(xiàng)目中引入 Seata 客戶端依賴(需與 Seata Server 版本匹配):
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
配置文件
修改 application.yaml 配置,關(guān)聯(lián) Seata 服務(wù):
spring:
cloud:
seata:
tx-service-group: my_test_tx_group # 事務(wù)組名稱,需與 config.txt 中配置一致
registry:
type: nacos # 注冊(cè)中心類型
nacos:
server-addr: 127.0.0.1:8848 # Nacos 地址
group: SEATA_GROUP # 與 Seata Server 配置一致
application: seata-server # Seata Server 服務(wù)名
data-source-proxy:
undo:
log-table: undo_log # 自定義回滾日志表名
業(yè)務(wù)代碼
通過(guò) @GlobalTransactional 注解定義全局事務(wù)邊界,發(fā)起全局事務(wù):
重要說(shuō)明:在 Seata AT 模式下,@Transactional 注解在全局事務(wù)環(huán)境下基本等于無(wú)效。即使方法上加了該注解,每執(zhí)行一次數(shù)據(jù)庫(kù)操作依然會(huì)立即提交本地事務(wù),每條 SQL 都會(huì)產(chǎn)生獨(dú)立的提交日志并立刻提交事務(wù)。
原因就是:
- Seata AT 模式下,本地事務(wù)默認(rèn)自動(dòng)提交,每條 SQL 立即執(zhí)行,這是設(shè)計(jì)特性;
- @Transactional 沒(méi)有失效,只是失去了本地事務(wù)提交控制權(quán),被 Seata 代理接管;
- @GlobalTransactional 才是分布式事務(wù)的核心,控制全局提交 / 回滾;
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/create")
// 定義全局事務(wù),name 為事務(wù)標(biāo)識(shí),rollbackFor 指定異?;貪L條件
@GlobalTransactional(name = "create-order-at", rollbackFor = Exception.class)
public String createOrder(@RequestBody OrderDTO orderDTO) {
orderService.createOrder(orderDTO);
return "訂單創(chuàng)建成功";
}
}
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryFeignClient inventoryFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
public void createOrder(OrderDTO orderDTO) {
// 1. 創(chuàng)建訂單(本地分支事務(wù))
Order order = new Order();
order.setOrderNo(UUID.randomUUID().toString());
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setCount(orderDTO.getCount());
order.setAmount(orderDTO.getAmount());
order.setStatus(1); // 訂單狀態(tài):1-待支付
orderMapper.insert(order);
// 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存(遠(yuǎn)程分支事務(wù))
InventoryDTO inventoryDTO = new InventoryDTO();
inventoryDTO.setProductId(orderDTO.getProductId());
inventoryDTO.setCount(orderDTO.getCount());
inventoryFeignClient.deduct(inventoryDTO);
// 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額(遠(yuǎn)程分支事務(wù))
AccountDTO accountDTO = new AccountDTO();
accountDTO.setUserId(orderDTO.getUserId());
accountDTO.setAmount(orderDTO.getAmount());
accountFeignClient.deduct(accountDTO);
}
}
5.2 庫(kù)存服務(wù)(RM 參與分支事務(wù))
庫(kù)存服務(wù)無(wú)需額外配置全局事務(wù)注解,僅需實(shí)現(xiàn)本地扣減邏輯,Seata 客戶端會(huì)自動(dòng)將其注冊(cè)為分支事務(wù):
@RestController
@RequestMapping("/inventory")
public class InventoryController {
@Autowired
private InventoryService inventoryService;
@PostMapping("/deduct")
public String deduct(@RequestBody InventoryDTO inventoryDTO) {
inventoryService.deduct(inventoryDTO.getProductId(), inventoryDTO.getCount());
return "庫(kù)存扣減成功";
}
}
@Service
public class InventoryService {
@Autowired
private InventoryMapper inventoryMapper;
public void deduct(Long productId, Integer count) {
// 檢查庫(kù)存是否充足
Inventory inventory = inventoryMapper.selectByProductId(productId);
if (inventory.getStock()< count) {
throw new RuntimeException("庫(kù)存不足"); // 拋出異常,觸發(fā)全局回滾
}
// 扣減庫(kù)存
inventory.setStock(inventory.getStock() - count);
inventoryMapper.updateById(inventory);
}
}
5.3 賬戶服務(wù)(RM 參與分支事務(wù))
與庫(kù)存服務(wù)類似,僅實(shí)現(xiàn)本地扣減邏輯,異常時(shí)拋出異常觸發(fā)回滾:
@RestController
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@PostMapping("/deduct")
public String deduct(@RequestBody AccountDTO accountDTO) {
accountService.deduct(accountDTO.getUserId(), accountDTO.getAmount());
return "賬戶扣減成功";
}
}
@Service
public class AccountService {
@Autowired
private AccountMapper accountMapper;
public void deduct(Long userId, BigDecimal amount) {
// 檢查余額是否充足
Account account = accountMapper.selectByUserId(userId);
if (account.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("余額不足"); // 拋出異常,觸發(fā)全局回滾
}
// 扣減余額
account.setBalance(account.getBalance().subtract(amount));
accountMapper.updateById(account);
}
}
說(shuō)明:AT 模式下,Seata 會(huì)自動(dòng)生成回滾日志,當(dāng)某一分支事務(wù)失?。⊕伋霎惓#琓C 會(huì)指令所有分支事務(wù)執(zhí)行回滾,恢復(fù)數(shù)據(jù)一致性。
六、TCC 模式示例
TCC 模式需要手動(dòng)實(shí)現(xiàn) Try、Confirm、Cancel 三個(gè)方法,適用于非關(guān)系型數(shù)據(jù)庫(kù)或自定義業(yè)務(wù)場(chǎng)景,以下以"下單-扣庫(kù)存-扣余額"場(chǎng)景為例,實(shí)現(xiàn) TCC 模式分布式事務(wù)。
重要說(shuō)明:每個(gè) Try 方法都必須加 @Transactional 注解,Confirm 和 Cancel 方法也最好都加。這不是為了分布式事務(wù),而是為了確保本地?cái)?shù)據(jù)庫(kù)操作的原子性。
6.1 訂單服務(wù) TCC 實(shí)現(xiàn)
訂單服務(wù) TCC 接口
@LocalTCC
public interface OrderTccService {
// Try 階段:創(chuàng)建訂單,commitMethod 指定確認(rèn)方法,rollbackMethod 指定回滾方法
// name 參數(shù):業(yè)務(wù)動(dòng)作名稱,建議全局唯一,用于標(biāo)識(shí)不同的 TCC 業(yè)務(wù)操作
// commitMethod:指定 Confirm 階段的方法名
// rollbackMethod:指定 Cancel 階段的方法名
@TwoPhaseBusinessAction(name = "tryCreateOrder", commitMethod = "commitCreateOrder", rollbackMethod = "cancelCreateOrder")
// @BusinessActionContextParameter:將參數(shù)傳遞到 BusinessActionContext 中,供 Confirm 和 Cancel 階段使用
// paramName:參數(shù)在 BusinessActionContext 中的鍵名
boolean tryCreateOrder(@BusinessActionContextParameter(paramName = "order") Order order);
// Confirm 階段:確認(rèn)創(chuàng)建訂單
boolean commitCreateOrder(BusinessActionContext context);
// Cancel 階段:回滾訂單創(chuàng)建
boolean cancelCreateOrder(BusinessActionContext context);
}
@Service
public class OrderTccServiceImpl implements OrderTccService {
@Autowired
private OrderMapper orderMapper;
@Override
@Transactional(rollbackFor = Exception.class)(rollbackFor = Exception.class)
public boolean tryCreateOrder(Order order) {
// Try 階段:創(chuàng)建訂單,狀態(tài)設(shè)置為待確認(rèn)
order.setStatus(0); // 0-待確認(rèn)
boolean result = orderMapper.insert(order) > 0;
if (!result) {
throw new RuntimeException("創(chuàng)建訂單失敗"); // 拋出異常觸發(fā)回滾
}
return true;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean commitCreateOrder(BusinessActionContext context) {
// Confirm 階段:更新訂單狀態(tài)為待支付
Order order = (Order) context.getActionContext("order");
order.setStatus(1); // 1-待支付
return orderMapper.updateById(order) > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean cancelCreateOrder(BusinessActionContext context) {
// Cancel 階段:刪除訂單
Order order = (Order) context.getActionContext("order");
return orderMapper.deleteById(order.getId()) > 0;
}
}
訂單服務(wù)控制器
@RestController
@RequestMapping("/order")
public class OrderTccController {
@Autowired
private OrderTccService orderTccService;
@Autowired
private InventoryFeignClient inventoryFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
@PostMapping("/create-tcc")
@GlobalTransactional(name = "create-order-tcc", rollbackFor = Exception.class)
public String createOrderTcc(@RequestBody OrderDTO orderDTO) {
// 1. 創(chuàng)建訂單(TCC Try 階段)
Order order = new Order();
order.setOrderNo(UUID.randomUUID().toString());
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setCount(orderDTO.getCount());
order.setAmount(orderDTO.getAmount());
order.setStatus(0); // 0-待確認(rèn)
orderTccService.tryCreateOrder(order);
// 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存(TCC Try 階段)
InventoryDTO inventoryDTO = new InventoryDTO();
inventoryDTO.setProductId(orderDTO.getProductId());
inventoryDTO.setCount(orderDTO.getCount());
inventoryFeignClient.deductTcc(inventoryDTO);
// 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額(TCC Try 階段)
AccountDTO accountDTO = new AccountDTO();
accountDTO.setUserId(orderDTO.getUserId());
accountDTO.setAmount(orderDTO.getAmount());
accountFeignClient.deductTcc(accountDTO);
return "訂單創(chuàng)建成功(TCC模式)";
}
}
6.2 庫(kù)存服務(wù) TCC 實(shí)現(xiàn)
@LocalTCC
public interface InventoryTccService {
// Try 階段:檢查并預(yù)留庫(kù)存
@TwoPhaseBusinessAction(name = "tryDeductInventory", commitMethod = "commitDeductInventory", rollbackMethod = "cancelDeductInventory")
boolean tryDeductInventory(@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);
// Confirm 階段:確認(rèn)扣減庫(kù)存
boolean commitDeductInventory(BusinessActionContext context);
// Cancel 階段:回滾庫(kù)存預(yù)留
boolean cancelDeductInventory(BusinessActionContext context);
}
@Service
public class InventoryTccServiceImpl implements InventoryTccService {
@Autowired
private InventoryMapper inventoryMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean tryDeductInventory(Long productId, Integer count) {
// 1. 檢查庫(kù)存是否充足
Inventory inventory = inventoryMapper.selectByProductId(productId);
if (inventory.getStock() < count) {
throw new RuntimeException("庫(kù)存不足");
}
// 2. 預(yù)留庫(kù)存:減少可用庫(kù)存,增加預(yù)留庫(kù)存
inventory.setStock(inventory.getStock() - count);
inventory.setReservedStock(inventory.getReservedStock() + count);
boolean result = inventoryMapper.updateById(inventory) > 0;
if (!result) {
throw new RuntimeException("扣減庫(kù)存失敗"); // 拋出異常觸發(fā)回滾
}
return true;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean commitDeductInventory(BusinessActionContext context) {
// Confirm 階段:減少預(yù)留庫(kù)存(實(shí)際扣減)
Long productId = Long.valueOf(context.getActionContext("productId").toString());
Integer count = Integer.valueOf(context.getActionContext("count").toString());
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setReservedStock(inventory.getReservedStock() - count);
return inventoryMapper.updateById(inventory) > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean cancelDeductInventory(BusinessActionContext context) {
// Cancel 階段:恢復(fù)可用庫(kù)存,減少預(yù)留庫(kù)存
Long productId = Long.valueOf(context.getActionContext("productId").toString());
Integer count = Integer.valueOf(context.getActionContext("count").toString());
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setStock(inventory.getStock() + count);
inventory.setReservedStock(inventory.getReservedStock() - count);
return inventoryMapper.updateById(inventory) > 0;
}
}
@RestController
@RequestMapping("/inventory")
public class InventoryTccController {
@Autowired
private InventoryTccService inventoryTccService;
@PostMapping("/deduct-tcc")
public String deductTcc(@RequestBody InventoryDTO inventoryDTO) {
inventoryTccService.tryDeductInventory(inventoryDTO.getProductId(), inventoryDTO.getCount());
return "庫(kù)存扣減成功(TCC模式)";
}
}
6.3 賬戶服務(wù) TCC 實(shí)現(xiàn)
@LocalTCC
public interface AccountTccService {
// Try 階段:資源檢查和預(yù)留,commitMethod 指定確認(rèn)方法,rollbackMethod 指定回滾方法
@TwoPhaseBusinessAction(name = "tryDeductBalance", commitMethod = "commitDeductBalance", rollbackMethod = "cancelDeductBalance")
boolean tryDeductBalance(@BusinessActionContextParameter(paramName = "userId") Long userId,
@BusinessActionContextParameter(paramName = "amount") BigDecimal amount);
// Confirm 階段:確認(rèn)執(zhí)行業(yè)務(wù),無(wú)需額外操作(Try 階段已完成核心邏輯)
boolean commitDeductBalance(BusinessActionContext context);
// Cancel 階段:回滾操作,解凍預(yù)留的資源
boolean cancelDeductBalance(BusinessActionContext context);
}
@Service
public class AccountTccServiceImpl implements AccountTccService {
@Autowired
private AccountMapper accountMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean tryDeductBalance(Long userId, BigDecimal amount) {
// 1. 檢查余額是否充足
Account account = accountMapper.selectByUserId(userId);
if (account.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("余額不足");
}
// 2. 凍結(jié)金額(預(yù)留資源):余額減少,凍結(jié)金額增加
account.setFreezeAmount(account.getFreezeAmount().add(amount));
account.setBalance(account.getBalance().subtract(amount));
boolean result = accountMapper.updateById(account) > 0;
if (!result) {
throw new RuntimeException("凍結(jié)賬戶失敗"); // 拋出異常觸發(fā)回滾
}
return true; // 返回 true 表示 Try 成功
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean commitDeductBalance(BusinessActionContext context) {
// 提交階段:無(wú)需操作,凍結(jié)金額已經(jīng)在 prepare 階段扣除,確認(rèn)后無(wú)需額外處理
return true;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean cancelDeductBalance(BusinessActionContext context) {
// 回滾階段:解凍金額,恢復(fù)余額
// 從上下文獲取 Try 階段傳入的參數(shù)
Long userId = Long.valueOf(context.getActionContext("userId").toString());
BigDecimal amount = new BigDecimal(context.getActionContext("amount").toString());
Account account = accountMapper.selectByUserId(userId);
// 解凍:凍結(jié)金額減少,余額增加
account.setFreezeAmount(account.getFreezeAmount().subtract(amount));
account.setBalance(account.getBalance().add(amount));
return accountMapper.updateById(account) > 0; // 返回 true 表示回滾成功
}
}
@RestController
@RequestMapping("/account")
public class AccountTccController {
@Autowired
private AccountTccService accountTccService;
@PostMapping("/deduct-tcc")
public String deductTcc(@RequestBody AccountDTO accountDTO) {
accountTccService.tryDeductBalance(accountDTO.getUserId(), accountDTO.getAmount());
return "賬戶扣減成功(TCC模式)";
}
}
說(shuō)明:TCC 模式下,TM 同樣通過(guò)@GlobalTransactional 發(fā)起全局事務(wù),TC 協(xié)調(diào)各分支的 Try、Confirm、Cancel 階段執(zhí)行,確保事務(wù)一致性。
TCC 模式的重試機(jī)制:
Try 階段:通常不進(jìn)行重試,因?yàn)?Try 操作可能包含業(yè)務(wù)檢查和資源預(yù)留,如果重試可能導(dǎo)致重復(fù)預(yù)留資源(如重復(fù)凍結(jié)金額)。
Confirm 階段:需要進(jìn)行重試,直到成功為止。因?yàn)?Confirm 操作是冪等的,多次執(zhí)行不會(huì)產(chǎn)生副作用,確保事務(wù)最終提交。
Cancel 階段:需要進(jìn)行重試,直到成功為止。因?yàn)?Cancel 操作也是冪等的,多次執(zhí)行不會(huì)產(chǎn)生副作用,確保事務(wù)最終回滾。
實(shí)現(xiàn)重試機(jī)制的建議:
冪等設(shè)計(jì):確保 Confirm 和 Cancel 方法是冪等的,即多次執(zhí)行產(chǎn)生相同的結(jié)果。
重試策略:不建議設(shè)置具體重試次數(shù),建議使用超時(shí)時(shí)間控制重試,而不是次數(shù)。Confirm/Cancel 應(yīng)保持無(wú)限重試,只添加一個(gè)最長(zhǎng)超時(shí)兜底即可:
# seata-server/conf/application.yml
# 全局事務(wù)超時(shí)時(shí)間配置
global:
transactionTimeoutMills: 60000 # 全局事務(wù)超時(shí)時(shí)間(毫秒),默認(rèn)60秒
# 不設(shè)置具體重試次數(shù),使用超時(shí)時(shí)間控制
# 服務(wù)器配置
server:
# 提交(Confirm)最長(zhǎng)重試超時(shí)時(shí)間,毫秒
# 1小時(shí) = 3600000ms
maxCommitRetryTimeout: 3600000
# 回滾(Cancel)最長(zhǎng)重試超時(shí)時(shí)間
maxRollbackRetryTimeout: 3600000
# 恢復(fù)機(jī)制配置
recovery:
# 回滾中狀態(tài)重試間隔,默認(rèn)1s
rollbackingRetryPeriod: 1000
# 提交中狀態(tài)重試間隔,默認(rèn)1s
committingRetryPeriod: 1000
業(yè)務(wù)日志:在 Confirm 和 Cancel 方法中添加詳細(xì)日志,便于排查重試失敗的原因。
超時(shí)處理:設(shè)置合理的全局事務(wù)超時(shí)時(shí)間,作為重試的最終兜底,避免重試無(wú)限期進(jìn)行。
-
人工補(bǔ)償:當(dāng)系統(tǒng)自動(dòng)重試失敗后,需要建立人工補(bǔ)償機(jī)制:
- 事務(wù)狀態(tài)監(jiān)控:定期掃描超時(shí)未完成的事務(wù),識(shí)別需要人工干預(yù)的事務(wù)
- 補(bǔ)償流程:建立標(biāo)準(zhǔn)化的人工補(bǔ)償流程,包括事務(wù)狀態(tài)分析、手動(dòng)執(zhí)行 Confirm/Cancel 操作
- 操作記錄:記錄所有人工補(bǔ)償操作,便于審計(jì)和追溯
- 告警機(jī)制:當(dāng)出現(xiàn)大量需要人工補(bǔ)償?shù)氖聞?wù)時(shí),及時(shí)觸發(fā)告警,通知運(yùn)維人員
TCC 模式的常見(jiàn)問(wèn)題
1. 冪等問(wèn)題
問(wèn)題:由于網(wǎng)絡(luò)重試或其他原因,Confirm 或 Cancel 操作可能被多次調(diào)用,如果不處理冪等性,可能導(dǎo)致業(yè)務(wù)邏輯錯(cuò)誤(如重復(fù)扣錢、重復(fù)解凍)。
2. 空回滾問(wèn)題
問(wèn)題:當(dāng) Try 操作由于網(wǎng)絡(luò)超時(shí)等原因未執(zhí)行,但 TC 仍然會(huì)調(diào)用 Cancel 操作,導(dǎo)致 Cancel 操作處理一個(gè)不存在的分支事務(wù)(不該回滾時(shí)別亂回滾)。
3. 懸掛問(wèn)題
問(wèn)題:當(dāng) Try 操作由于網(wǎng)絡(luò)延遲等原因后于 Cancel 操作執(zhí)行,導(dǎo)致 Cancel 操作執(zhí)行后,Try 操作才開(kāi)始執(zhí)行,造成數(shù)據(jù)不一致(別回滾完了又去凍結(jié))。
TCC 模式的統(tǒng)一解決方案
為了同時(shí)解決冪等、空回滾和懸掛問(wèn)題,建議使用 TCC 事務(wù)日志表來(lái)統(tǒng)一管理事務(wù)狀態(tài),所有操作都圍繞這張表進(jìn)行狀態(tài)判斷。
TCC 事務(wù)日志表結(jié)構(gòu):
CREATE TABLE `tcc_transaction_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT, -- 主鍵ID
`xid` varchar(100) NOT NULL, -- 全局事務(wù)ID
`branch_id` bigint(20) NOT NULL, -- 分支事務(wù)ID
`business_key` varchar(100) NOT NULL, -- 業(yè)務(wù)唯一標(biāo)識(shí)(如訂單號(hào))
`status` int(11) NOT NULL, -- 事務(wù)狀態(tài):1-Try成功,2-Confirm成功,3-Cancel成功
`create_time` datetime NOT NULL, -- 創(chuàng)建時(shí)間
`update_time` datetime NOT NULL, -- 更新時(shí)間
PRIMARY KEY (`id`),
UNIQUE KEY `uk_xid_branch` (`xid`, `branch_id`), -- 確保每個(gè)分支事務(wù)只有一條記錄
KEY `idx_business_key` (`business_key`) -- 業(yè)務(wù)鍵索引
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='TCC transaction log table';
基于上述 OrderTccServiceImpl 提供修改示例:
public enum TccStatusEnum {
/**
* Try 已執(zhí)行
*/
TRY_SUCCESS(1),
/**
* Confirm 已執(zhí)行
*/
CONFIRM_SUCCESS(2),
/**
* Cancel 已執(zhí)行
*/
CANCEL_SUCCESS(3);
private final int code;
TccStatusEnum(int code) {
this.code = code;
}
public int getCode() {
return code;
}
}
@Service
public class OrderTccServiceImpl implements OrderTccService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private TccTransactionLogMapper logMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean tryCreateOrder(@BusinessActionContextParameter(paramName = "order") Order order) {
String xid = RootContext.getXID();
long branchId = BranchContext.getBranchId();
// ======================== 防懸掛 ========================
TccTransactionLog log = logMapper.selectByXidAndBranchId(xid, branchId);
if (log != null) {
// 已經(jīng) Cancel(空回滾)→ 絕對(duì)禁止執(zhí)行 Try
if (log.getStatus() == TccStatusEnum.CANCEL_SUCCESS.getCode()) {
System.out.println("防懸掛攔截:Cancel已執(zhí)行,Try拒絕執(zhí)行");
return false;
}
// 已經(jīng)執(zhí)行過(guò) Try → 冪等,返回成功
if (log.getStatus() == TccStatusEnum.TRY_SUCCESS.getCode()) {
return true;
}
}
// Try 業(yè)務(wù)邏輯
order.setStatus(0);
boolean result = orderMapper.insert(order) > 0;
if (result) {
log = new TccTransactionLog();
log.setXid(xid);
log.setBranchId(branchId);
log.setBusinessKey(order.getOrderNo());
log.setStatus(TccStatusEnum.TRY_SUCCESS.getCode());
log.setCreateTime(new Date());
log.setUpdateTime(new Date());
logMapper.insert(log);
} else {
throw new RuntimeException("創(chuàng)建訂單失敗");
}
return true;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean commitCreateOrder(BusinessActionContext context) {
String xid = context.getXid();
long branchId = context.getBranchId();
TccTransactionLog log = logMapper.selectByXidAndBranchId(xid, branchId);
if (log == null) {
System.out.println("Confirm:日志不存在");
return false;
}
// 冪等:已 Confirm
if (log.getStatus() == TccStatusEnum.CONFIRM_SUCCESS.getCode()) {
return true;
}
// 只有 Try 成功才能 Confirm
if (log.getStatus() != TccStatusEnum.TRY_SUCCESS.getCode()) {
System.out.println("Confirm:狀態(tài)非法");
return false;
}
// 執(zhí)行業(yè)務(wù)
Order order = (Order) context.getActionContext("order");
order.setStatus(1);
boolean update = orderMapper.updateById(order) > 0;
if (update) {
log.setStatus(TccStatusEnum.CONFIRM_SUCCESS.getCode());
log.setUpdateTime(new Date());
logMapper.updateById(log);
}
return update;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean cancelCreateOrder(BusinessActionContext context) {
String xid = context.getXid();
long branchId = context.getBranchId();
TccTransactionLog log = logMapper.selectByXidAndBranchId(xid, branchId);
// ======================== 空回滾 ========================
if (log == null) {
log = new TccTransactionLog();
log.setXid(xid);
log.setBranchId(branchId);
log.setStatus(TccStatusEnum.CANCEL_SUCCESS.getCode());
log.setCreateTime(new Date());
log.setUpdateTime(new Date());
logMapper.insert(log);
return true;
}
// 冪等:已 Cancel
if (log.getStatus() == TccStatusEnum.CANCEL_SUCCESS.getCode()) {
return true;
}
// 只有 Try 成功才能 Cancel
if (log.getStatus() != TccStatusEnum.TRY_SUCCESS.getCode()) {
System.out.println("Cancel:狀態(tài)非法");
return false;
}
// 執(zhí)行業(yè)務(wù)
Order order = (Order) context.getActionContext("order");
boolean delete = orderMapper.deleteById(order.getId()) > 0;
if (delete) {
log.setStatus(TccStatusEnum.CANCEL_SUCCESS.getCode());
log.setUpdateTime(new Date());
logMapper.updateById(log);
}
return delete;
}
}
執(zhí)行流程:
- 所有 Try 成功 → 全局提交 → 執(zhí)行所有 Confirm
- 任意 Try 失敗 / 超時(shí) → 全局回滾 → 執(zhí)行所有 Cancel
- Confirm 和 Cancel 絕對(duì)互斥,永遠(yuǎn)不會(huì)同時(shí)執(zhí)行
- Confirm / Cancel 失敗,Seata 會(huì)自動(dòng)重試,直到成功
七、SAGA 模式示例
SAGA 模式適用于長(zhǎng)事務(wù)和復(fù)雜業(yè)務(wù)流程,通過(guò)狀態(tài)機(jī)定義服務(wù)調(diào)用流程,支持正向服務(wù)和補(bǔ)償服務(wù)。以下以“下單-扣庫(kù)存-扣余額”場(chǎng)景為例,實(shí)現(xiàn) SAGA 模式分布式事務(wù)。
7.1 SAGA 模式實(shí)現(xiàn)
1. 定義狀態(tài)機(jī)配置
@Configuration
public class SagaStateMachineConfig {
@Bean
public StateMachineFactory<String, String, Object> stateMachineFactory() {
// 定義狀態(tài)機(jī)構(gòu)建器
StateMachineBuilder.Builder<String, String, Object> builder = StateMachineBuilder.builder();
// 定義狀態(tài)和轉(zhuǎn)換
builder.configureStates()
.withStates()
.initial("START")
.states(EnumSet.of("CREATE_ORDER", "DEDUCT_INVENTORY", "DEDUCT_ACCOUNT", "END"))
.end("END")
.end("ROLLBACK");
// 定義轉(zhuǎn)換
builder.configureTransitions()
// 正向流程
.withExternal()
.source("START").target("CREATE_ORDER").event("CREATE_ORDER")
.and()
.withExternal()
.source("CREATE_ORDER").target("DEDUCT_INVENTORY").event("DEDUCT_INVENTORY")
.and()
.withExternal()
.source("DEDUCT_INVENTORY").target("DEDUCT_ACCOUNT").event("DEDUCT_ACCOUNT")
.and()
.withExternal()
.source("DEDUCT_ACCOUNT").target("END").event("SUCCESS")
// 回滾流程
.and()
.withExternal()
.source("CREATE_ORDER").target("ROLLBACK").event("ROLLBACK_CREATE_ORDER")
.and()
.withExternal()
.source("DEDUCT_INVENTORY").target("ROLLBACK").event("ROLLBACK_DEDUCT_INVENTORY")
.and()
.withExternal()
.source("DEDUCT_ACCOUNT").target("ROLLBACK").event("ROLLBACK_DEDUCT_ACCOUNT");
return builder.build();
}
}
2. 業(yè)務(wù)服務(wù)實(shí)現(xiàn)
@Service
public class SagaOrderService {
@Autowired
private StateMachineFactory<String, String, Object> stateMachineFactory;
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryFeignClient inventoryFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
@GlobalTransactional
public void createOrderWithSaga(OrderDTO orderDTO) {
// 創(chuàng)建狀態(tài)機(jī)實(shí)例
StateMachine<String, String, Object> stateMachine = stateMachineFactory.getStateMachine(UUID.randomUUID().toString());
try {
// 啟動(dòng)狀態(tài)機(jī)
stateMachine.start();
// 1. 執(zhí)行創(chuàng)建訂單
Order order = new Order();
order.setOrderNo(UUID.randomUUID().toString());
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setCount(orderDTO.getCount());
order.setAmount(orderDTO.getAmount());
order.setStatus(1); // 訂單狀態(tài):1-待支付
orderMapper.insert(order);
stateMachine.sendEvent(MessageBuilder.withPayload("CREATE_ORDER").build());
// 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存
InventoryDTO inventoryDTO = new InventoryDTO();
inventoryDTO.setProductId(orderDTO.getProductId());
inventoryDTO.setCount(orderDTO.getCount());
inventoryFeignClient.deductSaga(inventoryDTO);
stateMachine.sendEvent(MessageBuilder.withPayload("DEDUCT_INVENTORY").build());
// 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額
AccountDTO accountDTO = new AccountDTO();
accountDTO.setUserId(orderDTO.getUserId());
accountDTO.setAmount(orderDTO.getAmount());
accountFeignClient.deductSaga(accountDTO);
stateMachine.sendEvent(MessageBuilder.withPayload("DEDUCT_ACCOUNT").build());
// 4. 完成事務(wù)
stateMachine.sendEvent(MessageBuilder.withPayload("SUCCESS").build());
} catch (Exception e) {
// 根據(jù)當(dāng)前狀態(tài)執(zhí)行相應(yīng)的回滾
String currentState = stateMachine.getState().getId();
switch (currentState) {
case "CREATE_ORDER":
// 回滾訂單創(chuàng)建
stateMachine.sendEvent(MessageBuilder.withPayload("ROLLBACK_CREATE_ORDER").build());
break;
case "DEDUCT_INVENTORY":
// 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),回滾庫(kù)存扣減
InventoryDTO inventoryDTO = new InventoryDTO();
inventoryDTO.setProductId(orderDTO.getProductId());
inventoryDTO.setCount(orderDTO.getCount());
inventoryFeignClient.rollbackSaga(inventoryDTO);
stateMachine.sendEvent(MessageBuilder.withPayload("ROLLBACK_DEDUCT_INVENTORY").build());
break;
case "DEDUCT_ACCOUNT":
// 遠(yuǎn)程調(diào)用賬戶服務(wù),回滾賬戶扣減
AccountDTO accountDTO = new AccountDTO();
accountDTO.setUserId(orderDTO.getUserId());
accountDTO.setAmount(orderDTO.getAmount());
accountFeignClient.rollbackSaga(accountDTO);
stateMachine.sendEvent(MessageBuilder.withPayload("ROLLBACK_DEDUCT_ACCOUNT").build());
break;
}
throw e;
} finally {
stateMachine.stop();
}
}
}
@RestController
@RequestMapping("/order")
public class SagaOrderController {
@Autowired
private SagaOrderService sagaOrderService;
@PostMapping("/create-saga")
public String createOrderSaga(@RequestBody OrderDTO orderDTO) {
sagaOrderService.createOrderWithSaga(orderDTO);
return "訂單創(chuàng)建成功(SAGA模式)";
}
}
3. 庫(kù)存服務(wù) SAGA 實(shí)現(xiàn)
@RestController
@RequestMapping("/inventory")
public class InventorySagaController {
@Autowired
private InventoryService inventoryService;
@PostMapping("/deduct-saga")
public String deductSaga(@RequestBody InventoryDTO inventoryDTO) {
inventoryService.deduct(inventoryDTO.getProductId(), inventoryDTO.getCount());
return "庫(kù)存扣減成功(SAGA模式)";
}
@PostMapping("/rollback-saga")
public String rollbackSaga(@RequestBody InventoryDTO inventoryDTO) {
// 回滾庫(kù)存扣減,恢復(fù)庫(kù)存
Inventory inventory = inventoryService.getByProductId(inventoryDTO.getProductId());
inventory.setStock(inventory.getStock() + inventoryDTO.getCount());
inventoryService.update(inventory);
return "庫(kù)存回滾成功(SAGA模式)";
}
}
4. 賬戶服務(wù) SAGA 實(shí)現(xiàn)
@RestController
@RequestMapping("/account")
public class AccountSagaController {
@Autowired
private AccountService accountService;
@PostMapping("/deduct-saga")
public String deductSaga(@RequestBody AccountDTO accountDTO) {
accountService.deduct(accountDTO.getUserId(), accountDTO.getAmount());
return "賬戶扣減成功(SAGA模式)";
}
@PostMapping("/rollback-saga")
public String rollbackSaga(@RequestBody AccountDTO accountDTO) {
// 回滾賬戶扣減,恢復(fù)余額
Account account = accountService.getByUserId(accountDTO.getUserId());
account.setBalance(account.getBalance().add(accountDTO.getAmount()));
accountService.update(account);
return "賬戶回滾成功(SAGA模式)";
}
}
八、XA 模式示例
XA 模式適用于需要強(qiáng)一致性的場(chǎng)景,依賴數(shù)據(jù)庫(kù)原生 XA 協(xié)議,提供最強(qiáng)的數(shù)據(jù)一致性保證。以下以“下單-扣庫(kù)存-扣余額”場(chǎng)景為例,實(shí)現(xiàn) XA 模式分布式事務(wù)。
8.1 XA 模式實(shí)現(xiàn)
1. 配置文件開(kāi)啟 XA 模式
# application.yaml
spring:
cloud:
seata:
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
data-source-proxy:
mode: XA # 開(kāi)啟 XA 模式
2. 訂單服務(wù) XA 實(shí)現(xiàn)
@RestController
@RequestMapping("/order")
public class XAOrderController {
@Autowired
private XAOrderService xaOrderService;
@PostMapping("/create-xa")
// 定義全局事務(wù),使用 XA 模式
@GlobalTransactional(name = "create-order-xa", rollbackFor = Exception.class)
public String createOrderXA(@RequestBody OrderDTO orderDTO) {
xaOrderService.createOrder(orderDTO);
return "訂單創(chuàng)建成功(XA模式)";
}
}
@Service
public class XAOrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryFeignClient inventoryFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
public void createOrder(OrderDTO orderDTO) {
// 1. 創(chuàng)建訂單(本地分支事務(wù) - XA)
Order order = new Order();
order.setOrderNo(UUID.randomUUID().toString());
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setCount(orderDTO.getCount());
order.setAmount(orderDTO.getAmount());
order.setStatus(1); // 訂單狀態(tài):1-待支付
orderMapper.insert(order);
// 2. 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存(遠(yuǎn)程分支事務(wù) - XA)
InventoryDTO inventoryDTO = new InventoryDTO();
inventoryDTO.setProductId(orderDTO.getProductId());
inventoryDTO.setCount(orderDTO.getCount());
inventoryFeignClient.deductXa(inventoryDTO);
// 3. 遠(yuǎn)程調(diào)用賬戶服務(wù),扣減余額(遠(yuǎn)程分支事務(wù) - XA)
AccountDTO accountDTO = new AccountDTO();
accountDTO.setUserId(orderDTO.getUserId());
accountDTO.setAmount(orderDTO.getAmount());
accountFeignClient.deductXa(accountDTO);
}
}
3. 庫(kù)存服務(wù) XA 實(shí)現(xiàn)
@RestController
@RequestMapping("/inventory")
public class XAInventoryController {
@Autowired
private XAInventoryService inventoryService;
@PostMapping("/deduct-xa")
public String deductXA(@RequestBody InventoryDTO inventoryDTO) {
inventoryService.deduct(inventoryDTO.getProductId(), inventoryDTO.getCount());
return "庫(kù)存扣減成功(XA模式)";
}
}
@Service
public class XAInventoryService {
@Autowired
private InventoryMapper inventoryMapper;
public void deduct(Long productId, Integer count) {
// 檢查庫(kù)存是否充足
Inventory inventory = inventoryMapper.selectByProductId(productId);
if (inventory.getStock() < count) {
throw new RuntimeException("庫(kù)存不足"); // 拋出異常,觸發(fā)全局回滾
}
// 扣減庫(kù)存
inventory.setStock(inventory.getStock() - count);
inventoryMapper.updateById(inventory);
}
}
4. 賬戶服務(wù) XA 實(shí)現(xiàn)
@RestController
@RequestMapping("/account")
public class XAAccountController {
@Autowired
private XAAccountService accountService;
@PostMapping("/deduct-xa")
public String deductXA(@RequestBody AccountDTO accountDTO) {
accountService.deduct(accountDTO.getUserId(), accountDTO.getAmount());
return "賬戶扣減成功(XA模式)";
}
}
@Service
public class XAAccountService {
@Autowired
private AccountMapper accountMapper;
public void deduct(Long userId, BigDecimal amount) {
// 檢查余額是否充足
Account account = accountMapper.selectByUserId(userId);
if (account.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("余額不足"); // 拋出異常,觸發(fā)全局回滾
}
// 扣減余額
account.setBalance(account.getBalance().subtract(amount));
accountMapper.updateById(account);
}
}
九、生產(chǎn)環(huán)境部署
開(kāi)發(fā)環(huán)境的單機(jī)部署無(wú)法滿足生產(chǎn)環(huán)境的高可用需求,生產(chǎn)環(huán)境需部署 Seata 集群、注冊(cè)中心集群和數(shù)據(jù)庫(kù)主從,確保服務(wù)穩(wěn)定運(yùn)行。
9.1 集群部署
生產(chǎn)環(huán)境推薦使用 DB 模式存儲(chǔ)事務(wù)數(shù)據(jù)(替代單機(jī)的 file 模式),確保集群數(shù)據(jù)一致性,修改 file.conf 配置:
store {
mode = "db" # 存儲(chǔ)模式:db(數(shù)據(jù)庫(kù)),生產(chǎn)環(huán)境推薦
db {
datasource = "druid" # 數(shù)據(jù)源類型
dbType = "mysql" # 數(shù)據(jù)庫(kù)類型
driverClassName = "com.mysql.jdbc.Driver" # 數(shù)據(jù)庫(kù)驅(qū)動(dòng)
url = "jdbc:mysql://127.0.0.1:3306/seata?useSSL=false" # 數(shù)據(jù)庫(kù)地址(需提前創(chuàng)建 seata 數(shù)據(jù)庫(kù))
user = "root" # 數(shù)據(jù)庫(kù)用戶名
password = "123456" # 數(shù)據(jù)庫(kù)密碼
}
}
部署多個(gè) Seata Server 節(jié)點(diǎn),指定不同端口,組成集群:
# 節(jié)點(diǎn) 1:端口 8091
sh bin/seata-server.sh -h 127.0.0.1 -p 8091
# 節(jié)點(diǎn) 2:端口 8092
sh bin/seata-server.sh -h 127.0.0.1 -p 8092
# 節(jié)點(diǎn) 3:端口 8093
sh bin/seata-server.sh -h 127.0.0.1 -p 8093
所有節(jié)點(diǎn)配置相同的 Nacos 注冊(cè)中心和 DB 存儲(chǔ),Nacos 會(huì)自動(dòng)將多個(gè) Seata Server 節(jié)點(diǎn)識(shí)別為一個(gè)集群,實(shí)現(xiàn)負(fù)載均衡和故障轉(zhuǎn)移。
9.2 高可用性配置
Nacos 集群:部署多個(gè) Nacos 節(jié)點(diǎn),避免注冊(cè)中心單點(diǎn)故障,確保 Seata Server 和業(yè)務(wù)客戶端能夠正常注冊(cè)和發(fā)現(xiàn)。
Seata 集群:至少部署 2 個(gè) Seata Server 節(jié)點(diǎn),實(shí)現(xiàn)故障轉(zhuǎn)移,某一節(jié)點(diǎn)宕機(jī)后,其他節(jié)點(diǎn)可繼續(xù)提供服務(wù)。
數(shù)據(jù)庫(kù)主從:配置 MySQL 主從復(fù)制,Seata 數(shù)據(jù)庫(kù)的主庫(kù)宕機(jī)后,從庫(kù)可切換為新主庫(kù),確保事務(wù)數(shù)據(jù)不丟失。
十、最佳實(shí)踐
在實(shí)際項(xiàng)目中,合理運(yùn)用 Seata 的特性,結(jié)合業(yè)務(wù)場(chǎng)景優(yōu)化設(shè)計(jì),可在保證數(shù)據(jù)一致性的同時(shí),提升系統(tǒng)性能和穩(wěn)定性。
10.1 事務(wù)邊界設(shè)計(jì)
最小化事務(wù)范圍:只包含必要的業(yè)務(wù)操作,避免將無(wú)關(guān)操作納入分布式事務(wù),減少事務(wù)執(zhí)行時(shí)間。
避免長(zhǎng)事務(wù):將大事務(wù)拆分為多個(gè)小事務(wù),避免事務(wù)長(zhǎng)時(shí)間占用資源,降低超時(shí)和鎖沖突風(fēng)險(xiǎn)。
合理設(shè)置超時(shí)時(shí)間:根據(jù)業(yè)務(wù)執(zhí)行時(shí)間,設(shè)置合適的
globalTransactionTimeout參數(shù),避免事務(wù)卡住。
10.2 性能優(yōu)化
優(yōu)先使用 AT 模式:AT 模式對(duì)業(yè)務(wù)無(wú)侵入,性能優(yōu)于 TCC、XA 模式,適合大多數(shù)常規(guī)業(yè)務(wù)場(chǎng)景。
合理設(shè)置資源鎖定時(shí)間:避免長(zhǎng)時(shí)間鎖定數(shù)據(jù)庫(kù)資源,減少鎖沖突,提升并發(fā)能力。
使用異步處理:非核心操作(如日志記錄、消息通知)使用消息隊(duì)列異步處理,不納入分布式事務(wù)范圍。
10.3 監(jiān)控和告警
集成 Prometheus:監(jiān)控 Seata 服務(wù)的運(yùn)行狀態(tài)(如事務(wù)成功率、超時(shí)率、節(jié)點(diǎn)負(fù)載),實(shí)時(shí)掌握系統(tǒng)情況。
配置告警:設(shè)置事務(wù)失敗率、超時(shí)、節(jié)點(diǎn)宕機(jī)等告警規(guī)則,及時(shí)發(fā)現(xiàn)并處理異常。
日志管理:集中管理事務(wù)日志(如全局事務(wù) XID、分支事務(wù)狀態(tài)、回滾日志),便于問(wèn)題排查。
十一、常見(jiàn)問(wèn)題與解決方案
在 Seata 使用過(guò)程中,常見(jiàn)問(wèn)題主要集中在事務(wù)超時(shí)、網(wǎng)絡(luò)異常和數(shù)據(jù)一致性,以下為具體解決方案。
11.1 事務(wù)超時(shí)
問(wèn)題:事務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng),超過(guò)默認(rèn)超時(shí)時(shí)間,導(dǎo)致全局事務(wù)回滾。
解決方案:
優(yōu)化業(yè)務(wù)邏輯,減少事務(wù)執(zhí)行時(shí)間(如拆分大事務(wù)、優(yōu)化數(shù)據(jù)庫(kù)查詢);
合理設(shè)置
globalTransactionTimeout參數(shù),延長(zhǎng)超時(shí)時(shí)間(需結(jié)合業(yè)務(wù)實(shí)際);將大事務(wù)拆分為多個(gè)小事務(wù),降低單個(gè)事務(wù)的執(zhí)行時(shí)間。
11.2 網(wǎng)絡(luò)異常
問(wèn)題:服務(wù)間網(wǎng)絡(luò)波動(dòng),導(dǎo)致分支事務(wù)狀態(tài)無(wú)法正常上報(bào),或 TC 指令無(wú)法正常下發(fā),造成事務(wù)狀態(tài)不一致。
解決方案:
配置服務(wù)調(diào)用重試機(jī)制(如 Feign 重試),應(yīng)對(duì)短暫網(wǎng)絡(luò)波動(dòng);
使用可靠的網(wǎng)絡(luò)環(huán)境,避免網(wǎng)絡(luò)中斷、延遲過(guò)高;
實(shí)現(xiàn)業(yè)務(wù)冪等性設(shè)計(jì),避免重試導(dǎo)致的數(shù)據(jù)重復(fù)操作。
11.3 數(shù)據(jù)一致性
問(wèn)題:極端場(chǎng)景下(如 TC 宕機(jī)、數(shù)據(jù)庫(kù)異常),出現(xiàn)最終數(shù)據(jù)不一致。
解決方案:
使用可靠的消息隊(duì)列(如 RocketMQ、Kafka),實(shí)現(xiàn)最終一致性補(bǔ)償;
實(shí)現(xiàn)對(duì)賬機(jī)制,定期比對(duì)各服務(wù)的數(shù)據(jù),發(fā)現(xiàn)不一致時(shí)手動(dòng)修復(fù);
開(kāi)啟 Seata 的日志恢復(fù)機(jī)制,TC 重啟后可恢復(fù)未完成的事務(wù)。
十二、總結(jié)
Seata 為分布式事務(wù)提供了一套完整的解決方案,支持 AT、TCC、SAGA、XA 四種事務(wù)模式,可靈活適配不同業(yè)務(wù)場(chǎng)景,解決微服務(wù)架構(gòu)下的數(shù)據(jù)一致性難題。
通過(guò)本文的介紹,你應(yīng)該已經(jīng)掌握了:
Seata 的核心架構(gòu)(TC、TM、RM)和工作原理;
四種事務(wù)模式的使用場(chǎng)景和實(shí)現(xiàn)方式;
Seata 環(huán)境搭建(Server 部署、注冊(cè)/配置中心配置)和代碼實(shí)現(xiàn);
生產(chǎn)環(huán)境集群部署和高可用配置;
常見(jiàn)問(wèn)題的排查和解決方案。
在實(shí)際項(xiàng)目中,應(yīng)根據(jù)業(yè)務(wù)場(chǎng)景選擇合適的事務(wù)模式,結(jié)合消息隊(duì)列、緩存、數(shù)據(jù)庫(kù)主從等技術(shù),構(gòu)建可靠、高性能的分布式系統(tǒng)。同時(shí),關(guān)注 Seata 開(kāi)源項(xiàng)目的更新,及時(shí)升級(jí)版本,享受更完善的功能和更穩(wěn)定的性能。
Seata 作為一個(gè)活躍的開(kāi)源項(xiàng)目,正在不斷完善和發(fā)展,為微服務(wù)架構(gòu)下的分布式事務(wù)管理提供更加成熟的解決方案。