前言
如果能不用消息隊列可以實現業(yè)務邏輯,盡量別用。因為增加了學習成本。
我所在的公司,需要處理大量設備的信息。由于并發(fā)的問題以及信息順序的要求,近期用上了消息隊列。
下面提供下我整合的過程。
你可以學到
- 使用消息隊列發(fā)送消息。
- 掌握一種交換器的配置。
- 消息的確認機制。避免消息丟失和消息的執(zhí)行順序問題。
環(huán)境說明
java版本
java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
開發(fā)工具
IntelliJ IDEA 2018.2.1 (Ultimate Edition)
Build #IU-182.3911.36, built on August 6, 2018
Licensed to lan yu
Subscription is active until May 5, 2019
JRE: 1.8.0_152-release-1248-b8 x86_64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
macOS 10.13.6
docker
docker --version
Docker version 17.12.0-ce, build c97c6d6
代碼中用到了idea插件lombok
如下圖。如果不會百度搜下吧。

安裝rabbitmq
這里采用docker安裝。
docker run -d -p 15672:15672 -p 5672:5672 --name some-rabbit rabbitmq:management
上述命令會創(chuàng)建一個容器,該容器包含了rabbitmq
端口映射了2個。其中15672 是web控制臺的端口, 5672是通信端口。
執(zhí)行之后,訪問
http://127.0.0.1:15672/
即可看到web控制臺。賬號和密碼默認是guest

具體web控制臺的管理,非本文重點。不介紹。
整合spring和rabbitmq
- 先創(chuàng)建一個spring的項目。
這里跳過。如果不會,請去下載本文的代碼,導入。 - 引入依賴文件
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
注意版本。如果版本高低,可能會導致錯誤。
- 配置rabbitmq
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
<!--配置鏈接參數-->
<rabbit:connection-factory
id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username=""
password=""
publisher-confirms="true"
/>
<rabbit:admin connection-factory="connectionFactory" />
<!-- 給模板指定轉換器 --><!-- mandatory必須設置true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"
/>
<!--配置隊列-->
<rabbit:queue name="SERVER_DOWN1" />
<rabbit:queue name="CLIENT_UP1" />
<!--配置交換器,并綁定隊列。direct 是 交換器的一種類型。-->
<rabbit:direct-exchange name="DIRECT_EX1" id="DIRECT_EX1" >
<rabbit:bindings>
<rabbit:binding queue="CLIENT_UP1" key="CLIENT_UP1" />
<rabbit:binding queue="SERVER_DOWN1" key="SERVER_DOWN1" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 配置消費者 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" >
<rabbit:listener queues="CLIENT_UP1" ref="lightaiConsumer" />
</rabbit:listener-container>
</beans>
這里介紹下消息隊列的類型。
Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。
Topic交換機,符號”#”匹配一個或多個詞,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。
Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列
以上配置只使用了Direct交換機。表示的含義:
CLIENT_UP1 的消息 會進入 CLIENT_UP1 隊列
SERVER_DOWN1 的消息會進入 SERVER_DOWN1 隊列。
消費者只配置一個,因此只有CLIENT_UP1的消息會被消費。
- 消費者
package cn.wuwenfu.rabbitmqdemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;
@Service("lightaiConsumer")
@Slf4j
public class LightaiConsumer implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
String m = new String(message.getBody());
if (log.isInfoEnabled()){
log.info("Received '" + m+ "'");
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.error("出現異常");
e.printStackTrace();
}
}
}
這里一定捕捉異常,進行消息確認。否則會導致業(yè)務異常。
- 生產者
package cn.wuwenfu.rabbitmqdemo.main;
import cn.wuwenfu.rabbitmqdemo.publish.PublishService;
import cn.wuwenfu.rabbitmqdemo.publish.RabbitTemplatePublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author hiwower@gmail.com
* @date 2018/11/6 上午9:03
*/
@Component
public class Test {
private final String EXCHANGE_NAME="DIRECT_EX1";
@Autowired
private PublishService publishService;
@Autowired
private RabbitTemplatePublishService rabbitTemplatePublishService;
public void test(){
String message = System.currentTimeMillis()+"";
publishService.send(EXCHANGE_NAME,"SERVER_DOWN1",message);
publishService.send(EXCHANGE_NAME,"CLIENT_UP1",message);
publishService.send(EXCHANGE_NAME,"CLIENT_UP2",message);
rabbitTemplatePublishService.send(EXCHANGE_NAME,"SERVER_DOWN1",message);
rabbitTemplatePublishService.send(EXCHANGE_NAME,"CLIENT_UP1",message);
rabbitTemplatePublishService.send(EXCHANGE_NAME,"CLIENT_UP2",message);
}
}
上面代碼發(fā)送6條相同消息,分別發(fā)送到3個路由中。
其中SERVER_DOWN1 隊列收到2條,沒綁定消費者。不會被消費。
其中CLIENT_UP1 隊列收到2條,正常消費。
CLIENT_UP2 沒有對應的隊列。會 觸發(fā) 發(fā)送消息的回調。
RabbitTemplate是AmqpTemplate的一個實現??梢匀我膺x擇其中一個進行發(fā)送。
- 發(fā)送消息確認
發(fā)送消息的時候,消息會先到達交換器,再通過路由Key 發(fā)送到隊列。
其中涉及2個回調方法。只有這2個回調正常,才確保消息正確發(fā)送。
package cn.wuwenfu.rabbitmqdemo.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;
@Service("confirmCallBackListener")
@Slf4j
/**
* 到達交換器會觸發(fā).ack=true 反之false
*/
public class ConfirmCallBackListener implements ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (log.isInfoEnabled()){
log.info("交換機confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
}
消息只要發(fā)送了,這里肯定會執(zhí)行。ack的值如果是true 表示成功。ack=false表示失敗。
這里可以自行加入異常監(jiān)控和處理。正常情況下不會出現。
package cn.wuwenfu.rabbitmqdemo.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.stereotype.Service;
@Service("returnCallBackListener")
@Slf4j
/**
* 路由異常會觸發(fā)這里
*/
public class ReturnCallBackListener implements ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("路由return--message: ,msgBody:" + new String(message.getBody())
+ ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:"
+ routingKey);
}
}
如果消息正常投遞到消息隊列,這里不會執(zhí)行。否則會觸發(fā)。通常原因是找不到路由。
如下的replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=52802:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/ft521/Documents/study/項目/javaproject/rabbitmqdemo/target/classes:/Users/ft521/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/ft521/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/ft521/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/ft521/.m2/repository/org/springframework/spring-core/4.2.6.RELEASE/spring-core-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/ft521/.m2/repository/org/springframework/spring-context-support/4.2.6.RELEASE/spring-context-support-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-beans/4.2.6.RELEASE/spring-beans-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-context/4.2.6.RELEASE/spring-context-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-aop/4.2.6.RELEASE/spring-aop-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/ft521/.m2/repository/org/springframework/spring-expression/4.2.6.RELEASE/spring-expression-4.2.6.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/amqp/spring-rabbit/1.7.5.RELEASE/spring-rabbit-1.7.5.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-tx/4.3.13.RELEASE/spring-tx-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/retry/spring-retry/1.2.1.RELEASE/spring-retry-1.2.1.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-messaging/4.3.13.RELEASE/spring-messaging-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/com/rabbitmq/http-client/1.1.1.RELEASE/http-client-1.1.1.RELEASE.jar:/Users/ft521/.m2/repository/org/apache/httpcomponents/httpclient/4.3.6/httpclient-4.3.6.jar:/Users/ft521/.m2/repository/org/apache/httpcomponents/httpcore/4.3.3/httpcore-4.3.3.jar:/Users/ft521/.m2/repository/commons-codec/commons-codec/1.6/commons-codec-1.6.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.8.4/jackson-databind-2.8.4.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.8.0/jackson-annotations-2.8.0.jar:/Users/ft521/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.8.4/jackson-core-2.8.4.jar:/Users/ft521/.m2/repository/org/springframework/amqp/spring-amqp/1.7.5.RELEASE/spring-amqp-1.7.5.RELEASE.jar:/Users/ft521/.m2/repository/org/springframework/spring-web/4.3.13.RELEASE/spring-web-4.3.13.RELEASE.jar:/Users/ft521/.m2/repository/com/rabbitmq/amqp-client/4.0.3/amqp-client-4.0.3.jar cn.wuwenfu.rabbitmqdemo.main.Send
objc[33289]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java (0x10316e4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1031fa4e0). One of the two will be used. Which one is undefined.
[ ERROR ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ReturnCallBackListener - ReturnCallBackListener.java(17) - 路由return--message: ,msgBody:1542861998264,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
[ ERROR ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ReturnCallBackListener - ReturnCallBackListener.java(17) - 路由return--message: ,msgBody:1542861998264,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX1,routingKey:CLIENT_UP2
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:CorrelationData [id=efd7b1da-bda8-4784-b4a0-e5856fb7641b],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.consumer.LightaiConsumer - LightaiConsumer.java(18) - Received '1542861998264'
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:null,ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:CorrelationData [id=fe1a8a1b-fccb-47d0-a887-dcac9fdd5595],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.callback.ConfirmCallBackListener - ConfirmCallBackListener.java(17) - 交換機confirm--:correlationData:CorrelationData [id=8c5535a4-91dc-435c-9308-28098c29ee30],ack:true,cause:null
[ INFO ] 2018-11-22 12:46:38 - cn.wuwenfu.rabbitmqdemo.consumer.LightaiConsumer - LightaiConsumer.java(18) - Received '1542861998264'
- 消費消息確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
表示對消息進行確認。一定要處理異常的消息確認。否則會造成意外。
由于對消息進行了手動確認,這就確保了消息的執(zhí)行的時間上保證,確保業(yè)務邏輯執(zhí)行完畢之后才進入下一條消息。
之前我這里出現問題,采用的自動確認,導致消息順序異常。
最后
代碼下載地址: csdn下載