第三章 RocketMQ 順序消息

消息有序指的是可以按照消息的 發(fā)送順序 來(lái)消費(fèi)(需要注意的是,有的時(shí)候在并發(fā)情況下,應(yīng)用發(fā)送消息的順序本身可能有問(wèn)題,這一點(diǎn)需要應(yīng)用端去做保證)。RocketMQ 可以嚴(yán)格的保證消息有序,可以分為分區(qū)(messageQueue)有序或者全局有序。

順序消費(fèi)的原理解析,在默認(rèn)的情況下消息發(fā)送會(huì)采取輪詢方式把消息發(fā)送到不同的 queue (分區(qū)隊(duì)列);而消費(fèi)消息的時(shí)候從多個(gè) queue 上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。

下面用訂單進(jìn)行分區(qū)有序的示例。一個(gè)訂單的順序流程是:創(chuàng)建、付款、推送、完成。訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,消費(fèi)時(shí),同一個(gè) OrderId 獲取到的肯定是同一個(gè)隊(duì)列。

一、順序消息生產(chǎn)者

public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("127.0.0.1:9876");
        // https://blog.csdn.net/heihaozi/article/details/119145266
        DefaultChannelId.newInstance();
        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};

        // 訂單列表
        List<OrderStep> orderList = buildOrders();

        String dateStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        for (int i = 0; i < 10; i++) {
            // 加個(gè)時(shí)間前綴
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

            // 將orderId相同的消息發(fā)送到同一個(gè)queue中
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                // arg: orderList.get(i).getOrderId()
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    System.out.println(mqs.size()); // 4 - 為什么
                    Long id = (Long) arg;  //根據(jù)訂單id選擇發(fā)送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//訂單id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    /**
     * 訂單的步驟
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模擬訂單數(shù)據(jù)
     */
    private static List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("創(chuàng)建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("創(chuàng)建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("創(chuàng)建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

二、順序消息消費(fèi)者

public class ConsumerInOrder {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開(kāi)始消費(fèi)還是隊(duì)列尾部開(kāi)始消費(fèi)<br>
         * 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();
            // MessageListenerOrderly:每個(gè) queue 有唯一的 consume 線程來(lái)消費(fèi)
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每個(gè)queue有唯一的consume線程來(lái)消費(fèi), 訂單對(duì)每個(gè)queue(分區(qū))有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                try {
                    //模擬業(yè)務(wù)邏輯處理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // https://blog.csdn.net/heihaozi/article/details/119145266
        DefaultChannelId.newInstance();
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

參考

RocketMQ 基本樣例

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容