本篇文章我們會(huì)以秒殺場(chǎng)景為例演示如何利用RocketMq實(shí)現(xiàn)分布式事務(wù)。
開始之前我們先來(lái)了解rocketmq的事務(wù)消息設(shè)計(jì)和流程

Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息
一、RocketMQ事務(wù)消息流程概要
-
事務(wù)消息發(fā)送及提交:
1. 發(fā)送消息(half消息)。此階段的消息對(duì)任何消費(fèi)端不可見,不會(huì)被發(fā)現(xiàn)和消費(fèi)。消息被存儲(chǔ)在Broker中。 2. 服務(wù)端響應(yīng)消息寫入結(jié)果。 3. half消息發(fā)送成功執(zhí)行本地事務(wù)方法 4. 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback,如果提交狀態(tài)為Commit這Broker將half消息標(biāo)記為可消費(fèi)狀態(tài), 此時(shí)消費(fèi)端就可以消費(fèi)該條消息;如果是Rollback,則Broker會(huì)將該消息刪除
如果第4步的二次確認(rèn)消息因?yàn)槭鹿拾l(fā)送失敗,比如服務(wù)掛了,停電等,broker不知道是commit還是rollback,那么rocketmq需要定時(shí)回查生產(chǎn)者,確認(rèn)消息狀態(tài)。
-
補(bǔ)償流程:
5. 對(duì)沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查” 6. Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài) 7. 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback
其中,補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況。
二、開始編碼實(shí)戰(zhàn)
在秒殺業(yè)務(wù)當(dāng)中系統(tǒng)的性能瓶頸來(lái)自于數(shù)據(jù)庫(kù),秒殺時(shí)所有的用戶會(huì)同時(shí)搶一件商品,從技術(shù)講其實(shí)就是所有請(qǐng)求競(jìng)爭(zhēng)同一條行鎖,在高并發(fā)時(shí),都在競(jìng)爭(zhēng)同一行鎖,所有請(qǐng)求串行化,會(huì)大大降低系統(tǒng)的吞吐量。
其中一種方案就是緩存庫(kù)存,異步扣減,操作內(nèi)存的速度要大大超過(guò)操作磁盤。
- 活動(dòng)發(fā)布同步庫(kù)存進(jìn)緩存
- 下單交易扣減緩存中庫(kù)存
- 發(fā)送mq異步扣減數(shù)據(jù)庫(kù)庫(kù)存
我們先來(lái)看看發(fā)送普通消息會(huì)有什么問(wèn)題
1.引入依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.15.RELEASE</version>
</parent>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.配置地址
rocketmq.name-server=192.168.1.8:9876
rocketmq.producer.group=producer_group
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=5
3.下單
@Override
@Transactional
public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {
//1.校驗(yàn)下單狀態(tài),下單的商品是否存在,用戶是否合法,購(gòu)買數(shù)量是否正確
ItemModel itemModel = itemService.getItemById(itemId);
if(itemModel == null){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在");
}
UserModel userModel = userService.getUserById(userId);
if(userModel == null){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"用戶信息不存在");
}
if(amount <= 0 || amount > 99){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"數(shù)量信息不正確");
}
//校驗(yàn)活動(dòng)信息
if(promoId != null){
//(1)校驗(yàn)對(duì)應(yīng)活動(dòng)是否存在這個(gè)適用商品
if(promoId.intValue() != itemModel.getPromoModel().getId()){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活動(dòng)信息不正確");
//(2)校驗(yàn)活動(dòng)是否正在進(jìn)行中
}else if(itemModel.getPromoModel().getStatus().intValue() != 2) {
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活動(dòng)信息還未開始");
}
}
//2.落單減庫(kù)存,從緩存中扣減
boolean result = itemService.cacheDecreaseStock(itemId, amount);
if(!result){
throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
}
//3.訂單入庫(kù)
OrderModel orderModel = new OrderModel();
orderModel.setUserId(userId);
orderModel.setItemId(itemId);
orderModel.setAmount(amount);
if(promoId != null){
orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice());
}else{
orderModel.setItemPrice(itemModel.getPrice());
}
orderModel.setPromoId(promoId);
orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount)));
//生成交易流水號(hào),訂單號(hào)
orderModel.setId(generateOrderNo());
OrderDO orderDO = convertFromOrderModel(orderModel);
orderDOMapper.insertSelective(orderDO);
//加上商品的銷量
itemService.increaseSales(itemId,amount);
//4.返回前端
return orderModel;
}
4.減庫(kù)存的邏輯
public boolean cacheDecreaseStock(Integer itemId, Integer amount) throws BusinessException {
// 扣減緩存庫(kù)存
long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue() * -1);
if(result > 0) {
// 緩存的庫(kù)存扣減成功,發(fā)送異步消息扣減庫(kù)存
try {
rocketMQTemplate.convertAndSend("seckill",
DecreaseStockMessage.builder().itemId(itemId).amount(amount).build());
} catch(MessagingException e) {
log.error("異步扣減庫(kù)存消息發(fā)送異常", e);
// 發(fā)送消息異常庫(kù)存需要回填
increaseStock(itemId, amount);
return false;
}
//更新庫(kù)存成功
return true;
} else if(result == 0) {
//打上庫(kù)存已售罄的標(biāo)識(shí)
redisTemplate.opsForValue().set("promo_item_stock_invalid_" + itemId, "true");
//更新庫(kù)存成功
return true;
} else {
// 更新庫(kù)存失敗,緩存庫(kù)存回填
increaseStock(itemId, amount);
}
return false;
}
4.消費(fèi)端代碼
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DecreaseStockMessage {
private Integer itemId;
private Integer amount;
}
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group", topic = "seckill")
public class DecreasePromotStockListener implements RocketMQListener<DecreaseStockMessage> {
@Autowired
private ItemStockDOMapper stockDOMapper;
@Override
public void onMessage(DecreaseStockMessage decreaseStockMessage) {
Integer amount = decreaseStockMessage.getAmount();
Integer itemId = decreaseStockMessage.getItemId();
stockDOMapper.decreaseStock(itemId, amount);
}
}
執(zhí)行前數(shù)據(jù)庫(kù)庫(kù)存和緩存中的庫(kù)存


執(zhí)行后數(shù)據(jù)庫(kù)庫(kù)存和緩存中的庫(kù)存


結(jié)果是成功的,貌似好像萬(wàn)無(wú)一失,可果真是如此嗎,我們來(lái)分析一下。
下單的邏輯是在異步扣減庫(kù)存后去生成訂單,那么如果下單失敗了可是庫(kù)存已經(jīng)異步扣減了,那么就造成了數(shù)據(jù)的不一致,比如用戶取消支付等
沒有辦法去回補(bǔ)庫(kù)存
這就會(huì)造成少賣的現(xiàn)象,商家會(huì)發(fā)現(xiàn)庫(kù)存莫名奇妙的減少了可是找不到對(duì)應(yīng)的訂單
本質(zhì)就是一個(gè)分布式事務(wù)的問(wèn)題,因?yàn)锧Transactional并不能保證異步扣減庫(kù)存和生成訂單同時(shí)成功或失敗。
那么這樣的話,一個(gè)改造就是將發(fā)送消息延后到創(chuàng)建訂單后,整個(gè)下單方法最后返回之前,比如這樣
// ........省略以上生成訂單等邏輯,代碼已在上面貼出
// 異步減庫(kù)存
boolean b = itemService.asyncDecreaseStock(itemId, amount);
if(!b) {
// 回補(bǔ)redis的庫(kù)存
itemService.increaseSales(itemId, amount);
}
//4.返回前端
return orderModel;
但是這樣其實(shí)也是會(huì)有問(wèn)題的,消息發(fā)送成功,整個(gè)下單順利執(zhí)行完成,但是最后spring的聲明式事務(wù)最后提交前突然發(fā)生一些事故,比如網(wǎng)絡(luò)超時(shí),服務(wù)宕機(jī)等等導(dǎo)致事務(wù)沒有提交,但是消息已經(jīng)發(fā)送成功,庫(kù)存還是會(huì)丟掉。
那么這種時(shí)候的解決辦法就是需要有一種方式可以讓事務(wù)真正提交成功后發(fā)送消息,而且消息發(fā)送失敗的情況下依然有補(bǔ)償措施。接下來(lái)我們就看看通過(guò)rocketmq怎么解決這一問(wèn)題。
三、引入RocketMq事務(wù)消息
那么當(dāng)有下單請(qǐng)求進(jìn)來(lái)時(shí),直接發(fā)送第一階段消息,如下
@Override
public boolean asyncDecreaseStockTransaction(Integer userId, Integer promoId, Integer itemId, Integer amount) {
try {
Map<String, Integer> argsMap = new HashMap<>();
argsMap.put("userId", userId);
argsMap.put("promoId", promoId);
argsMap.put("itemId", itemId);
argsMap.put("amount", amount);
String s = JSONUtil.toJsonPrettyStr(argsMap);
rocketMQTemplate.sendMessageInTransaction(
"tx_producer_group",
"seckill",
MessageBuilder.withPayload(
DecreaseStockMessage.builder().itemId(itemId).amount(amount).build())
.build(),
s);
} catch(MessagingException e) {
return false;
}
return true;
}
第一階段消息發(fā)送成功會(huì)執(zhí)行定義的本地事務(wù),在這里去調(diào)用上面貼出的下單方法,如下:
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class DecreasePromoStockTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
/**
* 發(fā)送第一階段消息成功后執(zhí)行本地事務(wù)
* 生成訂單
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("開始執(zhí)行本地事務(wù)message:{}, args:{}", message, o);
try {
Object payload = message.getPayload();
String jsonStr = (String) o;
HashMap hashMap = JSONUtil.toBean(jsonStr, HashMap.class);
Integer userId = (Integer) hashMap.get("userId");
Integer itemId = (Integer) hashMap.get("itemId");
Integer promoId = (Integer) hashMap.get("promoId");
Integer amount = (Integer) hashMap.get("amount");
OrderModel order = orderService.createOrder(userId, itemId, promoId, amount);
} catch(Exception e) {
log.error("本地事務(wù)異常", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("回查開始{}", message);
return RocketMQLocalTransactionState.COMMIT;
}
}
扣庫(kù)存的方法修改為:
public boolean cacheDecreaseStock(Integer itemId, Integer amount) throws BusinessException {
// 扣減緩存庫(kù)存
long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue() * -1);
if(result > 0) {
// 緩存的庫(kù)存扣減成功,發(fā)送異步消息扣減庫(kù)存
//asyncDecreaseStock(itemId, amount);
//更新庫(kù)存成功
return true;
} else if(result == 0) {
//打上庫(kù)存已售罄的標(biāo)識(shí)
redisTemplate.opsForValue().set("promo_item_stock_invalid_" + itemId, "true");
//更新庫(kù)存成功
return true;
} else {
// 更新庫(kù)存失敗
increaseStock(itemId, amount);
}
return false;
}
回查方法還沒有實(shí)現(xiàn),為了在回查方法中追蹤該筆交易,需要設(shè)計(jì)一張訂單日志表
CREATE TABLE `stock_log` (
`stock_log_id` varchar(64) NOT NULL,
`item_id` int NOT NULL DEFAULT '0',
`amount` int NOT NULL DEFAULT '0',
`status` int NOT NULL DEFAULT '0' COMMENT '//1表示初始狀態(tài),2表示下單扣減庫(kù)存成功,3表示下單回滾',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`stock_log_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1、只需要在發(fā)送第一階段消息之前生成一條日志,插入數(shù)據(jù)庫(kù)
2、生成訂單后,更新日志的狀態(tài)
3、將logId作為message的一個(gè)字段發(fā)送出去,在回查的方法中就可以獲取到
4、查詢log日志狀態(tài)返回對(duì)應(yīng)的狀態(tài)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//根據(jù)是否扣減庫(kù)存成功,來(lái)判斷要返回COMMIT,ROLLBACK還是繼續(xù)UNKNOWN
String jsonString = new String(msg.getBody());
Map<String,Object>map = JSON.parseObject(jsonString, Map.class);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
// 從message消息中拿到日志id
String stockLogId = (String) map.get("stockLogId");
// 查詢?nèi)罩?
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if(stockLogDO == null){
return LocalTransactionState.UNKNOW;
}
if(stockLogDO.getStatus().intValue() == 2){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(stockLogDO.getStatus().intValue() == 1){
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}