說到分布式事務(wù),就會談到那個經(jīng)典的”賬號轉(zhuǎn)賬”問題:2個賬號,分布處于2個不同的DB,或者說2個不同的子系統(tǒng)里面,A要扣錢,B要加錢,如何保證原子性?
一般的思路都是通過消息中間件來實現(xiàn)“最終一致性”:A系統(tǒng)扣錢,然后發(fā)條消息給中間件,B系統(tǒng)接收此消息,進(jìn)行加錢。
但這里面有個問題:A是先update DB,后發(fā)送消息呢? 還是先發(fā)送消息,后update DB?
假設(shè)先update DB成功,發(fā)送消息網(wǎng)絡(luò)失敗,重發(fā)又失敗,怎么辦?
假設(shè)先發(fā)送消息成功,update DB失敗。消息已經(jīng)發(fā)出去了,又不能撤回,怎么辦?
所以,這里下個結(jié)論: 只要發(fā)送消息和update DB這2個操作不是原子的,無論誰先誰后,都是有問題的。
那這個問題怎么解決呢??
錯誤的方案0
有人可能想到了,我可以把“發(fā)送消息”這個網(wǎng)絡(luò)調(diào)用和update DB放在同1個事務(wù)里面,如果發(fā)送消息失敗,update DB自動回滾。這樣不就保證2個操作的原子性了嗎?
這個方案看似正確,其實是錯誤的,原因有2:
(1)網(wǎng)絡(luò)的2將軍問題:發(fā)送消息失敗,發(fā)送方并不知道是消息中間件真的沒有收到消息呢?還是消息已經(jīng)收到了,只是返回response的時候失敗了?
如果是已經(jīng)收到消息了,而發(fā)送端認(rèn)為沒有收到,執(zhí)行update db的回滾操作。則會導(dǎo)致A賬號的錢沒有扣,B賬號的錢卻加了。
(2)把網(wǎng)絡(luò)調(diào)用放在DB事務(wù)里面,可能會因為網(wǎng)絡(luò)的延時,導(dǎo)致DB長事務(wù)。嚴(yán)重的,會block整個DB。這個風(fēng)險很大。
基于以上分析,我們知道,這個方案其實是錯誤的!
方案1–業(yè)務(wù)方自己實現(xiàn)
假設(shè)消息中間件沒有提供“事務(wù)消息”功能,比如你用的是Kafka。那如何解決這個問題呢?
解決方案如下:
(1)Producer端準(zhǔn)備1張消息表,把update DB和insert message這2個操作,放在一個DB事務(wù)里面。
(2)準(zhǔn)備一個后臺程序,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。允許消息重復(fù),但消息不會丟,順序也不會打亂。
(3)Consumer端準(zhǔn)備一個判重表。處理過的消息,記在判重表里面。實現(xiàn)業(yè)務(wù)的冪等。但這里又涉及一個原子性問題:如果保證消息消費 + insert message到判重表這2個操作的原子性?
消費成功,但insert判重表失敗,怎么辦?關(guān)于這個,在Kafka的源碼分析系列,第1篇, exactly once問題的時候,有過討論。
通過上面3步,我們基本就解決了這里update db和發(fā)送網(wǎng)絡(luò)消息這2個操作的原子性問題。
但這個方案的一個缺點就是:需要設(shè)計DB消息表,同時還需要一個后臺任務(wù),不斷掃描本地消息。導(dǎo)致消息的處理和業(yè)務(wù)邏輯耦合額外增加業(yè)務(wù)方的負(fù)擔(dān)。
方案2 – RocketMQ 事務(wù)消息
為了能解決該問題,同時又不和業(yè)務(wù)耦合,RocketMQ提出了“事務(wù)消息”的概念。
具體來說,就是把消息的發(fā)送分成了2個階段:Prepare階段和確認(rèn)階段。
具體來說,上面的2個步驟,被分解成3個步驟:
(1) 發(fā)送Prepared消息
(2) update DB
(3) 根據(jù)update DB結(jié)果成功或失敗,Confirm或者取消Prepared消息。
可能有人會問了,前2步執(zhí)行成功了,最后1步失敗了怎么辦?這里就涉及到了RocketMQ的關(guān)鍵點:RocketMQ會定期(默認(rèn)是1分鐘)掃描所有的Prepared消息,詢問發(fā)送方,到底是要確認(rèn)這條消息發(fā)出去?還是取消此條消息?
具體代碼實現(xiàn)如下:
也就是定義了一個checkListener,RocketMQ會回調(diào)此Listener,從而實現(xiàn)上面所說的方案。
// 也就是上文所說的,當(dāng)RocketMQ發(fā)現(xiàn)`Prepared消息`時,會根據(jù)這個Listener實現(xiàn)的策略來決斷事務(wù)
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構(gòu)造事務(wù)消息的生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設(shè)置事務(wù)決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務(wù)的處理邏輯,相當(dāng)于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構(gòu)造MSG,省略構(gòu)造參數(shù)
Message msg = new Message(......);
// 發(fā)送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
public TransactionSendResult sendMessageInTransaction(.....) {
// 邏輯代碼,非實際代碼
// 1.發(fā)送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息發(fā)送成功,處理與消息關(guān)聯(lián)的本地事務(wù)單元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.結(jié)束事務(wù)
this.endTransaction(sendResult, localTransactionState, localException);
}
總結(jié):對比方案2和方案1,RocketMQ最大的改變,其實就是把“掃描消息表”這個事情,不讓業(yè)務(wù)方做,而是消息中間件幫著做了。
至于消息表,其實還是沒有省掉。因為消息中間件要詢問發(fā)送方,事物是否執(zhí)行成功,還是需要一個“變相的本地消息表”,記錄事物執(zhí)行狀態(tài)。
人工介入
可能有人又要說了,無論方案1,還是方案2,發(fā)送端把消息成功放入了隊列,但消費端消費失敗怎么辦?
消費失敗了,重試,還一直失敗怎么辦?是不是要自動回滾整個流程?
答案是人工介入。從工程實踐角度講,這種整個流程自動回滾的代價是非常巨大的,不但實現(xiàn)復(fù)雜,還會引入新的問題。比如自動回滾失敗,又怎么處理?
對應(yīng)這種極低概率的case,采取人工處理,會比實現(xiàn)一個高復(fù)雜的自動化回滾系統(tǒng),更加可靠,也更加簡單。