基于RabbitMQ的分布式事務(wù)最終一致性解決方案

1. 分布式事務(wù)

所謂事務(wù),通俗一點(diǎn)講就是一系列操作要么同時(shí)成功,要么同時(shí)失敗。而分布式事務(wù)就是這一系列的操作在不同的節(jié)點(diǎn)上,那要如何保證事務(wù)的ACID特性呢。

  • 原子性(atomicity)。一個(gè)事務(wù)是一個(gè)不可分割的工作單位,事務(wù)中包括的操作要么都成功,要么都失敗。

  • 一致性(consistency)。事務(wù)必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性與原子性是密切相關(guān)的。

  • 隔離性(isolation)。一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)并發(fā)的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。

  • 持久性(durability)。持久性也稱永久性(permanence),指一個(gè)事務(wù)一旦提交或回滾,數(shù)據(jù)庫(kù)會(huì)對(duì)數(shù)據(jù)持久化的保存。

2. 最終一致性方案

首先,什么叫一致性?一致性指系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻具有同樣的值,所有節(jié)點(diǎn)訪問(wèn)同一份最新的數(shù)據(jù)副本。那么,什么又叫最終一致性呢。在此之前,先給大家介紹一下BASE理論。

  1. BA(Basically Available):基本可用。在分布式系統(tǒng)出現(xiàn)故障的時(shí)候,允許犧牲部分非核心功能的可用性,常用的手段是訪問(wèn)部分功能時(shí)進(jìn)入降級(jí)頁(yè)面,來(lái)保障核心業(yè)務(wù)的可用性。

  2. S(Soft state):軟狀態(tài)。允許系統(tǒng)中的數(shù)據(jù)存在中間狀態(tài),并且認(rèn)為該狀態(tài)是不影響系統(tǒng)的整體可用性的,即允許系統(tǒng)在不同節(jié)點(diǎn)上的數(shù)據(jù)備份短暫性的不一致。

  3. E(Eventually consistent):最終一致性。所謂最終一致性,就是數(shù)據(jù)不可能永久的處于軟狀態(tài),在一定的時(shí)間期限內(nèi),所有節(jié)點(diǎn)的數(shù)據(jù)備份應(yīng)當(dāng)是一致的,即數(shù)據(jù)延時(shí)一段時(shí)間后達(dá)到一致性。至于這個(gè)時(shí)間期限,取決于各種因素,包括業(yè)務(wù)需求、網(wǎng)絡(luò)延時(shí)、系統(tǒng)負(fù)載、存儲(chǔ)選型,數(shù)據(jù)復(fù)制方案設(shè)計(jì)等因素。

3. 可靠消息

所謂的可靠消息,即發(fā)布端消息不丟失,可靠抵達(dá)隊(duì)列,消費(fèi)端可靠接收。以RabbitMQ為例,消息的投遞消費(fèi)過(guò)程如下:

1.發(fā)布端確認(rèn)

  1. 如果使用標(biāo)準(zhǔn)的AMQP協(xié)議,保證消息不丟失的唯一方法就是使用事務(wù),使通道具有事務(wù)性,對(duì)每一條消息的發(fā)布和提交都是事務(wù)性的。在這種情況下,事務(wù)是不必要的重量級(jí),并將吞吐量降低了250倍,為了解決這個(gè)問(wèn)題,就引入了確認(rèn)機(jī)制。

    • confirmCallback確認(rèn)模式

      開(kāi)啟發(fā)布者確認(rèn)

      spring:
          rabbitmq: 
              publisher-confirm-type: correlated
      

      自定義RabbitTemplate

      @Autowired
      RabbitTemplate rabbitTemplate;
      
      @PostConstruct
      public void initRabbitTemplate(){
      
          rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
              /**
                   *
                   * @param correlationData 當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)
                   * @param b 消息是否成功收到
                   * @param s 失敗的原因
                   */
              @Override
              public void confirm(CorrelationData correlationData, boolean b, String s) {
                  System.out.println("當(dāng)前消息【"+correlationData+"】==》服務(wù)端是否收到:"+b+"==》失敗的原因【"+s+"】");
              }
          });
      }
      
    • returnCallback未投遞到隊(duì)列退回模式

      開(kāi)啟消息抵達(dá)隊(duì)列確認(rèn)

      spring: 
          rabbitmq: 
              publisher-returns: true # 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
              template:   #只有抵達(dá)隊(duì)列,以異步發(fā)送優(yōu)先回調(diào)returnCallback
                mandatory: true
      

      設(shè)置消息抵達(dá)隊(duì)列回調(diào)

      @PostConstruct
      public void initRabbitTemplate(){
      
          rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
              /**
                   * 只要消息沒(méi)有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗回調(diào)
                   * @param message 投遞失敗的消息詳細(xì)信息
                   * @param i 回復(fù)狀態(tài)碼
                   * @param s 回復(fù)的文本內(nèi)容
                   * @param s1 當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)交換機(jī)
                   * @param s2 當(dāng)時(shí)這個(gè)消息用哪個(gè)路由鍵
                   */
              @Override
              public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                  System.out.println("失敗信息【"+message+"】==》狀態(tài)碼【"+i+"】==》文本內(nèi)容【"+s+"】==》交換機(jī)【"+s1+"】==》路由鍵【"+s2+"】");
              }
          });
      }
      

2. 消費(fèi)端確認(rèn)

ack機(jī)制

默認(rèn)是自動(dòng)確認(rèn)的,只要消息收到,客戶端會(huì)自動(dòng)確認(rèn),服務(wù)端就會(huì)移除這個(gè)問(wèn)題

  1. 問(wèn)題:假如收到很多消息,自動(dòng)回復(fù)給服務(wù)器ack,如果一個(gè)消息處理成功,宕機(jī)了。發(fā)生消息丟失

  2. 消費(fèi)者手動(dòng)確認(rèn)模式:只要沒(méi)有明確告訴MQ,消息被接收,沒(méi)有Ack,消息就一直是unacked狀態(tài),即使服務(wù)器宕機(jī),消息也不會(huì)丟失,會(huì)重新變?yōu)镽eady狀態(tài)。

    1. 開(kāi)啟手動(dòng)確認(rèn)模式

      spring: 
        raabbitmq: 
              listener:
                simple:
                  acknowledge-mode: manual
      
    2. 手動(dòng)簽收

      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      try {
          // 簽收 long deliveryTag, boolean mulitiple(是否批量模式)
          channel.basicAck(deliveryTag,false);
      } catch (IOException e) {
          e.printStackTrace();
      }
      
      // 拒簽 long deliveryTag, boolean mulitiple(是否批量模式), boolean requeue(是否重新入隊(duì))
      channel.basicNack(deliveryTag,false,false);
      

4. 分布式事務(wù)案例

在電商背景下,以訂單和庫(kù)存系統(tǒng)之間的分布式事務(wù)為例,來(lái)介紹分布式事務(wù)基于消息隊(duì)列的最終一致性方案。下單和扣減庫(kù)存操作要么同時(shí)成功,要么同時(shí)失敗,是事務(wù)的。如果只是本地事務(wù)的話,操作同一數(shù)據(jù)庫(kù),依賴數(shù)據(jù)庫(kù)本身的事務(wù)特性,就可以完成。但是,對(duì)于分布式系統(tǒng)而言,訂單系統(tǒng)和庫(kù)存系統(tǒng)是操作不同的數(shù)據(jù)庫(kù)的,那要如何實(shí)現(xiàn)這樣的分布式事務(wù)。

1. 業(yè)務(wù)流程及問(wèn)題

  1. 一般的業(yè)務(wù)流程是:下單成功后,遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存。偽代碼如下

    public void saveOrder(){
        // 創(chuàng)建訂單
        Order order = createOrder();
        // 保存訂單
        orderService.save(order);
        // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù)
        wareFeignService.sub(order);
    }
    
  2. 訂單系統(tǒng)和庫(kù)存系統(tǒng)本地是滿足事務(wù)的。即訂單服務(wù)發(fā)生異常,訂單回滾;庫(kù)存服務(wù)發(fā)生異常,庫(kù)存是會(huì)回滾的。

  3. 由于訂單服務(wù)是以內(nèi)嵌的方式遠(yuǎn)程調(diào)用庫(kù)存服務(wù)的,也就是說(shuō),庫(kù)存服務(wù)發(fā)生異常,訂單服務(wù)感知到遠(yuǎn)程調(diào)用異常,從而訂單會(huì)回滾的,對(duì)于業(yè)務(wù)來(lái)說(shuō),這是沒(méi)有問(wèn)題的。

  4. 如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之前發(fā)生異常,訂單會(huì)回滾,并且也不會(huì)調(diào)用庫(kù)存服務(wù)來(lái)扣減庫(kù)存,這也是沒(méi)有問(wèn)題的。

  5. 如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之后,并且遠(yuǎn)程扣減庫(kù)存操作成功后,發(fā)生異常,則訂單會(huì)回滾,但是遠(yuǎn)程庫(kù)存服務(wù)是無(wú)法回滾的。這就導(dǎo)致了數(shù)據(jù)的不一致性。

2. 基于消息隊(duì)列的解決方案分析

我們使用RabbitMQ來(lái)實(shí)現(xiàn)分布式事務(wù)的最終一致性。

1. 業(yè)務(wù)分析

  1. 由于用戶下單和支付并不是同時(shí)進(jìn)行,一般都是下單成功后,30min內(nèi)可以支付。那我們來(lái)思考這樣一個(gè)問(wèn)題,如果我們?cè)谙聠纬晒涂蹨p庫(kù)存的話,會(huì)不會(huì)有什么問(wèn)題。

    • 惡意刷單。下單后不支付,導(dǎo)致其他人無(wú)法下單。
  2. 那如果支付成功后再扣減庫(kù)存呢?

    • 在支付訂單時(shí),會(huì)出現(xiàn)庫(kù)存不足,支付失敗。
  3. 綜合這兩種情況考慮,我們?cè)谙聠纬晒笙孺i定庫(kù)存,支付成功再去扣減庫(kù)存,如果超時(shí)未支付,則解鎖庫(kù)存。

2. 業(yè)務(wù)流程

  1. 下單成功,鎖定庫(kù)存

  2. 訂單支付超時(shí),則需要自動(dòng)關(guān)閉訂單。

  3. 訂單關(guān)閉,庫(kù)存需要解鎖。

3. 定時(shí)任務(wù)

如何確保下單后,30min內(nèi)保留訂單。最先想到的方法應(yīng)該時(shí)定時(shí)任務(wù)。

  1. 定時(shí)任務(wù)使用的是系統(tǒng)時(shí)間,我們無(wú)法為每一個(gè)訂單都生成一定時(shí)任務(wù)。

  2. 我想大家應(yīng)該都發(fā)現(xiàn)了使用定時(shí)任務(wù)會(huì)帶來(lái)的問(wèn)題,那就是每一個(gè)訂單的保留時(shí)間并不是一致的30min,訂單保留的時(shí)間區(qū)間為(0min,60min)。即在定時(shí)任務(wù)即將到來(lái)前完成下單,和定時(shí)任務(wù)剛結(jié)束完成下單。

所以使用定時(shí)任務(wù)來(lái)完成這個(gè)操作是不可行。

4. RabbitMQ延時(shí)隊(duì)列

利用消息的存活時(shí)間和死信來(lái)完成延時(shí)任務(wù)。

1. 消息的存活時(shí)間(TTL:Time To Live)

  1. RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。

  2. 對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。

  3. 如果隊(duì)列和消息都設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息的死亡時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。

  4. 單個(gè)消息的TTL,才是實(shí)現(xiàn)延遲任務(wù)的關(guān)鍵??梢酝ㄟ^(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。

2. 死信交換機(jī)(DLX:Dead Letter Exchanges)

  1. 一個(gè)消息如果滿足如下條件,就會(huì)進(jìn)入死信路由(不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)多個(gè)隊(duì)列)

    • 一個(gè)消息被消費(fèi)者拒收,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。

    • 消息的TTL到了,消息過(guò)期了。

    • 隊(duì)列長(zhǎng)度限制滿了,排在前面的消息會(huì)被丟棄或者扔到死信路由上。

  2. 在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去

  3. 先控制消息在一段時(shí)間后變成死信,然后控制變成死信的消息被路由到某個(gè)指定的交換機(jī)。二者結(jié)合就可以實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列。

在下單成功后,發(fā)送一條消息到死信隊(duì)列,經(jīng)過(guò)一段時(shí)間(TTL),死信路由到訂單釋放隊(duì)列,訂單服務(wù)監(jiān)聽(tīng)到消息釋放放訂單。

在這里插入圖片描述

5. 分布式解決方案

1. 對(duì)以下情況,庫(kù)存需要解鎖。

  1. 首先之前提到的,訂單服務(wù)在遠(yuǎn)程調(diào)用成功后,發(fā)生異常,導(dǎo)致訂單回滾,庫(kù)存也需要解鎖。

  2. 訂單延時(shí)取消,或者主動(dòng)取消訂單,都需要解鎖庫(kù)存。

  3. 訂單服務(wù)下單成功后,訂單服務(wù)宕機(jī),超過(guò)訂單支付時(shí)間,仍然無(wú)法恢復(fù),導(dǎo)致無(wú)法發(fā)送消息通知庫(kù)存服務(wù)解鎖庫(kù)存,故需要自動(dòng)解鎖庫(kù)存。

2. 解決方案

針對(duì)以上情形,庫(kù)存解鎖的方案。

  1. 第一種情況,可以利用庫(kù)存自動(dòng)解鎖來(lái)解決。庫(kù)存鎖定時(shí)發(fā)送消息到延時(shí)隊(duì)列,經(jīng)過(guò)TTL后,成為死信路由到庫(kù)存解鎖隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)到消息后,解鎖庫(kù)存。

    • 不能在訂單未支付時(shí)就解鎖庫(kù)存,所以庫(kù)存自動(dòng)解鎖的延遲時(shí)間應(yīng)該大于訂單延時(shí)取消的時(shí)間。
  2. 對(duì)于第二種情況,主動(dòng)取消或者延時(shí)取消,都可以通過(guò)庫(kù)存的自動(dòng)解鎖來(lái)完成庫(kù)存的解鎖。

  3. 自動(dòng)解鎖時(shí),需要判斷訂單的狀態(tài),只有為取消狀態(tài)的訂單才可以解鎖庫(kù)存。但是這樣仍然會(huì)存在問(wèn)題。

    • 訂單服務(wù)卡頓,導(dǎo)致訂單狀態(tài)消息一直改不了,而庫(kù)存消息先到期,查詢訂單狀態(tài)為新建狀態(tài),不解鎖庫(kù)存,并刪除消息,導(dǎo)致庫(kù)永遠(yuǎn)無(wú)法解鎖。

    • 解決:訂單超時(shí)取消的同時(shí),發(fā)送訂單取消的消息到隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)該消息,則解鎖庫(kù)存。

    • 為了防止重復(fù)解鎖,需要滿足冪等性。

3. 代碼實(shí)現(xiàn)

使用消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的流程圖如下:


在這里插入圖片描述

1. 訂單服務(wù)

  1. 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class OrderRabbitMQConfig {

   /**
    * 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
    * @return
    */
   @Bean
   public MessageConverter messageConverter(){
       return new Jackson2JsonMessageConverter();
   }
   @Bean
   public Exchange orderEventExchange(){
       return new TopicExchange("order-event-exchange",true,false);
   }

   @Bean
   public Queue orderReleaseOrderQueue(){
       return new Queue("order.release.order.queue",true,false,false);
   }

   @Bean
   public Queue orderDelayQueue(){
       HashMap<String, Object> args = new HashMap<>();
       args.put("x-dead-letter-exchange","order-event-exchange");
       args.put("x-dead-letter-routing-key","order.release.order");
       args.put("x-message-ttl",60000);
       return new Queue("order.delay.queue",true,false,false,args);
   }

   @Bean
   public Binding stockReleaseBinding(){
       return new Binding("order.release.order.queue",
               Binding.DestinationType.QUEUE,
               "order-event-exchange",
               "order.release.order",
               null);
   }

   @Bean
   public Binding stockLockedBinding(){
       return new Binding("order.delay.queue",
               Binding.DestinationType.QUEUE,
               "order-event-exchange",
               "order.create.order",
               null);
   }
   /**
    * 訂單釋放直接和庫(kù)存釋放進(jìn)行綁定
    */
   @Bean
   public Binding orderReleaseOtherBinding(){
       return new Binding("stock.release.stock.queue",
               Binding.DestinationType.QUEUE,
               "order-event-exchange",
               "order.release.other.#",
               null);
   }

}
  1. 訂單創(chuàng)建偽代碼
@Transactional
public Order createOrder(){
  // 創(chuàng)建訂單
  Order order = createOrder();
  // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),鎖定庫(kù)存
  R r = wareFeignService.orderLockStock(wareSkuLockVo);
  if (r.getCode() == 0) {
    // 鎖定成功,發(fā)送訂單創(chuàng)建消息到延時(shí)隊(duì)列
    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
  } else {
     // 遠(yuǎn)程調(diào)用失敗,拋出異常
     throw new Exception();
  }
  reture order;
}
  1. 訂單服務(wù)監(jiān)聽(tīng)訂單釋放信息,關(guān)閉訂單
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {

   @Autowired
   OmsOrderService orderService;

   @RabbitHandler
   public void listener(OmsOrderEntity orderEntity, Channel channel, Message message) throws IOException {
       System.out.println("收到過(guò)期訂單,準(zhǔn)備關(guān)單:"+orderEntity.getBizOrderId());
       try{
           orderService.closeOrder(orderEntity.getBizOrderId());
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
       }catch (Exception e){
           channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
       }
   }
}
  1. 關(guān)閉訂單,發(fā)送解鎖庫(kù)存消息。偽代碼如下
public void closeOrder(String bizOrderId) {
  // 查詢當(dāng)前訂單是否付款
  OmsOrderEntity order = getOne(new QueryWrapper<OmsOrderEntity>().eq("biz_order_id", bizOrderId));
  if (order != null) {
    // 判斷訂單狀態(tài),為新建狀態(tài)才取消
    if (order.getOrderStatus() == OrderStatusConstant.CREATE.getCode()){
      //過(guò)期未支付,取消訂單,設(shè)置訂單狀態(tài)為取消狀態(tài)
      updateOrder.setOrderStatus(OrderStatusConstant.CANCEL.getCode());
      // 發(fā)送MQ
      rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
      }
    }
}

2. 庫(kù)存服務(wù)

  1. 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class MyRabbitConfig {

    /**
     * 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public Exchange stockEventExchange(){
        return new TopicExchange("stock-event-exchange",true,true);
    }

    @Bean
    public Queue stockReleaseStockQueue(){
        return new Queue("stock.release.stock.queue",true,false,false);
    }

    @Bean
    public Queue stockDelayQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange","stock-event-exchange");
        args.put("x-dead-letter-routing-key","stock.release");
        args.put("x-message-ttl",120000);
        return new Queue("stock.delay.queue",true,false,false,args);
    }

    @Bean
    public Binding stockReleaseBinding(){
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }

    @Bean
    public Binding stockLockedBinding(){
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }
}

  1. 庫(kù)存鎖定,偽代碼如下
@Transactional
public void orderLockStock() {
  // 保存工作單
  taskService.save(task);
  // 為每件商品鎖定庫(kù)存
  for (OrderItemVo orderItem : orderItems) {
    //判斷庫(kù)存是否足夠
    Long count = skuWareDao.lockStock(orderItem);
    if (count == 1){
        // 鎖定成功
        // 保存工作單詳情
        taskDetailService.save(taskDetail);
        // 自動(dòng)解鎖,發(fā)送工作單詳情,防止回滾后找不到數(shù)據(jù)
        // 鎖定庫(kù)存,發(fā)送消息到延時(shí)隊(duì)列
        rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", taskTo);
    } else {
        // 鎖定失敗,拋出異常
        throw new Exception();
    }
  }
}
  1. 監(jiān)聽(tīng)?zhēng)齑娼怄i信息
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {

  @Autowired
  WmsSkuWareService wareSkuService;

  /**
   * 庫(kù)存自動(dòng)解鎖
   *只要解鎖庫(kù)存的消息的失敗,需要告訴MQ解鎖失敗,消息不要?jiǎng)h除,重新放回隊(duì)列
   * @param taskTo
   * @param message
   *
   */
  @RabbitHandler
  public void handleStockLockedRelease(StockLockedTaskTo taskTo, Message message, Channel channel) throws IOException {
    System.out.println("收到解鎖庫(kù)存的消息");
    try{
      wareSkuService.unlockStock(taskTo);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
  }

  @RabbitHandler
  public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
    System.out.println("訂單關(guān)閉,準(zhǔn)備解鎖庫(kù)存");
    try{
      wareSkuService.unlockStock(orderTo);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
  }
}
  1. 自動(dòng)解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(StockLockedTaskTo taskTo) {
  // 判斷是否存在該任務(wù)
  Detail detail = detailService.getById(taskTo.detailId)
  if (detail != null){
    // 查詢訂單狀態(tài),訂單狀態(tài)為【取消】,則解鎖庫(kù)存
    // 遠(yuǎn)程調(diào)用訂單服務(wù),獲取訂單狀態(tài)
    R r = orderFeignService.getOrderStatus(bizOrderId);
    if (r.getCode() == 0) {
      // 遠(yuǎn)程查詢成功
      OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {
      });
      if (orderVo == null || orderVo.getOrderStatus() == OrderStatusConstant.CANCEL.getCode()) {
        // 訂單不存在,或者訂單已取消,都需要解鎖庫(kù)存
        if (detail.getLockStatus() == WareTaskStatusConstant.LOCKED.getCode()){
          //鎖定狀態(tài)下才需要解鎖,已解鎖的不用在解鎖
          unlockStock(detail.getId(),detail.getSkuId(),detail.getSkuNum());
          // 更新taskDetail狀態(tài)為已解鎖
          taskDetailService.updateById(taskDetailEntity);
        }
      }
    } else {
      throw new RuntimeException("遠(yuǎn)程調(diào)用訂單服務(wù)失敗");
    }
  }
}
  1. 訂單取消,解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(OrderTo orderTo) {
  // 無(wú)需查詢訂單最新?tīng)顟B(tài),能來(lái)到這,肯定更新了訂單狀態(tài)的
  // 判斷任務(wù)是否存在
  WmsOrderTaskEntity task = taskService.getOne(new QueryWrapper<WmsOrderTaskEntity>().eq("biz_order_id", bizOrderId));
  if (task != null){
    // 任務(wù)存在,獲取任務(wù)項(xiàng)狀態(tài)為鎖定狀態(tài)的所有任務(wù)項(xiàng)
    List<WmsOrderTaskDetailEntity> taskDetails = taskDetailService.list(new QueryWrapper<WmsOrderTaskDetailEntity>()
            .eq("task_id", task.getId())
            .eq("lock_status", WareTaskStatusConstant.LOCKED.getCode()));
    if (taskDetails != null && taskDetails.size() > 0){
      for (WmsOrderTaskDetailEntity taskDetail : taskDetails) {
        // 解鎖庫(kù)存
        unlockStock(taskDetail.getId(),taskDetail.getSkuId(),taskDetail.getSkuNum());
        // 更新taskDetail狀態(tài)為已解鎖
        taskDetailService.updateById(taskDetailEntity);
      }
    }
  }
}

使用RabbitMQ實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的大致流程解析完畢。如果對(duì)源碼感興趣的,歡迎到github倉(cāng)庫(kù)clone。

1. 分布式事務(wù)

所謂事務(wù),通俗一點(diǎn)講就是一系列操作要么同時(shí)成功,要么同時(shí)失敗。而分布式事務(wù)就是這一系列的操作在不同的節(jié)點(diǎn)上,那要如何保證事務(wù)的ACID特性呢。

  • 原子性(atomicity)。一個(gè)事務(wù)是一個(gè)不可分割的工作單位,事務(wù)中包括的操作要么都成功,要么都失敗。

  • 一致性(consistency)。事務(wù)必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性與原子性是密切相關(guān)的。

  • 隔離性(isolation)。一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)并發(fā)的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。

  • 持久性(durability)。持久性也稱永久性(permanence),指一個(gè)事務(wù)一旦提交或回滾,數(shù)據(jù)庫(kù)會(huì)對(duì)數(shù)據(jù)持久化的保存。

2. 最終一致性方案

首先,什么叫一致性?一致性指系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻具有同樣的值,所有節(jié)點(diǎn)訪問(wèn)同一份最新的數(shù)據(jù)副本。那么,什么又叫最終一致性呢。在此之前,先給大家介紹一下BASE理論。

  1. BA(Basically Available):基本可用。在分布式系統(tǒng)出現(xiàn)故障的時(shí)候,允許犧牲部分非核心功能的可用性,常用的手段是訪問(wèn)部分功能時(shí)進(jìn)入降級(jí)頁(yè)面,來(lái)保障核心業(yè)務(wù)的可用性。

  2. S(Soft state):軟狀態(tài)。允許系統(tǒng)中的數(shù)據(jù)存在中間狀態(tài),并且認(rèn)為該狀態(tài)是不影響系統(tǒng)的整體可用性的,即允許系統(tǒng)在不同節(jié)點(diǎn)上的數(shù)據(jù)備份短暫性的不一致。

  3. E(Eventually consistent):最終一致性。所謂最終一致性,就是數(shù)據(jù)不可能永久的處于軟狀態(tài),在一定的時(shí)間期限內(nèi),所有節(jié)點(diǎn)的數(shù)據(jù)備份應(yīng)當(dāng)是一致的,即數(shù)據(jù)延時(shí)一段時(shí)間后達(dá)到一致性。至于這個(gè)時(shí)間期限,取決于各種因素,包括業(yè)務(wù)需求、網(wǎng)絡(luò)延時(shí)、系統(tǒng)負(fù)載、存儲(chǔ)選型,數(shù)據(jù)復(fù)制方案設(shè)計(jì)等因素。

3. 可靠消息

所謂的可靠消息,即發(fā)布端消息不丟失,可靠抵達(dá)隊(duì)列,消費(fèi)端可靠接收。以RabbitMQ為例,消息的投遞消費(fèi)過(guò)程如下:

1.發(fā)布端確認(rèn)

  1. 如果使用標(biāo)準(zhǔn)的AMQP協(xié)議,保證消息不丟失的唯一方法就是使用事務(wù),使通道具有事務(wù)性,對(duì)每一條消息的發(fā)布和提交都是事務(wù)性的。在這種情況下,事務(wù)是不必要的重量級(jí),并將吞吐量降低了250倍,為了解決這個(gè)問(wèn)題,就引入了確認(rèn)機(jī)制。

    • confirmCallback確認(rèn)模式

      開(kāi)啟發(fā)布者確認(rèn)

      spring:
          rabbitmq: 
              publisher-confirm-type: correlated
      

      自定義RabbitTemplate

      @Autowired
      RabbitTemplate rabbitTemplate;
      
      @PostConstruct
      public void initRabbitTemplate(){
      
          rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
              /**
                   *
                   * @param correlationData 當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)
                   * @param b 消息是否成功收到
                   * @param s 失敗的原因
                   */
              @Override
              public void confirm(CorrelationData correlationData, boolean b, String s) {
                  System.out.println("當(dāng)前消息【"+correlationData+"】==》服務(wù)端是否收到:"+b+"==》失敗的原因【"+s+"】");
              }
          });
      }
      
    • returnCallback未投遞到隊(duì)列退回模式

      開(kāi)啟消息抵達(dá)隊(duì)列確認(rèn)

      spring: 
          rabbitmq: 
              publisher-returns: true # 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
              template:   #只有抵達(dá)隊(duì)列,以異步發(fā)送優(yōu)先回調(diào)returnCallback
                mandatory: true
      

      設(shè)置消息抵達(dá)隊(duì)列回調(diào)

      @PostConstruct
      public void initRabbitTemplate(){
      
          rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
              /**
                   * 只要消息沒(méi)有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗回調(diào)
                   * @param message 投遞失敗的消息詳細(xì)信息
                   * @param i 回復(fù)狀態(tài)碼
                   * @param s 回復(fù)的文本內(nèi)容
                   * @param s1 當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)交換機(jī)
                   * @param s2 當(dāng)時(shí)這個(gè)消息用哪個(gè)路由鍵
                   */
              @Override
              public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                  System.out.println("失敗信息【"+message+"】==》狀態(tài)碼【"+i+"】==》文本內(nèi)容【"+s+"】==》交換機(jī)【"+s1+"】==》路由鍵【"+s2+"】");
              }
          });
      }
      

2. 消費(fèi)端確認(rèn)

ack機(jī)制

默認(rèn)是自動(dòng)確認(rèn)的,只要消息收到,客戶端會(huì)自動(dòng)確認(rèn),服務(wù)端就會(huì)移除這個(gè)問(wèn)題

  1. 問(wèn)題:假如收到很多消息,自動(dòng)回復(fù)給服務(wù)器ack,如果一個(gè)消息處理成功,宕機(jī)了。發(fā)生消息丟失

  2. 消費(fèi)者手動(dòng)確認(rèn)模式:只要沒(méi)有明確告訴MQ,消息被接收,沒(méi)有Ack,消息就一直是unacked狀態(tài),即使服務(wù)器宕機(jī),消息也不會(huì)丟失,會(huì)重新變?yōu)镽eady狀態(tài)。

    1. 開(kāi)啟手動(dòng)確認(rèn)模式

      spring: 
        raabbitmq: 
              listener:
                simple:
                  acknowledge-mode: manual
      
    2. 手動(dòng)簽收

      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      try {
          // 簽收 long deliveryTag, boolean mulitiple(是否批量模式)
          channel.basicAck(deliveryTag,false);
      } catch (IOException e) {
          e.printStackTrace();
      }
      
      // 拒簽 long deliveryTag, boolean mulitiple(是否批量模式), boolean requeue(是否重新入隊(duì))
      channel.basicNack(deliveryTag,false,false);
      

4. 分布式事務(wù)案例

在電商背景下,以訂單和庫(kù)存系統(tǒng)之間的分布式事務(wù)為例,來(lái)介紹分布式事務(wù)基于消息隊(duì)列的最終一致性方案。下單和扣減庫(kù)存操作要么同時(shí)成功,要么同時(shí)失敗,是事務(wù)的。如果只是本地事務(wù)的話,操作同一數(shù)據(jù)庫(kù),依賴數(shù)據(jù)庫(kù)本身的事務(wù)特性,就可以完成。但是,對(duì)于分布式系統(tǒng)而言,訂單系統(tǒng)和庫(kù)存系統(tǒng)是操作不同的數(shù)據(jù)庫(kù)的,那要如何實(shí)現(xiàn)這樣的分布式事務(wù)。

1. 業(yè)務(wù)流程及問(wèn)題

  1. 一般的業(yè)務(wù)流程是:下單成功后,遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存。偽代碼如下

    public void saveOrder(){
        // 創(chuàng)建訂單
        Order order = createOrder();
        // 保存訂單
        orderService.save(order);
        // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù)
        wareFeignService.sub(order);
    }
    
  2. 訂單系統(tǒng)和庫(kù)存系統(tǒng)本地是滿足事務(wù)的。即訂單服務(wù)發(fā)生異常,訂單回滾;庫(kù)存服務(wù)發(fā)生異常,庫(kù)存是會(huì)回滾的。

  3. 由于訂單服務(wù)是以內(nèi)嵌的方式遠(yuǎn)程調(diào)用庫(kù)存服務(wù)的,也就是說(shuō),庫(kù)存服務(wù)發(fā)生異常,訂單服務(wù)感知到遠(yuǎn)程調(diào)用異常,從而訂單會(huì)回滾的,對(duì)于業(yè)務(wù)來(lái)說(shuō),這是沒(méi)有問(wèn)題的。

  4. 如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之前發(fā)生異常,訂單會(huì)回滾,并且也不會(huì)調(diào)用庫(kù)存服務(wù)來(lái)扣減庫(kù)存,這也是沒(méi)有問(wèn)題的。

  5. 如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之后,并且遠(yuǎn)程扣減庫(kù)存操作成功后,發(fā)生異常,則訂單會(huì)回滾,但是遠(yuǎn)程庫(kù)存服務(wù)是無(wú)法回滾的。這就導(dǎo)致了數(shù)據(jù)的不一致性。

2. 基于消息隊(duì)列的解決方案分析

我們使用RabbitMQ來(lái)實(shí)現(xiàn)分布式事務(wù)的最終一致性。

1. 業(yè)務(wù)分析

  1. 由于用戶下單和支付并不是同時(shí)進(jìn)行,一般都是下單成功后,30min內(nèi)可以支付。那我們來(lái)思考這樣一個(gè)問(wèn)題,如果我們?cè)谙聠纬晒涂蹨p庫(kù)存的話,會(huì)不會(huì)有什么問(wèn)題。

    • 惡意刷單。下單后不支付,導(dǎo)致其他人無(wú)法下單。
  2. 那如果支付成功后再扣減庫(kù)存呢?

    • 在支付訂單時(shí),會(huì)出現(xiàn)庫(kù)存不足,支付失敗。
  3. 綜合這兩種情況考慮,我們?cè)谙聠纬晒笙孺i定庫(kù)存,支付成功再去扣減庫(kù)存,如果超時(shí)未支付,則解鎖庫(kù)存。

2. 業(yè)務(wù)流程

  1. 下單成功,鎖定庫(kù)存

  2. 訂單支付超時(shí),則需要自動(dòng)關(guān)閉訂單。

  3. 訂單關(guān)閉,庫(kù)存需要解鎖。

3. 定時(shí)任務(wù)

如何確保下單后,30min內(nèi)保留訂單。最先想到的方法應(yīng)該時(shí)定時(shí)任務(wù)。

  1. 定時(shí)任務(wù)使用的是系統(tǒng)時(shí)間,我們無(wú)法為每一個(gè)訂單都生成一定時(shí)任務(wù)。

  2. 我想大家應(yīng)該都發(fā)現(xiàn)了使用定時(shí)任務(wù)會(huì)帶來(lái)的問(wèn)題,那就是每一個(gè)訂單的保留時(shí)間并不是一致的30min,訂單保留的時(shí)間區(qū)間為(0min,60min)。即在定時(shí)任務(wù)即將到來(lái)前完成下單,和定時(shí)任務(wù)剛結(jié)束完成下單。

所以使用定時(shí)任務(wù)來(lái)完成這個(gè)操作是不可行。

4. RabbitMQ延時(shí)隊(duì)列

利用消息的存活時(shí)間和死信來(lái)完成延時(shí)任務(wù)。

1. 消息的存活時(shí)間(TTL:Time To Live)

  1. RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。

  2. 對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。

  3. 如果隊(duì)列和消息都設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息的死亡時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。

  4. 單個(gè)消息的TTL,才是實(shí)現(xiàn)延遲任務(wù)的關(guān)鍵??梢酝ㄟ^(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。

2. 死信交換機(jī)(DLX:Dead Letter Exchanges)

  1. 一個(gè)消息如果滿足如下條件,就會(huì)進(jìn)入死信路由(不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)多個(gè)隊(duì)列)

    • 一個(gè)消息被消費(fèi)者拒收,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。

    • 消息的TTL到了,消息過(guò)期了。

    • 隊(duì)列長(zhǎng)度限制滿了,排在前面的消息會(huì)被丟棄或者扔到死信路由上。

  2. 在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去

  3. 先控制消息在一段時(shí)間后變成死信,然后控制變成死信的消息被路由到某個(gè)指定的交換機(jī)。二者結(jié)合就可以實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列。

在下單成功后,發(fā)送一條消息到死信隊(duì)列,經(jīng)過(guò)一段時(shí)間(TTL),死信路由到訂單釋放隊(duì)列,訂單服務(wù)監(jiān)聽(tīng)到消息釋放放訂單。

在這里插入圖片描述

5. 分布式解決方案

1. 對(duì)以下情況,庫(kù)存需要解鎖。

  1. 首先之前提到的,訂單服務(wù)在遠(yuǎn)程調(diào)用成功后,發(fā)生異常,導(dǎo)致訂單回滾,庫(kù)存也需要解鎖。

  2. 訂單延時(shí)取消,或者主動(dòng)取消訂單,都需要解鎖庫(kù)存。

  3. 訂單服務(wù)下單成功后,訂單服務(wù)宕機(jī),超過(guò)訂單支付時(shí)間,仍然無(wú)法恢復(fù),導(dǎo)致無(wú)法發(fā)送消息通知庫(kù)存服務(wù)解鎖庫(kù)存,故需要自動(dòng)解鎖庫(kù)存。

2. 解決方案

針對(duì)以上情形,庫(kù)存解鎖的方案。

  1. 第一種情況,可以利用庫(kù)存自動(dòng)解鎖來(lái)解決。庫(kù)存鎖定時(shí)發(fā)送消息到延時(shí)隊(duì)列,經(jīng)過(guò)TTL后,成為死信路由到庫(kù)存解鎖隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)到消息后,解鎖庫(kù)存。

    • 不能在訂單未支付時(shí)就解鎖庫(kù)存,所以庫(kù)存自動(dòng)解鎖的延遲時(shí)間應(yīng)該大于訂單延時(shí)取消的時(shí)間。
  2. 對(duì)于第二種情況,主動(dòng)取消或者延時(shí)取消,都可以通過(guò)庫(kù)存的自動(dòng)解鎖來(lái)完成庫(kù)存的解鎖。

  3. 自動(dòng)解鎖時(shí),需要判斷訂單的狀態(tài),只有為取消狀態(tài)的訂單才可以解鎖庫(kù)存。但是這樣仍然會(huì)存在問(wèn)題。

    • 訂單服務(wù)卡頓,導(dǎo)致訂單狀態(tài)消息一直改不了,而庫(kù)存消息先到期,查詢訂單狀態(tài)為新建狀態(tài),不解鎖庫(kù)存,并刪除消息,導(dǎo)致庫(kù)永遠(yuǎn)無(wú)法解鎖。

    • 解決:訂單超時(shí)取消的同時(shí),發(fā)送訂單取消的消息到隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)該消息,則解鎖庫(kù)存。

    • 為了防止重復(fù)解鎖,需要滿足冪等性。

3. 代碼實(shí)現(xiàn)

使用消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的流程圖如下:


在這里插入圖片描述

1. 訂單服務(wù)

  1. 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class OrderRabbitMQConfig {

   /**
    * 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
    * @return
    */
   @Bean
   public MessageConverter messageConverter(){
       return new Jackson2JsonMessageConverter();
   }
   @Bean
   public Exchange orderEventExchange(){
       return new TopicExchange("order-event-exchange",true,false);
   }

   @Bean
   public Queue orderReleaseOrderQueue(){
       return new Queue("order.release.order.queue",true,false,false);
   }

   @Bean
   public Queue orderDelayQueue(){
       HashMap<String, Object> args = new HashMap<>();
       args.put("x-dead-letter-exchange","order-event-exchange");
       args.put("x-dead-letter-routing-key","order.release.order");
       args.put("x-message-ttl",60000);
       return new Queue("order.delay.queue",true,false,false,args);
   }

   @Bean
   public Binding stockReleaseBinding(){
       return new Binding("order.release.order.queue",
               Binding.DestinationType.QUEUE,
               "order-event-exchange",
               "order.release.order",
               null);
   }

   @Bean
   public Binding stockLockedBinding(){
       return new Binding("order.delay.queue",
               Binding.DestinationType.QUEUE,
               "order-event-exchange",
               "order.create.order",
               null);
   }
   /**
    * 訂單釋放直接和庫(kù)存釋放進(jìn)行綁定
    */
   @Bean
   public Binding orderReleaseOtherBinding(){
       return new Binding("stock.release.stock.queue",
               Binding.DestinationType.QUEUE,
               "order-event-exchange",
               "order.release.other.#",
               null);
   }

}
  1. 訂單創(chuàng)建偽代碼
@Transactional
public Order createOrder(){
  // 創(chuàng)建訂單
  Order order = createOrder();
  // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),鎖定庫(kù)存
  R r = wareFeignService.orderLockStock(wareSkuLockVo);
  if (r.getCode() == 0) {
    // 鎖定成功,發(fā)送訂單創(chuàng)建消息到延時(shí)隊(duì)列
    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
  } else {
     // 遠(yuǎn)程調(diào)用失敗,拋出異常
     throw new Exception();
  }
  reture order;
}
  1. 訂單服務(wù)監(jiān)聽(tīng)訂單釋放信息,關(guān)閉訂單
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {

   @Autowired
   OmsOrderService orderService;

   @RabbitHandler
   public void listener(OmsOrderEntity orderEntity, Channel channel, Message message) throws IOException {
       System.out.println("收到過(guò)期訂單,準(zhǔn)備關(guān)單:"+orderEntity.getBizOrderId());
       try{
           orderService.closeOrder(orderEntity.getBizOrderId());
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
       }catch (Exception e){
           channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
       }
   }
}
  1. 關(guān)閉訂單,發(fā)送解鎖庫(kù)存消息。偽代碼如下
public void closeOrder(String bizOrderId) {
  // 查詢當(dāng)前訂單是否付款
  OmsOrderEntity order = getOne(new QueryWrapper<OmsOrderEntity>().eq("biz_order_id", bizOrderId));
  if (order != null) {
    // 判斷訂單狀態(tài),為新建狀態(tài)才取消
    if (order.getOrderStatus() == OrderStatusConstant.CREATE.getCode()){
      //過(guò)期未支付,取消訂單,設(shè)置訂單狀態(tài)為取消狀態(tài)
      updateOrder.setOrderStatus(OrderStatusConstant.CANCEL.getCode());
      // 發(fā)送MQ
      rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
      }
    }
}

2. 庫(kù)存服務(wù)

  1. 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class MyRabbitConfig {

    /**
     * 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public Exchange stockEventExchange(){
        return new TopicExchange("stock-event-exchange",true,true);
    }

    @Bean
    public Queue stockReleaseStockQueue(){
        return new Queue("stock.release.stock.queue",true,false,false);
    }

    @Bean
    public Queue stockDelayQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange","stock-event-exchange");
        args.put("x-dead-letter-routing-key","stock.release");
        args.put("x-message-ttl",120000);
        return new Queue("stock.delay.queue",true,false,false,args);
    }

    @Bean
    public Binding stockReleaseBinding(){
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }

    @Bean
    public Binding stockLockedBinding(){
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }
}

  1. 庫(kù)存鎖定,偽代碼如下
@Transactional
public void orderLockStock() {
  // 保存工作單
  taskService.save(task);
  // 為每件商品鎖定庫(kù)存
  for (OrderItemVo orderItem : orderItems) {
    //判斷庫(kù)存是否足夠
    Long count = skuWareDao.lockStock(orderItem);
    if (count == 1){
        // 鎖定成功
        // 保存工作單詳情
        taskDetailService.save(taskDetail);
        // 自動(dòng)解鎖,發(fā)送工作單詳情,防止回滾后找不到數(shù)據(jù)
        // 鎖定庫(kù)存,發(fā)送消息到延時(shí)隊(duì)列
        rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", taskTo);
    } else {
        // 鎖定失敗,拋出異常
        throw new Exception();
    }
  }
}
  1. 監(jiān)聽(tīng)?zhēng)齑娼怄i信息
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {

  @Autowired
  WmsSkuWareService wareSkuService;

  /**
   * 庫(kù)存自動(dòng)解鎖
   *只要解鎖庫(kù)存的消息的失敗,需要告訴MQ解鎖失敗,消息不要?jiǎng)h除,重新放回隊(duì)列
   * @param taskTo
   * @param message
   *
   */
  @RabbitHandler
  public void handleStockLockedRelease(StockLockedTaskTo taskTo, Message message, Channel channel) throws IOException {
    System.out.println("收到解鎖庫(kù)存的消息");
    try{
      wareSkuService.unlockStock(taskTo);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
  }

  @RabbitHandler
  public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
    System.out.println("訂單關(guān)閉,準(zhǔn)備解鎖庫(kù)存");
    try{
      wareSkuService.unlockStock(orderTo);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
  }
}
  1. 自動(dòng)解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(StockLockedTaskTo taskTo) {
  // 判斷是否存在該任務(wù)
  Detail detail = detailService.getById(taskTo.detailId)
  if (detail != null){
    // 查詢訂單狀態(tài),訂單狀態(tài)為【取消】,則解鎖庫(kù)存
    // 遠(yuǎn)程調(diào)用訂單服務(wù),獲取訂單狀態(tài)
    R r = orderFeignService.getOrderStatus(bizOrderId);
    if (r.getCode() == 0) {
      // 遠(yuǎn)程查詢成功
      OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {
      });
      if (orderVo == null || orderVo.getOrderStatus() == OrderStatusConstant.CANCEL.getCode()) {
        // 訂單不存在,或者訂單已取消,都需要解鎖庫(kù)存
        if (detail.getLockStatus() == WareTaskStatusConstant.LOCKED.getCode()){
          //鎖定狀態(tài)下才需要解鎖,已解鎖的不用在解鎖
          unlockStock(detail.getId(),detail.getSkuId(),detail.getSkuNum());
          // 更新taskDetail狀態(tài)為已解鎖
          taskDetailService.updateById(taskDetailEntity);
        }
      }
    } else {
      throw new RuntimeException("遠(yuǎn)程調(diào)用訂單服務(wù)失敗");
    }
  }
}
  1. 訂單取消,解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(OrderTo orderTo) {
  // 無(wú)需查詢訂單最新?tīng)顟B(tài),能來(lái)到這,肯定更新了訂單狀態(tài)的
  // 判斷任務(wù)是否存在
  WmsOrderTaskEntity task = taskService.getOne(new QueryWrapper<WmsOrderTaskEntity>().eq("biz_order_id", bizOrderId));
  if (task != null){
    // 任務(wù)存在,獲取任務(wù)項(xiàng)狀態(tài)為鎖定狀態(tài)的所有任務(wù)項(xiàng)
    List<WmsOrderTaskDetailEntity> taskDetails = taskDetailService.list(new QueryWrapper<WmsOrderTaskDetailEntity>()
            .eq("task_id", task.getId())
            .eq("lock_status", WareTaskStatusConstant.LOCKED.getCode()));
    if (taskDetails != null && taskDetails.size() > 0){
      for (WmsOrderTaskDetailEntity taskDetail : taskDetails) {
        // 解鎖庫(kù)存
        unlockStock(taskDetail.getId(),taskDetail.getSkuId(),taskDetail.getSkuNum());
        // 更新taskDetail狀態(tài)為已解鎖
        taskDetailService.updateById(taskDetailEntity);
      }
    }
  }
}

使用RabbitMQ實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的大致流程解析完畢。如果對(duì)源碼感興趣的,歡迎到github倉(cāng)庫(kù)clone。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容