01_Spring集成RabbitMQ之聲明式注解

@Author Jacky Wang
日常積累,轉(zhuǎn)載請注明出處,http://www.itdecent.cn/p/b081f1fd1480
最近新開發(fā)的一個(gè)項(xiàng)目是由傳統(tǒng)的SSM架構(gòu)進(jìn)行開發(fā)的,之前介紹了SpringBoot集成RabbitMQ的方式,這次特地對Spring集成RabbitMQ做一次記錄及介紹。
如需較詳細(xì)了解RabbitMQ的相關(guān)知識,可參考我的另一篇文章:03_SpringBoot集成RabbitMQ

由Spring集成RabbitMQ一般采用Xml配置或注解式兩種方式來進(jìn)行集成。由于個(gè)人不喜歡過多的xml文件,因此這里僅對注解方式進(jìn)行記錄。

1. 聲明式注解集成RabbitMQ

1.1 步驟

  1. 引入pom依賴
  2. 創(chuàng)建配置文件,包含mq的基礎(chǔ)配置
  3. 創(chuàng)建RabbitMQ監(jiān)聽器
  4. 聲明RabbitMQ配置類
  5. 創(chuàng)建消息生產(chǎn)者
  6. 測試

1.2 集成

1.2.1 引入Pom依賴
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.5.RELEASE</version>
</dependency>
<!-- 解決沖突 -->
<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.13</version>
</dependency>
1.2.2 創(chuàng)建配置文件 rabbitmq.properties
#rabbitmq.host=127.0.0.1
#rabbitmq服務(wù)器
rabbitmq.host=192.168.3.171
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual.host=/

#交換機(jī)
exchange.mes.com.add=mes.fanout.com.add
exchange.mes.com.del=mes.fanout.com.delete
exchange.mes.com.update=mes.fanout.com.update
exchange.mes.user.add=mes.fanout.user.add
exchange.mes.user.del=mes.fanout.user.delete
exchange.mes.user.update=mes.fanout.user.update

#隊(duì)列
queue.mes.com.add=mes_company_add
queue.mes.com.del=mes_company_delete
queue.mes.com.update=mes_company_update
queue.mes.user.add=mes_user_add
queue.mes.user.del=mes_user_delete
queue.mes.user.update=mes_user_update
1.2.3 創(chuàng)建RabbitMQ監(jiān)聽器
/**
 * 消費(fèi)監(jiān)聽類
 */
@Component
@Transactional
public class QueueListener {

    public static final Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Autowired
    private AppInterfaceService appInterfaceService;
    @Autowired
    private RabbitMQConfig rabbitMQConfig;
    @Autowired
    private OfficeMapper officeMapper;
    @Autowired
    private SystemService systemService;
    @Autowired
    private UserMapper userMapper;

    /**
    * 如果監(jiān)聽隊(duì)列指定的方法不存在則執(zhí)行默認(rèn)方法
    */
    public void onMessage(byte[] msg) {
        try {
            logger.info("onMessage : [{}]", new String(msg, "UTF-8"));
        } catch (Exception e) {
            logger.error("Error : [{}]", e);
        }
    }

    /**
     * 公司信息同步
     *
     * @param message
     */
    public void addCompany(byte[] message) {
        logger.info("RabbitMQ Method addCompany Get Msg : [{}]", new String(message));
    }

    public void updateCompany(byte[] message) {
        logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
    }

    /**
     * 用戶信息同步
     *
     * @param message
     */
    public void addUser(byte[] message) {
        logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
    }

    public void delUser(byte[] message) {
        logger.info("RabbitMQ Method delUser Get Msg : [{}]", new String(message));
    }

    public void updateUser(byte[] message) {
        logger.info("RabbitMQ Method updateUser Get Msg : [{}]", new String(message));
    }
}
1.2.4 創(chuàng)建RabbitMQ聲明配置類
  • 此次使用消費(fèi)者容器進(jìn)行消息消費(fèi),可支持單類多方法消費(fèi)不同隊(duì)列
  • 若無須使用消費(fèi)者容器,可取消下面 聲明消費(fèi)者監(jiān)聽執(zhí)行類的注釋即可
  • 示例總共創(chuàng)建了六個(gè)交換機(jī)與六個(gè)隊(duì)列,具體根據(jù)實(shí)際情況創(chuàng)建即可
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

import java.util.HashMap;
import java.util.Map;

@Configuration
@PropertySource(value = "classpath:/properties/rabbitmq.properties")
public class RabbitMQConfig {
    @Autowired
    private QueueListener queueListener;

    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.port}")
    private int port;
    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;
    @Value("${rabbitmq.virtual.host}")
    private String vhost;

    @Value("${exchange.mes.com.add}")
    private String companyAddExchangeName;
    @Value("${exchange.mes.com.del}")
    private String companyDelExchangeName;
    @Value("${exchange.mes.com.update}")
    private String companyUpdateExchangeName;
    @Value("${exchange.mes.user.add}")
    private String userAddExchangeName;
    @Value("${exchange.mes.user.del}")
    private String userDelExchangeName;
    @Value("${exchange.mes.user.update}")
    private String userUpdateExchangeName;

    @Value("${queue.mes.com.add}")
    private String companyAddQueueName;
    @Value("${queue.mes.com.del}")
    private String companyDelQueueName;
    @Value("${queue.mes.com.update}")
    private String companyUpdateQueueName;
    @Value("${queue.mes.user.add}")
    private String userAddQueueName;
    @Value("${queue.mes.user.del}")
    private String userDelQueueName;
    @Value("${queue.mes.user.update}")
    private String userUpdateQueueName;

    /**
     * rabbitmq連接配置
     *
     * @return
     */
    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin admin = new RabbitAdmin(rabbitConnectionFactory());
        admin.setIgnoreDeclarationExceptions(true); //即使有關(guān)rabbitmq的bean初始化失敗整個(gè)web應(yīng)用還能正常啟動
        return admin;
    }

    /**
     * 聲明交換機(jī)Exchange
     *
     * @return
     */
    @Bean
    public FanoutExchange companyAddExchange() {
        return new FanoutExchange(companyAddExchangeName, true, false);
    }

    @Bean
    public FanoutExchange companyDelExchange() {
        return new FanoutExchange(companyDelExchangeName, true, false);
    }

    @Bean
    public FanoutExchange companyUpdateExchange() {
        return new FanoutExchange(companyUpdateExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userAddExchange() {
        return new FanoutExchange(userAddExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userDelExchange() {
        return new FanoutExchange(userDelExchangeName, true, false);
    }

    @Bean
    public FanoutExchange userUpdateExchange() {
        return new FanoutExchange(userUpdateExchangeName, true, false);
    }

    /**
     * 聲明隊(duì)列Queue
     *
     * @return
     */
    @Bean
    public Queue companyAddQueue() {
        return new Queue(companyAddQueueName, true, false, false);
    }

    @Bean
    public Queue companyDelQueue() {
        return new Queue(companyDelQueueName, true, false, false);
    }

    @Bean
    public Queue companyUpdateQueue() {
        return new Queue(companyUpdateQueueName, true, false, false);
    }

    @Bean
    public Queue userAddQueue() {
        return new Queue(userAddQueueName, true, false, false);
    }

    @Bean
    public Queue userDelQueue() {
        return new Queue(userDelQueueName, true, false, false);
    }

    @Bean
    public Queue userUpdateQueue() {
        return new Queue(userUpdateQueueName, true, false, false);
    }

    /**
     * 將隊(duì)列綁定到指定的交換機(jī)
     *
     * @param companyAddQueue
     * @param companyAddExchange
     * @return
     */
    @Bean
    public Binding companyAddBinding(Queue companyAddQueue, FanoutExchange companyAddExchange) {
        return BindingBuilder.bind(companyAddQueue).to(companyAddExchange);
    }

    @Bean
    public Binding companyDelBinding(Queue companyDelQueue, FanoutExchange companyDelExchange) {
        return BindingBuilder.bind(companyDelQueue).to(companyDelExchange);
    }

    @Bean
    public Binding companyUpdateBinding(Queue companyUpdateQueue, FanoutExchange companyUpdateExchange) {
        return BindingBuilder.bind(companyUpdateQueue).to(companyUpdateExchange);
    }

    @Bean
    public Binding userAddBinding(Queue userAddQueue, FanoutExchange userAddExchange) {
        return BindingBuilder.bind(userAddQueue).to(userAddExchange);
    }

    @Bean
    public Binding userDelBinding(Queue userDelQueue, FanoutExchange userDelExchange) {
        return BindingBuilder.bind(userDelQueue).to(userDelExchange);
    }

    @Bean
    public Binding userUpdateBinding(Queue userUpdateQueue, FanoutExchange userUpdateExchange) {
        return BindingBuilder.bind(userUpdateQueue).to(userUpdateExchange);
    }

    /**
     * 聲明消費(fèi)者監(jiān)聽執(zhí)行類
     * @param receiver
     * @return
     */
    /*@Bean
    public MessageListenerAdapter listenerAdapter(QueueListener receiver) {
        MessageListenerAdapter m = new MessageListenerAdapter(receiver, "process");
        m.setMessageConverter(jsonMessageConverter());
        return m;
    }*/

    /**
     * 消費(fèi)者容器
     * 為不同隊(duì)列指定不同的執(zhí)行方法
     * @param rabbitConnectionFactory
     * @return
     */
    @Bean
    SimpleMessageListenerContainer listenerContainer(ConnectionFactory rabbitConnectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory);
        //container.setMessageConverter(jsonMessageConverter());
        //container.setConcurrentConsumers(1);
        //container.setMaxConcurrentConsumers(5);
        //container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(q -> projectKey + "_" + q);
        //container.setQueues(companyAddQueue(), companyDelQueue(), companyUpdateQueue(), userAddQueue(), userDelQueue(), userUpdateQueue());
        container.setQueueNames(companyAddQueueName, companyDelQueueName, companyUpdateQueueName, userAddQueueName, userDelQueueName, userUpdateQueueName);

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(queueListener);
        listenerAdapter.setDefaultListenerMethod("onMessage");
        listenerAdapter.setMessageConverter(jsonMessageConverter());
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        queueOrTagToMethodName.put(companyAddQueueName, "addCompany");
        queueOrTagToMethodName.put(companyDelQueueName, "delCompany");
        queueOrTagToMethodName.put(companyUpdateQueueName, "updateCompany");
        queueOrTagToMethodName.put(userAddQueueName, "addUser");
        queueOrTagToMethodName.put(userDelQueueName, "delUser");
        queueOrTagToMethodName.put(userUpdateQueueName, "updateUser");
        listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}
1.2.5 創(chuàng)建消息生產(chǎn)者
@Service
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final static Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);

    public void sendDataToQueue(String exchange, String routingKey, Object object) {
        try {
            rabbitTemplate.setMessagePropertiesConverter(new MessagePropertiesConverter() {
                @Override
                public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
                    MessageProperties messageProperties = new MessageProperties();
                    messageProperties.setContentType("application/json");
                    messageProperties.setContentEncoding("UTF-8");
                    return messageProperties;
                }

                @Override
                public AMQP.BasicProperties fromMessageProperties(MessageProperties source, String charset) {
                    return null;
                }
            });

            rabbitTemplate.convertAndSend(exchange, routingKey, object);
        } catch (Exception e) {
            logger.error("發(fā)送mq消息異常,Cause:[]", e);
        }

    }
}
1.2.6 測試
自定義Test或自定義Controller測試調(diào)用生產(chǎn)者發(fā)送消息,查看消費(fèi)者是否消費(fèi)即可。

eg:

@Controller
@RequestMapping("${adminPath}/rabbitmq")
public class RabbitMQController {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @RequestMapping("/sendToCompanyAdd")
    @ResponseBody
    public String sendToCompanyAdd(String id) {
        HashMap<String, String> map = new HashMap<>();
        map.put("id", id);
        rabbitMQSender.sendDataToQueue("mes.fanout.com.add", null, map);
        return "SUCCESS";
    }
}

3 注意事項(xiàng)

  • 若配置失敗,請檢查Spring的注解掃描是否能掃描到配置類。
  • 檢查RabbitMQ基礎(chǔ)配置信息是否有錯誤

2. Xml方式集成配置文件參考

spring-rabbitmq.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <description>rabbitmq 連接服務(wù)配置</description>

    <!-- 加載配置屬性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:/properties/rabbitmq.properties"/>

    <!-- 連接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}"
                               password="${rabbitmq.password}" port="${rabbitmq.port}"
                               virtual-host="${rabbitmq.vhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring template聲明-->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     message-converter="jsonMessageConverter"/>

    <!-- 消息對象json轉(zhuǎn)換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!-- 申明一個(gè)消息隊(duì)列Queue -->
    <rabbit:queue id="testQueueId" name="${rabbitmq.queue}" durable="false" auto-delete="false" exclusive="false"/>

    <!-- 定義交換機(jī) -->
    <rabbit:direct-exchange id="testExchangeId" name="${rabbitmq.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="testQueueId" key="${rabbitmq.routingKey}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- MQ監(jiān)聽配置 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="testQueueId" ref="queueListener" method="process"/>
    </rabbit:listener-container>
</beans>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容