RabbitMQ:Spring整合RabbitMQ

13F2925356653218AFBD71C70F752C7C.jpg

在前面的幾篇博客里面已經(jīng)把RabbitMQ的一些理論詳細(xì)了說明了,在這一篇中將記錄下Spring整合RabbitMQ,本文只是簡(jiǎn)單一個(gè)整合介紹,屬于拋磚引玉,具體實(shí)現(xiàn)還需大家深入研究哈..

代碼我會(huì)上傳到我的碼云上,如需下載請(qǐng)?jiān)谖恼碌哪┪矊ふ蚁螺d地址

1、POM引入

<!-- RabbitMQ -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

2、RabbitMQ配置信息

添加rabbitmq.properties配置文件

rabbit.hosts=127.0.0.1
rabbit.username=hrabbit
rabbit.password=123
rabbit.port=5672
rabbit.virtualHost=/hrabbit
# 統(tǒng)一XML配置中易變部分的命名
rabbit.queue=rabbitmq_test

3、添加FastJson轉(zhuǎn)化類

spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn)

package www.hrabbit.cn.configer;


import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.*;

import java.io.IOException;
import java.io.UnsupportedEncodingException;


/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午6:35
 * @Description:
 */


public class FastJsonMessageConverter  extends AbstractJsonMessageConverter {

    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    private static ClassMapper classMapper = new DefaultClassMapper();

    public FastJsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        } catch (IOException e) {
            throw new MessageConversionException("Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(), encoding, targetClass);
                } catch (IOException e) {
                    throw new MessageConversionException("Failed to convert Message content", e);
                }
            } else {
                log.warn("Could not convert incoming message with content-type [" + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz)
            throws UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return JSON.parseObject(contentAsString, clazz);
    }
}

4、添加amqp-application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

    <description>rabbitmq 連接服務(wù)配置</description>

    <!-- 連接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}"  virtual-host="${rabbit.virtualHost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring amqp默認(rèn)的是jackson 的一個(gè)插件,目的將生產(chǎn)者生產(chǎn)的數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列,由于fastjson的速度快于jackson,這里替換為fastjson的一個(gè)實(shí)現(xiàn) -->
    <bean id="jsonMessageConverter"  class="www.hrabbit.cn.util.FastJsonMessageConverter"></bean>

    <!-- spring template聲明-->
    <rabbit:template exchange="koms" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

    <!--
        durable:是否持久化

        exclusive: 僅創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除

        auto_delete: 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除隊(duì)列
     -->

    <!--  申明一個(gè)消息隊(duì)列Queue   -->
    <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
    <!--
     rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個(gè)特定的路由鍵完全匹配,才會(huì)轉(zhuǎn)發(fā)。

    rabbit:binding:設(shè)置消息queue匹配的key
     -->
    <!-- 交換機(jī)定義 -->
    <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
        <rabbit:bindings>
            <rabbit:binding queue="order" key="order"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--
         queues:監(jiān)聽的隊(duì)列,多個(gè)的話用逗號(hào)(,)分隔

        ref:監(jiān)聽器
     -->
    <bean class="www.hrabbit.cn.rabbitMq.listener.MessageListener" id="messageListener"></bean>
    <!-- 配置監(jiān)聽  acknowledeg = "manual"   設(shè)置手動(dòng)應(yīng)答  當(dāng)消息處理失敗時(shí):會(huì)一直重發(fā)  直到消息處理成功 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <!-- 配置監(jiān)聽器 -->
        <rabbit:listener queues="order" ref="messageListener"/>
    </rabbit:listener-container>
</beans>

在這個(gè)項(xiàng)目中我的生產(chǎn)者和消費(fèi)者是放到同一個(gè)項(xiàng)目中的。項(xiàng)目中的監(jiān)聽器,即為消費(fèi)者。

5、生產(chǎn)者

注入AmqpTemplate模板,調(diào)用convertAndSend ();方法添加消息;

package www.hrabbit.cn.rabbitMq.service.impl;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import www.hrabbit.cn.rabbitMq.service.SpittleService;

import javax.annotation.Resource;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午4:26
 * @Description:
 */
@Service("spittleService")
public class SpittleServiceImpl implements SpittleService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 生產(chǎn)消息
     * @return
     */
    public Map<String,Object> spittleMsg(){

        Map<String,Object> dataList = new LinkedHashMap<>();

        for (int i=0;i<10;i++){
            dataList.put("order","msgResult:"+i);
            amqpTemplate.convertAndSend("order","msgResult:"+i);
        }
        return dataList;
    }

}

6、添加監(jiān)聽器(即消費(fèi)者)

package www.hrabbit.cn.rabbitMq.listener;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午4:47
 * @Description:
 */
@Component
public class MessageListener implements ChannelAwareMessageListener {

    private Logger logger= LoggerFactory.getLogger(MessageListener.class);

    @Transactional
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //業(yè)務(wù)處理,放到action層,并返回處理成功還是異常的flag
        //boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0);
        //還有一個(gè)點(diǎn)就是如何獲取mq消息的報(bào)文部分message?
        String result=new String(message.getBody(),"UTF-8");
        System.out.println("消息:"+result);
        if(true){
            basicACK(message,channel);//處理正常--ack
        }else{
            basicNACK(message,channel);//處理異常--nack
        }
    }


    //正常消費(fèi)掉后通知mq服務(wù)器移除此條mq
    private void basicACK(Message message,Channel channel){
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch(IOException e){
            logger.error("通知服務(wù)器移除mq時(shí)異常,異常信息:"+e);
        }
    }
    //處理異常,mq重回隊(duì)列
    private void basicNACK(Message message,Channel channel) {
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        } catch (IOException e) {
            logger.error("mq重新進(jìn)入服務(wù)器時(shí)出現(xiàn)異常,異常信息:" + e);
        }
    }
}

7、啟動(dòng)項(xiàng)目,測(cè)試

訪問地址:http://localhost:8080/amqp/spittleMsg生產(chǎn)了10條消息,此時(shí)查看控制臺(tái)10條消息都被消費(fèi)了!

image.png

項(xiàng)目地址:https://gitee.com/hrabbit/spring-rabbitMQ

系列文章:

RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,527評(píng)論 19 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,200評(píng)論 2 11
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,261評(píng)論 6 342
  • 要加“m”說明是MB,否則就是KB了. -Xms:初始值 -Xmx:最大值 -Xmn:最小值 java -Xms1...
    阿B和阿C閱讀 7,519評(píng)論 0 7
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來...
    Chandler_玨瑜閱讀 6,764評(píng)論 2 39

友情鏈接更多精彩內(nèi)容