SpringBoot集成RocketMQ

1. 導入依賴

compile 'org.apache.rocketmq:rocketmq-client:4.5.2'

image.png

2. 編寫application.yml配置

image.png

3. 引入配置信息

為了方便,在這里消費者和生產(chǎn)者都放在一個項目里

引入生產(chǎn)者配置信息

package utry.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author
 * 消息生產(chǎn)者配置信息
 */
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class ProducerPropertiesConfig {

    @Value("${namesrvAddr}")
    private String namesrvAddr;

    private String groupName;

    private Integer maxMessageSize;

    private Integer sendMsgTimeout;

    private Integer retryTimesWhenSendFailed;

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public Integer getMaxMessageSize() {
        return maxMessageSize;
    }

    public void setMaxMessageSize(Integer maxMessageSize) {
        this.maxMessageSize = maxMessageSize;
    }

    public Integer getSendMsgTimeout() {
        return sendMsgTimeout;
    }

    public void setSendMsgTimeout(Integer sendMsgTimeout) {
        this.sendMsgTimeout = sendMsgTimeout;
    }

    public Integer getRetryTimesWhenSendFailed() {
        return retryTimesWhenSendFailed;
    }

    public void setRetryTimesWhenSendFailed(Integer retryTimesWhenSendFailed) {
        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
    }

    @Override
    public String toString() {
        return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]";
    }
}

編寫生產(chǎn)者



import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author
 * 消息生產(chǎn)者
 */
@Configuration
public class ProducerConfigure {

    Logger logger = LoggerFactory.getLogger(ProducerConfigure.class);

    @Autowired
    private ProducerPropertiesConfig producerPropertiesConfig;

    @Bean
    public DefaultMQProducer defaultProducer() throws MQClientException {
        logger.info(producerPropertiesConfig.toString());
        logger.info("defaultProducer 正在創(chuàng)建---------------------------------------");
        DefaultMQProducer producer = new DefaultMQProducer(producerPropertiesConfig.getGroupName());
        producer.setNamesrvAddr(producerPropertiesConfig.getNamesrvAddr());
        producer.setVipChannelEnabled(false);
        //其他屬性自行設置,這里才用默認
        producer.start();
        logger.info("rocketmq producer server開啟成功---------------------------------.");
        return producer;
    }
}

引入消費者配置信息

package utry.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author
 * 消費者屬性配置類
 */
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
public class ConsumerPropertiesConfig {
    private String groupName;

    @Value("${namesrvAddr}")
    private String namesrvAddr;

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    @Override
    public String toString() {
        return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]";
    }

}

編寫消費者
先編寫一個抽象類,再寫具體的實現(xiàn)

  1. 編寫抽象類
package utry.config;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author
 * 抽象的消息消費者
 */
@Service
public abstract class DefaultConsumerConfigure {
    Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class);

    @Autowired
    private ConsumerPropertiesConfig consumerConfig;

    /**
     * 開啟消費者監(jiān)聽服務
     * @param topic
     * @param tag
     * @throws MQClientException
     */
    public void listener(String topic, String tag) throws MQClientException {
        log.info("開啟" + topic + ":" + tag + "消費者-------------------");
        log.info(consumerConfig.toString());

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());

        consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());

        consumer.subscribe(topic, tag);

        // 開啟內(nèi)部類實現(xiàn)監(jiān)聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                return DefaultConsumerConfigure.this.dealBody(messageExtList);
            }
        });
        consumer.start();

        log.info("rocketmq啟動成功---------------------------------------");

    }

    /**
     * 處理body的業(yè)務
     * @param messageExtList
     * @return
     */
    public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList);

}

  1. 編寫消費者
package utry.config;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;

import java.util.List;

/**
 * @author
 * 消息消費者
 */
@Configuration
public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> {

    Logger log = LoggerFactory.getLogger(CustomConsumer.class);

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            super.listener("t_TopicTest", "Tag1");
        } catch (MQClientException e) {
            log.error("消費者監(jiān)聽器啟動失敗", e);
        }

    }

    @Override
    public ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList) {
        log.info("接收到消息");
        for (MessageExt msg : messageExtList) {
            try {
                String msgStr = new String(msg.getBody(), "utf-8");
                log.info(msgStr);
            } catch (Exception e) {
                log.error("body轉(zhuǎn)字符串解析失敗");
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

編寫Controller測試

package utry.controller;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import utry.config.CustomConsumer;

@RestController
public class ProducerController {

    Logger log = LoggerFactory.getLogger(CustomConsumer.class);

    @Autowired
    private DefaultMQProducer producer;


    @GetMapping("/msg/product")
    public void test(String info) throws Exception {
        Message message = new Message("t_TopicTest", "Tag1", "12345", info.getBytes());
        // 這里用到了這個mq的異步處理,類似ajax,可以得到發(fā)送到mq的情況,并做相應的處理
        // 不過要注意的是這個是異步的
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("傳輸成功");
                log.info(JSON.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                log.error("傳輸失敗", e);
            }
        });
    }

}

github:

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容