RocketMQ 順序消息:消息有序是指可以按照消息發(fā)送順序來消費。RocketMQ 可以嚴格的保證消息有序,但是這個順序逼格不是全局順序,只是分區(qū)(queue)順序。要保證群居順序,只能有一個分區(qū)。
順序消息
在 MQ 模型中,順序要由三個階段保證:
- 消息被發(fā)送時,保持順序
- 消息被存儲時的順序和發(fā)送的順序一致
- 消息被消費時的順序和存儲的順序一致
發(fā)送時保持順序,意味著對于有順序要求的消息,用戶應(yīng)該在同一個線程中采用同步的方式發(fā)送。存儲保持和發(fā)送的順序一致,則要求在同一線程中被發(fā)送出來的消息 A/B,存儲時 A 要在 B 之前。而消費保持和存儲一致,則要求消息 A/B 到達 Consumer 之后必須按照先后順序被處理。

生產(chǎn)者
package com.laiyy.study.rocketmqprovider.order;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author laiyy
* @date 2019/4/21 16:18
* @description
*/
public class OrderProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1、創(chuàng)建 DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo-producer");
// 2、設(shè)置 name server
producer.setNamesrvAddr("192.168.52.200:9876");
// 3、開啟 producer
producer.start();
// 連續(xù)發(fā)送 5 條信息
for (int index = 1; index <= 5; index++) {
// 創(chuàng)建消息
Message message = new Message("TOPIC_DEMO", "TAG_A", "KEYS_!", ("HELLO!" + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 指定 MessageQueue,順序發(fā)送消息
// 第一個參數(shù):消息體
// 第二個參數(shù):選中指定的消息隊列對象(會將所有的消息隊列傳進來,需要自己選擇)
// 第三個參數(shù):選擇對應(yīng)的隊列下標
SendResult result = producer.send(message, new MessageQueueSelector() {
// 第一個參數(shù):所有的消息隊列對象
// 第二個參數(shù):消息體
// 第三個參數(shù):傳入的消息隊列下標
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 獲取隊列下標
int index = (int) o;
return list.get(index);
}
}, 0);
System.out.println("發(fā)送第:" + index + " 條信息成功:" + result);
}
// 關(guān)閉 producer
producer.shutdown();
}
}
控制臺輸出結(jié)果:
發(fā)送第:1 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66560000, offsetMsgId=C0A834C800002A9F00000000000000B8, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=1]
發(fā)送第:2 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66630001, offsetMsgId=C0A834C800002A9F0000000000000171, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=2]
發(fā)送第:3 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66660002, offsetMsgId=C0A834C800002A9F000000000000022A, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=3]
發(fā)送第:4 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66690003, offsetMsgId=C0A834C800002A9F00000000000002E3, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=4]
發(fā)送第:5 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE666C0004, offsetMsgId=C0A834C800002A9F000000000000039C, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=5]
17:45:11.545 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10909] result: true
17:45:11.548 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:9876] result: true
17:45:11.549 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10911] result: true
Process finished with exit code 0
可以看到,所有消息的 queueId 都為 0,順序消息生產(chǎn)成功。
消費者
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
// 1、創(chuàng)建 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo-consumer");
// 2、設(shè)置 name server
consumer.setNamesrvAddr("192.168.52.200:9876");
// 設(shè)置消息拉取最大數(shù)
consumer.setConsumeMessageBatchMaxSize(2);
// 3、設(shè)置 subscribe
consumer.subscribe("TOPIC_DEMO", // 要消費的主題
"*" // 過濾規(guī)則
);
// 4、創(chuàng)建消息監(jiān)聽
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
// 5、獲取消息信息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer 消費信息:topic:" + topic+ ",tags:" + tags + ",消息體:" + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
// 6、返回消息讀取狀態(tài)
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 啟動消費者
consumer.start();
}
}
順序消費者與之前的 demo 最大的不同,在于 message listener 從 MessageListenerConcurrently 變?yōu)?MessageListenerOrderly,消費標識從 ConsumeConcurrentlyStatus 變?yōu)?ConsumeOrderlyStatus。
查看控制臺輸出:
Consumer 消費信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!1
Consumer 消費信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!2
Consumer 消費信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!3
Consumer 消費信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!4
Consumer 消費信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!5
事務(wù)消息
在 RocketMQ 4.3 版本后,開放了事務(wù)消息。
RocketMQ 事務(wù)消息流程
RocketMQ 的事務(wù)消息,只要是通過消息的異步處理,可以保證本地事務(wù)和消息發(fā)送同事成功執(zhí)行或失敗,從而保證數(shù)據(jù)的最終一致性。

MQ 事務(wù)消息解決分布式事務(wù)問題,但是第三方 MQ 支持事務(wù)消息的中間件不多,如 RockctMQ,它們支持事務(wù)的方式也是類似于采用二階段提交,但是市面上一些主流的 MQ 都是不支持事務(wù)消息的,如:Kafka、RabbitMQ
以 RocketMQ 為例,事務(wù)消息實現(xiàn)思路大致為:
- 第一階段的 Prepared 消息,會拿到消息的地址
- 第二階段執(zhí)行本地事務(wù)
- 第三階段通過第一階段拿到的地址去訪問消息,并修改狀態(tài)
也就是說,在業(yè)務(wù)方法內(nèi)想要消息隊列提交兩次消息,一次發(fā)送消息和一次確認消息。如果確認消息發(fā)送失敗,RocketMQ 會定期掃描消息集群中的事務(wù)消息。這時候發(fā)現(xiàn)了 prepared 消息,它會向消息發(fā)送者確認,所以生產(chǎn)方需要實現(xiàn)一個 check 接口。RocketMQ 會根據(jù)發(fā)送端設(shè)置的策略來決定是回滾還是繼續(xù)發(fā)送確認消息。這樣就保證了消息發(fā)送與本地事務(wù)同時成功或同時失敗。

事務(wù)消息的成功投遞需要三個 Topic,分別是
- Half Topic:用于記錄所有的 prepare 消息
- Op Half Topic:記錄以及提交了狀態(tài)的 prepare 消息
- Real Topic:事務(wù)消息真正的 topic,在 commit 后才會將消息寫入該 topic,從而進行消息投遞。
事務(wù)消息實現(xiàn)
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1、創(chuàng)建 TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("transaction-producer");
// 2、設(shè)置 name server
producer.setNamesrvAddr("192.168.52.200:9876");
// 3、指定消息監(jiān)聽對象,用于執(zhí)行本地事務(wù)和消息回查
TransactionListenerImpl transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
// 4、線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-thread");
return thread;
}
});
producer.setExecutorService(executor);
// 5、開啟 producer
producer.start();
// 6、創(chuàng)建消息
Message message = new Message("TRANSACTION_TOPIC", "TAG_A", "KEYS_!", "HELLO!TRANSACTION!".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 7、發(fā)送消息
TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
System.out.println(result);
// 關(guān)閉 producer
producer.shutdown();
}
}
事務(wù)消息監(jiān)聽器:
public class TransactionListenerImpl implements TransactionListener {
/**
* 存儲對應(yīng)書屋的狀態(tài)信息, key:事務(wù)id,value:事務(wù)執(zhí)行的狀態(tài)
*/
private ConcurrentMap<String, Integer> maps = new ConcurrentHashMap<>();
/**
* 執(zhí)行本地事務(wù)
*
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 事務(wù)id
String transactionId = message.getTransactionId();
// 0:執(zhí)行中,狀態(tài)未知
// 1:本地事務(wù)執(zhí)行成功
// 2:本地事務(wù)執(zhí)行失敗
maps.put(transactionId, 0);
try {
System.out.println("正在執(zhí)行本地事務(wù)。。。。");
// 模擬本地事務(wù)
TimeUnit.SECONDS.sleep(65);
System.out.println("本地事務(wù)執(zhí)行成功。。。。");
maps.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
maps.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 消息回查
*
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String transactionId = messageExt.getTransactionId();
System.out.println("正在執(zhí)行消息回查,事務(wù)id:" + transactionId);
// 獲取事務(wù)id的執(zhí)行狀態(tài)
if (maps.containsKey(transactionId)) {
int status = maps.get(transactionId);
System.out.println("消息回查狀態(tài):" + status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
default:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.UNKNOW;
}
}
運行生產(chǎn)者,查看控制臺輸出:
正在執(zhí)行本地事務(wù)。。。。
正在執(zhí)行消息回查,事務(wù)id:C0A800678F0818B4AAC26AEDDEB10000
消息回查狀態(tài):0
本地事務(wù)執(zhí)行成功。。。。
需要注意:消息回查會隔一段時間執(zhí)行一次,如果執(zhí)行本地事務(wù)的時間太短,則控制臺不會輸出事務(wù)回查日志。
廣播消息
生產(chǎn)者
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1、創(chuàng)建 DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("boardcast-producer");
// 2、設(shè)置 name server
producer.setNamesrvAddr("192.168.52.200:9876");
// 3、開啟 producer
producer.start();
for (int index = 1; index <= 10; index++) {
Message message = new Message("BOARD_CAST_TOPIC", "TAG_A", "KEYS_" + index, ("HELLO!" + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
System.out.println(result);
}
// 關(guān)閉 producer
producer.shutdown();
}
}
消費者
消費者需要將消費模式修改為 廣播消費: consumer.setMessageModel(MessageModel.BROADCASTING);
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1、創(chuàng)建 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("boardcast-consumer");
// 2、設(shè)置 name server
consumer.setNamesrvAddr("192.168.52.200:9876");
// 設(shè)置消息拉取最大數(shù)
consumer.setConsumeMessageBatchMaxSize(2);
// 修改消費模式,默認是集群消費模式,修改為廣播消費模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 3、設(shè)置 subscribe
consumer.subscribe("BOARD_CAST_TOPIC", // 要消費的主題
"*" // 過濾規(guī)則
);
// 4、創(chuàng)建消息監(jiān)聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 5、獲取消息信息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("A Consumer 消費信息:topic:" + topic+ ",tags:" + tags + ",消息體:" + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 6、返回消息讀取狀態(tài)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
驗證
生產(chǎn)者控制臺輸出
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965570000, offsetMsgId=C0A834C800002A9F00000000000026D0, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965660001, offsetMsgId=C0A834C800002A9F000000000000278F, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29656C0002, offsetMsgId=C0A834C800002A9F000000000000284E, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965700003, offsetMsgId=C0A834C800002A9F000000000000290D, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29657B0004, offsetMsgId=C0A834C800002A9F00000000000029CC, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965880005, offsetMsgId=C0A834C800002A9F0000000000002A8B, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29658E0006, offsetMsgId=C0A834C800002A9F0000000000002B4A, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965960007, offsetMsgId=C0A834C800002A9F0000000000002C09, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29659D0008, offsetMsgId=C0A834C800002A9F0000000000002CC8, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965AB0009, offsetMsgId=C0A834C800002A9F0000000000002D87, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=12]
19:24:35.135 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10911] result: true
19:24:35.140 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:9876] result: true
19:24:35.140 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10909] result: true
消費者控制臺輸出
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!1
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!2
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!5
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!4
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!3
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!7
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!6
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!8
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!9
A Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!10
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!1
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!2
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!3
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!5
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!4
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!6
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!7
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!8
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!9
B Consumer 消費信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!10