RabbitMQ

存在的問題:比如客戶端調用商品服務添加一條數(shù)據(jù)到數(shù)據(jù)庫,數(shù)據(jù)庫添加成功后商品服務還必須要等待搜索服務、緩存服務都完成后才能給客戶端響應結果。RabbitMQ就是為了解決這種問題。商品服務完成添加數(shù)據(jù)到數(shù)據(jù)庫之后可以將其余的消息交給RabbitMQ,然后商品服務就可以先向客戶端響應結果,RabbitMQ再通知搜索服務和緩存服務添加數(shù)據(jù)。
一、RabbitMQ安裝
用docker-compose安裝即可
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672 # 這個端口是RabbitMQ自帶的圖形監(jiān)控界面
volumes:
- ./mydata/rabbitmq:/var/lib/rabbitmq
二、RabbitMQ架構
2.1 架構圖

由生產(chǎn)者發(fā)送消息到一個指定Exchange中,然后指定Exchange和某個Queue的路由關系,Exchange就會將消息發(fā)送到Queue中,消費者和Virtual Host建立連接,和Queue建立Channel后,就能拿到最新的消息。
注意:一個Queue中的消息只會被一個消費者消費一次(即消費完之后消息直接出隊),另一個消費者消費不到同一個Queue中的同一條消息
2.2 查看圖形化界面
直接訪問http://ip:15672,默認用戶/密碼都是guest,是一個管理員用戶。
在界面的Connections可以查看客戶端與Virtual Host建立的連接;在Channels可以查看建立連接后建立的管道;在Exchange可以查看已有的Exchange,如果發(fā)送消息的時候沒有指定Exchange,會默認用 “/”D的Exchange;在Queues可以查看隊列,隊列需要我們手動創(chuàng)建;在Admin,可以添加其他用戶,可以添加Virtual Host,可以指定哪個用戶可以管理哪幾個Virtual Host。
三、RabbitMQ的使用
3.1 RabbitMQ的路由策略
- HelloWorld
- WorkQueues
- Publish/Subscribe
- Routing
- Topics
- Publisher confirms
3.2 Java連接RabbitMQ
Step1:創(chuàng)建Maven工程,導入RabbitMQ依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
Step2:創(chuàng)建一個用于建立Connection的工具類
public class RabbitMQUtils {
public static Connection getConnection {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.199.109"); // 指定ip
factory.setPort(5672); // 指定端口
factory.setUsername("test"); // 指定用戶名密碼
factory.setPassword("test");
factory.setVirtualHost("/test"); // 指定當前用戶管理的Virtual Host
Connection connection = null;
connection = factory.newConnection();
return connection;
}
}
Step3:測試
@Test
public void testConnection() {
Connection connection = RabbitMQUtils.getConnection();
connection.close(); // 在這里打個斷點, 就可以在圖形化界面查看到建立的連接
}
3.3 HelloWorld路由

一個生產(chǎn)者,創(chuàng)建一個Channel,一個默認的Exchange(不用手動指定),一個Queue,一個消費者。
Step1:創(chuàng)建生產(chǎn)者,創(chuàng)建一個Channel,發(fā)布消息到Exchange,指定路由規(guī)則
public class Productor {
@Test
public void publish() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 發(fā)布消息到Exchange, 同時指定路由規(guī)則
String msg = "一條消息";
/**
* 四個參數(shù):
(1) 指定Exchange, 如果要使用默認的就給一個空串
(2) 指定路由規(guī)則, 使用具體的隊列名稱(當使用默認Exchange的時候, 這個參數(shù)同時也是隊列名)
(當使用自定義Exchange的時候, 這個參數(shù)就寫Exchange名)
(3) 指定傳遞消息所攜帶的屬性, 沒有給就null
(4) 指定發(fā)布的具體消息, byte[]類型
*/
channel.basicPublish("", "HelloWorld", null, msg.getBytes());
System.in.read();
channel.close();
connection.close();
}
}
Step2:創(chuàng)建消費者,創(chuàng)建一個Channel,創(chuàng)建一個Queue,并且去消費當前隊列
public class Costumer {
@Test
public void consume() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 聲明隊列
/**
* 五個參數(shù):
(1) 指定隊列名稱
(2) 當前隊列是否持久化(只有在Queue中才能持久化, Exchange中不行)
(3) 是否排外, 即當前隊列是否只能被一個消費者消費
(4) 如果這個隊列沒有消費者在消費, 則隊列自動刪除
(5) 指定當前隊列的其他信息
*/
channel.queueDeclare("HelloWorld", true, false, false, null);
// 4. 開啟監(jiān)聽Queue
// 4.1 消息回調方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println("接受到消息" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
}
}
// 4.2 開始監(jiān)聽
/**
* 三個參數(shù):
(1) 指定消費哪個隊列
(2) 指定是否自動ACK(即接收到消息以后會立即告訴RabbitMQ該消息已經(jīng)被消費掉)
(3) 指定消息回調犯法(即消費者具體要做的事情)
*/
channel.basicConsume("HelloWorld", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
Step3:測試。注意:先啟動消費者再啟動生產(chǎn)者,不然無法發(fā)送消息到Exchange,因為發(fā)送消息到Exchange需要Queue
3.4 WorkQueues路由

一個生產(chǎn)者,一個默認的Exchange,一個Queue,多個消費者
Step1:創(chuàng)建生產(chǎn)者,同上
Step2:創(chuàng)建消費者,同上。不同的是要創(chuàng)建兩個消費者
/**
* 二號消費者
*/
public class Costumer2 {
@Test
public void consume() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 聲明隊列
channel.queueDeclare("WorkQueues", true, false, false, null);
// 3.1 可以指定當前消費者一次能消費多少條消息, 如果不指定就是兩個消費者均攤
channel.basicQos(2);
// 4. 開啟監(jiān)聽Queue
// 4.1 消息回調方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println("消費者二號接受到消息" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
}
}
// 4.2 開始監(jiān)聽
channel.basicConsume("Work", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
3.5 Publish/Subscribe路由

一個生產(chǎn)者,一個自定義的Exchange,多個Queue,多個消費者。此時兩個Queue中的消息是一模一樣的。
Step1:創(chuàng)建生產(chǎn)者,需要手動創(chuàng)建一個Exchange
public class Productor {
@Test
public void publish() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 創(chuàng)建Exchange
/**
* 參數(shù)1: 指定Exchange的名稱; 參數(shù)2: 指定Exchange的類型
FANOUT - 對應Publish/Subscribe路由
DIRECT - 對應Routing路由
TOPIC - 對應Topics路由
*/
channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT);
// 4. 將自定義的Exchange和Queue綁定起來(也可以在消費者中綁定)
// 參數(shù)3是RouteKey, Publish/Subscribe路由的RouteKey給空串即可
channel.queueBind("pubsub_queue1", "pubsub_exchange", "");
channel.queueBind("pubsub_queue2", "pubsub_exchange", "");
// 5. 發(fā)布消息到Exchange, 同時指定路由規(guī)則
for (int i = 0; i < 10; i++) {
String msg = "一條消息" + i;
channel.basicPublish("pubsub_exchange", "Work", null, msg.getBytes());
}
System.in.read();
channel.close();
connection.close();
}
}
Step2:創(chuàng)建消費者,同時指定當前消費者消費的是哪個隊列
/**
* 二號消費者
*/
public class Costumer2 {
@Test
public void consume() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 聲明隊列
channel.queueDeclare("pubsub_queue2", true, false, false, null);
// 4. 開啟監(jiān)聽Queue
// 4.1 消息回調方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println("消費者二號接受到消息" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
}
}
// 4.2 開始監(jiān)聽
channel.basicConsume("pubsub_queue2", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
3.6 Routing路由

一個生產(chǎn)者,一自定義的Exchange,多個Queue,多個消費者,Exchange和Queue之間不是直接綁定,而是通過RouteKey綁定,就是指定哪些類型的消息發(fā)送到哪個Queue中,此時兩個Queue中的消息不一定相同。
Step1:創(chuàng)建生產(chǎn)者,綁定時指定Queue對應的RouteKey
public class Productor {
@Test
public void publish() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 創(chuàng)建Exchange
/**
* 參數(shù)1: 指定Exchange的名稱; 參數(shù)2: 指定Exchange的類型
FANOUT - 對應Publish/Subscribe路由
DIRECT - 對應Routing路由
TOPIC - 對應Topics路由
*/
channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
// 4. 將自定義的Exchange和Queue綁定起來(也可以在消費者中綁定)
// 參數(shù)3指定每個隊列對應的RouteKey
channel.queueBind("routing_queue1", "routing_exchange", "ERROR");
channel.queueBind("routing_queue2", "routing_exchange", "INFO");
// 5. 發(fā)布消息到Exchange, 同時指定RouteKey, Exchange會根據(jù)RouteKey把消息發(fā)送到對應的Queue
channel.basicPublish("pubsub_exchange", "ERROR", null, "error msg1".getBytes());
channel.basicPublish("pubsub_exchange", "INFO", null, "info msg1".getBytes());
channel.basicPublish("pubsub_exchange", "INFO", null, "info msg2".getBytes());
channel.basicPublish("pubsub_exchange", "INFO", null, "info msg3".getBytes());
System.in.read();
channel.close();
connection.close();
}
}
Step2:創(chuàng)建消費者,同3.5
3.7 Topics路由

同Routing路由方式,只是RouteKey的形式不同
Step1:創(chuàng)建生產(chǎn)者
public class Productor {
@Test
public void publish() {
// 1. 獲取Connection
Connection connection = RabbitMQUtils.getConnection();
// 2. 創(chuàng)建Channel
Channel channel = connection.createChannel();
// 3. 創(chuàng)建Exchange
/**
* 參數(shù)1: 指定Exchange的名稱; 參數(shù)2: 指定Exchange的類型
FANOUT - 對應Publish/Subscribe路由
DIRECT - 對應Routing路由
TOPIC - 對應Topics路由
*/
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
// 4. 將自定義的Exchange和Queue綁定起來(也可以在消費者中綁定)
// 參數(shù)3指定每個隊列對應的RouteKey
/**
* 關于此時的RouteKey:
* "*.red.*": 表示有三個條件, 只要中間的條件符合就可以將消息發(fā)送到該隊列, *是占位符
* "fast.#": #是通配符, 效果等同于"fast.*.*"
*/
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
channel.queueBind("topics_queue2", "topics_exchange", "fast.#");
// 5. 發(fā)布消息到Exchange, 同時指定RouteKey, Exchange會根據(jù)RouteKey把消息發(fā)送到對應的Queue
channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快紅猴".getBytes());
channel.basicPublish("topics_exchange", "fast.white.dog", null, "快白狗".getBytes());
channel.basicPublish("topics_exchange", "slow.red.cat", null, "快紅猴".getBytes());
// 此時第1條、第3條消息會發(fā)送給topics_queue1
// 第1條、第2條消息會發(fā)送給topics_queue2
System.in.read();
channel.close();
connection.close();
}
}
四、SpringBoot整合RabbitMQ
4.1 快速入門
Step1:創(chuàng)建SpringBoot工程,導入SpringBoot整合RabbitMQ的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step2:編寫配置
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
Step3:編寫配置類,聲明Exchange和Queue,并且綁定
@Configuration
public class RabbitMQConfig {
// 創(chuàng)建Topic路由的Exchange
@Bean("TOPIC_EXCHANGE")
public TopicExchange getTopicExchange() {
/**
* 參數(shù)1: 指定Exchange的名稱
* 參數(shù)2: 是否持久化
* 參數(shù)3: 是否自動刪除
*/
return new TopicExchange("boot_topic_exchange", true, false);
}
// 創(chuàng)建Queue1
@Bean("TOPIC_QUEUE1")
public Queue getQueue1() {
/**
* 參數(shù)1: 指定Queue名稱
* 參數(shù)2: 是否持久化
* 參數(shù)3: 是否排外
* 參數(shù)4: 是否自動刪除
* 參數(shù)5: 攜帶的參數(shù)
*/
return new Queue("boot_topic_queue1", true, false, false, null);
}
// 創(chuàng)建Queue2
@Bean("TOPIC_QUEUE2")
public Queue getQueue2() {
return new Queue("boot_topic_queue2", true, false, false, null);
}
// 綁定
@Bean
public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
// 綁定
@Bean
public Binding getBindingSlow(@Qualifier("TOPIC_QUEUE2") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("slow.*.*");
}
}
Step4:準備生產(chǎn)者,發(fā)送消息
public class Productor {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publish() {
/**
* 參數(shù)1: 指定發(fā)送到的Exchange名稱
* 參數(shù)2: 指定此次發(fā)送的RouteKey
* 參數(shù)3: 具體消息
*/
rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快紅狗");
rabbitTemplate.convertAndSend("boot_topic_exchange", "slow.white.pig", "慢白豬");
System.in.read();
}
}
Step5:準備消費者
@Component
public class Costumer {
// 指定當前消費者監(jiān)聽的隊列, SpringBoot工程啟動后就會一直監(jiān)聽
@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(Object msg) {
// 該方法內(nèi)是消費者接收到消息之后的具體動作
System.out.println("消費者1號接收到消息" + msg);
}
@RabbitListener(queues = "boot_topic_queue2")
public void consumer2(Object msg) {
System.out.println("消費者2號接收到消息" + msg);
}
}
Step6:測試。啟動SpringBoot工程,可以在打印中查看消費者打印的消息,也可以在監(jiān)控界面中查看消息的發(fā)送和被消費的情況
4.2 在SpringBoot中手動ACK
Step1:修改配置
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手動ACK配置
Step2:修改消費者的接收行為
@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
System.out.println("消費者1號接收到消息" + msg);
// 手動ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
五、RabbitMQ的其他操作
5.1 消息的可靠性
-
Q:如果消息已經(jīng)到達了RabbitMQ,但是RabbitMQ宕機了,消息會丟失嗎?
A:不會,RabbitMQ的Queue有持久化機制
-
Q:消費者在消費消息時,如果執(zhí)行到一半,消費者宕機了怎么辦?
A:消費者中最好使用手動ACK來避免消息沒消息完卻宕機的情況,這樣消息就還會存在隊列中
-
Q:生產(chǎn)者發(fā)送消息時,由于網(wǎng)絡問題,導致消息沒發(fā)送到RabbitMQ怎么辦?
A:RabbitMQ提供了事務操作和Confirm機制,可以保證生產(chǎn)者把消息發(fā)送到Exchange(但是事務操作效率太低,主要用Confirm機制)
5.1.1 RabbitMQ的Confirm機制
可以保證生產(chǎn)者把消息發(fā)送到Exchange
5.1.1.1 普通Confirm方式
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在消息發(fā)送之前, 開啟Confirm
channel.confirmSelect();
channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快紅猴".getBytes());
// Step2:在消息發(fā)送之后確認消息是否發(fā)送成功
if (channel.waitForConfirms()) {
/**
* 消息發(fā)送成功時的處理
*/
} else {
/**
* 消息發(fā)送失敗時的處理
*/
}
System.in.read();
channel.close();
connection.close();
}
}
5.1.1.2 批量Confirm方式
主要用于發(fā)送多條消息時,只要有一條發(fā)送失敗,則全部發(fā)送失敗,拋出異常。
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在消息發(fā)送之前, 開啟Confirm
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
String msg = "快紅猴" + i;
channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
}
// Step2:在消息發(fā)送之后確認消息是否發(fā)送成功
channel.waitForConfirmsOrDie(); // 該方法不會返回布爾值, 如果有一條發(fā)送失敗就會直接拋異常
System.in.read();
channel.close();
connection.close();
}
}
5.1.1.3 異步Confirm方式
因為是異步的,所以不管生產(chǎn)者的消息是否發(fā)送成功,都會接著執(zhí)行生產(chǎn)者后面的代碼,單獨有會有一個線程出來進行Confirm判斷。效率最高,最常用。
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在消息發(fā)送之前, 開啟Confirm
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
String msg = "快紅猴" + i;
channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
}
// Step2:在消息發(fā)送之后確認消息是否發(fā)送成功
channel.addConfirmListener(new ConfirmListener {
// 消息發(fā)送成功時的回調方法
@Override
public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息發(fā)送成功。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
}
// 消息發(fā)送失敗時的回調方法
@Override
public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息發(fā)送失敗。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
}
});
System.in.read();
channel.close();
connection.close();
}
}
5.1.2 RabbitMQ的Return機制
Confirm機制只能保證消息到達Exchange,無法保證消息可以被Exchange分發(fā)到Queue。而Exchange是不能持久化消息的,Queue才可以持久化消息。所以可以使用Return機制保證Exchange把消息送達到指定的Queue
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
// Step1:在開啟Confirm之前先開啟Return
channel.addReturnListener(new ReturnListener() {
// 消息沒有到達Queue時執(zhí)行的回調方法
@Override
public void handleReturn(int replyCode, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] msg) throws Exception {
System.out.println(new String(msg, "UTF-8") + "沒有送到到Queue中");
}
})
// Step2:在消息發(fā)送之前, 開啟Confirm
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
String msg = "快紅猴" + i;
// 在發(fā)送消息時, 在RouteKey參數(shù)后面追加一個參數(shù), 置為true表示開啟Return
channel.basicPublish("topics_exchange", "fast.red.monkey", true, null, msg.getBytes());
}
// Step3:在消息發(fā)送之后確認消息是否發(fā)送成功
channel.addConfirmListener(new ConfirmListener {
@Override
public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息發(fā)送成功。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
}
@Override
public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息發(fā)送失敗。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
}
});
System.in.read();
channel.close();
connection.close();
}
}
5.1.3 在SpringBoot中實現(xiàn)Confirm和Return
Step1:修改配置文件
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手動ACK配置
publisher-confirm-type: simple
publisher-returns: true
Step2:編寫配置類,指定RabbitTemplate對象,開啟Confirm和Return,并編寫回調方法
@Component // 這里不能用Configuration, 因為要實現(xiàn)接口
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTempalte;
// 定義初始化方法, 這個方法在構建對象時會執(zhí)行
@PostConstruct
public void initMethod() {
// 給RabbitTemplate指定Confirm和Return的回調方法
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
/**
* 消息發(fā)送成功時的處理
*/
} else {
/**
* 消息發(fā)送失敗時的處理
*/
}
}
@Override
public void returnMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
/**
* Exchange中的消息沒有送達到Queue時的處理
*/
}
}
Step3:生產(chǎn)者和消費者不變
5.2 消息重復消費

重復消費同一個Queue的消息,會對非冪等行操作(增刪)造成問題;重復消費消息的原因是,消費者沒有給RabbitMQ一個Ack。
為了解決重復消費消息的問題,可以采用Redis,在消費者消費消息之前,先將消息的id作為key放到Redis中(用setnx方法,key不存在就創(chuàng)建key,key存在則獲取key的value),并把對應的value置0,表示正在執(zhí)行任務,等任務執(zhí)行完畢之后可以把value置1。如果消費者一號ACK失敗,在RabbitMQ將消息交給消費者二號時,會先執(zhí)行setnx方法判斷key是否存在,如果key存在則獲取key的value,如果value是0,則消費者二號就什么都不做,如果value是1,則表示消費者一號已經(jīng)執(zhí)行完了任務,但是ACK失敗,消費者二號直接執(zhí)行ACK幫消費者一號ACK即可。
極端情況:消費者一號出現(xiàn)了死鎖,則會一直卡在key存在且value=0的情況。解決方法是:在setnx設置key的時候,給key指定上一個生存時間即可。
5.2.1 實現(xiàn)避免消息重復消費
Step1:在Docker中啟動Redis,然后在項目中導入Redis的依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
Step2:生產(chǎn)者在發(fā)送消息前指定上消息的ID
public class Productor {
@Test
public void publish() throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] msg) throws Exception {
System.out.println(new String(msg, "UTF-8") + "沒有送到到Queue中");
}
})
channel.confirmSelect();
for (int i = 0 ; i < 100; i++) {
// 創(chuàng)建一個properties屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) // 指定消息是否需要持久化, 1表示需要持久化, 2不需要
.messageId(UUID.randomUUID().toString()) // 給每一條消息隨機分配了一個UUID
.build();
String msg = "快紅猴" + i;
// 將properties屬性放到第4個參數(shù)就可以隨著消息一起發(fā)出
channel.basicPublish("topics_exchange", "fast.red.monkey", true, properties, msg.getBytes());
}
channel.addConfirmListener(new ConfirmListener {
@Override
public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息發(fā)送成功。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
}
@Override
public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
System.out.println("消息發(fā)送失敗。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
}
});
System.in.read();
channel.close();
connection.close();
}
}
Step3:消費者調用Redis進行操作
public class Costumer1 {
@Test
public void consume() {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueues", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelopo,
AMQP.BasicProperties properties, byte[] body) throws IOException{
Jedis jedis = new Jedis("192.168.199.109", 6379); // 連接Redis
String messageId = properties.getMessageId(); // 取出properties中的messageId
// 1. setnx到Redis中, 默認指定Value為0, 指定生存時間為10秒
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if (result != null && result.equalsIgnoreCase("OK")) {
System.out.println("消費者一號接受到消息" + new String(body, "UTF-8"));
// 2. 消費成功, set messageId的value為1
jedis.set(messageId, "1");
channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
} else {
// 3. 如果1中的setnx失敗, 獲取key對應的value
// 如果是0, 則直接return, 如果是1則先ACK再return
String value = jedis.get(messageId);
if ("1".equalsIgnoreCase(s)) {
channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
}
}
}
}
channel.basicConsume("topics_queue1", false, consumer);
System.in.read();
channel.close();
connection.close();
}
}
Step4:測試,啟動生產(chǎn)者和消費者,然后到Redis中查看是否有新的key
5.2.2 在SpringBoot實現(xiàn)避免消息重復消費
Step1:導入SpringBoot整合Redis的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Step2:在配置文件中連接Redis
spring:
redis:
host: 192.168.199.109
port: 6379
Step3:生產(chǎn)者發(fā)送消息之前指定消息的ID
public class Productor {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publish() {
// 給消息的屬性分配一個隨機的UUID
CorrelationData messageId = new CorrelationData(UUID.randomUUID.toString());
// 把messageId放入第4個參數(shù)
rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快紅狗", messageId);
System.in.read();
}
}
Step4:消費者調用Redis進行操作
@Component
public class Costumer {
// 用StringRedisTemplate可以直接用String存入Redis(可以不用轉為字節(jié)數(shù)組)
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
// 先取出messageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
// 1. 設置key到Redis
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
System.out.println("消費者1號接收到消息" + msg);
// 2. 消費成功, set messageId的value為1
redisTemplate.opsForValue().set(messageId, "1");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動ACK
} else {
// 3. 如果1中的setnx失敗, 獲取key對應的value
// 如果是0, 則直接return, 如果是1則先ACK再return
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手動ACK
}
}
}
}
Step5:測試,啟動生產(chǎn)者和消費者,然后到Redis中查看是否有新的key
六、RabbitMQ的簡單應用
例子:添加一條數(shù)據(jù)時,客戶模塊在MySQL添后會調用搜索模塊在ES中也添加

客戶模塊:
Step1:在客戶模塊的SpringBoot工程中導入SpringBoot整合RabbitMQ的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step2:編寫配置文件連接RabbitMQ
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手動ACK配置
Step3:編寫配置類
@Configuration
public class RabbitMQConfig {
// 創(chuàng)建Topic路由的Exchange
@Bean("TOPIC_EXCHANGE")
public TopicExchange getTopicExchange() {
return new TopicExchange("openapi_customer_exchange", true, false);
}
// 創(chuàng)建Queue1
@Bean("TOPIC_QUEUE1")
public Queue getQueue1() {
return new Queue("openapi_customer_queue", true, false, false, null);
}
// 綁定
@Bean
public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
Step4:在service中直接把消息發(fā)送給Exchange(生產(chǎn)者)(要先注入RabbitTemplate)
// 原來的寫法: 直接把消息發(fā)送到
/**
String json = JSON.toJSON(customer);
HttpHeaders headers = new HttpHeaders();
headers.setContenType(MediaType.parseMediaType("application/json;charset=utf-8"));
HttpEntity<String> entity = new HttpEntity<>(json.headers);
restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);
*/
// 新的寫法: 將消息發(fā)送到Exchange
rabbitTemplate.converAndSend("openapi_customer_exchange", "openapi.customer.add", JSON.toJSON(customer));
搜索模塊:
Step1:在客戶模塊的SpringBoot工程中導入SpringBoot整合RabbitMQ的依賴
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Step2:編寫配置文件,連接RabbitMQ和Redis
spring:
rabbtimq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
ackonwledge-mode: manual # 手動ACK配置
Step3:編寫配置類
@Configuration
public class RabbitMQConfig {
// 創(chuàng)建Topic路由的Exchange
@Bean("TOPIC_EXCHANGE")
public TopicExchange getTopicExchange() {
return new TopicExchange("openapi_customer_exchange", true, false);
}
// 創(chuàng)建Queue1
@Bean("TOPIC_QUEUE1")
public Queue getQueue1() {
return new Queue("openapi_customer_queue", true, false, false, null);
}
// 綁定
@Bean
public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
@Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
// 指定Queue, 指定Exchange, 指定RouteKey
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
Step4:準備消費者
@Component
public class CostumerListener {
@Autowired
private CustomerService customerService;
// 指定當前消費者監(jiān)聽的隊列, SpringBoot工程啟動后就會一直監(jiān)聽
@RabbitListener(queues = "openapi_customer_queue")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
// 1. 獲取RoutingKey
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
// 2. 根據(jù)RoutingKey來選擇調用的方法
switch (receivedRoutingKey) {
case "openapi.customer.add":
// 3. 調用Service完成添加數(shù)據(jù)到ES(先把json字符串轉為Customer對象再存)
customerService.saveCustomer(JSON.parseJSON(msg, Customer.class));
// 4. 手動ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
break;
/**
* ......
* 其他RoutingKey對應的處理
*/
}
}
}