RocketMQ學(xué)習(xí)筆記(二)

安裝可視化界面

  • RocketMQ有一個(gè)對(duì)其擴(kuò)展的開源項(xiàng)目rocketmq-externals,該項(xiàng)目中有一個(gè)子模塊叫rocketmq-console,是一個(gè)管理控制臺(tái)的項(xiàng)目。下載整個(gè)擴(kuò)展項(xiàng)目并編譯打包子項(xiàng)目:
git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console 
#修改 rocketmq-console 中配置 namesrv 集群地址:
rocketmq.config.namesrvAddr=192.168.10.102:9876;192.168.10.103:9876
#跳過測試打包
mvn clean package -Dmaven.test.skip=true
  • 將生成的rocketmq-console-ng-2.0.0.jar隨便放在兩臺(tái)機(jī)器中的一臺(tái),然后啟動(dòng)rocketmq-consolejava -jar rocketmq-console-ng-2.0.0.jar,瀏覽器輸入網(wǎng)址:http://192.168.10.102:8080進(jìn)行訪問。
安裝成功
  • 新建 SpringBoot 項(xiàng)目,導(dǎo)入MQ客戶端依賴:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
  • 生產(chǎn)者發(fā)送同步消息:
package com.zzw.rmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
 * 發(fā)送同步消息
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //1、創(chuàng)建消息生產(chǎn)者 producer ,并指定生產(chǎn)者組名 group1
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定 NameServer 的地址
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        //設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
        producer.setSendMsgTimeout(60000);
        //3、啟動(dòng) producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4、創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("test-base-mq", "Tag1", ("Hello World" + i).getBytes());
            //5、發(fā)送消息
            SendResult result = producer.send(msg);
            //發(fā)送狀態(tài)
            SendStatus status = result.getSendStatus();
            //消息id
            String msgId = result.getMsgId();
            //接收消息的隊(duì)列id
            int queueId = result.getMessageQueue().getQueueId();
            System.out.println("發(fā)送狀態(tài):" + status + ",消息id:" + msgId + ",接收消息的隊(duì)列id:" + queueId);
            //線程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //6、關(guān)閉生產(chǎn)者 producer
        producer.shutdown();
    }
}
/*
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655AD480000,接收消息的隊(duì)列id:1
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655B1AD0001,接收消息的隊(duì)列id:2
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655B59B0002,接收消息的隊(duì)列id:3
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655B9880003,接收消息的隊(duì)列id:0
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655BD7E0004,接收消息的隊(duì)列id:1
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655C16B0005,接收消息的隊(duì)列id:2
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655C5580006,接收消息的隊(duì)列id:3
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655C9450007,接收消息的隊(duì)列id:0
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655CD300008,接收消息的隊(duì)列id:1
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655D11D0009,接收消息的隊(duì)列id:2
*/
  • 生產(chǎn)者發(fā)送異步消息:通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場景,即發(fā)送端不能容忍長時(shí)間地等待Broker的響應(yīng)。
package com.zzw.rmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
 * 發(fā)送異步消息
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //1、創(chuàng)建消息生產(chǎn)者 producer,并指定生產(chǎn)者組名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定 NameServer 地址
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        //設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
        producer.setSendMsgTimeout(60000);
        //3、啟動(dòng)producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4、創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("test-base-mq", "Tag2", ("Hello World" + i).getBytes());
            //5、發(fā)送異步消息
            producer.send(msg, new SendCallback() {
                /**
                 * 發(fā)送成功回調(diào)函數(shù)
                 * @param sendResult
                 */
                public void onSuccess(SendResult sendResult) {
                    System.out.println("發(fā)送結(jié)果:" + sendResult);
                }
                /**
                 * 發(fā)送失敗回調(diào)函數(shù)
                 * @param e
                 */
                public void onException(Throwable e) {
                    System.out.println("發(fā)送異常:" + e);
                }
            });
            //線程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        //6、關(guān)閉生產(chǎn)者 producer
        producer.shutdown();
    }
}
/*
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0000, offsetMsgId=C0A80A6600002A9F00000000000004D7, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=3], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0002, offsetMsgId=C0A80A6600002A9F0000000000000426, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=3], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0003, offsetMsgId=C0A80A6700002A9F0000000000000375, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=3], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0001, offsetMsgId=C0A80A6700002A9F0000000000000426, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=1], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650767F0004, offsetMsgId=C0A80A6700002A9F00000000000004D7, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=2], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465078AE0005, offsetMsgId=C0A80A6700002A9F0000000000000588, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=3], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC246507C960006, offsetMsgId=C0A80A6700002A9F0000000000000639, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=0], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465080800007, offsetMsgId=C0A80A6600002A9F0000000000000588, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=0], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465084690008, offsetMsgId=C0A80A6700002A9F00000000000006EA, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=3], queueOffset=3]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465088530009, offsetMsgId=C0A80A6600002A9F0000000000000639, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=0], queueOffset=3]
*/
  • 生產(chǎn)者單向發(fā)送消息:主要用在不特別關(guān)心發(fā)送結(jié)果的場景,例如日志發(fā)送。
package com.zzw.rmq.base.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class OneWayProducer {
    public static void main(String[] args) throws Exception, MQBrokerException {
        //1、創(chuàng)建消息生產(chǎn)者 producer,并指定生產(chǎn)者組名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2、指定 NameServer 地址
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        //設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
        producer.setSendMsgTimeout(60000);
        //3、啟動(dòng) producer
        producer.start();
        for (int i = 0; i < 3; i++) {
            //4、創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("base", "Tag3", ("Hello World,單向消息" + i).getBytes());
            //5、發(fā)送單向消息
            producer.sendOneway(msg);
            //線程睡1秒
            TimeUnit.SECONDS.sleep(5);
        }
        //6、關(guān)閉生產(chǎn)者producer
        producer.shutdown();
    }
}
  • 消費(fèi)者消費(fèi)消息的兩種模式:負(fù)載均衡廣播模式,默認(rèn)是負(fù)載均衡模式。
package com.zzw.rmq.base.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1、創(chuàng)建消費(fèi)者 Consumer,并指定消費(fèi)者組名(push消費(fèi)模式)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2、指定 NameServer 地址
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        //3、訂閱主題Topic和Tag, * 表示所有 tag
        consumer.subscribe("test-base-mq", "*");
        /**
         *  設(shè)定消費(fèi)模式:負(fù)載均衡(默認(rèn)) | 廣播模式
         *      負(fù)載均衡:多個(gè)消費(fèi)者共同消費(fèi)隊(duì)列消息,每個(gè)消費(fèi)者處理的消息不同
         *      廣播模式:不同的消費(fèi)者接收到的消息是一樣的
         */
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4、設(shè)置回調(diào)函數(shù),處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息內(nèi)容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5、啟動(dòng)消費(fèi)者consumer
        consumer.start();
    }
}
  • 順序消息:消息有序指可以按照消息的發(fā)送順序來消費(fèi)(FIFO)。RocketMQ可以嚴(yán)格地保證消息有序,分為分區(qū)有序全局有序
  • 在默認(rèn)的情況下消息發(fā)送會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);而消費(fèi)消息時(shí)從多個(gè)queue上拉取消息,這種情況不能保證消費(fèi)順序。但若控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)時(shí)只從這個(gè)queue上依次拉取,則保證了消費(fèi)順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),即為全局有序;若多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。
  • 下面用訂單進(jìn)行分區(qū)有序的示例。一個(gè)訂單的順序流程是:創(chuàng)建、付款、推送、完成。訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,消費(fèi)時(shí)同一個(gè)OrderId獲取到的肯定是同一個(gè)隊(duì)列。
  • 訂單構(gòu)建者:
package com.zzw.rmq.order;
import java.util.ArrayList;
import java.util.List;
/**
 * 訂單構(gòu)建者
 */
public class OrderStep {
    private long orderId;
    private String desc;
    public long getOrderId() {
        return orderId;
    }
    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }
    public String getDesc() {
        return desc;
    }
    public void setDesc(String desc) {
        this.desc = desc;
    }
    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
    public static List<OrderStep> buildOrders() {
        //  1039L:創(chuàng)建  付款  推送  完成
        //  1065L:創(chuàng)建  付款
        //  7235L:創(chuàng)建  付款
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("創(chuàng)建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("創(chuàng)建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("創(chuàng)建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(7235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    }
}
  • 生產(chǎn)者發(fā)送順序消息:
package com.zzw.rmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        //設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
        producer.setSendMsgTimeout(60000);
        producer.start();
        //構(gòu)建消息集合
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        //發(fā)送消息
        for (int i = 0; i < orderSteps.size(); i++) {
            String body = orderSteps.get(i) + "";
            /**
             * 參數(shù)一:消息對(duì)象
             * 參數(shù)二:消息隊(duì)列的選擇器
             * 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí)(訂單ID)
             */
            Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 * @param mqs:隊(duì)列集合
                 * @param msg:消息對(duì)象
                 * @param arg:業(yè)務(wù)標(biāo)識(shí)的參數(shù)
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    long orderId = (long) arg;
                    long index = orderId % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderSteps.get(i).getOrderId());
            System.out.println("發(fā)送結(jié)果:" + sendResult);
        }
        producer.shutdown();
    }
}
  • 消費(fèi)者順序消費(fèi)消息:
package com.zzw.rmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        /**
         * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
         * 若非第一次啟動(dòng),則按照上次消費(fèi)的位置繼續(xù)消費(fèi)
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("OrderTopic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //同一個(gè)消息隊(duì)列只會(huì)被同一個(gè)線程進(jìn)行處理
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()) + ",消息隊(duì)列id:" + msg.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}

/*
消費(fèi)者啟動(dòng)
線程名稱:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='創(chuàng)建'},消息隊(duì)列id:1
線程名稱:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='創(chuàng)建'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='付款'},消息隊(duì)列id:1
線程名稱:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='付款'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='完成'},消息隊(duì)列id:1
線程名稱:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='完成'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='創(chuàng)建'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='付款'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='推送'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='完成'},消息隊(duì)列id:3
*/
  • 延遲消息:比如在電商里提交了一個(gè)訂單就發(fā)送一個(gè)延時(shí)消息,1h后去檢查這個(gè)訂單的狀態(tài),若還是未付款則取消訂單釋放庫存。
  • 使用限制:RocketMq并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),從1s到2h分別對(duì)應(yīng)著等級(jí)1到18:
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • 生產(chǎn)者發(fā)送延遲消息:
package com.zzw.rmq.delay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class Producer {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        producer.setSendMsgTimeout(60000);
        producer.start();
        for (int i = 0; i < 10; i++) {
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());
            //多了這行代碼:設(shè)定延遲等級(jí)為1,即這個(gè)消息將在1s后發(fā)送
            msg.setDelayTimeLevel(1);
            SendResult result = producer.send(msg);
            System.out.println("發(fā)送結(jié)果:" + result);
            //線程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}
  • 消費(fèi)者消費(fèi)延遲消息:
package com.zzw.rmq.delay;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        consumer.subscribe("DelayTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息ID:【" + msg.getMsgId() + "】,延遲時(shí)間:" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}
/*
消費(fèi)者啟動(dòng)
消息ID:【C0A87C17250818B4AAC24A4FE0650001】,延遲時(shí)間:588ms later
消息ID:【C0A87C17250818B4AAC24A4FDBFD0000】,延遲時(shí)間:1698ms later
消息ID:【C0A87C17250818B4AAC24A4FE4BD0002】,延遲時(shí)間:108ms later
消息ID:【C0A87C17250818B4AAC24A4FE91C0003】,延遲時(shí)間:101ms later
消息ID:【C0A87C17250818B4AAC24A4FED0D0004】,延遲時(shí)間:7ms later
消息ID:【C0A87C17250818B4AAC24A4FF1630005】,延遲時(shí)間:5ms later
消息ID:【C0A87C17250818B4AAC24A4FF5550006】,延遲時(shí)間:5ms later
消息ID:【C0A87C17250818B4AAC24A4FF9430007】,延遲時(shí)間:7ms later
消息ID:【C0A87C17250818B4AAC24A4FFD3A0008】,延遲時(shí)間:105ms later
消息ID:【C0A87C17250818B4AAC24A5001290009】,延遲時(shí)間:1ms later
*/
  • 批量消息:批量發(fā)送消息能顯著提高傳遞小消息的性能,限制是這些批量消息應(yīng)該有相同的topic,相同的waitStoreMsgOK,不能是延時(shí)消息,并且這批消息的總大小不能超過4MB。
  • 生產(chǎn)者發(fā)送批量消息:若每次發(fā)送的消息大小不超過4MB,則使用以下模板1:
package com.zzw.rmq.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        producer.setSendMsgTimeout(60000);
        producer.start();
        List<Message> msgs = new ArrayList<>();
        /**
         * 參數(shù)一:消息主題Topic
         * 參數(shù)二:消息Tag
         * 參數(shù)三:消息內(nèi)容
         */
        Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());
        Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());
        Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());
        msgs.add(msg1);
        msgs.add(msg2);
        msgs.add(msg3);
        SendResult result = producer.send(msgs);
        System.out.println("發(fā)送結(jié)果:" + result);
        TimeUnit.SECONDS.sleep(1);
        producer.shutdown();
    }
}
/*
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C171BD418B4AAC24A5798310000,C0A87C171BD418B4AAC24A5798320001,C0A87C171BD418B4AAC24A5798320002, offsetMsgId=C0A80A6700002A9F0000000000002BCC,C0A80A6700002A9F0000000000002C7B,C0A80A6700002A9F0000000000002D2A, messageQueue=MessageQueue [topic=BatchTopic, brokerName=broker-b, queueId=3], queueOffset=0]
*/
  • 生產(chǎn)者發(fā)送批量消息:若發(fā)送消息的總長度大于4MB,則使用以下模板2:
package com.zzw.rmq.batch;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ProducerOther {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        producer.setSendMsgTimeout(60000);
        producer.start();
        List<Message> messages = new ArrayList<>();
        //把大的消息分裂成若干個(gè)小的消息
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            try {
                List<Message> listItem = splitter.next();
                producer.send(listItem);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
class ListSplitter implements Iterator<List<Message>> {
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的開銷20字節(jié)
            int SIZE_LIMIT = 1024 * 1024 * 4;
            if (tmpSize > SIZE_LIMIT) {
                //單個(gè)消息超過了最大的限制
                //忽略,否則會(huì)阻塞分裂的進(jìn)程
                if (nextIndex - currIndex == 0) {
                    //若下一個(gè)子列表沒有元素,則添加這個(gè)子列表然后退出循環(huán),否則只是退出循環(huán)
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
  • 消費(fèi)者消費(fèi)消息:
package com.zzw.rmq.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        consumer.subscribe("BatchTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_2,Hello World2
consumeThread=ConsumeMessageThread_1,Hello World1
consumeThread=ConsumeMessageThread_3,Hello World3
*/
  • TAG過濾消息,生產(chǎn)者發(fā)送消息:
package com.zzw.rmq.filter.tag;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        producer.setSendMsgTimeout(60000);
        producer.start();
        for (int i = 0; i < 3; i++) {
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("FilterTagTopic", "Tag1", ("Hello World" + i).getBytes());
            SendResult result = producer.send(msg);
            System.out.println("發(fā)送結(jié)果:" + result);
            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}
  • 消費(fèi)者按TAG過濾并消費(fèi)消息:
package com.zzw.rmq.filter.tag;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        consumer.subscribe("FilterTagTopic", "Tag1 || Tag2");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_1,Hello World2
consumeThread=ConsumeMessageThread_2,Hello World1
consumeThread=ConsumeMessageThread_3,Hello World0
consumeThread=ConsumeMessageThread_4,Hello World0
consumeThread=ConsumeMessageThread_5,Hello World1
consumeThread=ConsumeMessageThread_6,Hello World2
*/
  • SQL過濾消息:RocketMQ配置文件默認(rèn)是不開啟屬性過濾的,所以測試過程中會(huì)出現(xiàn)這樣的錯(cuò)誤:Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92,解決方案如下:
  • 修改兩臺(tái)機(jī)器中RocketMQ的安裝目錄的conf/2m-2s-async下的配置文件:
    • 服務(wù)器IP(192.168.10.102):vim /opt/module/rocketmq/conf/2m-2s-sync/broker-a.properties,vim /opt/module/rocketmq/conf/2m-2s-sync/broker-b-s.properties
    • 服務(wù)器IP(192.168.10.103):vim /opt/module/rocketmq/conf/2m-2s-sync/broker-b.propertiesvim /opt/module/rocketmq/conf/2m-2s-sync/broker-a-s.properties
    • 在文件末尾追加以下內(nèi)容:
#是否支持根據(jù)屬性過濾 若使用基于標(biāo)準(zhǔn)的sql92模式過濾消息,則改參數(shù)必須設(shè)置為true
enablePropertyFilter=true
  • 關(guān)閉broker集群:sh bin/mqshutdown broker,重新啟動(dòng)broker集群,重啟后使用jps命令查看啟動(dòng)進(jìn)程的狀態(tài):
#啟動(dòng)Broker集群
#啟動(dòng) Master1 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.102):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-a.properties &
#啟動(dòng) Slave2 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.102):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
#啟動(dòng) Master2 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.103):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-b.properties &
#啟動(dòng) Slave1 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.103):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
  • 生產(chǎn)者發(fā)送消息,通過putUserProperty來設(shè)置自定義過濾屬性:
package com.zzw.rmq.filter.sql;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        producer.setSendMsgTimeout(60000);
        producer.start();
        for (int i = 0; i < 10; i++) {
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes());
            //設(shè)置自定義的屬性
            msg.putUserProperty("i", String.valueOf(i));
            SendResult result = producer.send(msg);
            System.out.println("發(fā)送結(jié)果:" + result);
            TimeUnit.SECONDS.sleep(2);
        }
        producer.shutdown();
    }
}
  • RocketMQ只定義了一些基本語法來支持SQL過濾消息:
    • 數(shù)值比較:>,>=<,<=,BETWEEN=
    • 字符比較:=,<>,IN
    • IS NULLIS NOT NULL
    • 邏輯符號(hào):AND,OR,NOT
    • 數(shù)值,如:123,3.1415
    • 字符,如:'abc',必須用單引號(hào)包裹起來
    • 特殊的常量:NULL
    • 布爾值:TRUE,FALSE
  • 只有使用push模式的消費(fèi)者才能用使用SQL92標(biāo)準(zhǔn)的sql語句,接口如下:public void subscribe(finalString topic, final MessageSelector messageSelector)。
  • 消費(fèi)者消費(fèi)消息,通過消息選擇器來實(shí)現(xiàn)消息過濾:
package com.zzw.rmq.filter.sql;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        //參數(shù)2:消息選擇器
        consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_1,Hello World6
consumeThread=ConsumeMessageThread_2,Hello World7
consumeThread=ConsumeMessageThread_3,Hello World9
consumeThread=ConsumeMessageThread_4,Hello World8
*/
事務(wù)消息的解決方案
  • 事務(wù)消息發(fā)送及提交流程分析:
    • 發(fā)送half消息;
    • 服務(wù)端響應(yīng)消息寫入結(jié)果;
    • 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(若寫入失敗,則half消息對(duì)業(yè)務(wù)不可見,本地邏輯不執(zhí)行);
    • 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引,消息對(duì)消費(fèi)者可見)。
  • 事務(wù)補(bǔ)償流程分析:
    • 對(duì)沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”;
    • Producer收到回查消息,檢查回查消息對(duì)應(yīng)本地事務(wù)的狀態(tài);
    • 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback。
  • 事務(wù)消息共有三種狀態(tài):提交狀態(tài)、回滾狀態(tài)、中間狀態(tài)。
    • TransactionStatus.CommitTransaction:提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。
    • TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
    • TransactionStatus.Unknown:中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。
  • 生產(chǎn)者發(fā)送事務(wù)消息:
package com.zzw.rmq.transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.TimeUnit;
public class Producer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("group5");
        producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        producer.setSendMsgTimeout(60000);
        //添加事務(wù)監(jiān)聽器
        producer.setTransactionListener(new TransactionListener() {
            //在該方法中執(zhí)行本地事務(wù)
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                if (StringUtils.equals("TAGA", msg.getTags())) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TAGB", msg.getTags())) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (StringUtils.equals("TAGC", msg.getTags())) {
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }
            //該方法是MQ進(jìn)行消息事務(wù)狀態(tài)回查
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("消息的Tag:" + msg.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;//提交消息后就能被消費(fèi)
            }
        });
        producer.start();
        String[] tags = {"TAGA", "TAGB", "TAGC"};
        for (int i = 0; i < 3; i++) {
            /**
             * 參數(shù)一:消息主題Topic
             * 參數(shù)二:消息Tag
             * 參數(shù)三:消息內(nèi)容
             */
            Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
            SendResult result = producer.sendMessageInTransaction(msg, null);
            System.out.println("發(fā)送結(jié)果:" + result);
            TimeUnit.SECONDS.sleep(2);
        }
        //不能關(guān)閉,不然回查方法不能被執(zhí)行
        //producer.shutdown();
    }
}
/*
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C1733B418B4AAC24AA52C540000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-b, queueId=1], queueOffset=0]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C1733B418B4AAC24AA534690001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-b, queueId=2], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C1733B418B4AAC24AA53C490002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-b, queueId=3], queueOffset=2]
消息的Tag:TAGC
*/
  • 消費(fèi)者消費(fèi)事務(wù)消息:
package com.zzw.rmq.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
        consumer.subscribe("TransactionTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消費(fèi)者啟動(dòng)");
    }
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_1,Hello World0
consumeThread=ConsumeMessageThread_2,Hello World2
*/
  • 實(shí)現(xiàn)事務(wù)的監(jiān)聽接口:當(dāng)發(fā)送半消息成功時(shí),使用executeLocalTransaction方法來執(zhí)行本地事務(wù),它將返回前面提到的三個(gè)事務(wù)狀態(tài)之一。checkLocalTranscation 方法用于檢查本地事務(wù)狀態(tài),并響應(yīng)消息隊(duì)列的檢查請(qǐng)求,它也是返回前面提到的三個(gè)事務(wù)狀態(tài)之一。
  • 事務(wù)消息的使用限制:
    • 不支持延時(shí)消息和批量消息。
    • 為了避免單個(gè)消息被檢查多次而導(dǎo)致半隊(duì)列消息累積,默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次,用戶可以通過 Broker 配置文件中的transactionCheckMax參數(shù)來修改此限制。若已經(jīng)檢查某條消息超過 N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,同時(shí)打印錯(cuò)誤日志。用戶可以通過重寫 AbstractTransactionCheckListener類來修改這個(gè)行為。
    • 事務(wù)消息將在 Broker 配置文件中的參數(shù)transactionMsgTimeout特定時(shí)間長度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶還可以通過設(shè)置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS來改變這個(gè)限制,該參數(shù)優(yōu)先于transactionMsgTimeout參數(shù)。
    • 事務(wù)性消息可能不止一次被檢查或消費(fèi)。
    • 提交給用戶的目標(biāo)主題消息可能會(huì)失敗,目前這種依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機(jī)制來保證,若希望事務(wù)消息不丟失、事務(wù)完整性得到保證,則建議使用同步的雙重寫入機(jī)制
    • 事務(wù)消息的生產(chǎn)者 ID 不能與其它類型消息的生產(chǎn)者 ID 共享。與其它類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費(fèi)者。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容