精華推薦 | 【深入淺出RocketMQ原理及實(shí)戰(zhàn)】「性能原理挖掘系列」透徹剖析貫穿RocketMQ的事務(wù)性消息的底層原理并在分析其實(shí)際開(kāi)發(fā)場(chǎng)景

什么是事務(wù)消息

事務(wù)消息(Transactional Message)是指應(yīng)用本地事務(wù)和發(fā)送消息操作可以被定義到全局事務(wù)中,要么同時(shí)成功,要么同時(shí)失敗。RocketMQ的事務(wù)消息提供類(lèi)似 X/Open XA 的分布事務(wù)功能,通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。

事務(wù)消息所對(duì)應(yīng)的場(chǎng)景

在一些對(duì)數(shù)據(jù)一致性有強(qiáng)需求的場(chǎng)景,可以用 Apache RocketMQ 事務(wù)消息來(lái)解決,從而保證上下游數(shù)據(jù)的一致性。

image

以秒殺購(gòu)物商城的商品下單交易場(chǎng)景為例,用戶(hù)支付訂單這一核心操作的同時(shí)會(huì)涉及到下游物流發(fā)貨、庫(kù)存變更、購(gòu)物車(chē)狀態(tài)清空等多個(gè)子系統(tǒng)的變更。

事務(wù)性業(yè)務(wù)的處理分支包括:

  1. 主分支訂單系統(tǒng)狀態(tài)更新:由未支付變更為支付成功。
  2. 調(diào)用第三方物流系統(tǒng)狀態(tài)新增:新增待發(fā)貨物流記錄,創(chuàng)建訂單物流記錄。
  3. 積分系統(tǒng)狀態(tài)變更:變更用戶(hù)積分,更新用戶(hù)積分表。
  4. 購(gòu)物車(chē)系統(tǒng)狀態(tài)變更:清空購(gòu)物車(chē),更新用戶(hù)購(gòu)物車(chē)記錄。
image

RocketMQ的事務(wù)消息

Apache RocketMQ在4.3.0版的時(shí)候已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息。

RocketMQ事務(wù)消息流程

針對(duì)于事務(wù)消息的總體運(yùn)作流程,主要分為兩個(gè)部分:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。

事務(wù)消息發(fā)送及提交基本流程概要(后面會(huì)詳細(xì)分析原理)
事務(wù)消息發(fā)送步驟如下
  1. 消息發(fā)送者:生產(chǎn)者將半事務(wù)消息發(fā)送至RocketMQ Broker。
  2. Broker服務(wù)端:RocketMQ Broker 將消息持久化成功之后,向生產(chǎn)者返回 Ack 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息暫不能投遞,為半事務(wù)消息。
  3. 業(yè)務(wù)系統(tǒng):生產(chǎn)者開(kāi)始執(zhí)行本地事務(wù)邏輯。
  4. 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback)。
    • 如果本地操作成功,Commit操作生成消息索引,消息對(duì)消費(fèi)者可見(jiàn)
    • 如果本地操作失敗,此時(shí)對(duì)應(yīng)的half消息對(duì)業(yè)務(wù)不可見(jiàn),本地邏輯不執(zhí)行,Rollback均進(jìn)行回滾。
服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下
  • 確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者。
  • 確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會(huì)將半事務(wù)消息投遞給消費(fèi)者。
消息出現(xiàn)異常情況的補(bǔ)償流程如下

在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果(Commit/Rollback)或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過(guò)固定時(shí)間后,服務(wù)端將對(duì)消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。

注意:服務(wù)端僅僅會(huì)按照參數(shù)嘗試指定次數(shù),超過(guò)次數(shù)后事務(wù)會(huì)強(qiáng)制回滾,因此未決事務(wù)的回查時(shí)效性非常關(guān)鍵,需要按照業(yè)務(wù)的實(shí)際風(fēng)險(xiǎn)來(lái)設(shè)置

事務(wù)消息回查步驟如下
  • 生產(chǎn)者收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  • 生產(chǎn)者根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行處理。
補(bǔ)償總結(jié)
  1. 對(duì)沒(méi)有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),Broker服務(wù)端會(huì)發(fā)起一次“回查”。
  2. 生產(chǎn)者Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)。
  3. 生產(chǎn)者根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback。

補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況

RocketMQ事務(wù)消息實(shí)現(xiàn)原理

事務(wù)消息在一階段對(duì)用戶(hù)不可見(jiàn)

在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對(duì)用戶(hù)不可見(jiàn)。

實(shí)現(xiàn)技術(shù)要點(diǎn)一:事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶(hù)是不可見(jiàn)的。如何做到寫(xiě)入消息但是對(duì)用戶(hù)不可見(jiàn)呢?

RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的Topic與消息消費(fèi)隊(duì)列,然后,改變Topic為RMQ_SYS_TRANS_HALF_TOPIC。

由于消費(fèi)組未訂閱該主題,故消費(fèi)端無(wú)法消費(fèi)half類(lèi)型的消息,然后RocketMQ會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個(gè)服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請(qǐng)求,根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾消息。

RocketMQ中,消息在服務(wù)端的存儲(chǔ)結(jié)構(gòu)如下,每條消息都會(huì)有對(duì)應(yīng)的索引信息,Consumer通過(guò)ConsumeQueue這個(gè)二級(jí)索引來(lái)讀取消息實(shí)體內(nèi)容,其流程如下:

image

RocketMQ的底層實(shí)現(xiàn)原理

  1. 寫(xiě)入的如果事務(wù)消息,對(duì)消息的Topic和Queue等屬性進(jìn)行替換,同時(shí)將原來(lái)的Topic和Queue信息存儲(chǔ)到消息的屬性中,正因?yàn)橄⒅黝}被替換,故消息并不會(huì)轉(zhuǎn)發(fā)到該原主題的消息消費(fèi)隊(duì)列。
  2. 由于沒(méi)有直接發(fā)送到目標(biāo)的topic的隊(duì)列里面,故此消費(fèi)者無(wú)法感知消息的存在,不會(huì)消費(fèi),其實(shí)改變消息主題是RocketMQ的常用“套路”,回想一下延時(shí)消息的實(shí)現(xiàn)機(jī)制。
image
發(fā)送一個(gè)半事務(wù)消息

半事務(wù)消息是指暫不能投遞的消息,生產(chǎn)者已經(jīng)成功地將消息發(fā)送到了 Broker,但是Broker未收到生產(chǎn)者對(duì)該消息的二次確認(rèn),此時(shí)該消息被標(biāo)記成“暫不能投遞(pending)”狀態(tài),如果發(fā)送成功則執(zhí)行本地事務(wù),并根據(jù)本地事務(wù)執(zhí)行成功與否,向Broker半事務(wù)消息狀態(tài)(commit或者rollback),半事務(wù)消息只有commit狀態(tài)才會(huì)真正向下游投遞。

Commit和Rollback操作以及Op消息的底層實(shí)現(xiàn)原理

Rollback的情況,對(duì)于Rollback,本身一階段的消息對(duì)用戶(hù)是不可見(jiàn)的,其實(shí)不需要真正撤銷(xiāo)消息(實(shí)際上RocketMQ也無(wú)法去真正的刪除一條消息,因?yàn)槭琼樞驅(qū)懳募模?/p>

但是區(qū)別于這條消息沒(méi)有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個(gè)操作來(lái)標(biāo)識(shí)這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標(biāo)識(shí)事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。

如果一條事務(wù)消息沒(méi)有對(duì)應(yīng)的Op消息,說(shuō)明這個(gè)事務(wù)的狀態(tài)還無(wú)法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無(wú)論是Commit或者Rollback都會(huì)記錄一個(gè)Op操作。Commit相對(duì)于Rollback只是在寫(xiě)入Op消息前創(chuàng)建Half消息的索引

Op消息的存儲(chǔ)和對(duì)應(yīng)關(guān)系

Op消息寫(xiě)入到全局特定的Topic中通過(guò)源碼中的方法

TransactionalMessageUtil.buildOpTopic();

這個(gè)Topic是一個(gè)內(nèi)部的Topic(像Half消息的Topic一樣),不會(huì)被用戶(hù)消費(fèi)。Op消息的內(nèi)容為對(duì)應(yīng)的Half消息的存儲(chǔ)的Offset,這樣通過(guò)Op消息能索引到Half消息進(jìn)行后續(xù)的回查操作。

image
Half消息的索引構(gòu)建

執(zhí)行二階段Commit操作時(shí),需要構(gòu)建出Half消息的索引。

  • 一階段的Half消息由于是寫(xiě)到一個(gè)特殊的Topic,
  • 二階段構(gòu)建索引時(shí)需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過(guò)一次普通消息的寫(xiě)入操作來(lái)生成一條對(duì)用戶(hù)可見(jiàn)的消息。

所以,RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲(chǔ)的消息的內(nèi)容,在二階段時(shí)恢復(fù)出一條完整的普通消息,然后走一遍消息寫(xiě)入流程。

補(bǔ)償控制要點(diǎn)

如果由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,Broker端會(huì)通過(guò)掃描發(fā)現(xiàn)某條消息長(zhǎng)期處于"半事務(wù)消息"時(shí),需要主動(dòng)向消息生產(chǎn)者詢(xún)問(wèn)該消息的最終狀態(tài)(Commit或是Rollback)。

這樣最終保證了本地事務(wù)執(zhí)行成功,下游就能收到消息,本地事務(wù)執(zhí)行失敗,下游就收不到消息??偠WC了上下游數(shù)據(jù)的一致性。

image

注意:事務(wù)消息的生產(chǎn)組名稱(chēng) ProducerGroupName不能隨意設(shè)置。事務(wù)消息有回查機(jī)制,回查時(shí)Broker端如果發(fā)現(xiàn)原始生產(chǎn)者已經(jīng)崩潰,則會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例回查本地事務(wù)執(zhí)行情況以Commit或Rollback半事務(wù)消息。

RocketMQ的回查功能實(shí)現(xiàn)原理

如果在RocketMQ事務(wù)消息的二階段過(guò)程中失敗了,例如在做Commit操作時(shí),出現(xiàn)網(wǎng)絡(luò)問(wèn)題導(dǎo)致Commit失敗,那么需要通過(guò)一定的策略使這條消息最終被Commit。RocketMQ采用了一種補(bǔ)償機(jī)制,稱(chēng)為“回查”。

  • 回查次數(shù)的配置化

    • Broker端對(duì)未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對(duì)應(yīng)的Producer端(同一個(gè)Group的Producer),由Producer根據(jù)消息來(lái)檢查本地事務(wù)的狀態(tài),進(jìn)而執(zhí)行Commit或者Rollback。Broker端通過(guò)對(duì)比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。

    • 為了避免單個(gè)消息被檢查太多次而導(dǎo)致半隊(duì)列消息累積,我們默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次,但是用戶(hù)可以通過(guò) Broker 配置文件的 transactionCheckMax參數(shù)來(lái)修改此限制

    • 如果已經(jīng)檢查某條消息超過(guò) N 次的話(huà)( N = transactionCheckMax ) 則 Broker 將丟棄此消息,并在默認(rèn)情況下同時(shí)打印錯(cuò)誤日志,執(zhí)行回滾Rollback操作。

  • 回查行為的定制化d

    • 此外用戶(hù)可以通過(guò)重寫(xiě)AbstractTransactionalMessageCheckListener 類(lèi)來(lái)修改這個(gè)Rollback的行為,比如改寫(xiě)為Commit,或者其他的記錄日志或者發(fā)送消息郵件推送給指定人進(jìn)行人工跟進(jìn)。
  • 回查觸發(fā)時(shí)間定制化

事務(wù)消息將在 Broker配置文件中的參數(shù)transactionTimeout 這樣的特定時(shí)間長(zhǎng)度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶(hù)還可以通過(guò)設(shè)置用戶(hù)屬性CHECK_IMMUNITY_TIME_IN_SECONDS 來(lái)改變這個(gè)限制,該參數(shù)優(yōu)先于 transactionTimeout 參數(shù)。

事務(wù)性消息可能不止一次被檢查或消費(fèi)。
  • 發(fā)送給用戶(hù)的目標(biāo)topic消息可能會(huì)失敗,目前這依日志的記錄而定。它的高可用性通過(guò) RocketMQ 本身的高可用性機(jī)制來(lái)保證,如果希望確保事務(wù)消息不丟失、并且事務(wù)完整性得到保證,建議使用同步的雙重寫(xiě)入機(jī)制。

  • 事務(wù)消息的生產(chǎn)者 ID 不能與其他類(lèi)型消息的生產(chǎn)者 ID 共享。與其他類(lèi)型的消息不同,事務(wù)消息允許反向查詢(xún)、MQ服務(wù)器能通過(guò)它們的生產(chǎn)者 ID 查詢(xún)到消費(fèi)者。


消息事務(wù)樣例

事務(wù)消息共有三種狀態(tài),提交狀態(tài)、回滾狀態(tài)、中間狀態(tài)。

  • TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。
  • TransactionStatus.RollbackTransaction: 回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
  • TransactionStatus.Unknown: 中間狀態(tài)(Pending),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)。

開(kāi)發(fā)實(shí)現(xiàn)案例

發(fā)送事務(wù)消息樣例

創(chuàng)建事務(wù)性生產(chǎn)者

使用 TransactionMQProducer類(lèi)創(chuàng)建生產(chǎn)者,并指定唯一的 ProducerGroup,就可以設(shè)置自定義線(xiàn)程池來(lái)處理這些檢查請(qǐng)求。執(zhí)行本地事務(wù)后、需要根據(jù)執(zhí)行結(jié)果對(duì)消息隊(duì)列進(jìn)行回復(fù)。

    TransactionListener transactionListener = new TransactionListenerImpl();
    TransactionMQProducer producer = new
        TransactionMQProducer("please_rename_unique_group_name");
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
                                                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
           @Override
           public Thread newThread(Runnable r) {
               Thread thread = new Thread(r);
               thread.setName("client-transaction-msg-check-thread");
               return thread;
           }
       });
       producer.setExecutorService(executorService);
       producer.setTransactionListener(transactionListener);
       producer.start();
       String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
       for (int i = 0; i < 10; i++) {
           try {
               Message msg =
                   new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               SendResult sendResult = producer.sendMessageInTransaction(msg, null);
               System.out.printf("%s%n", sendResult);
               Thread.sleep(10);
           } catch (MQClientException | UnsupportedEncodingException e) {
               e.printStackTrace();
           }
       }
       for (int i = 0; i < 100000; i++) {
           Thread.sleep(1000);
       }
       producer.shutdown();
   }
實(shí)現(xiàn)事務(wù)的監(jiān)聽(tīng)接口

TransactionListener接口的定義如下:

public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

當(dāng)發(fā)送半消息成功時(shí),我們使用 executeLocalTransaction 方法來(lái)執(zhí)行本地事務(wù)。它返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。checkLocalTransaction 方法用于檢查本地事務(wù)狀態(tài),并回應(yīng)消息隊(duì)列的檢查請(qǐng)求。它也是返回前一節(jié)中提到的三個(gè)事務(wù)狀態(tài)之一。

public class TransactionListenerImpl implements TransactionListener {
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      int value = transactionIndex.getAndIncrement();
      int status = value % 3;
      localTrans.put(msg.getTransactionId(), status);
      return LocalTransactionState.UNKNOW;
  }
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      Integer status = localTrans.get(msg.getTransactionId());
      if (null != status) {
          switch (status) {
              case 0:
                  return LocalTransactionState.UNKNOW;
              case 1:
                  return LocalTransactionState.COMMIT_MESSAGE;
              case 2:
                  return LocalTransactionState.ROLLBACK_MESSAGE;
          }
      }
      return LocalTransactionState.COMMIT_MESSAGE;
  }
}

executeLocalTransaction 是半事務(wù)消息發(fā)送成功后,執(zhí)行本地事務(wù)的方法,具體執(zhí)行完本地事務(wù)后,可以在該方法中返回以下三種狀態(tài):

  • LocalTransactionState.COMMIT_MESSAGE:提交事務(wù),允許消費(fèi)者消費(fèi)該消息
  • LocalTransactionState.ROLLBACK_MESSAGE:回滾事務(wù),消息將被丟棄不允許消費(fèi)。
  • LocalTransactionState.UNKNOW:暫時(shí)無(wú)法判斷狀態(tài),等待固定時(shí)間以后Broker端根據(jù)回查規(guī)則向生產(chǎn)者進(jìn)行消息回查。

checkLocalTransaction是由于二次確認(rèn)消息沒(méi)有收到,Broker端回查事務(wù)狀態(tài)的方法?;夭橐?guī)則:本地事務(wù)執(zhí)行完成后,若Broker端收到的本地事務(wù)返回狀態(tài)為L(zhǎng)ocalTransactionState.UNKNOW,或生產(chǎn)者應(yīng)用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)。則Broker端會(huì)向消息生產(chǎn)者發(fā)起事務(wù)回查,第一次回查后仍未獲取到事務(wù)狀態(tài),則之后每隔一段時(shí)間會(huì)再次回查。

事務(wù)消息使用上的限制

事務(wù)消息不支持延時(shí)消息和批量消息。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容