注:默認(rèn)已安裝jdk,maven的前提下
1.下載
http://rocketmq.apache.org/release_notes/release-notes-4.2.0/

2.啟動(dòng)
解壓后進(jìn)入bin目錄:D:\rocketmq-4.2\bin
2.1 執(zhí)行命令:start mqnamesrv.cmd 啟動(dòng)nameserver,成功后不要關(guān)閉窗口
啟動(dòng)成功可以看到success:

2.2 執(zhí)行命令:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 啟動(dòng)broker 執(zhí)行成功后不要關(guān)閉窗口
啟動(dòng)成功是一個(gè)空的命令行窗口:

2.3 執(zhí)行完已經(jīng)啟動(dòng)成功,會(huì)有三個(gè)窗口,完整的執(zhí)行命令的窗口圖:

3.安裝rocketmq-console插件(可選,不是必須)
下載地址:https://github.com/apache/rocketmq-externals
3.1 如果已安裝git,使用git命令clone https://github.com/apache/rocketmq-externals即可下載
3.2 如果未安裝git,點(diǎn)擊下載:

3.3 下載完成后進(jìn)入解壓目錄的rocketmq-console項(xiàng)目找到application.properties配置文件,此處我的目錄是:D:\rocketmq-externals-master\rocketmq-console\src\main\resources

3.4 打開(kāi)目錄配置rocketmq-console的啟動(dòng)端口(它本質(zhì)是一個(gè)spring-boot項(xiàng)目,這里的端口相當(dāng)于啟動(dòng)項(xiàng)目,也就是tomcat端口,不要與其它項(xiàng)目端口沖突),以及剛才啟動(dòng)的rocketmq的ip及端口號(hào):

3.5 配置完成后,打開(kāi)cmd進(jìn)入rocketmq-console項(xiàng)目根目錄,我的目錄是D:\rocketmq-externals-master\rocketmq-console
執(zhí)行maven命令跳過(guò)測(cè)試打包:mvn clean package -Dmaven.test.skip=true 第一次需要下載依賴(lài)會(huì)有些慢

3.6 打包成功后,target目錄下會(huì)生成rocketmq-console-ng-1.0.0.jar,我的目錄為D:\rocketmq-externals-master\rocketmq-console\target

3.7 使用java命令java -jar rocketmq-console-ng-1.0.0.jar運(yùn)行這個(gè)jar包

3.8 訪問(wèn)rocketmq監(jiān)控頁(yè)面
http://localhost:8001 以實(shí)際地址為準(zhǔn)

4 測(cè)試rocketmq
4.1 啟動(dòng)消費(fèi)者監(jiān)聽(tīng),然后模擬生產(chǎn)者發(fā)送一條數(shù)據(jù)

4.2 測(cè)試結(jié)果,第一條打印信息是生產(chǎn)者打印的,可以看到SEND_OK代表生產(chǎn)成功,第二條打印信息是消費(fèi)者打印的,已經(jīng)成功消費(fèi)到數(shù)據(jù),如果數(shù)據(jù)中含有中文,消費(fèi)時(shí)注意要用UTF-8,否則亂碼

4.3 安裝的監(jiān)控頁(yè)面也可以看到剛才生產(chǎn)的消息,點(diǎn)擊MESSAGE DETAIL按鈕可以查看消息內(nèi)容:


6 測(cè)試代碼
6.1 依賴(lài):
<!-- RocketMQ -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.5.8</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>
6.2 完整測(cè)試代碼:
package com.haocang.clean.util;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.alibaba.rocketmq.shade.com.alibaba.fastjson.JSON;
import com.haocang.itc.pojo.MqBean;
/**
* RocketMq alibaba版本操作工具類(lèi)
* 生產(chǎn)/消費(fèi)
* @author shenke
*/
public class RocketMqUtil {
private RocketMqUtil(){
}
// 默認(rèn)生產(chǎn)者組名稱(chēng)
private static final String DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME = "test_producer";
// 默認(rèn)連接地址
private static final String DEFAULT_ROCKETMQ_ADDRESS = "localhost:9876";
// 默認(rèn)生產(chǎn)者應(yīng)用名稱(chēng)
private static final String DEFAULT_ROCKETMQ_INSTANCENAME = "test";
// 默認(rèn)生產(chǎn)者最大消息長(zhǎng)度
private static final Integer DEFAULT_ROCKETMQ_MAXMESSAGESIZE = Integer.MAX_VALUE;
// 默認(rèn)編碼
public static final String DEFAULT_ROCKETMQ_ENCODING = "UTF-8";
// 默認(rèn)消費(fèi)者組名稱(chēng)
private static final String DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME = "test_consumer";
// 默認(rèn)監(jiān)聽(tīng)主題
private static final String DEFAULT_ROCKETMQ_TOPIC = "test";
// 默認(rèn)監(jiān)聽(tīng)過(guò)濾條件
private static final String DEFAULT_ROCKETMQ_SUB_EXPRESSION = "test1";
/**
* 初始化生產(chǎn)者服務(wù)
* 默認(rèn)配置
* @return
*/
private static synchronized DefaultMQProducer initProducer() {
return initProducer(
DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME,
DEFAULT_ROCKETMQ_ADDRESS,
DEFAULT_ROCKETMQ_INSTANCENAME,
DEFAULT_ROCKETMQ_MAXMESSAGESIZE
);
}
/**
* 初始化生產(chǎn)者服務(wù)
* 可選配置
* @param rocketmqAddress
* @return
*/
private static synchronized DefaultMQProducer initProducer(
String groupName, String rocketmqAddress,String instanceName, int maxMessageSize) {
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(rocketmqAddress);
producer.setInstanceName(instanceName);
producer.setMaxMessageSize(maxMessageSize);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
/**
* 關(guān)閉生產(chǎn)者服務(wù)
*/
public static synchronized void closeProducer(DefaultMQProducer producer){
if (producer != null) {
producer.shutdown();
}
}
/**
* 初始化消費(fèi)者服務(wù)
* 使用默認(rèn)配置
* @return
*/
private static DefaultMQPushConsumer initConsumer(){
return initConsumer(DEFAULT_ROCKETMQ_ADDRESS, DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME, DEFAULT_ROCKETMQ_TOPIC, DEFAULT_ROCKETMQ_SUB_EXPRESSION);
}
/**
* 初始化消費(fèi)者服務(wù)
* 使用自定義配置
* @param topic
* @param subExpression
* @return
*/
private static DefaultMQPushConsumer initConsumer(String namesrvAddr, String consumerGroup, String topic, String subExpression) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(topic, subExpression);
} catch (MQClientException e1) {
e1.printStackTrace();
}
return consumer;
}
/**
* 關(guān)閉消費(fèi)者服務(wù)
*/
public static void closeConsumer(DefaultMQPushConsumer consumer){
if (consumer != null) {
consumer.shutdown();
}
}
/**
* 生產(chǎn)消息
* 使用默認(rèn)配置
* @param message
* @param close 生產(chǎn)完畢是否關(guān)閉,若不關(guān)閉則會(huì)阻塞
* @return
*/
public static SendResult send(Message message, boolean close){
SendResult sendResult = null;
DefaultMQProducer producer = initProducer();
try {
sendResult = producer.send(message);
if(close){
closeProducer(producer);
}
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
/**
* 生產(chǎn)消息
* 使用自定義配置
* @param message
* @param groupName
* @param rocketmqAddress
* @param instanceName
* @param maxMessageSize
* @param close 生產(chǎn)完畢是否關(guān)閉,若不關(guān)閉則會(huì)阻塞
* @return
*/
public static SendResult send(Message message, String groupName,String rocketmqAddress,String instanceName, int maxMessageSize, boolean close){
SendResult sendResult = null;
try {
DefaultMQProducer producer = initProducer(groupName, rocketmqAddress, instanceName, maxMessageSize);
sendResult = producer.send(message);
if(close){
closeProducer(producer);
}
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
/**
* 消費(fèi)消息
* 使用默認(rèn)配置
* @param messageListenerConcurrently
* @throws MQClientException
*/
public static void consumer(MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
DefaultMQPushConsumer consumer = initConsumer();
consumer.registerMessageListener(messageListenerConcurrently);
consumer.start();
}
/**
* 消費(fèi)消息
* 使用自定義配置
* @param topic
* @param subExpression
* @param messageListenerConcurrently
* @throws MQClientException
*/
public static void consumer(String namesrvAddr, String consumerGroup, String topic, String subExpression, MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
DefaultMQPushConsumer consumer = initConsumer(namesrvAddr, consumerGroup, topic, subExpression);
consumer.registerMessageListener(messageListenerConcurrently);
consumer.start();
}
public static void main(String[] args) {
try {
consumer(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody(), "UTF-8"));
}
} catch (Exception e) {
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
} catch (Exception e) {
}
String data = "[{\"id\":\"001\",\"name\":\"張三\",\"age\":\"21\"},{\"id\":\"002\",\"name\":\"李四\",\"age\":\"22\"}]";
SendResult send = send(new Message("test", "test1", data.getBytes()), true);
System.out.println(send.toString());
}
}
7 總結(jié)
本文只是demo,詳細(xì)闡述如果在windows上搭建rocketmq服務(wù)及測(cè)試,實(shí)際生產(chǎn)環(huán)境中需要更精準(zhǔn)的配置,以及具體的業(yè)務(wù)問(wèn)題,比如訂單系統(tǒng)中如何保證消息順序消費(fèi),當(dāng)存在多個(gè)監(jiān)聽(tīng)時(shí)如何避免消息重復(fù)消費(fèi),如何讓每個(gè)監(jiān)聽(tīng)都能消費(fèi)到,如何回溯消息,如何定時(shí)消費(fèi)消息,延時(shí)消費(fèi)消息等一系列的問(wèn)題,可以參考官方文檔:http://rocketmq.apache.org/docs/quick-start/