RocketMq訂閱與發(fā)送消息

使用阿里云的rocketMq訂閱與發(fā)送消息,直接上代碼:

1.引入pom依賴

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.8.Final</version>
</dependency>

2.新建RocketMqUtil類

package com.**.***.***.utils;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.***.***.***.listener.MqMessageListener;
import com.***.***.***.listener.MqTimeMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Slf4j
@Configuration
public class RocketMqUtil {

    @Value("${rocketMq.groupId}")
    private String groupId;

    @Value("${rocketMq.accessKey}")
    private String accessKey;

    @Value("${rocketMq.secretKey}")
    private String secretKey;

    @Value("${rocketMq.nameSrvAddr}")
    private String nameSrvAddr;

    @Value("${rocketMq.topic}")
    private String topic;

    @Value("${rocketMq.startApprovalTag}")
    private String startApprovalTag;

    @Value("${rocketMq.timeout}")
    private String timeout;

    @Autowired
    private MqMessageListener messageListener;

    @Autowired
    private MqTimeMessageListener timeMessageListener;

    /**
     * 創(chuàng)建消息生產(chǎn)者
     * @return
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer producer() {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.SendMsgTimeoutMillis, timeout);

        ProducerBean producer = new ProducerBean();
        producer.setProperties(properties);

        log.info("rocketMq創(chuàng)建生產(chǎn)者成功");
        return producer;
    }


    /**
     * 創(chuàng)建消息訂閱
     * @return
     */
    @Bean(initMethod = "start")
    public ConsumerBean consumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);

        consumerBean.setProperties(properties);

        // 訂閱消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        // 訂閱普通消息
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        subscription.setExpression("start_approve||process_approve");
        subscriptionTable.put(subscription, messageListener);
        // 訂閱定時/延時消息
        Subscription subscriptionTime = new Subscription();
        subscriptionTime.setTopic(topic);
        subscriptionTime.setExpression("start_approve||process_approve");
        subscriptionTable.put(subscriptionTime, timeMessageListener);

        consumerBean.setSubscriptionTable(subscriptionTable);
        log.info("rocketMq訂閱成功");
        return consumerBean;
    }
}

3.普通消息監(jiān)聽類MqMessageListener

package com.***.***.***.listener;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


@Component
public class MqMessageListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ普通消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
        try {
            // 處理業(yè)務(wù)
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error("消費MQ消息失??! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
            return Action.ReconsumeLater;
        }
    }

}

4.異步/定時/延時消息監(jiān)聽類

package com.***.***.***.listener;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class MqTimeMessageListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${rocketMq.startedApprovalTag}")
    private String startedApprovalTag;

    @Value("${rocketMq.processedApprove}")
    private String processedApprove;


    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ定時/延時消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));


        try {
            // 處理業(yè)務(wù)

        } catch (Exception e) {

            logger.error("消費MQ消息失??! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
        } 
        return Action.CommitMessage;
    }




}

5.消息發(fā)送RocketMqProducer

package com.***.***.***.utils;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import java.util.Properties;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RocketMqProducer {

    @Value("${rocketMq.groupId}")
    private String groupId;

    @Value("${rocketMq.sendMegGroup}")
    private String sendMegGroup;

    @Value("${rocketMq.accessKey}")
    private String accessKey;

    @Value("${rocketMq.secretKey}")
    private String secretKey;

    @Value("${rocketMq.nameSrvAddr}")
    private String nameSrvAddr;

    @Value("${rocketMq.topic}")
    private String topic;

    @Value("${rocketMq.startApprovalTag}")
    private String startApprovalTag;

    @Value("${rocketMq.timeout}")
    private String timeout;

    @Resource
    private ProducerBean producer;


    /**
     * 發(fā)送異步消息
     * @param tag
     * @param msgKey
     * @param messageBody
     */
    public void sendAsyncMsg(String tag, String msgKey, byte[] messageBody) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, sendMegGroup);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeout);

        Message msg = new Message(topic, tag, msgKey, messageBody);
        try {
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    log.info("rocketMq發(fā)送成功,msg:{}", JSON.toJSONString(msg));
                }

                @Override
                public void onException(final OnExceptionContext context) {
                    log.info("rocketMq發(fā)送失敗:tag:{},topic:{},body:{}", tag, context.getTopic(), new String(messageBody), context.getException());
                    // todo 持久化失敗消息,定時補償
                }
            });
        } catch (ONSClientException e) {
            log.info("rocketMq發(fā)送異常:", e);
        }
    }
}


6.調(diào)用消息發(fā)送

@Autowired
private RocketMqProducer rocketMqProducer;
rocketMqProducer.sendAsyncMsg("tag", "msgKey", "msg";
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容