安裝可視化界面
-
RocketMQ有一個(gè)對(duì)其擴(kuò)展的開源項(xiàng)目rocketmq-externals,該項(xiàng)目中有一個(gè)子模塊叫rocketmq-console,是一個(gè)管理控制臺(tái)的項(xiàng)目。下載整個(gè)擴(kuò)展項(xiàng)目并編譯打包子項(xiàng)目:
git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
#修改 rocketmq-console 中配置 namesrv 集群地址:
rocketmq.config.namesrvAddr=192.168.10.102:9876;192.168.10.103:9876
#跳過測試打包
mvn clean package -Dmaven.test.skip=true
- 將生成的
rocketmq-console-ng-2.0.0.jar隨便放在兩臺(tái)機(jī)器中的一臺(tái),然后啟動(dòng)rocketmq-console:java -jar rocketmq-console-ng-2.0.0.jar,瀏覽器輸入網(wǎng)址:http://192.168.10.102:8080進(jìn)行訪問。

安裝成功
- 新建 SpringBoot 項(xiàng)目,導(dǎo)入MQ客戶端依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
- 生產(chǎn)者發(fā)送同步消息:
package com.zzw.rmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
* 發(fā)送同步消息
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
//1、創(chuàng)建消息生產(chǎn)者 producer ,并指定生產(chǎn)者組名 group1
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2、指定 NameServer 的地址
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
//設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
producer.setSendMsgTimeout(60000);
//3、啟動(dòng) producer
producer.start();
for (int i = 0; i < 10; i++) {
//4、創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("test-base-mq", "Tag1", ("Hello World" + i).getBytes());
//5、發(fā)送消息
SendResult result = producer.send(msg);
//發(fā)送狀態(tài)
SendStatus status = result.getSendStatus();
//消息id
String msgId = result.getMsgId();
//接收消息的隊(duì)列id
int queueId = result.getMessageQueue().getQueueId();
System.out.println("發(fā)送狀態(tài):" + status + ",消息id:" + msgId + ",接收消息的隊(duì)列id:" + queueId);
//線程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6、關(guān)閉生產(chǎn)者 producer
producer.shutdown();
}
}
/*
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655AD480000,接收消息的隊(duì)列id:1
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655B1AD0001,接收消息的隊(duì)列id:2
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655B59B0002,接收消息的隊(duì)列id:3
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655B9880003,接收消息的隊(duì)列id:0
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655BD7E0004,接收消息的隊(duì)列id:1
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655C16B0005,接收消息的隊(duì)列id:2
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655C5580006,接收消息的隊(duì)列id:3
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655C9450007,接收消息的隊(duì)列id:0
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655CD300008,接收消息的隊(duì)列id:1
發(fā)送狀態(tài):SEND_OK,消息id:C0A87C173EF418B4AAC24655D11D0009,接收消息的隊(duì)列id:2
*/
- 生產(chǎn)者發(fā)送異步消息:通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場景,即發(fā)送端不能容忍長時(shí)間地等待Broker的響應(yīng)。
package com.zzw.rmq.base.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
* 發(fā)送異步消息
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//1、創(chuàng)建消息生產(chǎn)者 producer,并指定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2、指定 NameServer 地址
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
//設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
producer.setSendMsgTimeout(60000);
//3、啟動(dòng)producer
producer.start();
for (int i = 0; i < 10; i++) {
//4、創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("test-base-mq", "Tag2", ("Hello World" + i).getBytes());
//5、發(fā)送異步消息
producer.send(msg, new SendCallback() {
/**
* 發(fā)送成功回調(diào)函數(shù)
* @param sendResult
*/
public void onSuccess(SendResult sendResult) {
System.out.println("發(fā)送結(jié)果:" + sendResult);
}
/**
* 發(fā)送失敗回調(diào)函數(shù)
* @param e
*/
public void onException(Throwable e) {
System.out.println("發(fā)送異常:" + e);
}
});
//線程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6、關(guān)閉生產(chǎn)者 producer
producer.shutdown();
}
}
/*
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0000, offsetMsgId=C0A80A6600002A9F00000000000004D7, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=3], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0002, offsetMsgId=C0A80A6600002A9F0000000000000426, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=3], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0003, offsetMsgId=C0A80A6700002A9F0000000000000375, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=3], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650766E0001, offsetMsgId=C0A80A6700002A9F0000000000000426, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=1], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC24650767F0004, offsetMsgId=C0A80A6700002A9F00000000000004D7, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=2], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465078AE0005, offsetMsgId=C0A80A6700002A9F0000000000000588, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=3], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC246507C960006, offsetMsgId=C0A80A6700002A9F0000000000000639, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=0], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465080800007, offsetMsgId=C0A80A6600002A9F0000000000000588, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=0], queueOffset=2]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465084690008, offsetMsgId=C0A80A6700002A9F00000000000006EA, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-b, queueId=3], queueOffset=3]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C173D6418B4AAC2465088530009, offsetMsgId=C0A80A6600002A9F0000000000000639, messageQueue=MessageQueue [topic=test-base-mq, brokerName=broker-a, queueId=0], queueOffset=3]
*/
- 生產(chǎn)者單向發(fā)送消息:主要用在不特別關(guān)心發(fā)送結(jié)果的場景,例如日志發(fā)送。
package com.zzw.rmq.base.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class OneWayProducer {
public static void main(String[] args) throws Exception, MQBrokerException {
//1、創(chuàng)建消息生產(chǎn)者 producer,并指定生產(chǎn)者組名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2、指定 NameServer 地址
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
//設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
producer.setSendMsgTimeout(60000);
//3、啟動(dòng) producer
producer.start();
for (int i = 0; i < 3; i++) {
//4、創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("base", "Tag3", ("Hello World,單向消息" + i).getBytes());
//5、發(fā)送單向消息
producer.sendOneway(msg);
//線程睡1秒
TimeUnit.SECONDS.sleep(5);
}
//6、關(guān)閉生產(chǎn)者producer
producer.shutdown();
}
}
- 消費(fèi)者消費(fèi)消息的兩種模式:
負(fù)載均衡和廣播模式,默認(rèn)是負(fù)載均衡模式。
package com.zzw.rmq.base.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//1、創(chuàng)建消費(fèi)者 Consumer,并指定消費(fèi)者組名(push消費(fèi)模式)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2、指定 NameServer 地址
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
//3、訂閱主題Topic和Tag, * 表示所有 tag
consumer.subscribe("test-base-mq", "*");
/**
* 設(shè)定消費(fèi)模式:負(fù)載均衡(默認(rèn)) | 廣播模式
* 負(fù)載均衡:多個(gè)消費(fèi)者共同消費(fèi)隊(duì)列消息,每個(gè)消費(fèi)者處理的消息不同
* 廣播模式:不同的消費(fèi)者接收到的消息是一樣的
*/
consumer.setMessageModel(MessageModel.BROADCASTING);
//4、設(shè)置回調(diào)函數(shù),處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息內(nèi)容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5、啟動(dòng)消費(fèi)者consumer
consumer.start();
}
}
- 順序消息:消息有序指可以按照消息的發(fā)送順序來消費(fèi)(FIFO)。RocketMQ可以嚴(yán)格地保證消息有序,分為
分區(qū)有序或全局有序。 - 在默認(rèn)的情況下消息發(fā)送會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);而消費(fèi)消息時(shí)從多個(gè)queue上拉取消息,這種情況不能保證消費(fèi)順序。但若控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)時(shí)只從這個(gè)queue上依次拉取,則保證了消費(fèi)順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),即為全局有序;若多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。
- 下面用訂單進(jìn)行分區(qū)有序的示例。一個(gè)訂單的順序流程是:創(chuàng)建、付款、推送、完成。訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,消費(fèi)時(shí)同一個(gè)OrderId獲取到的肯定是同一個(gè)隊(duì)列。
- 訂單構(gòu)建者:
package com.zzw.rmq.order;
import java.util.ArrayList;
import java.util.List;
/**
* 訂單構(gòu)建者
*/
public class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
public static List<OrderStep> buildOrders() {
// 1039L:創(chuàng)建 付款 推送 完成
// 1065L:創(chuàng)建 付款
// 7235L:創(chuàng)建 付款
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1065L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(7235L);
orderDemo.setDesc("創(chuàng)建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(7235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(7235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
- 生產(chǎn)者發(fā)送順序消息:
package com.zzw.rmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
//設(shè)置生產(chǎn)者發(fā)送消息的超時(shí)時(shí)間長一點(diǎn)
producer.setSendMsgTimeout(60000);
producer.start();
//構(gòu)建消息集合
List<OrderStep> orderSteps = OrderStep.buildOrders();
//發(fā)送消息
for (int i = 0; i < orderSteps.size(); i++) {
String body = orderSteps.get(i) + "";
/**
* 參數(shù)一:消息對(duì)象
* 參數(shù)二:消息隊(duì)列的選擇器
* 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí)(訂單ID)
*/
Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
* @param mqs:隊(duì)列集合
* @param msg:消息對(duì)象
* @param arg:業(yè)務(wù)標(biāo)識(shí)的參數(shù)
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderSteps.get(i).getOrderId());
System.out.println("發(fā)送結(jié)果:" + sendResult);
}
producer.shutdown();
}
}
- 消費(fèi)者順序消費(fèi)消息:
package com.zzw.rmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
/**
* 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)
* 若非第一次啟動(dòng),則按照上次消費(fèi)的位置繼續(xù)消費(fèi)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
//同一個(gè)消息隊(duì)列只會(huì)被同一個(gè)線程進(jìn)行處理
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("線程名稱:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()) + ",消息隊(duì)列id:" + msg.getQueueId());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
/*
消費(fèi)者啟動(dòng)
線程名稱:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='創(chuàng)建'},消息隊(duì)列id:1
線程名稱:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='創(chuàng)建'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='付款'},消息隊(duì)列id:1
線程名稱:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='付款'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_1】:OrderStep{orderId=1065, desc='完成'},消息隊(duì)列id:1
線程名稱:【ConsumeMessageThread_2】:OrderStep{orderId=7235, desc='完成'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='創(chuàng)建'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='付款'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='推送'},消息隊(duì)列id:3
線程名稱:【ConsumeMessageThread_3】:OrderStep{orderId=1039, desc='完成'},消息隊(duì)列id:3
*/
- 延遲消息:比如在電商里提交了一個(gè)訂單就發(fā)送一個(gè)延時(shí)消息,1h后去檢查這個(gè)訂單的狀態(tài),若還是未付款則取消訂單釋放庫存。
- 使用限制:RocketMq并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),從1s到2h分別對(duì)應(yīng)著等級(jí)1到18:
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- 生產(chǎn)者發(fā)送延遲消息:
package com.zzw.rmq.delay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
producer.setSendMsgTimeout(60000);
producer.start();
for (int i = 0; i < 10; i++) {
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());
//多了這行代碼:設(shè)定延遲等級(jí)為1,即這個(gè)消息將在1s后發(fā)送
msg.setDelayTimeLevel(1);
SendResult result = producer.send(msg);
System.out.println("發(fā)送結(jié)果:" + result);
//線程睡1秒
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
- 消費(fèi)者消費(fèi)延遲消息:
package com.zzw.rmq.delay;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
consumer.subscribe("DelayTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息ID:【" + msg.getMsgId() + "】,延遲時(shí)間:" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
/*
消費(fèi)者啟動(dòng)
消息ID:【C0A87C17250818B4AAC24A4FE0650001】,延遲時(shí)間:588ms later
消息ID:【C0A87C17250818B4AAC24A4FDBFD0000】,延遲時(shí)間:1698ms later
消息ID:【C0A87C17250818B4AAC24A4FE4BD0002】,延遲時(shí)間:108ms later
消息ID:【C0A87C17250818B4AAC24A4FE91C0003】,延遲時(shí)間:101ms later
消息ID:【C0A87C17250818B4AAC24A4FED0D0004】,延遲時(shí)間:7ms later
消息ID:【C0A87C17250818B4AAC24A4FF1630005】,延遲時(shí)間:5ms later
消息ID:【C0A87C17250818B4AAC24A4FF5550006】,延遲時(shí)間:5ms later
消息ID:【C0A87C17250818B4AAC24A4FF9430007】,延遲時(shí)間:7ms later
消息ID:【C0A87C17250818B4AAC24A4FFD3A0008】,延遲時(shí)間:105ms later
消息ID:【C0A87C17250818B4AAC24A5001290009】,延遲時(shí)間:1ms later
*/
- 批量消息:批量發(fā)送消息能顯著提高傳遞小消息的性能,限制是這些批量消息應(yīng)該有相同的topic,相同的waitStoreMsgOK,不能是延時(shí)消息,并且這批消息的總大小不能超過4MB。
- 生產(chǎn)者發(fā)送批量消息:若每次發(fā)送的消息大小不超過4MB,則使用以下模板1:
package com.zzw.rmq.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
producer.setSendMsgTimeout(60000);
producer.start();
List<Message> msgs = new ArrayList<>();
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());
Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());
Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());
msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);
SendResult result = producer.send(msgs);
System.out.println("發(fā)送結(jié)果:" + result);
TimeUnit.SECONDS.sleep(1);
producer.shutdown();
}
}
/*
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C171BD418B4AAC24A5798310000,C0A87C171BD418B4AAC24A5798320001,C0A87C171BD418B4AAC24A5798320002, offsetMsgId=C0A80A6700002A9F0000000000002BCC,C0A80A6700002A9F0000000000002C7B,C0A80A6700002A9F0000000000002D2A, messageQueue=MessageQueue [topic=BatchTopic, brokerName=broker-b, queueId=3], queueOffset=0]
*/
- 生產(chǎn)者發(fā)送批量消息:若發(fā)送消息的總長度大于4MB,則使用以下模板2:
package com.zzw.rmq.batch;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ProducerOther {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
producer.setSendMsgTimeout(60000);
producer.start();
List<Message> messages = new ArrayList<>();
//把大的消息分裂成若干個(gè)小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class ListSplitter implements Iterator<List<Message>> {
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的開銷20字節(jié)
int SIZE_LIMIT = 1024 * 1024 * 4;
if (tmpSize > SIZE_LIMIT) {
//單個(gè)消息超過了最大的限制
//忽略,否則會(huì)阻塞分裂的進(jìn)程
if (nextIndex - currIndex == 0) {
//若下一個(gè)子列表沒有元素,則添加這個(gè)子列表然后退出循環(huán),否則只是退出循環(huán)
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
- 消費(fèi)者消費(fèi)消息:
package com.zzw.rmq.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
consumer.subscribe("BatchTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_2,Hello World2
consumeThread=ConsumeMessageThread_1,Hello World1
consumeThread=ConsumeMessageThread_3,Hello World3
*/
- TAG過濾消息,生產(chǎn)者發(fā)送消息:
package com.zzw.rmq.filter.tag;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
producer.setSendMsgTimeout(60000);
producer.start();
for (int i = 0; i < 3; i++) {
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("FilterTagTopic", "Tag1", ("Hello World" + i).getBytes());
SendResult result = producer.send(msg);
System.out.println("發(fā)送結(jié)果:" + result);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
- 消費(fèi)者按TAG過濾并消費(fèi)消息:
package com.zzw.rmq.filter.tag;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
consumer.subscribe("FilterTagTopic", "Tag1 || Tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_1,Hello World2
consumeThread=ConsumeMessageThread_2,Hello World1
consumeThread=ConsumeMessageThread_3,Hello World0
consumeThread=ConsumeMessageThread_4,Hello World0
consumeThread=ConsumeMessageThread_5,Hello World1
consumeThread=ConsumeMessageThread_6,Hello World2
*/
- SQL過濾消息:RocketMQ配置文件默認(rèn)是不開啟屬性過濾的,所以測試過程中會(huì)出現(xiàn)這樣的錯(cuò)誤:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92,解決方案如下: - 修改兩臺(tái)機(jī)器中RocketMQ的安裝目錄的
conf/2m-2s-async下的配置文件:- 服務(wù)器IP(192.168.10.102):
vim /opt/module/rocketmq/conf/2m-2s-sync/broker-a.properties,vim /opt/module/rocketmq/conf/2m-2s-sync/broker-b-s.properties - 服務(wù)器IP(192.168.10.103):
vim /opt/module/rocketmq/conf/2m-2s-sync/broker-b.properties,vim /opt/module/rocketmq/conf/2m-2s-sync/broker-a-s.properties - 在文件末尾追加以下內(nèi)容:
- 服務(wù)器IP(192.168.10.102):
#是否支持根據(jù)屬性過濾 若使用基于標(biāo)準(zhǔn)的sql92模式過濾消息,則改參數(shù)必須設(shè)置為true
enablePropertyFilter=true
- 關(guān)閉broker集群:
sh bin/mqshutdown broker,重新啟動(dòng)broker集群,重啟后使用jps命令查看啟動(dòng)進(jìn)程的狀態(tài):
#啟動(dòng)Broker集群
#啟動(dòng) Master1 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.102):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-a.properties &
#啟動(dòng) Slave2 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.102):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
#啟動(dòng) Master2 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.103):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-b.properties &
#啟動(dòng) Slave1 節(jié)點(diǎn)(服務(wù)器ip:192.168.10.103):
nohup sh bin/mqbroker -c /opt/module/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
- 生產(chǎn)者發(fā)送消息,通過
putUserProperty來設(shè)置自定義過濾屬性:
package com.zzw.rmq.filter.sql;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
producer.setSendMsgTimeout(60000);
producer.start();
for (int i = 0; i < 10; i++) {
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes());
//設(shè)置自定義的屬性
msg.putUserProperty("i", String.valueOf(i));
SendResult result = producer.send(msg);
System.out.println("發(fā)送結(jié)果:" + result);
TimeUnit.SECONDS.sleep(2);
}
producer.shutdown();
}
}
- RocketMQ只定義了一些基本語法來支持SQL過濾消息:
- 數(shù)值比較:
>,>=,<,<=,BETWEEN,= - 字符比較:
=,<>,IN -
IS NULL,IS NOT NULL - 邏輯符號(hào):
AND,OR,NOT - 數(shù)值,如:123,3.1415
- 字符,如:
'abc',必須用單引號(hào)包裹起來 - 特殊的常量:
NULL - 布爾值:
TRUE,FALSE
- 數(shù)值比較:
- 只有使用push模式的消費(fèi)者才能用使用SQL92標(biāo)準(zhǔn)的sql語句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)。 - 消費(fèi)者消費(fèi)消息,通過消息選擇器來實(shí)現(xiàn)消息過濾:
package com.zzw.rmq.filter.sql;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
//參數(shù)2:消息選擇器
consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_1,Hello World6
consumeThread=ConsumeMessageThread_2,Hello World7
consumeThread=ConsumeMessageThread_3,Hello World9
consumeThread=ConsumeMessageThread_4,Hello World8
*/

事務(wù)消息的解決方案
- 事務(wù)消息發(fā)送及提交流程分析:
- 發(fā)送half消息;
- 服務(wù)端響應(yīng)消息寫入結(jié)果;
- 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(若寫入失敗,則half消息對(duì)業(yè)務(wù)不可見,本地邏輯不執(zhí)行);
- 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引,消息對(duì)消費(fèi)者可見)。
- 事務(wù)補(bǔ)償流程分析:
- 對(duì)沒有
Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”; - Producer收到回查消息,檢查回查消息對(duì)應(yīng)本地事務(wù)的狀態(tài);
- 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback。
- 對(duì)沒有
- 事務(wù)消息共有三種狀態(tài):提交狀態(tài)、回滾狀態(tài)、中間狀態(tài)。
-
TransactionStatus.CommitTransaction:提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。 -
TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。 -
TransactionStatus.Unknown:中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。
-
- 生產(chǎn)者發(fā)送事務(wù)消息:
package com.zzw.rmq.transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group5");
producer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
producer.setSendMsgTimeout(60000);
//添加事務(wù)監(jiān)聽器
producer.setTransactionListener(new TransactionListener() {
//在該方法中執(zhí)行本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if (StringUtils.equals("TAGA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TAGB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("TAGC", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
//該方法是MQ進(jìn)行消息事務(wù)狀態(tài)回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("消息的Tag:" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;//提交消息后就能被消費(fèi)
}
});
producer.start();
String[] tags = {"TAGA", "TAGB", "TAGC"};
for (int i = 0; i < 3; i++) {
/**
* 參數(shù)一:消息主題Topic
* 參數(shù)二:消息Tag
* 參數(shù)三:消息內(nèi)容
*/
Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("發(fā)送結(jié)果:" + result);
TimeUnit.SECONDS.sleep(2);
}
//不能關(guān)閉,不然回查方法不能被執(zhí)行
//producer.shutdown();
}
}
/*
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C1733B418B4AAC24AA52C540000, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-b, queueId=1], queueOffset=0]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C1733B418B4AAC24AA534690001, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-b, queueId=2], queueOffset=1]
發(fā)送結(jié)果:SendResult [sendStatus=SEND_OK, msgId=C0A87C1733B418B4AAC24AA53C490002, offsetMsgId=null, messageQueue=MessageQueue [topic=TransactionTopic, brokerName=broker-b, queueId=3], queueOffset=2]
消息的Tag:TAGC
*/
- 消費(fèi)者消費(fèi)事務(wù)消息:
package com.zzw.rmq.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.10.102:9876;192.168.10.103:9876");
consumer.subscribe("TransactionTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消費(fèi)者啟動(dòng)");
}
}
/*
消費(fèi)者啟動(dòng)
consumeThread=ConsumeMessageThread_1,Hello World0
consumeThread=ConsumeMessageThread_2,Hello World2
*/
- 實(shí)現(xiàn)事務(wù)的監(jiān)聽接口:當(dāng)發(fā)送半消息成功時(shí),使用
executeLocalTransaction方法來執(zhí)行本地事務(wù),它將返回前面提到的三個(gè)事務(wù)狀態(tài)之一。checkLocalTranscation方法用于檢查本地事務(wù)狀態(tài),并響應(yīng)消息隊(duì)列的檢查請(qǐng)求,它也是返回前面提到的三個(gè)事務(wù)狀態(tài)之一。 - 事務(wù)消息的使用限制:
- 不支持延時(shí)消息和批量消息。
- 為了避免單個(gè)消息被檢查多次而導(dǎo)致半隊(duì)列消息累積,默認(rèn)將單個(gè)消息的檢查次數(shù)限制為 15 次,用戶可以通過 Broker 配置文件中的
transactionCheckMax參數(shù)來修改此限制。若已經(jīng)檢查某條消息超過 N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,同時(shí)打印錯(cuò)誤日志。用戶可以通過重寫AbstractTransactionCheckListener類來修改這個(gè)行為。 - 事務(wù)消息將在 Broker 配置文件中的參數(shù)
transactionMsgTimeout特定時(shí)間長度之后被檢查。當(dāng)發(fā)送事務(wù)消息時(shí),用戶還可以通過設(shè)置用戶屬性CHECK_IMMUNITY_TIME_IN_SECONDS來改變這個(gè)限制,該參數(shù)優(yōu)先于transactionMsgTimeout參數(shù)。 - 事務(wù)性消息可能不止一次被檢查或消費(fèi)。
- 提交給用戶的目標(biāo)主題消息可能會(huì)失敗,目前這種依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機(jī)制來保證,若希望事務(wù)消息不丟失、事務(wù)完整性得到保證,則建議使用
同步的雙重寫入機(jī)制。 - 事務(wù)消息的生產(chǎn)者 ID 不能與其它類型消息的生產(chǎn)者 ID 共享。與其它類型的消息不同,事務(wù)消息允許反向查詢、MQ服務(wù)器能通過它們的生產(chǎn)者 ID 查詢到消費(fèi)者。