分布式事務(wù)理論:分布式事務(wù)
什么是最大努力通知
最大努力通知也是一種解決分布式事務(wù)的方案,下邊是一個是充值的例子:

交互流程:
- 1、賬戶系統(tǒng)調(diào)用充值系統(tǒng)接口
- 2、充值系統(tǒng)完成支付處理向賬戶系統(tǒng)發(fā)起充值結(jié)果通知,若通知失敗,則充值系統(tǒng)按策略進行重復(fù)通知
- 3、賬戶系統(tǒng)接收到充值結(jié)果通知修改充值狀態(tài)。
- 4、賬戶系統(tǒng)未接收到通知會主動調(diào)用充值系統(tǒng)的接口查詢充值結(jié)果。
通過上邊的例子我們總結(jié)最大努力通知方案的目標:
目標:發(fā)起通知方通過一定的機制最大努力將業(yè)務(wù)處理結(jié)果通知到接收方。
具體包括:
1、有一定的消息重復(fù)通知機制。
因為接收通知方可能沒有接收到通知,此時要有一定的機制對消息重復(fù)通知。
2、消息校對機制。
如果盡最大努力也沒有通知到接收方,或者接收方消費消息后要再次消費,此時可由接收方主動向通知方查詢消息信息來滿足需求。
最大努力通知與可靠消息一致性有什么不同?
1、解決方案思想不同
可靠消息一致性:發(fā)起通知方需要保證將消息發(fā)出去,并且將消息發(fā)到接收通知方,消息的可靠性關(guān)鍵由發(fā)起通知方來保證。
最大努力通知:發(fā)起通知方盡最大的努力將業(yè)務(wù)處理結(jié)果通知為接收通知方,但是可能消息接收不到,此時需要接收通知方主動調(diào)用發(fā)起通知方的接口查詢業(yè)務(wù)處理結(jié)果,通知的可靠性關(guān)鍵在接收通知方。
2、兩者的業(yè)務(wù)應(yīng)用場景不同
可靠消息一致性:關(guān)注的是交易過程的事務(wù)一致,以異步的方式完成交易。
最大努力通知:關(guān)注的是交易后的通知事務(wù),即將交易結(jié)果可靠的通知出去。
3、技術(shù)解決方向不同
可靠消息一致性:要解決消息從發(fā)出到接收的一致性,即消息發(fā)出并且被接收到。
最大努力通知:無法保證消息從發(fā)出到接收的一致性,只提供消息接收的可靠性機制??煽繖C制是,最大努力的將消息通知給接收方,當消息無法被接收方接收時,由接收方主動查詢消息(業(yè)務(wù)處理結(jié)果)。
解決方案
通過對最大努力通知的理解,采用MQ的ack機制就可以實現(xiàn)最大努力通知。
方案1:

本方案是利用MQ的ack機制由MQ向接收通知方發(fā)送通知,流程如下:
1、發(fā)起通知方將通知發(fā)給MQ。
使用普通消息機制將通知發(fā)給MQ。
注意:如果消息沒有發(fā)出去可由接收通知方主動請求發(fā)起通知方查詢業(yè)務(wù)執(zhí)行結(jié)果(后邊會講)。2、接收通知方監(jiān)聽 MQ。
3、接收通知方接收消息,業(yè)務(wù)處理完成回應(yīng)ack。
4、接收通知方若沒有回應(yīng)ack則MQ會重復(fù)通知。
MQ會按照間隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知間隔 (如果MQ采用rocketMq,在broker中可進行配置),直到達到通知要求的時間窗口上限。5、接收通知方可通過消息校對接口來校對消息的一致性。
方案2:
本方案也是利用MQ的ack機制,與方案1不同的是應(yīng)用程序向接收通知方發(fā)送通知,如下圖:

交互流程如下:
1、發(fā)起通知方將通知發(fā)給MQ。
使用可靠消息一致方案中的事務(wù)消息保證本地事務(wù)與消息的原子性,最終將通知先發(fā)給MQ。2、通知程序監(jiān)聽 MQ,接收MQ的消息。
方案1中接收通知方直接監(jiān)聽MQ,方案2中由通知程序監(jiān)聽MQ。
通知程序若沒有回應(yīng)ack則MQ會重復(fù)通知。3、通知程序通過互聯(lián)網(wǎng)接口協(xié)議(如http、webservice)調(diào)用接收通知方案接口,完成通知。
通知程序調(diào)用接收通知方案接口成功就表示通知成功,即消費MQ消息成功,MQ將不再向通知程序投遞通知消息。4、接收通知方可通過消息校對接口來校對消息的一致性。
方案1和方案2的不同點:
1、方案1中接收通知方與MQ接口,即接收通知方案監(jiān)聽 MQ,此方案主要應(yīng)用與內(nèi)部應(yīng)用之間的通知。
2、方案2中由通知程序與MQ接口,通知程序監(jiān)聽MQ,收到MQ的消息后由通知程序通過互聯(lián)網(wǎng)接口協(xié)議調(diào)用接收通知方。此方案主要應(yīng)用于外部應(yīng)用之間的通知,例如支付寶、微信的支付結(jié)果(notify接口回調(diào))通知。
RocketMQ實現(xiàn)最大努力通知型事務(wù)
業(yè)務(wù)說明
本實例通過RocketMq中間件實現(xiàn)最大努力通知型分布式事務(wù),模擬充值過程。
本案例有賬戶系統(tǒng)和充值系統(tǒng)兩個微服務(wù),其中賬戶系統(tǒng)的數(shù)據(jù)庫是bank1數(shù)據(jù)庫,其中有張三賬戶。充值系統(tǒng)的數(shù)據(jù)庫使用bank1_pay數(shù)據(jù)庫,記錄了賬戶的充值記錄。
業(yè)務(wù)流程如下圖:

交互流程如下:
1、用戶請求充值系統(tǒng)進行充值。
2、充值系統(tǒng)完成充值將充值結(jié)果發(fā)給MQ。
3、賬戶系統(tǒng)監(jiān)聽MQ,接收充值結(jié)果通知,如果接收不到消息,MQ會重復(fù)發(fā)送通知。接收到充值結(jié)果通知賬戶系統(tǒng)增加充值金額。
4、賬戶系統(tǒng)也可以主動查詢充值系統(tǒng)的充值結(jié)果查詢接口,增加金額。
程序組成部分
本示例程序組成部分如下:
數(shù)據(jù)庫:MySQL-5.7.25
包括bank1和bank1_pay兩個數(shù)據(jù)庫。
rocketmq 服務(wù)端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE
微服務(wù)框架:
Nacos-Server:1.3.1
SpringBoot:2.2.10.RELEASE
spring-cloud-dependencies:Hoxton.SR8
spring-cloud-alibaba-dependencies:2.2.1.RELEASE
微服務(wù)及數(shù)據(jù)庫的關(guān)系 :
rocket-notifymsg-demo-bank1 銀行1,操作張三賬戶, 連接數(shù)據(jù)庫bank1
rocket-notifymsg-demo-pay 銀行2,操作充值記錄,連接數(shù)據(jù)庫bank1_pay

交互流程如下:
1、用戶請求充值系統(tǒng)進行充值。
2、充值系統(tǒng)完成充值將充值結(jié)果發(fā)給MQ。
3、賬戶系統(tǒng)監(jiān)聽MQ,接收充值結(jié)果通知,如果接收不到消息,MQ會重復(fù)發(fā)送通知。接收到充值結(jié)果通知賬戶系統(tǒng)增加充值金額。
4、賬戶系統(tǒng)也可以主動查詢充值系統(tǒng)的充值結(jié)果查詢接口,增加金額。
創(chuàng)建數(shù)據(jù)庫:
創(chuàng)建bank1庫
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
創(chuàng)建bank1_pay庫,并導(dǎo)入以下表結(jié)構(gòu):
CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_pay` (
`id` varchar(64) COLLATE utf8_bin NOT NULL,
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '賬號',
`pay_amount` double NULL DEFAULT NULL COMMENT '充值余額',
`result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值結(jié)果:success,fail',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
啟動RocketMQ
1)、下載RocketMQ服務(wù)器
下載地址:http://rocketmq.apache.org/dowloading/releases/2)、解壓并啟動
啟動nameserver:
Windows系統(tǒng):
set ROCKETMQ_HOME=[rocketmq服務(wù)端解壓路徑]
start [rocketmq服務(wù)端解壓路徑]/bin/mqnamesrv.cmd
Centos系統(tǒng):
進入rocketMQ解壓目錄下的bin文件夾
nohup sh bin/mqnamesrv &
日志目錄:{rocketMQ解壓目錄}/logs/rocketmqlogs/namesrv.log
啟動broker:
Windows系統(tǒng):
set ROCKETMQ_HOME=[rocketmq服務(wù)端解壓路徑]
start [rocketmq服務(wù)端解壓路徑]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
Centos系統(tǒng):
進入rocketMQ解壓目錄下的bin文件夾
nohup sh bin/mqbroker &
日志目錄:{rocketMQ解壓目錄}/logs/rocketmqlogs/broker.log
創(chuàng)建
rocket-notifymsg-demo-bank1:銀行1,操作張三賬戶, 連接數(shù)據(jù)庫bank1
rocket-notifymsg-demo-pay:銀行2,操作充值記錄,連接數(shù)據(jù)庫bank1_pay
引入maven依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.10.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR8</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.6</version>
<configuration>
<configurationFile>
${basedir}/src/main/resources/generator/generatorConfig.xml
</configurationFile>
<overwrite>true</overwrite>
<verbose>true</verbose>
</configuration>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>4.1.5</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
rocket-notifymsg-demo-pay
rocket-notifymsg-demo-pay實現(xiàn)如下功能:
- 1、充值接口
- 2、充值完成要通知
- 3、充值結(jié)果查詢接口
application.properties
spring.application.name=notify-msg-pay
server.port=8094
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/trade_bank1_pay?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password = mysql
rocketmq.producer.group = producer_notifymsg_pay
rocketmq.name-server = 127.0.0.1:9876
logging.level.root = info
logging.level.org.springframework.web = info
logging.level.cn.itcast.wanxintx.effortdemo = debug
Controller
@RestController
@RequestMapping("/account")
public class AccountPayController {
@Autowired
private AccountPayService accountPayService;
//充值
@GetMapping(value = "/paydo")
public AccountPay pay(AccountPay accountPay){
//生成事務(wù)編號
String txNo = UUID.randomUUID().toString();
accountPay.setId(txNo);
return accountPayService.insertAccountPay(accountPay);
}
//查詢充值結(jié)果
@GetMapping(value = "/payResult/{txNo}")
public AccountPay payresult(@PathVariable("txNo") String txNo){
return accountPayService.getAccountPay(txNo);
}
}
Service
@Service
@Slf4j
public class AccountPayService {
@Autowired
private AccountPayMapper accountPayMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 插入充值記錄
* @param accountPay
* @return
*/
public AccountPay insertAccountPay(AccountPay accountPay) {
int success = accountPayMapper.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
if(success>0){
//發(fā)送通知,使用普通消息發(fā)送通知
accountPay.setResult("success");
rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
return accountPay;
}
return null;
}
/**
* 查詢充值記錄,接收通知方調(diào)用此方法來查詢充值結(jié)果
* @param txNo
* @return
*/
public AccountPay getAccountPay(String txNo) {
AccountPay accountPay = accountPayMapper.findByIdTxNo(txNo);
return accountPay;
}
}
Mapper
public interface AccountPayMapper extends Mapper<AccountPay> {
int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Long pay_amount, @Param("result") String result);
AccountPay findByIdTxNo(@Param("txNo") String txNo);
}
<mapper namespace="com.yibo.notifypay.mapper.AccountPayMapper">
<resultMap id="BaseResultMap" type="com.yibo.notifypay.domain.entity.AccountPay">
<!--
WARNING - @mbg.generated
-->
<id column="id" jdbcType="VARCHAR" property="id" />
<result column="account_no" jdbcType="VARCHAR" property="accountNo" />
<result column="pay_amount" jdbcType="BIGINT" property="payAmount" />
<result column="result" jdbcType="VARCHAR" property="result" />
</resultMap>
<insert id="insertAccountPay">
insert into account_pay(id,account_no,pay_amount,result) values(#{id},#{accountNo},#{payAmount},#{result})
</insert>
<select id="findByIdTxNo" resultType="BaseResultMap">
select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}
</select>
</mapper>
rocket-notifymsg-demo-bank1
rocket-notifymsg-demo-bank1實現(xiàn)如下功能:
- 1、監(jiān)聽MQ,接收充值結(jié)果,根據(jù)充值結(jié)果完成賬戶金額修改。
- 2、主動查詢充值系統(tǒng),根據(jù)充值結(jié)果完成賬戶金額修改。
application.properties
spring.application.name=notify-msg-bank1
server.port=8096
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/trade_bank1?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password = mysql
rocketmq.producer.group = producer_notifymsg_pay
rocketmq.name-server = 127.0.0.1:9876
logging.level.root = info
logging.level.org.springframework.web = info
logging.level.cn.itcast.wanxintx.effortdemo = debug
Controller
@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
//主動查詢充值結(jié)果
@GetMapping(value = "/payresult/{txNo}")
public AccountPay result(@PathVariable("txNo") String txNo){
AccountPay accountPay = accountInfoService.queryPayResult(txNo);
return accountPay;
}
}
Service
@Service
@Slf4j
public class AccountInfoService {
@Autowired
private AccountInfoMapper accountInfoMapper;
@Autowired
private PayClient payClient;
/**
* 更新賬戶金額
* @param accountChange
*/
@Transactional
public void updateAccountBalance(AccountChangeEvent accountChange) {
//冪等校驗
if(accountInfoMapper.isExistTx(accountChange.getTxNo())>0){
return ;
}
int i = accountInfoMapper.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
//插入事務(wù)記錄,用于冪等控制
accountInfoMapper.addTx(accountChange.getTxNo());
}
/**
* 遠程調(diào)用查詢充值結(jié)果
* @param tx_no
* @return
*/
public AccountPay queryPayResult(String tx_no) {
//遠程調(diào)用
AccountPay payresult = payClient.payresult(tx_no);
if("success".equals(payresult.getResult())){
//更新賬戶金額
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(payresult.getAccountNo());//賬號
accountChangeEvent.setAmount(payresult.getPayAmount());//金額
accountChangeEvent.setTxNo(payresult.getId());//充值事務(wù)號
updateAccountBalance(accountChangeEvent);
}
return payresult;
}
}
Consumer
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1")
public class NotifyMsgConsumer implements RocketMQListener<AccountPay> {
@Autowired
private AccountInfoService accountInfoService;
@Override
public void onMessage(AccountPay accountPay) {
log.info("接收到消息:{}", JSON.toJSONString(accountPay));
if("success".equals(accountPay.getResult())){
//更新賬戶金額
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setTxNo(accountPay.getId());
accountInfoService.updateAccountBalance(accountChangeEvent);
}
log.info("處理消息完成:{}", JSON.toJSONString(accountPay));
}
}
Feign
@FeignClient(value = "notify-msg-pay")
public interface PayClient {
//遠程調(diào)用充值系統(tǒng)的接口查詢充值結(jié)果
@GetMapping(value = "account/payResult/{txNo}")
public AccountPay payresult(@PathVariable("txNo") String txNo);
}
Mapper
public interface AccountInfoMapper extends Mapper<AccountInfo> {
//修改賬戶金額
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Long amount);
//查詢冪等記錄,用于冪等控制
int isExistTx(String txNo);
//添加事務(wù)記錄,用于冪等控制
int addTx(String txNo);
}
<mapper namespace="com.yibo.notify.mapper.AccountInfoMapper">
<resultMap id="BaseResultMap" type="com.yibo.notify.domain.entity.AccountInfo">
<!--
WARNING - @mbg.generated
-->
<id column="id" jdbcType="BIGINT" property="id" />
<result column="account_name" jdbcType="VARCHAR" property="accountName" />
<result column="account_no" jdbcType="VARCHAR" property="accountNo" />
<result column="account_password" jdbcType="VARCHAR" property="accountPassword" />
<result column="account_balance" jdbcType="BIGINT" property="accountBalance" />
</resultMap>
<!-- 修改賬戶金額 -->
<update id="updateAccountBalance">
update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}
</update>
<!-- 查詢冪等記錄,用于冪等控制 -->
<select id="isExistTx">
select count(1) from de_duplication where tx_no = #{txNo}
</select>
<!-- 添加事務(wù)記錄,用于冪等控制 -->
<insert id="addTx">
insert into de_duplication values(#{txNo},now())
</insert>
</mapper>
總結(jié)
最大努力通知方案是分布式事務(wù)中對一致性要求最低的一種,適用于一些最終一致性時間敏感度低的業(yè)務(wù)。
最大努力通知方案需要實現(xiàn)如下功能:
- 1、消息重復(fù)通知機制。
- 2、消息校對機制。