分布式事務解決方案-RocketMQ實現(xiàn)可靠消息最終一致性

1.什么是可靠消息最終一致性事務

? 可靠消息最終一致性方案是指當事務發(fā)起方執(zhí)行完成本地事務后并發(fā)出一條消息,事務參與方(消息消費者)一定能夠接收消息并處理事務成功,此方案強調(diào)的是只要消息發(fā)給事務參與方最終事務要達到一致。

? 此方案是利用消息中間件完成,如下圖:

? 事務發(fā)起方(消息生產(chǎn)方)將消息發(fā)給消息中間件,事務參與方從消息中間件接收消息,事務發(fā)起方和消息中間件之間,事務參與方(消息消費方)和消息中間件之間都是通過網(wǎng)絡通信,由于網(wǎng)絡通信的不確定性會導致分布式事務問題。

因此可靠消息最終一致性方案要解決以下幾個問題:

1).本地事務與消息發(fā)送的原子性問題

? 本地事務與消息發(fā)送的原子性問題即:事務發(fā)起方在本地事務執(zhí)行成功后消息必須發(fā)出去,否則就丟棄消息。即實現(xiàn)本地事務和消息發(fā)送的原子性,要么都成功,要么都失敗。本地事務與消息發(fā)送的原子性問題是實現(xiàn)可靠消息最終一致性方案的關鍵問題。

先來嘗試下這種操作,先發(fā)送消息,再操作數(shù)據(jù)庫:

begin transaction;

//1.發(fā)送MQ

//2.數(shù)據(jù)庫操作

commit transation;

這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因為可能發(fā)送消息成功,數(shù)據(jù)庫操作失敗。

你立馬想到第二種方案,先進行數(shù)據(jù)庫操作,再發(fā)送消息:

begin transaction;

//1.數(shù)據(jù)庫操作

//2.發(fā)送MQ

commit transation;

? 這種情況下貌似沒有問題,如果發(fā)送MQ消息失敗,就會拋出異常,導致數(shù)據(jù)庫事務回滾。但如果是超時異常,數(shù)據(jù)庫回滾,但MQ其實已經(jīng)正常發(fā)送了,同樣會導致不一致。

2)事務參與方接收消息的可靠性

事務參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復接收消息。

3)消息重復消費的問題

? 由于網(wǎng)絡2的存在,若某一個消費節(jié)點超時但是消費成功,此時消息中間件會重復投遞此消息,就導致了消息的重復消費。

? 要解決消息重復消費的問題就要實現(xiàn)事務參與方的方法冪等性。

2.解決方案

? 上節(jié)討論了可靠消息最終一致性事務方案需要解決的問題,本節(jié)討論具體的解決方案。

2.1.本地消息表方案

? 本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證數(shù)據(jù)業(yè)務操作和消息的一致性,然后通過定時任務將消息發(fā)送至消息中間件,待確認消息發(fā)送給消費方成功再將消息刪除。

下面以注冊送積分為例來說明:

下例共有兩個微服務交互,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分。

交互流程如下:

1)用戶注冊

? 用戶服務在本地事務新增用戶和增加 ”積分消息日志“。(用戶表和消息表通過本地事務保證一致)

下邊是偽代碼

begin transaction;

//1.新增用戶

//2.存儲積分消息日志

commit transation;

這種情況下,本地數(shù)據(jù)庫操作與存儲積分消息日志處于同一個事務中,本地數(shù)據(jù)庫操作與記錄消息日志操作具備原子性。

2)定時任務掃描日志

? 如何保證將消息發(fā)送給消息隊列呢?

? 經(jīng)過第一步消息已經(jīng)寫到消息日志表中,可以啟動獨立的線程,定時對消息日志表中的消息進行掃描并發(fā)送至消息中間件,在消息中間件反饋發(fā)送成功后刪除該消息日志,否則等待定時任務下一周期重試。

3)消費消息

? 如何保證消費者一定能消費到消息呢?

? 這里可以使用MQ的ack(即消息確認)機制,消費者監(jiān)聽MQ,如果消費者接收到消息并且業(yè)務處理完成后向MQ發(fā)送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發(fā)送消息。

? 積分服務接收到”增加積分“消息,開始增加積分,積分增加成功后向消息中間件回應ack,否則消息中間件將重復投遞此消息。

? 由于消息會重復投遞,積分服務的”增加積分“功能需要實現(xiàn)冪等性。

2.2.RocketMQ事務消息方案

? RocketMQ 是一個來自阿里巴巴的分布式消息中間件,于 2012 年開源,并在 2017 年正式成為 Apache 頂級項目。據(jù)了解,包括阿里云上的消息產(chǎn)品以及收購的子公司在內(nèi),阿里集團的消息產(chǎn)品全線都運行在 RocketMQ 之上,并且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現(xiàn)。Apache RocketMQ 4.3之后的版本正式支持事務消息,為分布式事務實現(xiàn)提供了便利性支持。

? RocketMQ 事務消息設計則主要是為了解決 Producer 端的消息發(fā)送與本地事務執(zhí)行的原子性問題,RocketMQ 的設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協(xié)調(diào)者存在;而 RocketMQ 本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系統(tǒng)發(fā)生異常時依然能夠保證達成事務的最終一致性。

? 在RocketMQ 4.3后實現(xiàn)了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內(nèi)部,解決 Producer 端的消息發(fā)送與本地事務執(zhí)行的原子性問題。

執(zhí)行流程如下:

為方便理解我們還以注冊送積分的例子來描述 整個流程。

Producer 即MQ發(fā)送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。

1、Producer 發(fā)送事務消息

? Producer (MQ發(fā)送方)發(fā)送事務消息至MQ Server,MQ Server將消息狀態(tài)標記為Prepared(預備狀態(tài)),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。

? 本例中,Producer 發(fā)送 ”增加積分消息“ 到MQ Server。

2、MQ Server回應消息發(fā)送成功

? MQ Server接收到Producer 發(fā)送給的消息則回應發(fā)送成功表示MQ已接收到消息。

3、Producer 執(zhí)行本地事務

? Producer 端執(zhí)行業(yè)務代碼邏輯,通過本地數(shù)據(jù)庫事務控制。

? 本例中,Producer 執(zhí)行添加用戶操作。

4、消息投遞

? 若Producer 本地事務執(zhí)行成功則自動向MQServer發(fā)送commit消息,MQ Server接收到commit消息后將”增加積分消息“ 狀態(tài)標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;

? 若Producer 本地事務執(zhí)行失敗則自動向MQServer發(fā)送rollback消息,MQ Server接收到rollback消息后 將刪除”增加積分消息“ 。

? MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里ack默認自動回應,即程序執(zhí)行正常則自動回應ack。

5、事務回查

? 如果執(zhí)行Producer端本地事務過程中,執(zhí)行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執(zhí)行狀態(tài),這個過程叫事務回查。MQ Server會根據(jù)事務回查結果來決定是否投遞消息。

以上主干流程已由RocketMQ實現(xiàn),對用戶側(cè)來說,用戶需要分別實現(xiàn)本地事務執(zhí)行以及本地事務回查方法,因此只需關注本地事務的執(zhí)行狀態(tài)即可。

RoacketMQ提供RocketMQLocalTransactionListener接口:

public interface RocketMQLocalTransactionListener {

? /**

? - 發(fā)送prepare消息成功此方法被回調(diào),該方法用于執(zhí)行本地事務

? - @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id

? - @param arg 調(diào)用send方法時傳遞的參數(shù),當send時候若有額外的參數(shù)可以傳遞到send方法中,這里能獲取到

? - @return 返回事務狀態(tài),COMMIT:提交? ROLLBACK:回滾? UNKNOW:回調(diào)

? ? */

? ? ? RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);

? /**

? - @param msg 通過獲取transactionId來判斷這條消息的本地事務執(zhí)行狀態(tài)

? - @return 返回事務狀態(tài),COMMIT:提交? ROLLBACK:回滾? UNKNOW:回調(diào)

? ? */

? ? ? RocketMQLocalTransactionState checkLocalTransaction(Message msg);

? }

發(fā)送事務消息:

以下是RocketMQ提供用于發(fā)送事務消息的API:

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

//設置TransactionListener實現(xiàn)

producer.setTransactionListener(transactionListener);

//發(fā)送事務消息

SendResult sendResult = producer.sendMessageInTransaction(msg, null);

3 RocketMQ實現(xiàn)可靠消息案例

本實例通過RocketMQ中間件實現(xiàn)可靠消息最終一致性分布式事務,模擬兩個賬戶的轉(zhuǎn)賬交易過程。

案例內(nèi)容參見:http://www.pbteach.com/post/java_distribut/subject_dtx-04/

本案例使用了RocketMQ作為消息中間件,RocketMQ主要解決了兩個功能:

? 1、本地事務與消息發(fā)送的原子性問題。

? 2、事務參與方接收消息的可靠性。

4 總結

? 可靠消息最終一致性事務適合執(zhí)行周期長且實時性要求不高的場景。引入消息機制后,同步的事務操作變?yōu)榛谙?zhí)行的異步操作, 避免了分布式事務中的同步阻塞操作的影響,并實現(xiàn)了兩個服務的解耦。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容