分布式事務(wù)之解決方案(可靠消息最終一致性)

5. 分布式事務(wù)解決方案之可靠消息最終一致性

5.1. 什么是可靠消息最終一致性事務(wù)

可靠消息最終一致性方案是指當(dāng)事務(wù)發(fā)起執(zhí)行完全本地事務(wù)后并發(fā)出一條消息,事務(wù)參與方(消息消費者)一定能夠接收消息并處理事務(wù)成功,此方案強調(diào)的是只要消息發(fā)給事務(wù)參與方最終事務(wù)要達到一致。
此方案是利用消息中間件完成,如下圖:
事務(wù)發(fā)起方(消息生產(chǎn)方)將消息發(fā)給消息中間件,事務(wù)參與方從消息中間件接收消息,事務(wù)發(fā)起方和消息中間件之間,事務(wù)參與方(消息消費方)和消息中間件之間都是通過網(wǎng)絡(luò)通信,由于網(wǎng)絡(luò)通信的不確定性導(dǎo)致分布式事務(wù)問題。

在這里插入圖片描述

因此可靠消息最終一致性方案要解決以下幾個問題 :
1、本地事務(wù)與消息發(fā)送的原子性問題
本地事務(wù)與消息發(fā)送的原子性問題即 :事務(wù)發(fā)起方在本地事務(wù)執(zhí)行成功后消息必須發(fā)出去,否則就丟棄消息。即實現(xiàn)本地事務(wù)和消息發(fā)送的原子性,要么都成功,要么都失敗。本地事務(wù)與消息發(fā)送的原子性問題是實現(xiàn)可靠消息最終一致性方案的關(guān)鍵問題。
先來嘗試下這種操作,先發(fā)送消息,再操作數(shù)據(jù)庫 :

begin transaction;
        // 1.發(fā)送MQ
        // 2.數(shù)據(jù)庫操作
commit transation;      

這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因為可能發(fā)送消息成功,數(shù)據(jù)庫操作失敗。
你立馬想到第二種方案,先進行數(shù)據(jù)庫操作,再發(fā)送消息 :

begin transaction;
        // 1.數(shù)據(jù)庫操作
        // 2.發(fā)送MQ
commit transation;      

這種情況下貌似沒有問題,如果發(fā)送MQ消息失敗,就會拋出異常,導(dǎo)致數(shù)據(jù)庫事務(wù)回滾。但如果是超時異常,數(shù)據(jù)庫回滾,但MQ其實已經(jīng)正常發(fā)送來,同樣會導(dǎo)致不一致。
2、事務(wù)參與方接收消息的可靠性
事務(wù)參與方必須能夠從消息隊列接收到消息,如果接收消息失敗可以重復(fù)接收消息。
3、消息重復(fù)消費的問題
由于網(wǎng)絡(luò)2的存在,若某一個消費節(jié)點超時但是消費成功,此時消息中間件會重復(fù)投遞此消息,就導(dǎo)致來消息的重復(fù)消費。要解決消息重復(fù)消費的問題就要實現(xiàn)事務(wù)參與方的方法冪等性。

5.2. 解決方案

5.2.1. 本地消息表方案

本地消息表這個方案最初是eBay提出的,此方案的核心是通過本地事務(wù)保證數(shù)據(jù)業(yè)務(wù)操作和消息的一致性,然后通過定時任務(wù)將消息發(fā)送至消息中間件,待確認消息發(fā)送給消費方成功再將消息刪除。
下面以注冊送積分為例來說明 :
下例共有兩個微服務(wù)交互,用戶服務(wù)和積分服務(wù),用戶服務(wù)負責(zé)添加用戶,積分服務(wù)負責(zé)增加積分。

在這里插入圖片描述

交互流程如下 :
1、用戶注冊
用戶服務(wù)在本地事務(wù)新增用戶和增加“積分消息日志”。(用戶表和消息表通過本地事務(wù)保證一致)
下表是偽代碼

begin transaction;
        // 1.新增用戶
        // 2.存儲積分消息日志
commit transation;      

這種情況下,本地數(shù)據(jù)庫操作與存儲積分消息日志處于同一事務(wù)中,本地數(shù)據(jù)庫操作與記錄消息日志操作具備原子性。
2、定時任務(wù)掃描日志
如何保證將消息發(fā)送給消息隊列呢?
經(jīng)過第一步消息已經(jīng)寫到消息日志表中,可以啟動獨立的線程,定時對消息日志表中的消息進行掃描并發(fā)送至消息中間件,在消息中間件反饋發(fā)送成功后刪除該消息日志,否則等待定時任務(wù)下一周期重試。
3、消費消息
如何保證消費者一定能消費到消息呢?
這里可以使用MQ的ack(即消息確認)機制,消費者監(jiān)聽MQ,如果消費者接收到消息并且業(yè)務(wù)處理完成后向MQ發(fā)送ack(即消息確認),此時說明消費者正常消費消息完成,MQ將不再向消費者推送消息,否則消費者會不斷重試向消費者來發(fā)送消息。
積分服務(wù)接收到“增加積分”消息,開始增加積分,積分增加成功后消息中間件回應(yīng)ack,否則消息中間件將重復(fù)投遞此消息。
由于消息會重復(fù)投遞,積分服務(wù)的“增加積分”功能需要實現(xiàn)冪等性。

5.2.2. RocketMQ事務(wù)消息方案

RocketMQ是一個來自阿里巴巴的分布式消息中間件,于2012年開源,并在2017年正式成為Apache頂級項目。據(jù)了解,包括阿里云上的消息產(chǎn)品以及收購的子公司在內(nèi),阿里集團的消息產(chǎn)品全線都運行在RocketMQ之上,并且最近幾年的雙十一大促中,RocketMQ都有搶眼表現(xiàn)。Apache RocketMQ 4.3之后的版本正式支持事務(wù)消息,為分布式事務(wù)實現(xiàn)提供來便利性支持。
RocketMQ事務(wù)消息設(shè)計則主要是為了解決Producer端的消息發(fā)送與本地事務(wù)執(zhí)行的原子性問題,RocketMQ的設(shè)計中broker與producer端的雙向通信能力,使得broker天生可以作為一個事務(wù)協(xié)調(diào)者存在;而RocketMQ本身提供的存儲機制為事務(wù)消息提供了持久化能力;RocketMQ的高可用機制以及可靠消息設(shè)計則為事務(wù)消息在系統(tǒng)發(fā)生異常時依然能夠保證達成事務(wù)的最終一致性。
在RocketMQ 4.3后實現(xiàn)了完整的事務(wù)消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內(nèi)部,解決Producer端的消息發(fā)送與本地事務(wù)執(zhí)行的原子性問題。

在這里插入圖片描述

執(zhí)行流程如下 :
為方便理解我們還以注冊送積分的例子來描述整個流程。
Producer即MQ發(fā)送方,本例中是用戶服務(wù),負責(zé)新增用戶。MQ訂閱方即消息消費方,本例中是積分服務(wù),負責(zé)新增積分。
1、Producer發(fā)送事務(wù)消息
Producer(MQ發(fā)送方)發(fā)送事務(wù)消息至MQ Server,MQ Server將消息狀態(tài)標(biāo)記為Prepared(預(yù)覽狀態(tài)),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。
2、MQ Server回應(yīng)消息發(fā)送成功
MQ Server接收到Producer發(fā)送給的消息則回應(yīng)發(fā)送成功表示MQ已接收到消息。
3、Producer執(zhí)行本地事務(wù)
Producer端執(zhí)行業(yè)務(wù)代碼邏輯,通過本地數(shù)據(jù)庫事務(wù)控制。
本例中,Producer執(zhí)行添加用戶操作。
4、消息投遞
若Producer本地事務(wù)執(zhí)行成功則自動向MQ Server發(fā)送commit消息,MQ Server接收到commit消息后將“增加積分消息”狀態(tài)標(biāo)記為可消費,此時MQ訂閱方(積分服務(wù))即正常消費消息;
若Producer 本地事務(wù)執(zhí)行失敗則自動向MQ Server發(fā)送rollback消息,MQ Server接收到rollback消息后將刪除“增加積分消息”。
MQ訂閱方(積分服務(wù))消費消息,消費成功則向MQ回應(yīng)ack,否則將重復(fù)接收消息。這里ack默認自動回應(yīng),即程序執(zhí)行正常則自動回應(yīng)ack。
5、事務(wù)回查
如果執(zhí)行Producer端本地事務(wù)過程中,執(zhí)行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他Producer來獲取事務(wù)執(zhí)行狀態(tài),這個過程叫事務(wù)回查。MQ Server會根據(jù)事務(wù)回查結(jié)果來決定是否投遞消息。
以上主干流程已由RocketMQ實現(xiàn),對用戶則來說,用戶需要分別實現(xiàn)本地事務(wù)執(zhí)行以及本地事務(wù)回查方法,因此只需關(guān)注本地事務(wù)的執(zhí)行狀態(tài)即可。
RocketMQ提供RocketMQLocalTransactionListener接口 :

public interface RocketMQLocalTransactionListener {
    /**
    發(fā)送prepare消息成功此方法被回調(diào),該方法用于執(zhí)行本地事務(wù)
    @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
    @param arg 調(diào)用send方法時傳遞的參數(shù),當(dāng)send時候若有額外的參數(shù)可以傳遞到send方法中,這里能獲取到
    @return 返回事務(wù)狀態(tài),COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調(diào)
    */
    RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
    /**
    @param msg 通過獲取transactionId來判斷這條消息的本地事務(wù)執(zhí)行狀態(tài)
    @return 返回事務(wù)狀態(tài),COMMIT :提交 ROLLBACK :回滾 UNKNOW :回調(diào)
    */
    RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
  • 發(fā)送事務(wù)消息 :
    以下是RocketMQ提供用于發(fā)送事務(wù)消息的API :
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 設(shè)置TransactionListener實現(xiàn)
producer.setTransactionListener(transactionListener);
// 發(fā)送事務(wù)消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

5.3. RocketMQ實現(xiàn)可靠消息最終一致性事務(wù)

5.3.1. 業(yè)務(wù)說明

本實例通過RocketMQ中間件實現(xiàn)可靠消息最終一致性分布式事務(wù),模擬兩個賬戶的轉(zhuǎn)賬交易過程。
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務(wù)。交易過程是,張三給李四轉(zhuǎn)賬指定金額。
上述交易步驟,張三扣減金額與給bank2發(fā)轉(zhuǎn)賬消息,兩個操作必須是一個整體性的事務(wù)。


在這里插入圖片描述

5.3.2.程序組成部分

本示例程序組成部分如下: 數(shù)據(jù)庫:MySQL-5.7.25
包括bank1和bank2兩個數(shù)據(jù)庫。
JDK:64位 jdk1.8.0_201
rocketmq 服務(wù)端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務(wù)框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE 微服務(wù)及數(shù)據(jù)庫的關(guān)系 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操作張三賬戶, 連接數(shù)據(jù)庫bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操作李四賬戶,連接數(shù)據(jù)庫bank2
本示例程序技術(shù)架構(gòu)如下 :


在這里插入圖片描述

交互流程如下 :
1、Bank1向MQ Server發(fā)送轉(zhuǎn)賬消息;
2、Bank1執(zhí)行本地事務(wù),扣減金額;
3、Bank2接收消息,執(zhí)行本地事務(wù),添加金額。

5.3.3. 創(chuàng)建數(shù)據(jù)庫

創(chuàng)建bank1庫,并導(dǎo)入以下表結(jié)構(gòu)和數(shù)據(jù)(包含張三賬戶)

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);

創(chuàng)建bank2庫,并導(dǎo)入以下表結(jié)構(gòu)和數(shù)據(jù)(包含李四賬戶)

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的賬戶', '2', NULL, 0);

在bank1、bank2數(shù)據(jù)庫中新增de_duplication,交易記錄表(去重表),用于交易冪等控制。

 DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

5.3.4. 啟動RocketMQ

(1)下載RocketMQ服務(wù)器
下載地址 :http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin- release.zip
(2)解壓并啟動
啟動nameserver :

 set ROCKETMQ_HOME=[rocketmq服務(wù)端解壓路徑] 
 start [rocketmq服務(wù)端解壓路徑]/bin/mqnamesrv.cmd

啟動broker:

set ROCKETMQ_HOME=[rocketmq服務(wù)端解壓路徑]
start [rocketmq服務(wù)端解壓路徑]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true

3.3.5 工程概述

(1)父工程maven依賴說明
在父工程中指定來SpringBoot和SpringCloud版本

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring‐boot‐dependencies</artifactId>
    <version>2.1.3.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency> 
    <groupId>org.springframework.cloud</groupId> 
    <artifactId>spring‐cloud‐dependencies</artifactId>
    <version>Greenwich.RELEASE</version>
    <type>pom</type>
<scope>import</scope> </dependency>

在dtx-txmsg-demo父工程中指定來rocketmq-spring-boot-starter的版本

 <dependency>
    <groupId>org.apache.rocketmq</groupId> 
    <artifactId>rocketmq‐spring‐boot‐starter</artifactId> 
    <version>2.0.2</version>
</dependency>

(2)配置rocketMQ
application-local.properties中配置rocketMQ nameServer地址及生產(chǎn)組 :

 rocketmq.producer.group = producer_bank2 
 rocketmq.name‐server = 127.0.0.1:9876

3.3.6 dtx-txmsg-demo-bank1

dtx-txmsg-demo-bank1實現(xiàn)如下功能:
1、張三扣減金額,提交本地事務(wù)。
2、向MQ發(fā)送轉(zhuǎn)賬消息。
2)Dao

@Mapper
@Component
public interface AccountInfoDao {
    @Update("update account_info set account_balance=account_balance+#{amount} where account_no=# {accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    @Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo);
    @Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo);
}

3)AccountInfoService

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private AccountInfoDao accountInfoDao;
    /**
    * 更新帳號余額‐發(fā)送消息
    * producer向MQ Server發(fā)送消息 *
    * @param accountChangeEvent */
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
    //構(gòu)建消息體
    JSONObject jsonObject = new JSONObject(); jsonObject.put("accountChange",accountChangeEvent); Message<String> message =
    MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult sendResult =
    rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null);
    log.info("send transcation message body={},result= {}",message.getPayload(),sendResult.getSendStatus());
    }   
    /**
    * 更新帳號余額‐本地事務(wù)
    * producer發(fā)送消息完成后接收到MQ Server的回應(yīng)即開始執(zhí)行本地事務(wù) *
    * @param accountChangeEvent */
    @Transactional
    @Override
    public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
    log.info("開始更新本地事務(wù),事務(wù)號:{}",accountChangeEvent.getTxNo()); accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * ‐1);
        } 
    }
    //為冪等作準(zhǔn)備 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() == 2){
    throw new RuntimeException("bank1更新本地事務(wù)時拋出異常"); log.info("結(jié)束更新本地事務(wù),事務(wù)號:{}",accountChangeEvent.getTxNo());
        }
}

4)RocketMQLocalTransactionListener
編寫RocketMQLocalTransactionListener接口實現(xiàn)類,實現(xiàn)執(zhí)行本地事務(wù)和事務(wù)回查兩個方法。

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
    @Autowired
    AccountInfoService accountInfoService;
    @Autowired
    AccountInfoDao accountInfoDao;
    //消息發(fā)送成功回調(diào)此方法,此方法執(zhí)行本地事務(wù)
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
    //解析消息內(nèi)容 
    try {
        String jsonString = new String((byte[]) message.getPayload()); 
        JSONObject jsonObject = JSONObject.parseObject(jsonString); AccountChangeEvent accountChangeEvent =
        JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class); //扣除金額
        accountInfoService.doUpdateAccountBalance(accountChangeEvent);
        return RocketMQLocalTransactionState.COMMIT; 
    } catch (Exception e) {
    log.error("executeLocalTransaction 事務(wù)執(zhí)行失敗",e); e.printStackTrace();
    return RocketMQLocalTransactionState.ROLLBACK;
        } 
    }
    //此方法檢查事務(wù)執(zhí)行狀態(tài)
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        RocketMQLocalTransactionState state;
        final JSONObject jsonObject = JSON.parseObject(new String((byte[]) message.getPayload()));
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
        //事務(wù)id
        String txNo = accountChangeEvent.getTxNo();
        int isexistTx = accountInfoDao.isExistTx(txNo);
        log.info("回查事務(wù),事務(wù)號: {} 結(jié)果: {}", accountChangeEvent.getTxNo(),isexistTx); if(isexistTx>0){
        state= RocketMQLocalTransactionState.COMMIT; 
        }else{
        state= RocketMQLocalTransactionState.UNKNOWN; 
        }
        return state;
        } 
}

4)Controller

@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
@GetMapping(value = "/transfer")
public String transfer(@RequestParam("accountNo")String accountNo,@RequestParam("amount") Double amount){
    String tx_no = UUID.randomUUID().toString();
    AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
    accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
    return "轉(zhuǎn)賬成功"; 
    }
}

3.3.7 dtx-txmsg-demo-bank2

dtx-txmsg-demo-bank2需要實現(xiàn)如下功能 :
1、監(jiān)聽MQ,接收消息。
2、接收到消息增加賬戶金額。
1)Service
注意為避免消息重復(fù)發(fā)送,這里需要實現(xiàn)冪等。

 @Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired
    AccountInfoDao accountInfoDao;
    /**
    * 消費消息,更新本地事務(wù),添加金額 * @param accountChangeEvent
    */
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("bank2更新本地賬號,賬號:{},金額: {}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //冪等校驗
        int existTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());           if(existTx<=0){
        //執(zhí)行更新 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
            }
        } 
    }
    //添加事務(wù)記錄
    accountInfoDao.addTx(accountChangeEvent.getTxNo()); 
    log.info("更新本地事務(wù)執(zhí)行成功,本次事務(wù)號: {}", accountChangeEvent.getTxNo());
    }else{
    log.info("更新本地事務(wù)執(zhí)行失敗,本次事務(wù)號: {}", accountChangeEvent.getTxNo());
            }
        } 
}

2)MQ監(jiān)聽類

@Component
@RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2") 
@Slf4j
public class TxmsgConsumer implements RocketMQListener<String> {
    @Autowired
    AccountInfoService accountInfoService;
    @Override
    public void onMessage(String s) {
        log.info("開始消費消息:{}",s);
        //解析消息為對象
        final JSONObject jsonObject = JSON.parseObject(s); AccountChangeEvent accountChangeEvent =
        JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
        //調(diào)用service增加賬號金額 accountChangeEvent.setAccountNo("2"); accountInfoService.addAccountInfoBalance(accountChangeEvent);
        } 
}

5.3.8 測試場景

  • bank1本地事務(wù)失敗,則bank1不發(fā)送轉(zhuǎn)賬消息。
  • bank2接收轉(zhuǎn)賬消息失敗,會進行重試發(fā)送消息。
  • bank2多次消費同一個消息,實現(xiàn)冪等。

5.4 小結(jié)

可靠消息最終一致性就是保證消息從生產(chǎn)方經(jīng)過消息中間件傳遞到消費方的一致性,本案例使用了RocketMQ作為消息中間件,RocketMQ主要解決了兩個功能 :
1、本地事務(wù)與消息發(fā)送的原子性問題。
2、事務(wù)參與方接收消息的可靠性。
可靠消息最終一致性事務(wù)適合執(zhí)行周期長且實時性要求不高的場景。引入消息機制后,同步的事務(wù)操作變?yōu)榛谙?zhí)行的異步操作,避免了分布式事務(wù)中的同步阻塞操作的影響,并實現(xiàn)了兩個服務(wù)的解耦。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容