RocketMQ 順序消息:消息有序是指可以按照消息發(fā)送順序來(lái)消費(fèi)。RocketMQ 可以嚴(yán)格的保證消息有序,但是這個(gè)順序逼格不是全局順序,只是分區(qū)(queue)順序。要保證群居順序,只能有一個(gè)分區(qū)。
順序消息
在 MQ 模型中,順序要由三個(gè)階段保證:
- 消息被發(fā)送時(shí),保持順序
- 消息被存儲(chǔ)時(shí)的順序和發(fā)送的順序一致
- 消息被消費(fèi)時(shí)的順序和存儲(chǔ)的順序一致
發(fā)送時(shí)保持順序,意味著對(duì)于有順序要求的消息,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲(chǔ)保持和發(fā)送的順序一致,則要求在同一線程中被發(fā)送出來(lái)的消息 A/B,存儲(chǔ)時(shí) A 要在 B 之前。而消費(fèi)保持和存儲(chǔ)一致,則要求消息 A/B 到達(dá) Consumer 之后必須按照先后順序被處理。

生產(chǎn)者
package com.laiyy.study.rocketmqprovider.order;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author laiyy
* @date 2019/4/21 16:18
* @description
*/
public class OrderProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1、創(chuàng)建 DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo-producer");
// 2、設(shè)置 name server
producer.setNamesrvAddr("192.168.52.200:9876");
// 3、開(kāi)啟 producer
producer.start();
// 連續(xù)發(fā)送 5 條信息
for (int index = 1; index <= 5; index++) {
// 創(chuàng)建消息
Message message = new Message("TOPIC_DEMO", "TAG_A", "KEYS_!", ("HELLO!" + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 指定 MessageQueue,順序發(fā)送消息
// 第一個(gè)參數(shù):消息體
// 第二個(gè)參數(shù):選中指定的消息隊(duì)列對(duì)象(會(huì)將所有的消息隊(duì)列傳進(jìn)來(lái),需要自己選擇)
// 第三個(gè)參數(shù):選擇對(duì)應(yīng)的隊(duì)列下標(biāo)
SendResult result = producer.send(message, new MessageQueueSelector() {
// 第一個(gè)參數(shù):所有的消息隊(duì)列對(duì)象
// 第二個(gè)參數(shù):消息體
// 第三個(gè)參數(shù):傳入的消息隊(duì)列下標(biāo)
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 獲取隊(duì)列下標(biāo)
int index = (int) o;
return list.get(index);
}
}, 0);
System.out.println("發(fā)送第:" + index + " 條信息成功:" + result);
}
// 關(guān)閉 producer
producer.shutdown();
}
}
控制臺(tái)輸出結(jié)果:
發(fā)送第:1 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66560000, offsetMsgId=C0A834C800002A9F00000000000000B8, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=1]
發(fā)送第:2 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66630001, offsetMsgId=C0A834C800002A9F0000000000000171, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=2]
發(fā)送第:3 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66660002, offsetMsgId=C0A834C800002A9F000000000000022A, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=3]
發(fā)送第:4 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE66690003, offsetMsgId=C0A834C800002A9F00000000000002E3, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=4]
發(fā)送第:5 條信息成功:SendResult [sendStatus=SEND_OK, msgId=C0A800677E4C18B4AAC26ACE666C0004, offsetMsgId=C0A834C800002A9F000000000000039C, messageQueue=MessageQueue [topic=TOPIC_DEMO, brokerName=broker-a, queueId=0], queueOffset=5]
17:45:11.545 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10909] result: true
17:45:11.548 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:9876] result: true
17:45:11.549 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10911] result: true
Process finished with exit code 0
可以看到,所有消息的 queueId 都為 0,順序消息生產(chǎn)成功。
消費(fèi)者
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
// 1、創(chuàng)建 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo-consumer");
// 2、設(shè)置 name server
consumer.setNamesrvAddr("192.168.52.200:9876");
// 設(shè)置消息拉取最大數(shù)
consumer.setConsumeMessageBatchMaxSize(2);
// 3、設(shè)置 subscribe
consumer.subscribe("TOPIC_DEMO", // 要消費(fèi)的主題
"*" // 過(guò)濾規(guī)則
);
// 4、創(chuàng)建消息監(jiān)聽(tīng)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
// 5、獲取消息信息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標(biāo)簽
String tags = msg.getTags();
// 獲取信息
try {
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer 消費(fèi)信息:topic:" + topic+ ",tags:" + tags + ",消息體:" + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
// 6、返回消息讀取狀態(tài)
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 啟動(dòng)消費(fèi)者
consumer.start();
}
}
順序消費(fèi)者與之前的 demo 最大的不同,在于 message listener 從 MessageListenerConcurrently 變?yōu)?MessageListenerOrderly,消費(fèi)標(biāo)識(shí)從 ConsumeConcurrentlyStatus 變?yōu)?ConsumeOrderlyStatus。
查看控制臺(tái)輸出:
Consumer 消費(fèi)信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!1
Consumer 消費(fèi)信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!2
Consumer 消費(fèi)信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!3
Consumer 消費(fèi)信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!4
Consumer 消費(fèi)信息:topic:TOPIC_DEMO,tags:TAG_A,消息體:HELLO!5
事務(wù)消息
在 RocketMQ 4.3 版本后,開(kāi)放了事務(wù)消息。
RocketMQ 事務(wù)消息流程
RocketMQ 的事務(wù)消息,只要是通過(guò)消息的異步處理,可以保證本地事務(wù)和消息發(fā)送同事成功執(zhí)行或失敗,從而保證數(shù)據(jù)的最終一致性。

MQ 事務(wù)消息解決分布式事務(wù)問(wèn)題,但是第三方 MQ 支持事務(wù)消息的中間件不多,如 RockctMQ,它們支持事務(wù)的方式也是類似于采用二階段提交,但是市面上一些主流的 MQ 都是不支持事務(wù)消息的,如:Kafka、RabbitMQ
以 RocketMQ 為例,事務(wù)消息實(shí)現(xiàn)思路大致為:
- 第一階段的 Prepared 消息,會(huì)拿到消息的地址
- 第二階段執(zhí)行本地事務(wù)
- 第三階段通過(guò)第一階段拿到的地址去訪問(wèn)消息,并修改狀態(tài)
也就是說(shuō),在業(yè)務(wù)方法內(nèi)想要消息隊(duì)列提交兩次消息,一次發(fā)送消息和一次確認(rèn)消息。如果確認(rèn)消息發(fā)送失敗,RocketMQ 會(huì)定期掃描消息集群中的事務(wù)消息。這時(shí)候發(fā)現(xiàn)了 prepared 消息,它會(huì)向消息發(fā)送者確認(rèn),所以生產(chǎn)方需要實(shí)現(xiàn)一個(gè) check 接口。RocketMQ 會(huì)根據(jù)發(fā)送端設(shè)置的策略來(lái)決定是回滾還是繼續(xù)發(fā)送確認(rèn)消息。這樣就保證了消息發(fā)送與本地事務(wù)同時(shí)成功或同時(shí)失敗。

事務(wù)消息的成功投遞需要三個(gè) Topic,分別是
- Half Topic:用于記錄所有的 prepare 消息
- Op Half Topic:記錄以及提交了狀態(tài)的 prepare 消息
- Real Topic:事務(wù)消息真正的 topic,在 commit 后才會(huì)將消息寫(xiě)入該 topic,從而進(jìn)行消息投遞。
事務(wù)消息實(shí)現(xiàn)
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1、創(chuàng)建 TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("transaction-producer");
// 2、設(shè)置 name server
producer.setNamesrvAddr("192.168.52.200:9876");
// 3、指定消息監(jiān)聽(tīng)對(duì)象,用于執(zhí)行本地事務(wù)和消息回查
TransactionListenerImpl transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
// 4、線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-thread");
return thread;
}
});
producer.setExecutorService(executor);
// 5、開(kāi)啟 producer
producer.start();
// 6、創(chuàng)建消息
Message message = new Message("TRANSACTION_TOPIC", "TAG_A", "KEYS_!", "HELLO!TRANSACTION!".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 7、發(fā)送消息
TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
System.out.println(result);
// 關(guān)閉 producer
producer.shutdown();
}
}
事務(wù)消息監(jiān)聽(tīng)器:
public class TransactionListenerImpl implements TransactionListener {
/**
* 存儲(chǔ)對(duì)應(yīng)書(shū)屋的狀態(tài)信息, key:事務(wù)id,value:事務(wù)執(zhí)行的狀態(tài)
*/
private ConcurrentMap<String, Integer> maps = new ConcurrentHashMap<>();
/**
* 執(zhí)行本地事務(wù)
*
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 事務(wù)id
String transactionId = message.getTransactionId();
// 0:執(zhí)行中,狀態(tài)未知
// 1:本地事務(wù)執(zhí)行成功
// 2:本地事務(wù)執(zhí)行失敗
maps.put(transactionId, 0);
try {
System.out.println("正在執(zhí)行本地事務(wù)。。。。");
// 模擬本地事務(wù)
TimeUnit.SECONDS.sleep(65);
System.out.println("本地事務(wù)執(zhí)行成功。。。。");
maps.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
maps.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 消息回查
*
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String transactionId = messageExt.getTransactionId();
System.out.println("正在執(zhí)行消息回查,事務(wù)id:" + transactionId);
// 獲取事務(wù)id的執(zhí)行狀態(tài)
if (maps.containsKey(transactionId)) {
int status = maps.get(transactionId);
System.out.println("消息回查狀態(tài):" + status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
default:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.UNKNOW;
}
}
運(yùn)行生產(chǎn)者,查看控制臺(tái)輸出:
正在執(zhí)行本地事務(wù)。。。。
正在執(zhí)行消息回查,事務(wù)id:C0A800678F0818B4AAC26AEDDEB10000
消息回查狀態(tài):0
本地事務(wù)執(zhí)行成功。。。。
需要注意:消息回查會(huì)隔一段時(shí)間執(zhí)行一次,如果執(zhí)行本地事務(wù)的時(shí)間太短,則控制臺(tái)不會(huì)輸出事務(wù)回查日志。
廣播消息
生產(chǎn)者
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1、創(chuàng)建 DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("boardcast-producer");
// 2、設(shè)置 name server
producer.setNamesrvAddr("192.168.52.200:9876");
// 3、開(kāi)啟 producer
producer.start();
for (int index = 1; index <= 10; index++) {
Message message = new Message("BOARD_CAST_TOPIC", "TAG_A", "KEYS_" + index, ("HELLO!" + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
System.out.println(result);
}
// 關(guān)閉 producer
producer.shutdown();
}
}
消費(fèi)者
消費(fèi)者需要將消費(fèi)模式修改為 廣播消費(fèi): consumer.setMessageModel(MessageModel.BROADCASTING);
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1、創(chuàng)建 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("boardcast-consumer");
// 2、設(shè)置 name server
consumer.setNamesrvAddr("192.168.52.200:9876");
// 設(shè)置消息拉取最大數(shù)
consumer.setConsumeMessageBatchMaxSize(2);
// 修改消費(fèi)模式,默認(rèn)是集群消費(fèi)模式,修改為廣播消費(fèi)模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 3、設(shè)置 subscribe
consumer.subscribe("BOARD_CAST_TOPIC", // 要消費(fèi)的主題
"*" // 過(guò)濾規(guī)則
);
// 4、創(chuàng)建消息監(jiān)聽(tīng)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 5、獲取消息信息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標(biāo)簽
String tags = msg.getTags();
// 獲取信息
try {
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("A Consumer 消費(fèi)信息:topic:" + topic+ ",tags:" + tags + ",消息體:" + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 6、返回消息讀取狀態(tài)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
驗(yàn)證
生產(chǎn)者控制臺(tái)輸出
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965570000, offsetMsgId=C0A834C800002A9F00000000000026D0, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965660001, offsetMsgId=C0A834C800002A9F000000000000278F, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=10]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29656C0002, offsetMsgId=C0A834C800002A9F000000000000284E, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965700003, offsetMsgId=C0A834C800002A9F000000000000290D, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29657B0004, offsetMsgId=C0A834C800002A9F00000000000029CC, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965880005, offsetMsgId=C0A834C800002A9F0000000000002A8B, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=11]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29658E0006, offsetMsgId=C0A834C800002A9F0000000000002B4A, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965960007, offsetMsgId=C0A834C800002A9F0000000000002C09, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B29659D0008, offsetMsgId=C0A834C800002A9F0000000000002CC8, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A80067971418B4AAC26B2965AB0009, offsetMsgId=C0A834C800002A9F0000000000002D87, messageQueue=MessageQueue [topic=BOARD_CAST_TOPIC, brokerName=broker-a, queueId=2], queueOffset=12]
19:24:35.135 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10911] result: true
19:24:35.140 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:9876] result: true
19:24:35.140 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.52.200:10909] result: true
消費(fèi)者控制臺(tái)輸出
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!1
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!2
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!5
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!4
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!3
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!7
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!6
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!8
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!9
A Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!10
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!1
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!2
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!3
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!5
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!4
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!6
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!7
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!8
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!9
B Consumer 消費(fèi)信息:topic:BOARD_CAST_TOPIC,tags:TAG_A,消息體:HELLO!10