如何用RocketMQ實(shí)現(xiàn)分布式事務(wù)

本篇文章我們會(huì)以秒殺場(chǎng)景為例演示如何利用RocketMq實(shí)現(xiàn)分布式事務(wù)。
開始之前我們先來(lái)了解rocketmq的事務(wù)消息設(shè)計(jì)和流程


流程

Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息

一、RocketMQ事務(wù)消息流程概要


  • 事務(wù)消息發(fā)送及提交:

    1. 發(fā)送消息(half消息)。此階段的消息對(duì)任何消費(fèi)端不可見,不會(huì)被發(fā)現(xiàn)和消費(fèi)。消息被存儲(chǔ)在Broker中。
    2. 服務(wù)端響應(yīng)消息寫入結(jié)果。
    3. half消息發(fā)送成功執(zhí)行本地事務(wù)方法
    4. 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback,如果提交狀態(tài)為Commit這Broker將half消息標(biāo)記為可消費(fèi)狀態(tài),
       此時(shí)消費(fèi)端就可以消費(fèi)該條消息;如果是Rollback,則Broker會(huì)將該消息刪除
    

如果第4步的二次確認(rèn)消息因?yàn)槭鹿拾l(fā)送失敗,比如服務(wù)掛了,停電等,broker不知道是commit還是rollback,那么rocketmq需要定時(shí)回查生產(chǎn)者,確認(rèn)消息狀態(tài)。

  • 補(bǔ)償流程:

    5. 對(duì)沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”
    6. Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)
    7. 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback
    

其中,補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況。

二、開始編碼實(shí)戰(zhàn)


在秒殺業(yè)務(wù)當(dāng)中系統(tǒng)的性能瓶頸來(lái)自于數(shù)據(jù)庫(kù),秒殺時(shí)所有的用戶會(huì)同時(shí)搶一件商品,從技術(shù)講其實(shí)就是所有請(qǐng)求競(jìng)爭(zhēng)同一條行鎖,在高并發(fā)時(shí),都在競(jìng)爭(zhēng)同一行鎖,所有請(qǐng)求串行化,會(huì)大大降低系統(tǒng)的吞吐量。

其中一種方案就是緩存庫(kù)存,異步扣減,操作內(nèi)存的速度要大大超過(guò)操作磁盤。

  1. 活動(dòng)發(fā)布同步庫(kù)存進(jìn)緩存
  2. 下單交易扣減緩存中庫(kù)存
  3. 發(fā)送mq異步扣減數(shù)據(jù)庫(kù)庫(kù)存

我們先來(lái)看看發(fā)送普通消息會(huì)有什么問(wèn)題
1.引入依賴

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.15.RELEASE</version>
  </parent>
 <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

2.配置地址
rocketmq.name-server=192.168.1.8:9876
rocketmq.producer.group=producer_group

spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=5
3.下單

 @Override
    @Transactional
    public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {
        //1.校驗(yàn)下單狀態(tài),下單的商品是否存在,用戶是否合法,購(gòu)買數(shù)量是否正確
        ItemModel itemModel = itemService.getItemById(itemId);
        if(itemModel == null){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在");
        }

        UserModel userModel = userService.getUserById(userId);
        if(userModel == null){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"用戶信息不存在");
        }
        if(amount <= 0 || amount > 99){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"數(shù)量信息不正確");
        }

        //校驗(yàn)活動(dòng)信息
        if(promoId != null){
            //(1)校驗(yàn)對(duì)應(yīng)活動(dòng)是否存在這個(gè)適用商品
            if(promoId.intValue() != itemModel.getPromoModel().getId()){
                throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活動(dòng)信息不正確");
                //(2)校驗(yàn)活動(dòng)是否正在進(jìn)行中
            }else if(itemModel.getPromoModel().getStatus().intValue() != 2) {
                throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活動(dòng)信息還未開始");
            }
        }

        //2.落單減庫(kù)存,從緩存中扣減
        boolean result = itemService.cacheDecreaseStock(itemId, amount);
        if(!result){
            throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
        }

        //3.訂單入庫(kù)
        OrderModel orderModel = new OrderModel();
        orderModel.setUserId(userId);
        orderModel.setItemId(itemId);
        orderModel.setAmount(amount);
        if(promoId != null){
            orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice());
        }else{
            orderModel.setItemPrice(itemModel.getPrice());
        }
        orderModel.setPromoId(promoId);
        orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount)));

        //生成交易流水號(hào),訂單號(hào)
        orderModel.setId(generateOrderNo());
        OrderDO orderDO = convertFromOrderModel(orderModel);
        orderDOMapper.insertSelective(orderDO);

        //加上商品的銷量
        itemService.increaseSales(itemId,amount);
        //4.返回前端
        return orderModel;
    }

4.減庫(kù)存的邏輯

 public boolean cacheDecreaseStock(Integer itemId, Integer amount) throws BusinessException {
        // 扣減緩存庫(kù)存
        long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue() * -1);
        if(result > 0) {
            // 緩存的庫(kù)存扣減成功,發(fā)送異步消息扣減庫(kù)存
            try {
                rocketMQTemplate.convertAndSend("seckill",
                        DecreaseStockMessage.builder().itemId(itemId).amount(amount).build());
            } catch(MessagingException e) {
                log.error("異步扣減庫(kù)存消息發(fā)送異常", e);
                // 發(fā)送消息異常庫(kù)存需要回填
                increaseStock(itemId, amount);
                return false;
            }
            //更新庫(kù)存成功
            return true;
        } else if(result == 0) {
            //打上庫(kù)存已售罄的標(biāo)識(shí)
            redisTemplate.opsForValue().set("promo_item_stock_invalid_" + itemId, "true");

            //更新庫(kù)存成功
            return true;
        } else {
            // 更新庫(kù)存失敗,緩存庫(kù)存回填
            increaseStock(itemId, amount);
        }
        return false;
    }

4.消費(fèi)端代碼

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DecreaseStockMessage {

    private Integer itemId;
    private Integer amount;
}


@Component
@RocketMQMessageListener(consumerGroup = "consumer_group", topic = "seckill")
public class DecreasePromotStockListener implements RocketMQListener<DecreaseStockMessage> {

    @Autowired
    private ItemStockDOMapper stockDOMapper;

    @Override
    public void onMessage(DecreaseStockMessage decreaseStockMessage) {
        Integer amount = decreaseStockMessage.getAmount();
        Integer itemId = decreaseStockMessage.getItemId();
        stockDOMapper.decreaseStock(itemId, amount);
    }
}

執(zhí)行前數(shù)據(jù)庫(kù)庫(kù)存和緩存中的庫(kù)存


數(shù)據(jù)庫(kù)庫(kù)存
圖片.png

執(zhí)行后數(shù)據(jù)庫(kù)庫(kù)存和緩存中的庫(kù)存



圖片.png

結(jié)果是成功的,貌似好像萬(wàn)無(wú)一失,可果真是如此嗎,我們來(lái)分析一下。

下單的邏輯是在異步扣減庫(kù)存后去生成訂單,那么如果下單失敗了可是庫(kù)存已經(jīng)異步扣減了,那么就造成了數(shù)據(jù)的不一致,比如用戶取消支付等
沒有辦法去回補(bǔ)庫(kù)存

這就會(huì)造成少賣的現(xiàn)象,商家會(huì)發(fā)現(xiàn)庫(kù)存莫名奇妙的減少了可是找不到對(duì)應(yīng)的訂單
本質(zhì)就是一個(gè)分布式事務(wù)的問(wèn)題,因?yàn)锧Transactional并不能保證異步扣減庫(kù)存和生成訂單同時(shí)成功或失敗。

那么這樣的話,一個(gè)改造就是將發(fā)送消息延后到創(chuàng)建訂單后,整個(gè)下單方法最后返回之前,比如這樣


// ........省略以上生成訂單等邏輯,代碼已在上面貼出

       // 異步減庫(kù)存
        boolean b = itemService.asyncDecreaseStock(itemId, amount);
        if(!b) {
            // 回補(bǔ)redis的庫(kù)存
            itemService.increaseSales(itemId, amount);
        }
        //4.返回前端
        return orderModel;

但是這樣其實(shí)也是會(huì)有問(wèn)題的,消息發(fā)送成功,整個(gè)下單順利執(zhí)行完成,但是最后spring的聲明式事務(wù)最后提交前突然發(fā)生一些事故,比如網(wǎng)絡(luò)超時(shí),服務(wù)宕機(jī)等等導(dǎo)致事務(wù)沒有提交,但是消息已經(jīng)發(fā)送成功,庫(kù)存還是會(huì)丟掉。

那么這種時(shí)候的解決辦法就是需要有一種方式可以讓事務(wù)真正提交成功后發(fā)送消息,而且消息發(fā)送失敗的情況下依然有補(bǔ)償措施。接下來(lái)我們就看看通過(guò)rocketmq怎么解決這一問(wèn)題。

三、引入RocketMq事務(wù)消息

那么當(dāng)有下單請(qǐng)求進(jìn)來(lái)時(shí),直接發(fā)送第一階段消息,如下

@Override
    public boolean asyncDecreaseStockTransaction(Integer userId, Integer promoId, Integer itemId, Integer amount) {
        try {
            Map<String, Integer> argsMap = new HashMap<>();
            argsMap.put("userId", userId);
            argsMap.put("promoId", promoId);
            argsMap.put("itemId", itemId);
            argsMap.put("amount", amount);

            String s = JSONUtil.toJsonPrettyStr(argsMap);

            rocketMQTemplate.sendMessageInTransaction(
                    "tx_producer_group",
                    "seckill",
                    MessageBuilder.withPayload(
                        DecreaseStockMessage.builder().itemId(itemId).amount(amount).build())
                        .build(),
                    s);
        } catch(MessagingException e) {
            return false;
        }
        return true;
    }

第一階段消息發(fā)送成功會(huì)執(zhí)行定義的本地事務(wù),在這里去調(diào)用上面貼出的下單方法,如下:

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class DecreasePromoStockTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;


    /**
     * 發(fā)送第一階段消息成功后執(zhí)行本地事務(wù)
     * 生成訂單
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("開始執(zhí)行本地事務(wù)message:{}, args:{}", message, o);
        try {
            Object payload = message.getPayload();
            String jsonStr = (String) o;
            HashMap hashMap = JSONUtil.toBean(jsonStr, HashMap.class);
            Integer userId = (Integer) hashMap.get("userId");
            Integer itemId = (Integer) hashMap.get("itemId");
            Integer promoId = (Integer) hashMap.get("promoId");
            Integer amount = (Integer) hashMap.get("amount");

            OrderModel order = orderService.createOrder(userId, itemId, promoId, amount);
        } catch(Exception e) {
            log.error("本地事務(wù)異常", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("回查開始{}", message);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

扣庫(kù)存的方法修改為:

public boolean cacheDecreaseStock(Integer itemId, Integer amount) throws BusinessException {
        // 扣減緩存庫(kù)存
        long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue() * -1);
        if(result > 0) {
            // 緩存的庫(kù)存扣減成功,發(fā)送異步消息扣減庫(kù)存
            //asyncDecreaseStock(itemId, amount);
            //更新庫(kù)存成功
            return true;
        } else if(result == 0) {
            //打上庫(kù)存已售罄的標(biāo)識(shí)
            redisTemplate.opsForValue().set("promo_item_stock_invalid_" + itemId, "true");
            //更新庫(kù)存成功
            return true;
        } else {
            // 更新庫(kù)存失敗
            increaseStock(itemId, amount);
        }

        return false;
    }

回查方法還沒有實(shí)現(xiàn),為了在回查方法中追蹤該筆交易,需要設(shè)計(jì)一張訂單日志表

CREATE TABLE `stock_log` (
  `stock_log_id` varchar(64) NOT NULL,
  `item_id` int NOT NULL DEFAULT '0',
  `amount` int NOT NULL DEFAULT '0',
  `status` int NOT NULL DEFAULT '0' COMMENT '//1表示初始狀態(tài),2表示下單扣減庫(kù)存成功,3表示下單回滾',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`stock_log_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

1、只需要在發(fā)送第一階段消息之前生成一條日志,插入數(shù)據(jù)庫(kù)
2、生成訂單后,更新日志的狀態(tài)
3、將logId作為message的一個(gè)字段發(fā)送出去,在回查的方法中就可以獲取到
4、查詢log日志狀態(tài)返回對(duì)應(yīng)的狀態(tài)

 @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //根據(jù)是否扣減庫(kù)存成功,來(lái)判斷要返回COMMIT,ROLLBACK還是繼續(xù)UNKNOWN
                String jsonString  = new String(msg.getBody());
                Map<String,Object>map = JSON.parseObject(jsonString, Map.class);
                Integer itemId = (Integer) map.get("itemId");
                Integer amount = (Integer) map.get("amount");
        // 從message消息中拿到日志id
                String stockLogId = (String) map.get("stockLogId");
        // 查詢?nèi)罩?               
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
                if(stockLogDO == null){
                    return LocalTransactionState.UNKNOW;
                }
                if(stockLogDO.getStatus().intValue() == 2){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if(stockLogDO.getStatus().intValue() == 1){
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容