文章首發(fā)于公眾號《程序員果果》
地址 : https://mp.weixin.qq.com/s/dYqGd9zi2mNelsNNLIribg
消息發(fā)送示例
導(dǎo)入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml:
rocketmq:
name-server: 172.16.250.129:9876
producer:
group: myGroup
普通消息
同步發(fā)送
原理:
同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后,會在收到服務(wù)端返回響應(yīng)之后才發(fā)下一條消息的通訊方式。

應(yīng)用場景:
這種可靠性同步地發(fā)送方式應(yīng)用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統(tǒng)等。
示例代碼:
public void sendMsg() throws Exception {
Message message = new Message(
// 普通消息所屬的Topic
"Topic-Normal",
// Message Tag可理解為Gmail中的標(biāo)簽,對消息進行再歸類,方便Consumer指定過濾條件在消息隊列 RocketMQ 的服務(wù)器過濾。
"TagA",
// Message Body可以是任何二進制形式的數(shù)據(jù)。
"Hello MQ".getBytes()
);
rocketMQTemplate.getProducer().send( message );
// 等同于上面的方式(常用)
//rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}
異步發(fā)送
原理:
異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不等服務(wù)端返回響應(yīng),接著發(fā)送下一條消息的通訊方式。RocketMQ異步發(fā)送,需要實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)。消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)端響應(yīng)即可發(fā)送第二條消息。發(fā)送方通過回調(diào)接口接收服務(wù)端響應(yīng),并處理響應(yīng)結(jié)果。

應(yīng)用場景:
異步發(fā)送一般用于鏈路耗時較長,對響應(yīng)時間較為敏感的業(yè)務(wù)場景,例如,您視頻上傳后通知啟動轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。
示例代碼:
public void sendAsyncMsg() {
Map<String , Object> map = new HashMap<>();
map.put( "name" , "zs" );
map.put( "age" , 20);
rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息發(fā)送成功。
log.info( "async send success" );
}
@Override
public void onException(Throwable throwable) {
// 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理。
log.info( "async send fail" );
}
} );
}
順序消息
全局順序消息
- 概念:對于指定的一個Topic,所有消息按照嚴(yán)格的先入先出(FIFO)的順序來發(fā)布和消費。
- 適用場景:適用于性能要求不高,所有的消息嚴(yán)格按照FIFO原則來發(fā)布和消費的場景。
- 示例:在證券處理中,以人民幣兌換美元為Topic,在價格相同的情況下,先出價者優(yōu)先處理,則可以按照FIFO的方式發(fā)布和消費全局順序消息。
分區(qū)順序消息
- 概念:對于指定的一個Topic,所有消息根據(jù)Sharding Key進行區(qū)塊分區(qū)。同一個分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進行發(fā)布和消費。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。
- 適用場景:適用于性能要求高,以Sharding Key作為分區(qū)字段,在同一個區(qū)塊中嚴(yán)格地按照FIFO原則進行消息發(fā)布和消費的場景。
- 示例:
- 用戶注冊需要發(fā)送發(fā)驗證碼,以用戶ID作為Sharding Key,那么同一個用戶發(fā)送的消息都會按照發(fā)布的先后順序來消費。
- 電商的訂單創(chuàng)建,以訂單ID作為Sharding Key,那么同一個訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發(fā)布的先后順序來消費。
無序消息、全局順序消息、分區(qū)順序消息的對比


示例代碼
public void sendOrderlyMsg() {
//根據(jù)指定的hashKey按順序發(fā)送
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
// 分區(qū)順序消息中區(qū)分不同分區(qū)的關(guān)鍵字段,Sharding Key與普通消息的key是完全不同的概念。
// 全局順序消息,該字段可以設(shè)置為任意非空字符串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
// 發(fā)送消息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
}
}
catch (Exception e) {
// 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理。
System.out.println(new Date() + " Send mq message failed");
e.printStackTrace();
}
}
}
延時消息
概念:
Producer將消息發(fā)送到消息隊列RocketMQ服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時間后才投遞到Consumer進行消費,該消息即延時消息。
適用場景:
消息生產(chǎn)和消費有時間窗口要求,例如在電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條延時消息。這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。如支付未完成,則關(guān)閉訂單。如已完成支付則忽略。
示例代碼:
public void sendDelayMsg() {
rocketMQTemplate.syncSend( "Topic-Delay",
MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
3000,
//設(shè)置延時等級3,這個消息將在10s之后發(fā)送(現(xiàn)在只支持固定的幾個時間,詳看delayTimeLevel)
//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
3 );
}
事務(wù)消息
概念:
- 事務(wù)消息:消息隊列RocketMQ提供類似X/Open XA的分布式事務(wù)功能,通過消息隊列RocketMQ事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
- 半事務(wù)消息:暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊列RocketMQ服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認(rèn),此時該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
- 消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊列RocketMQ服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback),該詢問過程即消息回查。
分布式事務(wù)消息的優(yōu)勢:
消息隊列RocketMQ分布式事務(wù)消息不僅可以實現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性。同時,傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會因為某一個關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。在極端情況下,如果關(guān)聯(lián)的某一個應(yīng)用始終無法處理成功,也只需對當(dāng)前應(yīng)用進行補償或數(shù)據(jù)訂正處理,而無需對整體業(yè)務(wù)進行回滾。
典型場景:
在電商購物車下單時,涉及到購物車系統(tǒng)和交易系統(tǒng),這兩個系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過分布式事務(wù)消息的異步處理實現(xiàn)。在這種場景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。而購物車系統(tǒng)只需要訂閱消息隊列RocketMQ的交易訂單消息,做相應(yīng)的業(yè)務(wù)處理,即可保證最終的數(shù)據(jù)一致性。
事務(wù)消息交互流程如下圖所示:

事務(wù)消息發(fā)送步驟如下:
- 發(fā)送方將半事務(wù)消息發(fā)送至消息隊列RocketMQ服務(wù)端。
- 消息隊列RocketMQ服務(wù)端將消息持久化成功之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半事務(wù)消息。
- 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
- 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit或是Rollback),服務(wù)端收到Commit狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到Rollback狀態(tài)則刪除半事務(wù)消息,訂閱方將不會接受該消息。
事務(wù)消息回查步驟如下:
- 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時間后服務(wù)端將對該消息發(fā)起消息回查。
- 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進行操作。
示例代碼:
發(fā)送事務(wù)消息包含以下兩個步驟:
- 發(fā)送半事務(wù)消息(Half Message,示例代碼如下
/**
* 事務(wù)消息
*/
public void sendTransactionMsg() {
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"Topic-Tx:TagA",
MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
null );
SendStatus sendStatus = transactionSendResult.getSendStatus();
LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
System.out.println( new Date() + " Send mq message status "+ sendStatus +" , localTransactionState "+ localTransactionState );
}
- 發(fā)送方開始執(zhí)行本地事務(wù)邏輯
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)
System.out.println("TX message listener execute local transaction");
RocketMQLocalTransactionState result;
try {
// 業(yè)務(wù)代碼( 例如下訂單 )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println("execute local transaction error");
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 檢查本地事務(wù)( 例如檢查下訂單是否成功 )
System.out.println("TX message listener check local transaction");
RocketMQLocalTransactionState result;
try {
//業(yè)務(wù)代碼( 根據(jù)檢查結(jié)果,決定是COMMIT或ROLLBACK )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 異常就回滾
System.out.println("check local transaction error");
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
- 發(fā)送方在本地事務(wù)執(zhí)行后,若向服務(wù)端提交二次確認(rèn)是Commit,RocketMQ服務(wù)端收到Commit狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方最終將收到該消息;訂閱方代碼如下
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:{}" , message);
}
}
源碼
https://github.com/gf-huanchupk/SpringBootLearning