可靠消息與分布式事務(wù)

可靠消息與分布式事務(wù)

1 概述

前面的文章https://zhuanlan.zhihu.com/p/92866118講述了Seata對分布式事務(wù)的支持,當(dāng)生產(chǎn)環(huán)境中沒有seata的部署時(shí),我們?nèi)绾瓮ㄟ^可靠消息例如RocketMQ處理分布式事務(wù)。

2 分布式事務(wù)的困難

分布式事務(wù),全局事務(wù)就是一個(gè)分布式系統(tǒng),我們都知道,事務(wù)具有ACID特性,但是對于分布式環(huán)境,ACID是不能完整的得到保障的。

在分布式系統(tǒng)中,有個(gè)著名的CAP定理,指的是在一個(gè)分布式系統(tǒng)中,一致性(Consistency)、可用性(Availability)、分區(qū)容錯(cuò)性(Partition tolerance)。CAP 原則指的是,這三個(gè)要素最多只能同時(shí)實(shí)現(xiàn)兩點(diǎn),不可能三者兼顧。

但是在實(shí)際的分布式環(huán)境中,P是一定會(huì)存在的,所以在分布式環(huán)境中。當(dāng)P出現(xiàn)時(shí),我們需要在C和A之間做取舍,要和選擇CP,要么選擇AP。

有沒有CA系統(tǒng)呢?有,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫就是CA系統(tǒng),因?yàn)樗粫?huì)出現(xiàn)分區(qū)的情況,單庫提供的ACID保障,它不是分布式的。常用的存儲(chǔ)系統(tǒng)在CAP中的取舍如下圖所示:

image.png

由此可知,分布式事務(wù)同樣也要在C和A之間做取舍,因此分布式事務(wù)難做,在發(fā)生P時(shí),我們需要放棄C和A中的一個(gè)。

3 事務(wù)消息的分布式事務(wù)

在使用可靠消息做分布式事務(wù)時(shí),我們需要保證本地事務(wù)與消息的發(fā)送能有原子性,即本地事務(wù)成功則消息發(fā)送成功,本地事務(wù)失敗則消息不發(fā)送或消息回滾,而RocketMQ的事務(wù)消息支持兩階段提交和事務(wù)回查,能保證消息的發(fā)送與本地事務(wù)之間的原子性。

我們先看看RocketMQ官網(wǎng)的示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

TransactionListener的實(shí)現(xiàn)如下:

import ...
 
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
 
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
 
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
 
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }

TransactionListener的定義如下:

image.png

其中包含兩個(gè)方法:

· executeLocalTransaction方法會(huì)在發(fā)送消息后調(diào)用,用于執(zhí)行本地事務(wù),如果本地事務(wù)執(zhí)行成功,rocketmq再提交消息

· checkLocalTransaction用于對本地事務(wù)做檢查,rocketmq依賴此方法做補(bǔ)償,補(bǔ)償由rocketmq的broker主動(dòng)發(fā)起,因?yàn)橄⒌腸ommit或者rollback可能會(huì)失敗,因此需要這種補(bǔ)償機(jī)制保證消息的commit或者rollback執(zhí)行成功

而當(dāng)一個(gè)事務(wù)消息commit成功后,消費(fèi)者才能消費(fèi)到此消息,RocketMQ通過事務(wù)消息的方式提供了分布式事務(wù)的支持。

RocketMQ事務(wù)消息的分布式事務(wù)方案示意圖如下:

image.png

Producer的本地事務(wù)和消息的發(fā)送是具有原子性的,因此當(dāng)本地事務(wù)成功后,consumer一定會(huì)接收到消息。

但是通過RocketMQ的事務(wù)消息實(shí)現(xiàn)的分布式事務(wù)是有局限性的,它默認(rèn)了當(dāng)本地事務(wù)執(zhí)行成功時(shí),consumer收到消息后的業(yè)務(wù)處理一定能執(zhí)行成功,一旦consumer無法完成事務(wù),全局事務(wù)將無法回滾。舉個(gè)例子,對于庫存扣減類需求,下單后需要扣減庫存,而扣減庫存在獨(dú)立的服務(wù)中,此時(shí)如果單純的使用事務(wù)消息無法滿足,因?yàn)閹齑鏋?后,扣減會(huì)失敗,而此時(shí)全局事務(wù)無法回滾。

對于事務(wù)消息實(shí)現(xiàn)的分布式事務(wù),它在ACID特性上的保障非常弱:

· 原子性:部分場景下能保證,但是對于庫存扣減類需求,不適合單純的使用事務(wù)消息處理分布式事務(wù),因?yàn)闆]法回滾

· 一致性:不能保證,只能得到最終一致

· 隔離性:無保證,多個(gè)事務(wù)之間不隔離

· 持久性:能保證

可靠消息實(shí)現(xiàn)的分布式事務(wù),實(shí)際上是在發(fā)生P時(shí),選擇了A放棄了C,它是BASE原理的應(yīng)用,BASE是Basically Available(基本可用)、Soft state(軟狀態(tài))和Eventually consistent(最終一致性)的簡寫。BASE是對CAP中一致性和可用性權(quán)衡的結(jié)果,核心思想是即使無法做到強(qiáng)一致性,但每個(gè)應(yīng)用都可以根據(jù)自身的業(yè)務(wù)特點(diǎn),采用適當(dāng)?shù)姆绞絹硎沟孟到y(tǒng)達(dá)到最終一致性。

不僅僅是事務(wù)消息,TCC和Saga模式也是BASE理論的應(yīng)用,特征了一定的C。

4 利用事務(wù)消息做回滾

前文講述了使用事務(wù)消息完成分布式事務(wù)的限制,且無法回滾。而我們使用服務(wù)接口調(diào)用時(shí),在缺少TCC/Saga等分布式事務(wù)解決方案的情況下,無法得到事務(wù)的保證,無事務(wù)的情況下整個(gè)過程的調(diào)用方式如下:

image.png

這個(gè)過程缺少事務(wù)的保證,當(dāng)系統(tǒng)A需要回滾時(shí),系統(tǒng)B無法完成回滾,因?yàn)榻涌谡{(diào)用也可能失敗,即使系統(tǒng)B提供回滾接口也無法完全保證回滾成功。

我們可以通過事務(wù)消息來做回滾,這樣做雖然有些別扭,但相比前文描述的用事務(wù)消息做分布式事務(wù)而言,一致性雖然沒有完全得到保障,但它保障了在非回滾的場景下的一致性,整個(gè)過程如下:

image.png

為什么說這樣做有點(diǎn)別扭,就是因?yàn)檫@里將事務(wù)消息用于回滾,系統(tǒng)A提交事務(wù)后,需要將事務(wù)消息回滾,這樣做的目的是防止系統(tǒng)B消費(fèi)到事務(wù)消息。

如果系統(tǒng)A出錯(cuò),需要回滾時(shí),整個(gè)過程如下:

image.png

當(dāng)系統(tǒng)A出現(xiàn)異?;貪L后,需要提交事務(wù)消息使得系統(tǒng)B可消費(fèi)到事務(wù)消息從而回滾系統(tǒng)B的事務(wù)。

這樣做的優(yōu)點(diǎn):

· 實(shí)際上類似于saga模式的分布式事務(wù),拿MQ當(dāng)事務(wù)的協(xié)調(diào)者,能用于庫存扣減類場景,場景限制少

· 當(dāng)沒有發(fā)生回滾時(shí),能保證一致性,發(fā)生回滾時(shí),能達(dá)到最終一致

· 無額外的依賴,在沒有seata這類分布式事務(wù)解決方案的環(huán)境下可行

缺點(diǎn):

· 使用上非常別扭,要將事務(wù)消息的提交與回滾與本地事務(wù)反過來,本地事務(wù)成功則消息事務(wù)回滾,本地事務(wù)回滾則提交消息事務(wù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容