spring和RabbitMQ消息隊列的整合

前言

如果能不用消息隊列可以實現業(yè)務邏輯,盡量別用。因為增加了學習成本。

我所在的公司,需要處理大量設備的信息。由于并發(fā)的問題以及信息順序的要求,近期用上了消息隊列。

下面提供下我整合的過程。

你可以學到

  • 使用消息隊列發(fā)送消息。
  • 掌握一種交換器的配置。
  • 消息的確認機制。避免消息丟失和消息的執(zhí)行順序問題。

環(huán)境說明

java版本

java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

開發(fā)工具

IntelliJ IDEA 2018.2.1 (Ultimate Edition)
Build #IU-182.3911.36, built on August 6, 2018
Licensed to lan yu
Subscription is active until May 5, 2019
JRE: 1.8.0_152-release-1248-b8 x86_64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
macOS 10.13.6

docker

docker --version
Docker version 17.12.0-ce, build c97c6d6

代碼中用到了idea插件lombok
如下圖。如果不會百度搜下吧。

image.png

安裝rabbitmq

這里采用docker安裝。

docker run -d -p 15672:15672 -p 5672:5672 --name some-rabbit rabbitmq:management

上述命令會創(chuàng)建一個容器,該容器包含了rabbitmq
端口映射了2個。其中15672 是web控制臺的端口, 5672是通信端口。
執(zhí)行之后,訪問
http://127.0.0.1:15672/
即可看到web控制臺。賬號和密碼默認是guest

image.png

具體web控制臺的管理,非本文重點。不介紹。

整合spring和rabbitmq

  • 先創(chuàng)建一個spring的項目。
    這里跳過。如果不會,請去下載本文的代碼,導入。
  • 引入依賴文件
 <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>

        <!-- spring核心包 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.3</version>
        </dependency>

注意版本。如果版本高低,可能會導致錯誤。

  • 配置rabbitmq
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">

<!--配置鏈接參數-->
    <rabbit:connection-factory
            id="connectionFactory"
            host="${rabbitmq.host}"
            port="${rabbitmq.port}"
            username=""
            password=""
            publisher-confirms="true"
    />

    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 給模板指定轉換器 --><!-- mandatory必須設置true,return callback才生效 -->
    <rabbit:template id="amqpTemplate"    connection-factory="connectionFactory"
                     confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener"
                     mandatory="true"
    />

    <!--配置隊列-->
    <rabbit:queue name="SERVER_DOWN1" />
    <rabbit:queue name="CLIENT_UP1" />

    <!--配置交換器,并綁定隊列。direct 是 交換器的一種類型。-->
    <rabbit:direct-exchange name="DIRECT_EX1" id="DIRECT_EX1" >
        <rabbit:bindings>
            <rabbit:binding queue="CLIENT_UP1" key="CLIENT_UP1" />
            <rabbit:binding queue="SERVER_DOWN1" key="SERVER_DOWN1" />
        </rabbit:bindings>
    </rabbit:direct-exchange>



    <!-- 配置消費者 -->
    <rabbit:listener-container
            connection-factory="connectionFactory" acknowledge="manual" >
        <rabbit:listener queues="CLIENT_UP1" ref="lightaiConsumer" />
    </rabbit:listener-container>



</beans>

這里介紹下消息隊列的類型。
Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。
Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。
Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列

以上配置只使用了Direct交換機。表示的含義:
CLIENT_UP1 的消息 會進入 CLIENT_UP1 隊列
SERVER_DOWN1 的消息會進入 SERVER_DOWN1 隊列。

消費者只配置一個,因此只有CLIENT_UP1的消息會被消費。

  • 消費者
package cn.wuwenfu.rabbitmqdemo.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;

@Service("lightaiConsumer")
@Slf4j
public class LightaiConsumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try{

            String m = new String(message.getBody());
            if (log.isInfoEnabled()){
                log.info("Received '" + m+ "'");
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch(Exception e){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.error("出現異常");
            e.printStackTrace();
        }
    }

}

這里一定捕捉異常,進行消息確認。否則會導致業(yè)務異常。

  • 生產者
package cn.wuwenfu.rabbitmqdemo.main;

import cn.wuwenfu.rabbitmqdemo.publish.PublishService;
import cn.wuwenfu.rabbitmqdemo.publish.RabbitTemplatePublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author hiwower@gmail.com
 * @date 2018/11/6 上午9:03
 */
@Component
public class Test {

    private final String EXCHANGE_NAME="DIRECT_EX1";
    @Autowired
    private PublishService publishService;

    @Autowired
    private RabbitTemplatePublishService rabbitTemplatePublishService;

    public void  test(){
        String message = System.currentTimeMillis()+"";
       publishService.send(EXCHANGE_NAME,"SERVER_DOWN1",message);
        publishService.send(EXCHANGE_NAME,"CLIENT_UP1",message);
        
        publishService.send(EXCHANGE_NAME,"CLIENT_UP2",message);

        rabbitTemplatePublishService.send(EXCHANGE_NAME,"SERVER_DOWN1",message);
        rabbitTemplatePublishService.send(EXCHANGE_NAME,"CLIENT_UP1",message);
        rabbitTemplatePublishService.send(EXCHANGE_NAME,"CLIENT_UP2",message);


    }


}

上面代碼發(fā)送6條相同消息,分別發(fā)送到3個路由中。
其中SERVER_DOWN1 隊列收到2條,沒綁定消費者。不會被消費。
其中CLIENT_UP1 隊列收到2條,正常消費。
CLIENT_UP2 沒有對應的隊列。會 觸發(fā) 發(fā)送消息的回調。

RabbitTemplate是AmqpTemplate的一個實現??梢匀我膺x擇其中一個進行發(fā)送。

  • 發(fā)送消息確認

發(fā)送消息的時候,消息會先到達交換器,再通過路由Key 發(fā)送到隊列。
其中涉及2個回調方法。只有這2個回調正常,才確保消息正確發(fā)送。

package cn.wuwenfu.rabbitmqdemo.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

@Service("confirmCallBackListener")
@Slf4j
/**
 *  到達交換器會觸發(fā).ack=true 反之false
 */
public class ConfirmCallBackListener implements ConfirmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (log.isInfoEnabled()){
            log.info("交換機confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
        }
    }

}

消息只要發(fā)送了,這里肯定會執(zhí)行。ack的值如果是true 表示成功。ack=false表示失敗。

這里可以自行加入異常監(jiān)控和處理。正常情況下不會出現。

package cn.wuwenfu.rabbitmqdemo.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.stereotype.Service;

@Service("returnCallBackListener")
@Slf4j
/**
 * 路由異常會觸發(fā)這里
 */
public class ReturnCallBackListener implements ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.error("路由return--message: ,msgBody:" + new String(message.getBody())
                    + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:"
                    + routingKey);
    }

}

如果消息正常投遞到消息隊列,這里不會執(zhí)行。否則會觸發(fā)。通常原因是找不到路由。
如下的replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2

/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=52802:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/ft521/Documents/study/項目/javaproject/rabbitmqdemo/target/classes:/Users/ft521/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/ft521/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/ft521/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/ft521/.m2/repository/org/springframework/spring-core/4.2.6.RELEASE/spring-core-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/ft521/.m2/repository/org/springframework/spring-context-support/4.2.6.RELEASE/spring-context-support-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-beans/4.2.6.RELEASE/spring-beans-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-context/4.2.6.RELEASE/spring-context-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-aop/4.2.6.RELEASE/spring-aop-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/ft521/.m2/repository/org/springframework/spring-expression/4.2.6.RELEASE/spring-expression-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/amqp/spring-rabbit/1.7.5.RELEASE/spring-rabbit-1.7.5.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-tx/4.3.13.RELEASE/spring-tx-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/retry/spring-retry/1.2.1.RELEASE/spring-retry-1.2.1.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-messaging/4.3.13.RELEASE/spring-messaging-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/com/rabbitmq/http-client/1.1.1.RELEASE/http-client-1.1.1.RELEASE.jar:/Users/ft521/.m2/repository/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar:/Users/ft521/.m2/repository/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar:/Users/ft521/.m2/repository/commons-codec/commons-codec/1.6/commons-codec-1.6.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.8.4/jackson-databind-2.8.4.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.8.0/jackson-annotations-2.8.0.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.8.4/jackson-core-2.8.4.jar:/Users/ft521/.m2/repository/org/springframework/amqp/spring-amqp/1.7.5.RELEASE/spring-amqp-1.7.5.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-web/4.3.13.RELEASE/spring-web-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/com/rabbitmq/amqp-client/4.0.3/amqp-client-4.0.3.jar cn.wuwenfu.rabbitmqdemo.main.Send
objc[33289]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java (0x10316e4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1031fa4e0). One of the two will be used. Which one is undefined.
[ ERROR ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ReturnCallBackListener - ReturnCallBackListener.java(17) - 路由return--message: ,msgBody:1542861998264,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
[ ERROR ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ReturnCallBackListener - ReturnCallBackListener.java(17) - 路由return--message: ,msgBody:1542861998264,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:CorrelationData [id=efd7b1da-bda8-4784-b4a0-e5856fb7641b],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.consumer.LightaiConsumer - LightaiConsumer.java(18) - Received '1542861998264'
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:CorrelationData [id=fe1a8a1b-fccb-47d0-a887-dcac9fdd5595],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:CorrelationData [id=8c5535a4-91dc-435c-9308-28098c29ee30],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.consumer.LightaiConsumer - LightaiConsumer.java(18) - Received '1542861998264'

  • 消費消息確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

表示對消息進行確認。一定要處理異常的消息確認。否則會造成意外。

由于對消息進行了手動確認,這就確保了消息的執(zhí)行的時間上保證,確保業(yè)務邏輯執(zhí)行完畢之后才進入下一條消息。
之前我這里出現問題,采用的自動確認,導致消息順序異常。

最后

代碼下載地址: csdn下載

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容