RocketMQ 常用消息類型

文章首發(fā)于公眾號《程序員果果》
地址 : https://mp.weixin.qq.com/s/dYqGd9zi2mNelsNNLIribg

消息發(fā)送示例

導(dǎo)入依賴:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

application.yml:

rocketmq:
  name-server: 172.16.250.129:9876
  producer:
    group: myGroup

普通消息

同步發(fā)送

原理:

同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后,會在收到服務(wù)端返回響應(yīng)之后才發(fā)下一條消息的通訊方式。


應(yīng)用場景:

這種可靠性同步地發(fā)送方式應(yīng)用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統(tǒng)等。

示例代碼:

public void sendMsg() throws Exception {

    Message message = new Message(
            // 普通消息所屬的Topic
            "Topic-Normal",
            // Message Tag可理解為Gmail中的標(biāo)簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列 RocketMQ 的服務(wù)器過濾。
            "TagA",
            // Message Body可以是任何二進制形式的數(shù)據(jù)。
            "Hello MQ".getBytes()
    );
    rocketMQTemplate.getProducer().send( message );
    // 等同于上面的方式(常用)
    //rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}

異步發(fā)送

原理:

異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不等服務(wù)端返回響應(yīng),接著發(fā)送下一條消息的通訊方式。RocketMQ異步發(fā)送,需要實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)。消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)端響應(yīng)即可發(fā)送第二條消息。發(fā)送方通過回調(diào)接口接收服務(wù)端響應(yīng),并處理響應(yīng)結(jié)果。


應(yīng)用場景:

異步發(fā)送一般用于鏈路耗時較長,對響應(yīng)時間較為敏感的業(yè)務(wù)場景,例如,您視頻上傳后通知啟動轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。

示例代碼:

public void sendAsyncMsg() {
    Map<String , Object> map = new HashMap<>();
    map.put( "name" , "zs" );
    map.put( "age" , 20);

    rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 消息發(fā)送成功。
            log.info( "async send success" );
        }

        @Override
        public void onException(Throwable throwable) {
            // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理。
            log.info( "async send fail" );
        }
    } );
}

順序消息

全局順序消息

  • 概念:對于指定的一個Topic,所有消息按照嚴(yán)格的先入先出(FIFO)的順序來發(fā)布和消費。
  • 適用場景:適用于性能要求不高,所有的消息嚴(yán)格按照FIFO原則來發(fā)布和消費的場景。
  • 示例:在證券處理中,以人民幣兌換美元為Topic,在價格相同的情況下,先出價者優(yōu)先處理,則可以按照FIFO的方式發(fā)布和消費全局順序消息。

分區(qū)順序消息

  • 概念:對于指定的一個Topic,所有消息根據(jù)Sharding Key進行區(qū)塊分區(qū)。同一個分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進行發(fā)布和消費。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。
  • 適用場景:適用于性能要求高,以Sharding Key作為分區(qū)字段,在同一個區(qū)塊中嚴(yán)格地按照FIFO原則進行消息發(fā)布和消費的場景。
  • 示例:
  • 用戶注冊需要發(fā)送發(fā)驗證碼,以用戶ID作為Sharding Key,那么同一個用戶發(fā)送的消息都會按照發(fā)布的先后順序來消費。
  • 電商的訂單創(chuàng)建,以訂單ID作為Sharding Key,那么同一個訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發(fā)布的先后順序來消費。

無序消息、全局順序消息、分區(qū)順序消息的對比


示例代碼

public void sendOrderlyMsg() {
    //根據(jù)指定的hashKey按順序發(fā)送
    for (int i = 0; i < 1000; i++) {
        String orderId = "biz_" + i % 10;
        // 分區(qū)順序消息中區(qū)分不同分區(qū)的關(guān)鍵字段,Sharding Key與普通消息的key是完全不同的概念。
        // 全局順序消息,該字段可以設(shè)置為任意非空字符串。
        String shardingKey = String.valueOf(orderId);
        try {
            SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
            // 發(fā)送消息,只要不拋異常就是成功。
            if (sendResult != null) {
                System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
            }
        }
        catch (Exception e) {
            // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理。
            System.out.println(new Date() + " Send mq message failed");
            e.printStackTrace();
        }
    }
}

延時消息

概念:

Producer將消息發(fā)送到消息隊列RocketMQ服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進行消費,該消息即延時消息。

適用場景:

消息生產(chǎn)和消費有時間窗口要求,例如在電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條延時消息。這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。如支付未完成,則關(guān)閉訂單。如已完成支付則忽略。

示例代碼:

public void sendDelayMsg() {
    rocketMQTemplate.syncSend( "Topic-Delay",
            MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
            3000,
            //設(shè)置延時等級3,這個消息將在10s之后發(fā)送(現(xiàn)在只支持固定的幾個時間,詳看delayTimeLevel)
            //messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            3 );
}

事務(wù)消息

概念:

  • 事務(wù)消息:消息隊列RocketMQ提供類似X/Open XA的分布式事務(wù)功能,通過消息隊列RocketMQ事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
  • 半事務(wù)消息:暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊列RocketMQ服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認(rèn),此時該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
  • 消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊列RocketMQ服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback),該詢問過程即消息回查。

分布式事務(wù)消息的優(yōu)勢:

消息隊列RocketMQ分布式事務(wù)消息不僅可以實現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性。同時,傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會因為某一個關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。在極端情況下,如果關(guān)聯(lián)的某一個應(yīng)用始終無法處理成功,也只需對當(dāng)前應(yīng)用進行補償或數(shù)據(jù)訂正處理,而無需對整體業(yè)務(wù)進行回滾。

典型場景:

在電商購物車下單時,涉及到購物車系統(tǒng)和交易系統(tǒng),這兩個系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過分布式事務(wù)消息的異步處理實現(xiàn)。在這種場景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。而購物車系統(tǒng)只需要訂閱消息隊列RocketMQ的交易訂單消息,做相應(yīng)的業(yè)務(wù)處理,即可保證最終的數(shù)據(jù)一致性。

事務(wù)消息交互流程如下圖所示:

事務(wù)消息發(fā)送步驟如下:

  1. 發(fā)送方將半事務(wù)消息發(fā)送至消息隊列RocketMQ服務(wù)端。
  2. 消息隊列RocketMQ服務(wù)端將消息持久化成功之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半事務(wù)消息。
  3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
  4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit或是Rollback),服務(wù)端收到Commit狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到Rollback狀態(tài)則刪除半事務(wù)消息,訂閱方將不會接受該消息。

事務(wù)消息回查步驟如下:

  1. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時間后服務(wù)端將對該消息發(fā)起消息回查。
  2. 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  3. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進行操作。

示例代碼:

發(fā)送事務(wù)消息包含以下兩個步驟:

    1. 發(fā)送半事務(wù)消息(Half Message,示例代碼如下
/**
 * 事務(wù)消息
 */
public void sendTransactionMsg() {
    TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
            "Topic-Tx:TagA",
            MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
            null );
    SendStatus sendStatus = transactionSendResult.getSendStatus();
    LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
    System.out.println( new Date() + " Send mq message status "+ sendStatus +" ,  localTransactionState "+ localTransactionState );
}
    1. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 執(zhí)行本地事務(wù)
        System.out.println("TX message listener execute local transaction");
        RocketMQLocalTransactionState result;
        try {
            // 業(yè)務(wù)代碼( 例如下訂單 )
            result = RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            System.out.println("execute local transaction error");
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 檢查本地事務(wù)( 例如檢查下訂單是否成功 )
        System.out.println("TX message listener check local transaction");
        RocketMQLocalTransactionState result;
        try {
            //業(yè)務(wù)代碼( 根據(jù)檢查結(jié)果,決定是COMMIT或ROLLBACK )
            result = RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 異常就回滾
            System.out.println("check local transaction error");
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }

}
    1. 發(fā)送方在本地事務(wù)執(zhí)行后,若向服務(wù)端提交二次確認(rèn)是Commit,RocketMQ服務(wù)端收到Commit狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;訂閱方代碼如下
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:{}" , message);
    }

}

源碼

https://github.com/gf-huanchupk/SpringBootLearning

系列文章

RocketMQ 簡介
RocketMQ 安裝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容