可靠消息與分布式事務(wù)
1 概述
前面的文章https://zhuanlan.zhihu.com/p/92866118講述了Seata對分布式事務(wù)的支持,當(dāng)生產(chǎn)環(huán)境中沒有seata的部署時(shí),我們?nèi)绾瓮ㄟ^可靠消息例如RocketMQ處理分布式事務(wù)。
2 分布式事務(wù)的困難
分布式事務(wù),全局事務(wù)就是一個(gè)分布式系統(tǒng),我們都知道,事務(wù)具有ACID特性,但是對于分布式環(huán)境,ACID是不能完整的得到保障的。
在分布式系統(tǒng)中,有個(gè)著名的CAP定理,指的是在一個(gè)分布式系統(tǒng)中,一致性(Consistency)、可用性(Availability)、分區(qū)容錯(cuò)性(Partition tolerance)。CAP 原則指的是,這三個(gè)要素最多只能同時(shí)實(shí)現(xiàn)兩點(diǎn),不可能三者兼顧。
但是在實(shí)際的分布式環(huán)境中,P是一定會(huì)存在的,所以在分布式環(huán)境中。當(dāng)P出現(xiàn)時(shí),我們需要在C和A之間做取舍,要和選擇CP,要么選擇AP。
有沒有CA系統(tǒng)呢?有,傳統(tǒng)的關(guān)系型數(shù)據(jù)庫就是CA系統(tǒng),因?yàn)樗粫?huì)出現(xiàn)分區(qū)的情況,單庫提供的ACID保障,它不是分布式的。常用的存儲(chǔ)系統(tǒng)在CAP中的取舍如下圖所示:

由此可知,分布式事務(wù)同樣也要在C和A之間做取舍,因此分布式事務(wù)難做,在發(fā)生P時(shí),我們需要放棄C和A中的一個(gè)。
3 事務(wù)消息的分布式事務(wù)
在使用可靠消息做分布式事務(wù)時(shí),我們需要保證本地事務(wù)與消息的發(fā)送能有原子性,即本地事務(wù)成功則消息發(fā)送成功,本地事務(wù)失敗則消息不發(fā)送或消息回滾,而RocketMQ的事務(wù)消息支持兩階段提交和事務(wù)回查,能保證消息的發(fā)送與本地事務(wù)之間的原子性。
我們先看看RocketMQ官網(wǎng)的示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
TransactionListener的實(shí)現(xiàn)如下:
import ...
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
TransactionListener的定義如下:

其中包含兩個(gè)方法:
· executeLocalTransaction方法會(huì)在發(fā)送消息后調(diào)用,用于執(zhí)行本地事務(wù),如果本地事務(wù)執(zhí)行成功,rocketmq再提交消息
· checkLocalTransaction用于對本地事務(wù)做檢查,rocketmq依賴此方法做補(bǔ)償,補(bǔ)償由rocketmq的broker主動(dòng)發(fā)起,因?yàn)橄⒌腸ommit或者rollback可能會(huì)失敗,因此需要這種補(bǔ)償機(jī)制保證消息的commit或者rollback執(zhí)行成功
而當(dāng)一個(gè)事務(wù)消息commit成功后,消費(fèi)者才能消費(fèi)到此消息,RocketMQ通過事務(wù)消息的方式提供了分布式事務(wù)的支持。
RocketMQ事務(wù)消息的分布式事務(wù)方案示意圖如下:

Producer的本地事務(wù)和消息的發(fā)送是具有原子性的,因此當(dāng)本地事務(wù)成功后,consumer一定會(huì)接收到消息。
但是通過RocketMQ的事務(wù)消息實(shí)現(xiàn)的分布式事務(wù)是有局限性的,它默認(rèn)了當(dāng)本地事務(wù)執(zhí)行成功時(shí),consumer收到消息后的業(yè)務(wù)處理一定能執(zhí)行成功,一旦consumer無法完成事務(wù),全局事務(wù)將無法回滾。舉個(gè)例子,對于庫存扣減類需求,下單后需要扣減庫存,而扣減庫存在獨(dú)立的服務(wù)中,此時(shí)如果單純的使用事務(wù)消息無法滿足,因?yàn)閹齑鏋?后,扣減會(huì)失敗,而此時(shí)全局事務(wù)無法回滾。
對于事務(wù)消息實(shí)現(xiàn)的分布式事務(wù),它在ACID特性上的保障非常弱:
· 原子性:部分場景下能保證,但是對于庫存扣減類需求,不適合單純的使用事務(wù)消息處理分布式事務(wù),因?yàn)闆]法回滾
· 一致性:不能保證,只能得到最終一致
· 隔離性:無保證,多個(gè)事務(wù)之間不隔離
· 持久性:能保證
可靠消息實(shí)現(xiàn)的分布式事務(wù),實(shí)際上是在發(fā)生P時(shí),選擇了A放棄了C,它是BASE原理的應(yīng)用,BASE是Basically Available(基本可用)、Soft state(軟狀態(tài))和Eventually consistent(最終一致性)的簡寫。BASE是對CAP中一致性和可用性權(quán)衡的結(jié)果,核心思想是即使無法做到強(qiáng)一致性,但每個(gè)應(yīng)用都可以根據(jù)自身的業(yè)務(wù)特點(diǎn),采用適當(dāng)?shù)姆绞絹硎沟孟到y(tǒng)達(dá)到最終一致性。
不僅僅是事務(wù)消息,TCC和Saga模式也是BASE理論的應(yīng)用,特征了一定的C。
4 利用事務(wù)消息做回滾
前文講述了使用事務(wù)消息完成分布式事務(wù)的限制,且無法回滾。而我們使用服務(wù)接口調(diào)用時(shí),在缺少TCC/Saga等分布式事務(wù)解決方案的情況下,無法得到事務(wù)的保證,無事務(wù)的情況下整個(gè)過程的調(diào)用方式如下:

這個(gè)過程缺少事務(wù)的保證,當(dāng)系統(tǒng)A需要回滾時(shí),系統(tǒng)B無法完成回滾,因?yàn)榻涌谡{(diào)用也可能失敗,即使系統(tǒng)B提供回滾接口也無法完全保證回滾成功。
我們可以通過事務(wù)消息來做回滾,這樣做雖然有些別扭,但相比前文描述的用事務(wù)消息做分布式事務(wù)而言,一致性雖然沒有完全得到保障,但它保障了在非回滾的場景下的一致性,整個(gè)過程如下:

為什么說這樣做有點(diǎn)別扭,就是因?yàn)檫@里將事務(wù)消息用于回滾,系統(tǒng)A提交事務(wù)后,需要將事務(wù)消息回滾,這樣做的目的是防止系統(tǒng)B消費(fèi)到事務(wù)消息。
如果系統(tǒng)A出錯(cuò),需要回滾時(shí),整個(gè)過程如下:

當(dāng)系統(tǒng)A出現(xiàn)異?;貪L后,需要提交事務(wù)消息使得系統(tǒng)B可消費(fèi)到事務(wù)消息從而回滾系統(tǒng)B的事務(wù)。
這樣做的優(yōu)點(diǎn):
· 實(shí)際上類似于saga模式的分布式事務(wù),拿MQ當(dāng)事務(wù)的協(xié)調(diào)者,能用于庫存扣減類場景,場景限制少
· 當(dāng)沒有發(fā)生回滾時(shí),能保證一致性,發(fā)生回滾時(shí),能達(dá)到最終一致
· 無額外的依賴,在沒有seata這類分布式事務(wù)解決方案的環(huán)境下可行
缺點(diǎn):
· 使用上非常別扭,要將事務(wù)消息的提交與回滾與本地事務(wù)反過來,本地事務(wù)成功則消息事務(wù)回滾,本地事務(wù)回滾則提交消息事務(wù)