RocketMq-Windows單機(jī)安裝

注:默認(rèn)已安裝jdk,maven的前提下

1.下載

http://rocketmq.apache.org/release_notes/release-notes-4.2.0/

image.png

2.啟動(dòng)

解壓后進(jìn)入bin目錄:D:\rocketmq-4.2\bin
2.1 執(zhí)行命令:start mqnamesrv.cmd 啟動(dòng)nameserver,成功后不要關(guān)閉窗口
啟動(dòng)成功可以看到success:


nameserver啟動(dòng)成功窗口

2.2 執(zhí)行命令:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 啟動(dòng)broker 執(zhí)行成功后不要關(guān)閉窗口
啟動(dòng)成功是一個(gè)空的命令行窗口:


broker啟動(dòng)成功窗口

2.3 執(zhí)行完已經(jīng)啟動(dòng)成功,會(huì)有三個(gè)窗口,完整的執(zhí)行命令的窗口圖:
執(zhí)行命令的窗口圖

3.安裝rocketmq-console插件(可選,不是必須)

下載地址:https://github.com/apache/rocketmq-externals
3.1 如果已安裝git,使用git命令clone https://github.com/apache/rocketmq-externals即可下載
3.2 如果未安裝git,點(diǎn)擊下載:

下載rocktemq-console

3.3 下載完成后進(jìn)入解壓目錄的rocketmq-console項(xiàng)目找到application.properties配置文件,此處我的目錄是:D:\rocketmq-externals-master\rocketmq-console\src\main\resources
application.properties配置文件目錄

3.4 打開(kāi)目錄配置rocketmq-console的啟動(dòng)端口(它本質(zhì)是一個(gè)spring-boot項(xiàng)目,這里的端口相當(dāng)于啟動(dòng)項(xiàng)目,也就是tomcat端口,不要與其它項(xiàng)目端口沖突),以及剛才啟動(dòng)的rocketmq的ip及端口號(hào):
配置rocketmq-console

3.5 配置完成后,打開(kāi)cmd進(jìn)入rocketmq-console項(xiàng)目根目錄,我的目錄是D:\rocketmq-externals-master\rocketmq-console
執(zhí)行maven命令跳過(guò)測(cè)試打包:mvn clean package -Dmaven.test.skip=true 第一次需要下載依賴(lài)會(huì)有些慢
build項(xiàng)目成功的命令行窗口

3.6 打包成功后,target目錄下會(huì)生成rocketmq-console-ng-1.0.0.jar,我的目錄為D:\rocketmq-externals-master\rocketmq-console\target
maven打包的jar

3.7 使用java命令java -jar rocketmq-console-ng-1.0.0.jar運(yùn)行這個(gè)jar包
啟動(dòng)成功,配置的端口號(hào)為8001

3.8 訪問(wèn)rocketmq監(jiān)控頁(yè)面
http://localhost:8001 以實(shí)際地址為準(zhǔn)
監(jiān)控頁(yè)面

4 測(cè)試rocketmq

4.1 啟動(dòng)消費(fèi)者監(jiān)聽(tīng),然后模擬生產(chǎn)者發(fā)送一條數(shù)據(jù)


測(cè)試生產(chǎn)消費(fèi)

4.2 測(cè)試結(jié)果,第一條打印信息是生產(chǎn)者打印的,可以看到SEND_OK代表生產(chǎn)成功,第二條打印信息是消費(fèi)者打印的,已經(jīng)成功消費(fèi)到數(shù)據(jù),如果數(shù)據(jù)中含有中文,消費(fèi)時(shí)注意要用UTF-8,否則亂碼


測(cè)試結(jié)果

4.3 安裝的監(jiān)控頁(yè)面也可以看到剛才生產(chǎn)的消息,點(diǎn)擊MESSAGE DETAIL按鈕可以查看消息內(nèi)容:
生產(chǎn)成功的消息

消息詳細(xì)

6 測(cè)試代碼

6.1 依賴(lài):

<!-- RocketMQ -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-all</artifactId>
    <version>3.5.8</version>
    <type>pom</type>
</dependency>
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.5.8</version>
</dependency>

6.2 完整測(cè)試代碼:

package com.haocang.clean.util;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.alibaba.rocketmq.shade.com.alibaba.fastjson.JSON;
import com.haocang.itc.pojo.MqBean;

/**
 * RocketMq alibaba版本操作工具類(lèi)
 * 生產(chǎn)/消費(fèi)
 * @author shenke
 */
public class RocketMqUtil {
    
    private RocketMqUtil(){
        
    }
    
    // 默認(rèn)生產(chǎn)者組名稱(chēng)
    private static final String DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME = "test_producer";
    // 默認(rèn)連接地址
    private static final String DEFAULT_ROCKETMQ_ADDRESS = "localhost:9876";
    // 默認(rèn)生產(chǎn)者應(yīng)用名稱(chēng)
    private static final String DEFAULT_ROCKETMQ_INSTANCENAME = "test";
    // 默認(rèn)生產(chǎn)者最大消息長(zhǎng)度
    private static final Integer DEFAULT_ROCKETMQ_MAXMESSAGESIZE = Integer.MAX_VALUE;
    // 默認(rèn)編碼
    public static final String DEFAULT_ROCKETMQ_ENCODING = "UTF-8";
    // 默認(rèn)消費(fèi)者組名稱(chēng)
    private static final String DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME = "test_consumer";
    // 默認(rèn)監(jiān)聽(tīng)主題
    private static final String DEFAULT_ROCKETMQ_TOPIC = "test";
    // 默認(rèn)監(jiān)聽(tīng)過(guò)濾條件
    private static final String DEFAULT_ROCKETMQ_SUB_EXPRESSION = "test1";
    
    /**
     * 初始化生產(chǎn)者服務(wù)
     * 默認(rèn)配置
     * @return
     */
    private static synchronized DefaultMQProducer initProducer() {
        return initProducer(
                DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME,
                DEFAULT_ROCKETMQ_ADDRESS,
                DEFAULT_ROCKETMQ_INSTANCENAME,
                DEFAULT_ROCKETMQ_MAXMESSAGESIZE
        );
    }
 
    /**
     * 初始化生產(chǎn)者服務(wù)
     * 可選配置
     * @param rocketmqAddress
     * @return
     */
    private static synchronized DefaultMQProducer initProducer(
        String groupName, String rocketmqAddress,String instanceName, int maxMessageSize) {
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(rocketmqAddress);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
    
    /**
     * 關(guān)閉生產(chǎn)者服務(wù)
     */
    public static synchronized void closeProducer(DefaultMQProducer producer){
        if (producer != null) {
            producer.shutdown();
        }
    }
    
    /**
     * 初始化消費(fèi)者服務(wù)
     * 使用默認(rèn)配置
     * @return
     */
    private static DefaultMQPushConsumer initConsumer(){
        return initConsumer(DEFAULT_ROCKETMQ_ADDRESS, DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME, DEFAULT_ROCKETMQ_TOPIC, DEFAULT_ROCKETMQ_SUB_EXPRESSION);
    }
    
    /**
     * 初始化消費(fèi)者服務(wù)
     * 使用自定義配置
     * @param topic
     * @param subExpression
     * @return
     */
    private static DefaultMQPushConsumer initConsumer(String namesrvAddr, String consumerGroup, String topic, String subExpression) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe(topic, subExpression);
        } catch (MQClientException e1) {
            e1.printStackTrace();
        }
        return consumer;
    }
    
    /**
     * 關(guān)閉消費(fèi)者服務(wù)
     */
    public static void closeConsumer(DefaultMQPushConsumer consumer){
        if (consumer != null) {
            consumer.shutdown();
        }
    }
    
    /**
     * 生產(chǎn)消息
     * 使用默認(rèn)配置
     * @param message
     * @param close 生產(chǎn)完畢是否關(guān)閉,若不關(guān)閉則會(huì)阻塞
     * @return
     */
    public static SendResult send(Message message, boolean close){
        SendResult sendResult = null;
        DefaultMQProducer producer = initProducer();
        try {
            sendResult = producer.send(message);
            if(close){
                closeProducer(producer);
            }
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
        return sendResult;
    }
    
    /**
     * 生產(chǎn)消息
     * 使用自定義配置
     * @param message
     * @param groupName
     * @param rocketmqAddress
     * @param instanceName
     * @param maxMessageSize
     * @param close 生產(chǎn)完畢是否關(guān)閉,若不關(guān)閉則會(huì)阻塞
     * @return
     */
    public static SendResult send(Message message, String groupName,String rocketmqAddress,String instanceName, int maxMessageSize, boolean close){
        SendResult sendResult = null;
        try {
            DefaultMQProducer producer = initProducer(groupName, rocketmqAddress, instanceName, maxMessageSize);
            sendResult = producer.send(message);
            if(close){
                closeProducer(producer);
            }
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
        return sendResult;
    }
    
    /**
     * 消費(fèi)消息
     * 使用默認(rèn)配置
     * @param messageListenerConcurrently
     * @throws MQClientException 
     */
    public static void consumer(MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
        DefaultMQPushConsumer consumer = initConsumer();
        consumer.registerMessageListener(messageListenerConcurrently);
        consumer.start();
    }
    
    /**
     * 消費(fèi)消息
     * 使用自定義配置
     * @param topic
     * @param subExpression
     * @param messageListenerConcurrently
     * @throws MQClientException 
     */
    public static void consumer(String namesrvAddr, String consumerGroup, String topic, String subExpression, MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
        DefaultMQPushConsumer consumer = initConsumer(namesrvAddr, consumerGroup, topic, subExpression);
        consumer.registerMessageListener(messageListenerConcurrently);
        consumer.start();
    }
    
    public static void main(String[] args) {
        try {
            consumer(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt messageExt : msgs) {
                            System.out.println(new String(messageExt.getBody(), "UTF-8"));
                        }
                    } catch (Exception e) {
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        } catch (Exception e) {
        }
        
        String data = "[{\"id\":\"001\",\"name\":\"張三\",\"age\":\"21\"},{\"id\":\"002\",\"name\":\"李四\",\"age\":\"22\"}]";
        SendResult send = send(new Message("test", "test1", data.getBytes()), true);
        System.out.println(send.toString());
    }

}

7 總結(jié)

本文只是demo,詳細(xì)闡述如果在windows上搭建rocketmq服務(wù)及測(cè)試,實(shí)際生產(chǎn)環(huán)境中需要更精準(zhǔn)的配置,以及具體的業(yè)務(wù)問(wèn)題,比如訂單系統(tǒng)中如何保證消息順序消費(fèi),當(dāng)存在多個(gè)監(jiān)聽(tīng)時(shí)如何避免消息重復(fù)消費(fèi),如何讓每個(gè)監(jiān)聽(tīng)都能消費(fèi)到,如何回溯消息,如何定時(shí)消費(fèi)消息,延時(shí)消費(fèi)消息等一系列的問(wèn)題,可以參考官方文檔:http://rocketmq.apache.org/docs/quick-start/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容