分布式事務(wù)&seata

微服務(wù)中的分布式事務(wù)問(wèn)題

在單體應(yīng)用中,3個(gè)子模塊(庫(kù)存stock,訂單order,賬戶(hù)account),數(shù)據(jù)的一致性是由數(shù)據(jù)庫(kù)保證。

image.png

但是當(dāng)場(chǎng)景是微服務(wù)的場(chǎng)景時(shí)
image.png

庫(kù)存,訂單,賬戶(hù)只能保證各自的數(shù)據(jù)一致性,無(wú)法保證全局的數(shù)據(jù)一致性。

分布式事務(wù)seata

分布式事務(wù)的發(fā)展過(guò)程中,有一些著名的理論基礎(chǔ)如二階段提交協(xié)議(2pc),協(xié)議把分布式事務(wù)的過(guò)程分為2個(gè)階段進(jìn)行。
基于2pc的分布式事務(wù)模型有XA、AT、TCC、SAGA,他們的共同點(diǎn)都是基于2pc協(xié)議,同時(shí)各自有各自的特點(diǎn)和適用場(chǎng)景。
那么seata是什么,他跟2pc以及XA、AT、TCC、SAGA的關(guān)系?
Seata 是一款開(kāi)源的分布式事務(wù)解決方案,致力于在微服務(wù)架構(gòu)下提供高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。本質(zhì)上,seata是打包實(shí)現(xiàn)了 XA、AT、TCC、SAGA模型,同時(shí)支持Dubbo、Spring Cloud、Sofa-RPC、Motan 和 gRPC 等RPC框架的便捷接入,高可用等。
也就是2pc以及XA、AT、TCC、SAGA偏協(xié)議和理論。
seata基于協(xié)議和理論給出了具體實(shí)用的解決方案。
其中AT模式的方便易用,代碼無(wú)侵入的特點(diǎn)被廣泛使用。

seata解決方案圖示:


image.png

從圖中看出,seata整個(gè)分布式事務(wù)分為3個(gè)角色(RM,TM,TC)。
RM,TM是內(nèi)嵌于微服務(wù)中(或者說(shuō)內(nèi)嵌于客戶(hù)端中),TC是獨(dú)立部署的一個(gè)服務(wù)(相當(dāng)于服務(wù)端)。

  • TC (Transaction Coordinator) - 事務(wù)協(xié)調(diào)者
    維護(hù)全局和分支事務(wù)的狀態(tài),驅(qū)動(dòng)全局事務(wù)提交或回滾。

  • TM (Transaction Manager) - 事務(wù)管理器
    定義全局事務(wù)的范圍:開(kāi)始全局事務(wù)、提交或回滾全局事務(wù)。

  • RM (Resource Manager) - 資源管理器
    管理分支事務(wù)處理的資源,與TC交談以注冊(cè)分支事務(wù)和報(bào)告分支事務(wù)的狀態(tài),并驅(qū)動(dòng)分支事務(wù)提交或回滾。

seata的-全局事務(wù)是由很多分支事務(wù)組成,一般來(lái)說(shuō),分支事務(wù)就是本地事務(wù)


image.png

seata的快速使用

用例

用戶(hù)購(gòu)買(mǎi)商品的業(yè)務(wù)邏輯。整個(gè)業(yè)務(wù)邏輯由3個(gè)微服務(wù)提供支持:

倉(cāng)儲(chǔ)服務(wù):對(duì)給定的商品扣除倉(cāng)儲(chǔ)數(shù)量。
訂單服務(wù):根據(jù)采購(gòu)需求創(chuàng)建訂單。
帳戶(hù)服務(wù):從用戶(hù)帳戶(hù)中扣除余額。

架構(gòu)圖
image.png
倉(cāng)儲(chǔ)服務(wù)
public interface StorageService {

    /**
     * 扣除存儲(chǔ)數(shù)量
     */
    void deduct(String commodityCode, int count);
}
訂單服務(wù)
public interface OrderService {

    /**
     * 創(chuàng)建訂單
     */
    Order create(String userId, String commodityCode, int orderCount);
}
帳戶(hù)服務(wù)
public interface AccountService {

    /**
     * 從用戶(hù)賬戶(hù)中借出
     */
    void debit(String userId, int money);
}
主要業(yè)務(wù)邏輯
public class BusinessServiceImpl implements BusinessService {

    private StorageService storageService;

    private OrderService orderService;

    /**
     * 采購(gòu)
     */
    public void purchase(String userId, String commodityCode, int orderCount) {

        storageService.deduct(commodityCode, orderCount);

        orderService.create(userId, commodityCode, orderCount);
    }
}
public class OrderServiceImpl implements OrderService {

    private OrderDAO orderDAO;

    private AccountService accountService;

    public Order create(String userId, String commodityCode, int orderCount) {

        int orderMoney = calculate(commodityCode, orderCount);

        accountService.debit(userId, orderMoney);

        Order order = new Order();
        order.userId = userId;
        order.commodityCode = commodityCode;
        order.count = orderCount;
        order.money = orderMoney;

        // INSERT INTO orders ...
        return orderDAO.insert(order);
    }
}
seata 的分布式交易解決方案
image.png

我們只需要使用一個(gè) @GlobalTransactional 注解在業(yè)務(wù)方法上:


    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        ......
    }
dubbo中的seata

Business發(fā)起全局事務(wù),那么Storage,Order,Account是如何感知的,或者說(shuō)他們是如何串在一起的。
這里涉及一個(gè)xid的概念,xid是分布式全局事務(wù)的唯一id,所以只要Storage,Order,Account綁定的都是同一個(gè)xid就能保證他們是在同一個(gè)全局事務(wù)中,所以只要保證xid在鏈路上的傳遞就可以了。
在dubbo中,xid的傳遞是通過(guò)Fitter實(shí)現(xiàn)的。

/**
 * The type Transaction propagation filter.
 */
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID(); // 獲取當(dāng)前事務(wù) XID
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 獲取 RPC 調(diào)用傳遞過(guò)來(lái)的 XID
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) { // Provider:把 RPC 調(diào)用傳遞來(lái)的 XID 綁定到當(dāng)前運(yùn)行時(shí)
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation); // 業(yè)務(wù)方法的調(diào)用

        } finally {
            if (bind) { // Provider:調(diào)用完成后,對(duì) XID 的清理
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) { // 調(diào)用過(guò)程有新的事務(wù)上下文開(kāi)啟,則不能清除
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}
參考

seata github
seata官方文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容