微服務(wù)中的分布式事務(wù)問(wèn)題
在單體應(yīng)用中,3個(gè)子模塊(庫(kù)存stock,訂單order,賬戶(hù)account),數(shù)據(jù)的一致性是由數(shù)據(jù)庫(kù)保證。

但是當(dāng)場(chǎng)景是微服務(wù)的場(chǎng)景時(shí)

庫(kù)存,訂單,賬戶(hù)只能保證各自的數(shù)據(jù)一致性,無(wú)法保證全局的數(shù)據(jù)一致性。
分布式事務(wù)seata
分布式事務(wù)的發(fā)展過(guò)程中,有一些著名的理論基礎(chǔ)如二階段提交協(xié)議(2pc),協(xié)議把分布式事務(wù)的過(guò)程分為2個(gè)階段進(jìn)行。
基于2pc的分布式事務(wù)模型有XA、AT、TCC、SAGA,他們的共同點(diǎn)都是基于2pc協(xié)議,同時(shí)各自有各自的特點(diǎn)和適用場(chǎng)景。
那么seata是什么,他跟2pc以及XA、AT、TCC、SAGA的關(guān)系?
Seata 是一款開(kāi)源的分布式事務(wù)解決方案,致力于在微服務(wù)架構(gòu)下提供高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。本質(zhì)上,seata是打包實(shí)現(xiàn)了 XA、AT、TCC、SAGA模型,同時(shí)支持Dubbo、Spring Cloud、Sofa-RPC、Motan 和 gRPC 等RPC框架的便捷接入,高可用等。
也就是2pc以及XA、AT、TCC、SAGA偏協(xié)議和理論。
seata基于協(xié)議和理論給出了具體實(shí)用的解決方案。
其中AT模式的方便易用,代碼無(wú)侵入的特點(diǎn)被廣泛使用。
seata解決方案圖示:

從圖中看出,seata整個(gè)分布式事務(wù)分為3個(gè)角色(RM,TM,TC)。
RM,TM是內(nèi)嵌于微服務(wù)中(或者說(shuō)內(nèi)嵌于客戶(hù)端中),TC是獨(dú)立部署的一個(gè)服務(wù)(相當(dāng)于服務(wù)端)。
TC (Transaction Coordinator) - 事務(wù)協(xié)調(diào)者
維護(hù)全局和分支事務(wù)的狀態(tài),驅(qū)動(dòng)全局事務(wù)提交或回滾。TM (Transaction Manager) - 事務(wù)管理器
定義全局事務(wù)的范圍:開(kāi)始全局事務(wù)、提交或回滾全局事務(wù)。RM (Resource Manager) - 資源管理器
管理分支事務(wù)處理的資源,與TC交談以注冊(cè)分支事務(wù)和報(bào)告分支事務(wù)的狀態(tài),并驅(qū)動(dòng)分支事務(wù)提交或回滾。
seata的-全局事務(wù)是由很多分支事務(wù)組成,一般來(lái)說(shuō),分支事務(wù)就是本地事務(wù)

seata的快速使用
用例
用戶(hù)購(gòu)買(mǎi)商品的業(yè)務(wù)邏輯。整個(gè)業(yè)務(wù)邏輯由3個(gè)微服務(wù)提供支持:
倉(cāng)儲(chǔ)服務(wù):對(duì)給定的商品扣除倉(cāng)儲(chǔ)數(shù)量。
訂單服務(wù):根據(jù)采購(gòu)需求創(chuàng)建訂單。
帳戶(hù)服務(wù):從用戶(hù)帳戶(hù)中扣除余額。
架構(gòu)圖

倉(cāng)儲(chǔ)服務(wù)
public interface StorageService {
/**
* 扣除存儲(chǔ)數(shù)量
*/
void deduct(String commodityCode, int count);
}
訂單服務(wù)
public interface OrderService {
/**
* 創(chuàng)建訂單
*/
Order create(String userId, String commodityCode, int orderCount);
}
帳戶(hù)服務(wù)
public interface AccountService {
/**
* 從用戶(hù)賬戶(hù)中借出
*/
void debit(String userId, int money);
}
主要業(yè)務(wù)邏輯
public class BusinessServiceImpl implements BusinessService {
private StorageService storageService;
private OrderService orderService;
/**
* 采購(gòu)
*/
public void purchase(String userId, String commodityCode, int orderCount) {
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
}
}
public class OrderServiceImpl implements OrderService {
private OrderDAO orderDAO;
private AccountService accountService;
public Order create(String userId, String commodityCode, int orderCount) {
int orderMoney = calculate(commodityCode, orderCount);
accountService.debit(userId, orderMoney);
Order order = new Order();
order.userId = userId;
order.commodityCode = commodityCode;
order.count = orderCount;
order.money = orderMoney;
// INSERT INTO orders ...
return orderDAO.insert(order);
}
}
seata 的分布式交易解決方案

我們只需要使用一個(gè) @GlobalTransactional 注解在業(yè)務(wù)方法上:
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
......
}
dubbo中的seata
Business發(fā)起全局事務(wù),那么Storage,Order,Account是如何感知的,或者說(shuō)他們是如何串在一起的。
這里涉及一個(gè)xid的概念,xid是分布式全局事務(wù)的唯一id,所以只要Storage,Order,Account綁定的都是同一個(gè)xid就能保證他們是在同一個(gè)全局事務(wù)中,所以只要保證xid在鏈路上的傳遞就可以了。
在dubbo中,xid的傳遞是通過(guò)Fitter實(shí)現(xiàn)的。
/**
* The type Transaction propagation filter.
*/
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID(); // 獲取當(dāng)前事務(wù) XID
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 獲取 RPC 調(diào)用傳遞過(guò)來(lái)的 XID
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
}
boolean bind = false;
if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
if (rpcXid != null) { // Provider:把 RPC 調(diào)用傳遞來(lái)的 XID 綁定到當(dāng)前運(yùn)行時(shí)
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + rpcXid + "] to RootContext");
}
}
}
try {
return invoker.invoke(invocation); // 業(yè)務(wù)方法的調(diào)用
} finally {
if (bind) { // Provider:調(diào)用完成后,對(duì) XID 的清理
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) { // 調(diào)用過(guò)程有新的事務(wù)上下文開(kāi)啟,則不能清除
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
}