前言
目的整體就是入門(mén)學(xué)習(xí)RocketMQ到底是國(guó)什么用的,基本如何使用之類(lèi)。
對(duì)于RocketMQ而言
- 支持集群模型,負(fù)載均衡,水平擴(kuò)展能力
- 億級(jí)別的消息堆積能力
- 采用零拷貝的原理,順序?qū)懕P(pán),隨機(jī)讀
- 豐富的api使用等
基本的概念模型
與RabbitMQ一樣有生產(chǎn)者與消費(fèi)者 - Producer: 消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息。
- Comsumer: 消息消費(fèi)者, 負(fù)責(zé)消費(fèi)消息
- push Consumer: consumer的一種,需要向Consumer對(duì)象注冊(cè)監(jiān)聽(tīng)
- pull Consuer: consumer的一種,需要主動(dòng)請(qǐng)求broker拉取消息
- Producer Group:生產(chǎn)者集合,一般用于發(fā)送一類(lèi)消息
- Consumer Group:消費(fèi)者集合,一般用于接受一類(lèi)消息進(jìn)行消費(fèi)
- Broker: MQ 消息服務(wù),
安裝windows RocketMQ
至于安裝可以去搜一些安裝教程,再進(jìn)行修改一些配置的就可以了。
RocketMQ客戶(hù)端
開(kāi)始之前得去github下載個(gè)客戶(hù)端
https://github.com/apache/rocketmq-dashboard
下載完后直接修改配置文件application.yml
rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs: 替換成你本地的地址或者能連上你開(kāi)啟的RocketMQ服務(wù)地址和端口
- 127.0.0.1:9876
- 127.0.0.2:9876
創(chuàng)建項(xiàng)目
創(chuàng)建maven項(xiàng)目
- pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<rocketmq.version>4.3.0</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
創(chuàng)建producer類(lèi)
創(chuàng)建defaultMQProducer對(duì)象
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("test_producer_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
producer.start();
for(int i = 0 ; i <5; i ++) {
// 1. 創(chuàng)建消息
Message message = new Message("test_topic", // 主題
"TagA", // 標(biāo)簽
"key" + i, // 用戶(hù)自定義的key ,唯一的標(biāo)識(shí)
("Hello RocketMQ" + i).getBytes()); // 消息內(nèi)容實(shí)體(byte[])
// 2. 發(fā)送消息 topic 默認(rèn)4個(gè)隊(duì)列
SendResult sendResult = producer.send(message);
System.out.println("消息發(fā)出:" + sendResult);
producer.shutdown();
}
執(zhí)行打印
消息發(fā)出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B760000, offsetMsgId=C0A8010500002A9F000000000005812E, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=1], queueOffset=1]
消息發(fā)出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B800001, offsetMsgId=C0A8010500002A9F00000000000581F0, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=1]
消息發(fā)出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B820002, offsetMsgId=C0A8010500002A9F00000000000582B2, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=3], queueOffset=2]
消息發(fā)出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B850003, offsetMsgId=C0A8010500002A9F0000000000058374, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=0], queueOffset=1]
消息發(fā)出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B8B0004, offsetMsgId=C0A8010500002A9F0000000000058436, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=1], queueOffset=2]
創(chuàng)建Consumer類(lèi)
- 創(chuàng)建defaultMQConsumer對(duì)象
- 設(shè)置NamesrvAddr 及消費(fèi)位置ConsumerFromWhere
- 進(jìn)行訂閱主題 subscribe
- 注冊(cè)監(jiān)聽(tīng)并消費(fèi)registerMessageListener
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_name");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.err.println("consumer start...");
l
}
}
打印消息
topic: test_topic,tags: TagA, keys: key0,body: Hello RocketMQ0
topic: test_topic,tags: TagA, keys: key1,body: Hello RocketMQ1
topic: test_topic,tags: TagA, keys: key2,body: Hello RocketMQ2
topic: test_topic,tags: TagA, keys: key3,body: Hello RocketMQ3
topic: test_topic,tags: TagA, keys: key4,body: Hello RocketMQ4
消息重發(fā)
假設(shè)消費(fèi)者有一段代碼是拋出異常,并重發(fā)了多少次,達(dá)到多少次的時(shí)候就去記錄,
做定時(shí)任務(wù)補(bǔ)償處理
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_name");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
if(keys.equals("key1")) {
System.err.println("消息消費(fèi)失敗..");
int a = 1/0;
}
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
e.printStackTrace();
int recousumeTimes = me.getReconsumeTimes();
System.err.println("recousumeTimes: " + recousumeTimes);
if(recousumeTimes == 3) {
// 記錄日志....
// 做補(bǔ)償處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.err.println("consumer start...");
}
}
打印日志
recousumeTimes: 3
消息消費(fèi)失敗..
java.lang.ArithmeticException: / by zero
at com.bfxy.rocketmq.quickstart.Consumer$1.consumeMessage(Consumer.java:42)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
recousumeTimes: 3
結(jié)尾
此次學(xué)習(xí)了解一個(gè)RocketMQ的一個(gè)生產(chǎn)者發(fā)送消息和消費(fèi)者接收消息,并在消費(fèi)者做一個(gè)假設(shè)處理異常,可以做日志記錄或者消息補(bǔ)償之類(lèi)。