消息有序指的是可以按照消息的
發(fā)送順序來(lái)消費(fèi)(需要注意的是,有的時(shí)候在并發(fā)情況下,應(yīng)用發(fā)送消息的順序本身可能有問(wèn)題,這一點(diǎn)需要應(yīng)用端去做保證)。RocketMQ 可以嚴(yán)格的保證消息有序,可以分為分區(qū)(messageQueue)有序或者全局有序。
順序消費(fèi)的原理解析,在默認(rèn)的情況下消息發(fā)送會(huì)采取輪詢方式把消息發(fā)送到不同的 queue (分區(qū)隊(duì)列);而消費(fèi)消息的時(shí)候從多個(gè) queue 上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。但是如果控制發(fā)送的順序消息只
依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。
下面用訂單進(jìn)行分區(qū)有序的示例。一個(gè)訂單的順序流程是:創(chuàng)建、付款、推送、完成。訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,消費(fèi)時(shí),同一個(gè) OrderId 獲取到的肯定是同一個(gè)隊(duì)列。
一、順序消息生產(chǎn)者
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
// https://blog.csdn.net/heihaozi/article/details/119145266
DefaultChannelId.newInstance();
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 訂單列表
List<OrderStep> orderList = buildOrders();
String dateStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
for (int i = 0; i < 10; i++) {
// 加個(gè)時(shí)間前綴
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
// 將orderId相同的消息發(fā)送到同一個(gè)queue中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
// arg: orderList.get(i).getOrderId()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
System.out.println(mqs.size()); // 4 - 為什么
Long id = (Long) arg; //根據(jù)訂單id選擇發(fā)送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//訂單id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 訂單的步驟
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模擬訂單數(shù)據(jù)
*/
private static List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
二、順序消息消費(fèi)者
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開(kāi)始消費(fèi)還是隊(duì)列尾部開(kāi)始消費(fèi)<br>
* 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
// MessageListenerOrderly:每個(gè) queue 有唯一的 consume 線程來(lái)消費(fèi)
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每個(gè)queue有唯一的consume線程來(lái)消費(fèi), 訂單對(duì)每個(gè)queue(分區(qū))有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模擬業(yè)務(wù)邏輯處理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// https://blog.csdn.net/heihaozi/article/details/119145266
DefaultChannelId.newInstance();
consumer.start();
System.out.println("Consumer Started.");
}
}