1、什么是順序消息
順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發(fā)布和消費的消息類型。順序消息由兩個部分組成:順序發(fā)布和順序消費。
順序消息包含兩種類型:
- 分區(qū)順序:一個Partition內(nèi)所有的消息按照先進先出的順序進行發(fā)布和消費
- 全局順序:一個Topic內(nèi)所有的消息按照先進先出的順序進行發(fā)布和消費
那么多線程中發(fā)送消息算不算順序發(fā)布?
如上一部分介紹的,多線程中若沒有因果關(guān)系則沒有順序。那么用戶在多線程中去發(fā)消息就意味著用戶不關(guān)心那些在不同線程中被發(fā)送的消息的順序。即多線程發(fā)送的消息,不同線程間的消息不是順序發(fā)布的,同一線程的消息是順序發(fā)布的。這是需要用戶自己去保障的。
而對于順序消費,則需要保證哪些來自同一個發(fā)送線程的消息在消費時是按照相同的順序被處理的(為什么不說他們應(yīng)該在一個線程中被消費呢?)。
全局順序其實是分區(qū)順序的一個特例,即使Topic只有一個分區(qū)(以下不在討論全局順序,因為全局順序?qū)⒚媾R性能的問題,而且絕大多數(shù)場景都不需要全局順序)。
2、如何保證順序
在MQ的模型中,順序需要由3個階段去保障:
- 消息被發(fā)送時保持順序
- 消息被存儲時保持和發(fā)送的順序一致
- 消息被消費時保持和存儲的順序一致
發(fā)送時保持順序意味著對于有順序要求的消息,用戶應(yīng)該在同一個線程中采用同步的方式發(fā)送。存儲保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。
一言以蔽之,就是需要保證:生產(chǎn)者 - MQServer - 消費者 是一對一對一的關(guān)系。
如下圖所示:

對于兩個訂單的消息的原始數(shù)據(jù):a1、b1、b2、a2、a3、b3(絕對時間下發(fā)生的順序):
在發(fā)送時,a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒有順序關(guān)系,這意味著a、b訂單的消息可以在不同的線程中被發(fā)送出去
在存儲時,需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證
- a1、b1、b2、a2、a3、b3是可以接受的
- a1、a2、b1、b2、a3、b3也是可以接受的
- a1、a3、b1、b2、a2、b3是不能接受的
消費時保證順序的簡單方式就是“什么都不做”,不對收到的消息的順序進行調(diào)整,即只要一個分區(qū)的消息只由一個線程處理即可;當(dāng)然,如果a、b在一個分區(qū)中,在收到消息后也可以將他們拆分到不同線程中處理,不過要權(quán)衡一下收益
3、Producer端
Producer端確保消息順序唯一要做的事情就是將同一組的消息路由到同一個分區(qū),在RocketMQ中,通過MessageQueueSelector來實現(xiàn)分區(qū)的選擇。
public interface MessageQueueSelector {
MessageQueue select(List<MessageQueue> mqs, Message msg, Object var3);
}
- List<MessageQueue> mqs:消息要發(fā)送的Topic下所有的分區(qū)
- Message msg:消息對象
- 額外的參數(shù):用戶可以傳遞自己的參數(shù)
比如如下實現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):
long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

4、Consumer端
RocketMQ消費端有兩種類型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用戶控制線程,主動從服務(wù)端獲取消息,每次獲取到的是一個MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲順序一致,用戶需要再拿到這批消息后自己保證消費的順序。
對于PushConsumer,由用戶注冊MessageListener來消費消息,在客戶端中需要保證調(diào)用MessageListener時消息的順序性。RocketMQ中的實現(xiàn)如下:

- PullMessageService單線程的從Broker獲取消息
- PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個消息的緩存),之后提交一個消費任務(wù)到ConsumeMessageOrderService
- ConsumeMessageOrderService多線程執(zhí)行,每個線程在消費消息時需要拿到MessageQueue的鎖
- 拿到鎖之后從ProcessQueue中獲取消息
保證消費順序的核心思想是:
- 獲取到消息后添加到ProcessQueue中,單線程執(zhí)行,所以ProcessQueue中的消息是順序的
- 提交的消費任務(wù)時提交的是“對某個MQ進行一次消費”,這次消費請求是從ProcessQueue中獲取消息消費,所以也是順序的(無論哪個線程獲取到鎖,都是按照ProcessQueue中消息的順序進行消費)
5、副作用
順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區(qū),消息需要保證每個分區(qū)的數(shù)據(jù)只有一個線程消息,那么就會有一些缺陷:
- 發(fā)送順序消息無法利用集群的Failover特性,因為不能更換MessageQueue進行重試
- 因為發(fā)送的路由策略導(dǎo)致的熱點問題,可能某一些MessageQueue的數(shù)據(jù)量特別大
- 消費的并行讀依賴于分區(qū)數(shù)量
- 消費失敗時無法跳過
不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過Raft、Paxos之類的算法保證有可用的副本,或者通過其他高可用的存儲設(shè)備來存儲MessageQueue。
熱點問題好像沒有什么好的解決辦法,只能通過拆分MessageQueue和優(yōu)化路由方法來盡量均衡的將消息分配到不同的MessageQueue。
消費并行度理論上不會有太大問題,因為MessageQueue的數(shù)量可以調(diào)整。
消費失敗的無法跳過是不可避免的,因為跳過可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯誤的。不過可以提供一些策略,由用戶根據(jù)錯誤類型來決定是否跳過,并且提供重試隊列之類的功能,在跳過之后用戶可以在“其他”地方重新消費到這條消息。
6、代碼實例
在默認的情況下消息發(fā)送會采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊列);而消費消息的時候從多個queue上拉取消息,這種情況發(fā)送和消費是不能保證順序。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區(qū)有序,即相對每個queue,消息都是有序的。
下面用訂單進行分區(qū)有序的示例。一個訂單的順序流程是:創(chuàng)建、付款、推送、完成。訂單號相同的消息會被先后發(fā)送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
public class Producer {
public static void main (String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 訂單列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加個時間前綴
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根據(jù)訂單id選擇發(fā)送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//訂單id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(), body));
}
producer.shutdown();
}
/**
* 訂單的步驟
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId () {
return orderId;
}
public void setOrderId (long orderId) {
this.orderId = orderId;
}
public String getDesc () {
return desc;
}
public void setDesc (String desc) {
this.desc = desc;
}
@Override
public String toString () {
return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}';
}
}
/**
* 生成模擬訂單數(shù)據(jù)
*/
private List<OrderStep> buildOrders () {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
日志如下:
SeSendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='創(chuàng)建'}
SendResult status:SEND_OK, queueId:1, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111065, desc='創(chuàng)建'}
SendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103117235, desc='創(chuàng)建'}
SendResult status:SEND_OK, queueId:1, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111065, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103117235, desc='付款'}
SendResult status:SEND_OK, queueId:1, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111065, desc='完成'}
SendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103117235, desc='完成'}
SendResult status:SEND_OK, queueId:3, body:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='完成'}
消費者代碼:
public class ConsumerInOrder {
public static void main (String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
* 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區(qū))有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模擬業(yè)務(wù)邏輯處理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
日志如下:
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='創(chuàng)建'}
consumeThread=ConsumeMessageThread_2queueId=1, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111065, desc='創(chuàng)建'}
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='付款'}
consumeThread=ConsumeMessageThread_2queueId=1, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111065, desc='付款'}
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103117235, desc='創(chuàng)建'}
consumeThread=ConsumeMessageThread_2queueId=1, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111065, desc='完成'}
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103117235, desc='付款'}
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='推送'}
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103117235, desc='完成'}
consumeThread=ConsumeMessageThread_1queueId=3, content:2020-08-12 11:14:37 Hello RocketMQ OrderStep{orderId=15103111039, desc='完成'}