0. 前言
本文內(nèi)容分為如下三部分
RabbitMQ高級(jí)特性
消息可靠性投遞
Consumer ACK
消費(fèi)端限流
TTL
死信隊(duì)列
延遲隊(duì)列
日志與監(jiān)控
消息可靠性分析與追蹤
管理
RabbitMQ應(yīng)用問題
消息可靠性保障
消息冪等性處理
1. 高級(jí)特性
1.1 消息的可靠投遞
在使用 RabbitMQ 的時(shí)候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場(chǎng)景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
confirm 確認(rèn)模式
return 退回模式
rabbitmq 整個(gè)消息投遞的路徑為:
producer--->rabbitmq broker--->exchange--->queue--->consumer
消息從 producer 到 exchange 則會(huì)返回一個(gè) confirmCallback 。
消息從 exchange-->queue 投遞失敗則會(huì)返回一個(gè) returnCallback。
我們將利用這兩個(gè) callback 控制消息的可靠性投遞
confirm模式
在上一篇中的最后我們用spring-boot配置了rabbitMQ,
在這里在原來的基礎(chǔ)上繼續(xù)進(jìn)行,在生產(chǎn)者的application.yml添加
使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
rabbitmq:
host: xxx
port: 5672
username: root
password: root
virtual-host: /example
#生產(chǎn)端配置
#開啟發(fā)送確認(rèn),此配置在Springboot2.3.0版本中已經(jīng)@Deprecated了,默認(rèn)就是
# publisher-confirms: true
#
publisher-confirm-type: simple
#開啟發(fā)送失敗退回
publisher-returns: true
然后新增一個(gè)test
@Test
public void testConfirm() {
// 1. 設(shè)置ConnectionFactory的publisher-confirms="true" 開啟 確認(rèn)模式。
// 2. 使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。
// 當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。
// 在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相關(guān)配置信息
* @param ack exchange交換機(jī) 是否成功收到了消息。true 成功,false代表失敗
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被執(zhí)行了");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失敗
System.out.println("接收失敗消息" + cause);
//做一些處理,讓消息再次發(fā)送。
}
}
});
// 3. 發(fā)生消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.confirm", "confirm mq hello~~~~~~~~");
}
啟動(dòng)測(cè)試,結(jié)束后控制臺(tái)成功打印

登陸rabbitmq管理后臺(tái)也可以看到消息已經(jīng)寫入隊(duì)列中了,只是還沒有被消費(fèi)。
其中回調(diào)函數(shù)中 confirm的第一個(gè)參數(shù)correlationData會(huì)在發(fā)送消息的函數(shù)convertAndSend的重載函數(shù)中會(huì)使用,這里沒有使用這個(gè)參數(shù)。
ack比較重要,可以判斷交換機(jī)是否收到消息
cause失敗原因
return模式
使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當(dāng)消息從exchange路由到queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會(huì)將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。
application.yml添加
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
rabbitmq:
host: asjunor.site
port: 5672
username: root
password: root
virtual-host: /example
#生產(chǎn)端配置
#開啟發(fā)送確認(rèn),此配置在Springboot2.3.0版本中已經(jīng)@Deprecated了,默認(rèn)就是
# publisher-confirms: true
#
publisher-confirm-type: simple
#開啟發(fā)送失敗退回
publisher-returns: true
#開啟執(zhí)行return回調(diào)
template:
mandatory: true
編寫測(cè)試
/**
* 回退模式: 當(dāng)消息發(fā)送給Exchange后,Exchange路由到Queue失敗時(shí)候才會(huì)執(zhí)行 ReturnCallBack
* 步驟:
* 1. 開啟回退模式:publisher-returns="true"
* 2. 設(shè)置ReturnCallBack
* 3. 設(shè)置Exchange處理消息的模式:
* 1. 如果消息沒有路由到Queue,則丟棄消息(默認(rèn))
* 2. 如果消息沒有路由到Queue,返回給消息發(fā)送方ReturnCallBack
*/
@Test
public void testReturn(){
// 設(shè)置交換機(jī)處理失敗消息的模式
// rabbitTemplate.setMandatory(true);
// 2.設(shè)置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息對(duì)象
* @param replyCode 錯(cuò)誤碼
* @param replyText 錯(cuò)誤信息
* @param exchange 交換機(jī)
* @param routingKey 路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 被執(zhí)行了");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
// 3. 發(fā)送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boott.treturn", "return mq hello~~~~~~~~");
}
控制臺(tái)打印

不知道因?yàn)槭裁丛?,return回調(diào)經(jīng)常會(huì)不打印信息,有待研究
在RabbitMQ中也提供了事務(wù)機(jī)制,但是性能較差,此處不做講解。
使用channel下列方法,完成事務(wù)控制:
txSelect(), 用于將當(dāng)前channel設(shè)置成transaction模式
txCommit(),用于提交事務(wù)
txRollback(),用于回滾事務(wù)
1.2 Consumer Ack
ack指Acknowledge,確認(rèn)。 表示消費(fèi)端收到消息后的確認(rèn)方式。
有三種確認(rèn)方式:
- 自動(dòng)確認(rèn):
acknowledge="none" - 手動(dòng)確認(rèn):
acknowledge="manual" - 根據(jù)異常情況確認(rèn):
acknowledge="auto",(這種方式使用麻煩,不作講解)
其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。
配置consumer的監(jiān)聽器
對(duì)于消費(fèi)者,配置application.yml為
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
rabbitmq:
host: asjunor.site
port: 5672
username: root
password: root
virtual-host: /example
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
我們?cè)谠瓉鞷abbitMQListener上修改
新建一個(gè)監(jiān)聽器ackListener
@Component
@RabbitListener(queues = "boot_queue")
public class AckListener {
@RabbitHandler
public void process(String hello,Channel channel, Message message) throws IOException, InterruptedException {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 接受轉(zhuǎn)換消息
System.out.println("ackListener收到的消息為:" + new String(message.getBody()));
// 2. 處理業(yè)務(wù)邏輯
System.out.println("處理業(yè)務(wù)邏輯");
int i =3/0;
// 3. 手動(dòng)簽收
channel.basicAck(deliveryTag,true);
}
catch (Exception e){
// 4. 拒絕簽收
/*
第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端
*/
channel.basicNack(deliveryTag,true,true);
// channel.basicReject(deliveryTag,true); 單條數(shù)據(jù)
}
//消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack返回false,并重新回到隊(duì)列,api里面解釋得很清楚
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
@RabbitListener 可以標(biāo)注在類上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給 @RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型
設(shè)置acknowledge屬性,設(shè)置ack方式 none:自動(dòng)確認(rèn),manual:手動(dòng)確認(rèn)
如果在消費(fèi)端沒有出現(xiàn)異常,則調(diào)用channel.basicAck(deliveryTag,false);方法確認(rèn)簽收消息,簽收成功,消息就被消費(fèi)了。
如果出現(xiàn)異常,則在catch中調(diào)用 basicNack或 basicReject,拒絕消息,讓MQ重新發(fā)送消息。
消息可靠性小結(jié)
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 生產(chǎn)方確認(rèn)Confirm
- 消費(fèi)方確認(rèn)Ack
- Broker高可用,后面集群會(huì)講到
1.3 消費(fèi)端限流
回顧一下這個(gè)圖,我們說過MQ有個(gè)很重要的作用就是削峰填谷

接下來就學(xué)習(xí)如何實(shí)現(xiàn)限流
新建一個(gè)QosListener,這時(shí)候要把之前的AckListener注釋掉,
修改application.yml
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
rabbitmq:
host: asjunor.site
port: 5672
username: root
password: root
virtual-host: /example
listener:
direct:
acknowledge-mode: manual
#每次限流1條消息
prefetch: 1
simple:
acknowledge-mode: manual
prefetch: 1
其中新增了perfetch = 1,表示消費(fèi)端每次從mq拉去一條消息來消費(fèi),直到手動(dòng)確認(rèn)消費(fèi)完畢后,才會(huì)繼續(xù)拉去下一條消息。
/**
* Consumer 限流機(jī)制
* 1. 確保ack機(jī)制為手動(dòng)確認(rèn)。
* 2. listener-container配置屬性
* perfetch = 1,表示消費(fèi)端每次從mq拉去一條消息來消費(fèi),直到手動(dòng)確認(rèn)消費(fèi)完畢后,才會(huì)繼續(xù)拉去下一條消息。
*/
@Component
@RabbitListener(queues = "boot_queue")
public class QosListener {
@RabbitHandler
public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
Thread.sleep(1000);
//1.獲取消息
System.out.println("Qos:"+new String(message.getBody()));
//2. 處理業(yè)務(wù)邏輯
//3. 簽收
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
在這里我們把最后一行 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);注釋掉
啟動(dòng)消費(fèi)者
發(fā)現(xiàn)只打印了一次

小結(jié)
配置 prefetch屬性設(shè)置消費(fèi)端一次拉取多少消息
消費(fèi)端的確認(rèn)模式一定為手動(dòng)確認(rèn)。acknowledge="manual"
1.4 TTL
TTL 全稱 Time To Live(存活時(shí)間/過期時(shí)間)。
當(dāng)消息到達(dá)存活時(shí)間后,還沒有被消費(fèi),會(huì)被自動(dòng)清除。
RabbitMQ可以對(duì)消息設(shè)置過期時(shí)間,也可以對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過期時(shí)間。
比如我們有個(gè)訂單系統(tǒng),下訂單時(shí)候如果30分鐘內(nèi)未被支付,那么這條消息就失效了。

控制臺(tái)測(cè)試
打開我們的控制臺(tái),新建隊(duì)列的時(shí)候可以看到最下面的arguments,我們先設(shè)置一個(gè)message ttl為10秒

再創(chuàng)建一個(gè)交換機(jī)

將交換機(jī)和隊(duì)列綁定

再在這個(gè)頁面下面手動(dòng)發(fā)消息

十秒鐘這個(gè)消息就會(huì)消失
編寫代碼測(cè)試
刪除在控制臺(tái)創(chuàng)建的交換機(jī)和對(duì)壘
producer端新增一個(gè)TTLConfig
@Configuration
public class TTLConfig {
public static final String EXCHANGE_NAME = "test_exchange_ttl";
public static final String QUEUE_NAME = "test_queue_ttl";
//1.交換機(jī)
@Bean("ttlExchange")
public Exchange ttlExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.Queue 隊(duì)列
@Bean("ttlQueue")
public Queue ttlQueue(){
return QueueBuilder.durable(QUEUE_NAME).ttl(20000).build();
}
//3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
/*
1. 知道哪個(gè)隊(duì)列
2. 知道哪個(gè)交換機(jī)
3. routing key
*/
@Bean
public Binding bindTTLQueueExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
}
}
之后運(yùn)行測(cè)試文件就能創(chuàng)建exchange和queue,其中創(chuàng)建隊(duì)列時(shí)候設(shè)置了參數(shù)ttl為20秒,指隊(duì)列的過期時(shí)間。
再編寫測(cè)試函數(shù)
/**
* TTL:過期時(shí)間
* 1. 隊(duì)列統(tǒng)一過期
*
* 2. 消息單獨(dú)過期
*
*
* 如果設(shè)置了消息的過期時(shí)間,也設(shè)置了隊(duì)列的過期時(shí)間,它以時(shí)間短的為準(zhǔn)。
* 隊(duì)列過期后,會(huì)將隊(duì)列所有消息全部移除。
* 消息過期后,只有消息在隊(duì)列頂端,才會(huì)判斷其是否過期(移除掉)
*
*/
@Test
public void testTTL() {
// 消息后處理對(duì)象,設(shè)置一些消息的參數(shù)信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
/**
* Change (or replace) the message.
*
* @param message the message.
* @return the message.
* @throws AmqpException an exception.
*/
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.設(shè)置message的信息
message.getMessageProperties().setExpiration("10000");//消息的過期時(shí)間
//2.返回該消息
return message;
}
};
//消息單獨(dú)過期
//rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
for (int i = 0; i < 10; i++) {
if(i == 5){
//消息單獨(dú)過期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}else{
//不過期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
}
在這里,其實(shí)消息不會(huì)看到它過期了,因?yàn)?不是在隊(duì)列的頭部,只有在隊(duì)列頭部的才會(huì)被移除掉。
小結(jié)
設(shè)置隊(duì)列過期時(shí)間使用參數(shù):x-message-ttl,單位:ms(毫秒),會(huì)對(duì)整個(gè)隊(duì)列消息統(tǒng)一過期。
設(shè)置消息過期時(shí)間使用參數(shù):expiration。單位:ms(毫秒),當(dāng)該消息在隊(duì)列頭部時(shí)(消費(fèi)時(shí)),會(huì)單獨(dú)判斷這一消息是否過期。
如果兩者都進(jìn)行了設(shè)置,以時(shí)間短的為準(zhǔn)。
1.5 死信隊(duì)列
死信隊(duì)列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機(jī)),當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。

在前面的ttl例子中,當(dāng)我們的消息過期后,會(huì)被丟棄,但如果這個(gè)隊(duì)列綁定了死信交換機(jī),則消息不會(huì)被丟棄,而是發(fā)送到死信交換機(jī),而死信交換機(jī)又可以綁定其他隊(duì)列,從而可以重新被消費(fèi)者消費(fèi)。
考慮兩個(gè)問題:
- 第一個(gè)隊(duì)列如何綁定死信交換機(jī)?
- 消息什么時(shí)候成為死信
消息成為死信的三種情況
隊(duì)列消息長(zhǎng)度到達(dá)限制;
消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊(duì)列,requeue=false;
原隊(duì)列存在消息過期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);
tip:
死信交換機(jī)和死信隊(duì)列和正常的交換機(jī)和隊(duì)列沒有任何區(qū)別
- 聲明正常的隊(duì)列(redirect_queue)和交換機(jī)(redirect_exchange)
- 聲明死信隊(duì)列(dlx_queue)和死信交換機(jī)(dlx_exchange)
- 正常隊(duì)列綁定死信交換機(jī)
設(shè)置兩個(gè)參數(shù):- x-dead-letter-exchange:死信交換機(jī)名稱
- x-dead-letter-routing-key:發(fā)送給死信交換機(jī)的routingkey
修改application.yml
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
rabbitmq:
host: asjunor.site
port: 5672
username: root
password: root
virtual-host: /example
#生產(chǎn)端配置
#開啟發(fā)送確認(rèn),此配置在Springboot2.3.0版本中已經(jīng)@Deprecated了,默認(rèn)就是
# publisher-confirms: true
#
publisher-confirm-type: simple
#開啟發(fā)送失敗退回
publisher-returns: true
#開啟執(zhí)行return回調(diào)
template:
mandatory: true
retry:
# 允許消息消費(fèi)失敗的重試
enabled: true
# 消息最多消費(fèi)次數(shù)3次
max-attempts: 3
# 消息多次消費(fèi)的間隔1秒
initial-interval: 1000
listener:
direct:
# 設(shè)置為false,會(huì)丟棄消息或者重新發(fā)布到死信隊(duì)列
default-requeue-rejected: false
編寫DeadLetterConfig
package org.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName DeadLetterConfig
* @Description TODO
* @Author Patrick Star
* @Date 2020/12/7 9:37 下午
*/
@Configuration
public class DeadLetterConfig {
public static final String DL_EXCHANGE = "dl_exchange";
public static final String DL_QUEUE = "dl_queue";
public static final String REDIRECT_QUEUE = "redirect_queue";
public static final String REDIRECT_EXCHANGE = "redirect_exchange";
public static final String DL_REDIRECT_ROUTING_KEY = "dlx.hehe";
/**
* 死信隊(duì)列跟交換機(jī)類型沒有關(guān)系 不一定為directExchange 不影響該類型交換機(jī)的特性.
*/
@Bean("dlExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.topicExchange(DL_EXCHANGE).durable(true).build();
}
@Bean("dlQueue")
public Queue deadLetterQueue() {
// 設(shè)置正常隊(duì)列的長(zhǎng)度限制和ttl
return QueueBuilder.durable(DL_QUEUE).build();
}
@Bean("redirectQueue")
public Queue redirectQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 聲明 死信隊(duì)列Exchange
args.put("x-dead-letter-exchange", DL_EXCHANGE);
// x-dead-letter-routing-key 聲明 死信隊(duì)列拋出異常重定向隊(duì)列的routingKey("dlx.hehe")
args.put("x-dead-letter-routing-key", DL_REDIRECT_ROUTING_KEY);
args.put("x-message-ttl", 10000);
args.put("x-max-length", 10);
return QueueBuilder.durable(REDIRECT_QUEUE).withArguments(args).build();
}
//1.交換機(jī)
@Bean("redirectExchange")
public Exchange redirectExchange() {
return ExchangeBuilder.topicExchange(REDIRECT_EXCHANGE).durable(true).build();
}
/**
* 死信隊(duì)列綁定到死信交換器上.
*
* @return the binding
*/
@Bean
public Binding dlxBinding(Queue dlQueue, Exchange dlExchange) {
return BindingBuilder
.bind(dlQueue)
.to(dlExchange)
.with("dlx.#")
.noargs();
}
/**
* 將重定向隊(duì)列通過routingKey(“dlx.hehe”)綁定到死信隊(duì)列的Exchange上
*
* @return the binding
*/
@Bean
public Binding redirectToDLBinding(@Qualifier("redirectQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(DL_REDIRECT_ROUTING_KEY)
.noargs();
}
/**
* 綁定正常的交換機(jī)和隊(duì)列
*
* @return the binding
*/
@Bean
public Binding redirectBinding() {
return BindingBuilder
.bind(redirectQueue())
.to(redirectExchange())
.with("test.dlx.#")
.noargs();
}
}
其中分別使用了幾種方法來綁定隊(duì)列,在Binding函數(shù)中
- 我們可以直接寫隊(duì)列的函數(shù)和交換機(jī)的函數(shù),如最后一個(gè)
- 我們可以用@Qualifier注解指定隊(duì)列和交換機(jī),如倒數(shù)第二個(gè)
- 我們可以在傳遞參數(shù)時(shí),將參數(shù)名字和定義的隊(duì)列和交換機(jī)匹配
對(duì)消息產(chǎn)生死信的三種情況進(jìn)行測(cè)試
/**
* 發(fā)送測(cè)試死信消息:
* 1. 過期時(shí)間
* 2. 長(zhǎng)度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
//1. 測(cè)試過期時(shí)間,死信消息
rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.hehe","我是一條消息,我會(huì)死嗎?");
//2. 測(cè)試長(zhǎng)度限制后,消息死信
/* for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.haha","我是一條消息,我會(huì)死嗎?");
}*/
//3. 測(cè)試消息拒收
// rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.haha","我是一條消息,我會(huì)死嗎?");
}
對(duì)第一種情況,消息過期后會(huì)自動(dòng)轉(zhuǎn)到死信隊(duì)列
對(duì)第二種情況,消息長(zhǎng)度超過了限制,超過的會(huì)自動(dòng)轉(zhuǎn)到死信隊(duì)列
對(duì)第三種情況,我們要編寫一個(gè)消費(fèi)者來監(jiān)聽正常的隊(duì)列,讓消息拒絕接收
新建一個(gè)DlxListener,注意,這里是監(jiān)聽我們的正常隊(duì)列,而不是死信隊(duì)列
@Component
@RabbitListener(queues = "redirect_queue")
public class DlxListener {
@RabbitHandler
public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 接受轉(zhuǎn)換消息
System.out.println("DlxListener收到的消息為:" + new String(message.getBody()));
// 2. 處理業(yè)務(wù)邏輯
System.out.println("處理業(yè)務(wù)邏輯");
int i = 3 / 0;
// 3. 手動(dòng)簽收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
/*
第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端
*/
System.out.println("出現(xiàn)異常,拒絕接收");
// 4. 拒絕簽收,不重回隊(duì)列 requeue = false
channel.basicNack(deliveryTag, true, false);
// channel.basicReject(deliveryTag,true); 單條數(shù)據(jù)
//消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack返回false,并重新回到隊(duì)列,api里面解釋得很清楚
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
之后運(yùn)行Consumer,將test的第三個(gè)注釋打開,進(jìn)行測(cè)試,控制臺(tái)打印

小結(jié)
死信交換機(jī)和死信隊(duì)列和普通的沒有區(qū)別
當(dāng)消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列
-
消息成為死信的三種情況:
- 隊(duì)列消息長(zhǎng)度到達(dá)限制;
- 消費(fèi)者拒接消費(fèi)消息,并且不重回隊(duì)列;
- 原隊(duì)列存在消息過期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);
1.6 延遲隊(duì)列
延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間后,才會(huì)被消費(fèi)。
應(yīng)用場(chǎng)景有:
比如用戶下單后,30分鐘未支付,取消訂單,回滾庫存。再比如新用戶注冊(cè)成功7天后,發(fā)送短信問候。
實(shí)現(xiàn)方式:
1. 定時(shí)器
2. 延遲隊(duì)列

但是很可惜,在RabbitMQ中并未提供延遲隊(duì)列功能。
但是可以使用:TTL+死信隊(duì)列 組合實(shí)現(xiàn)延遲隊(duì)列的效果

如何實(shí)現(xiàn)呢,加入我們給訂單設(shè)定30分鐘的支付時(shí)間,訂單系統(tǒng)一開始將消息發(fā)送到正常隊(duì)列,30分鐘后轉(zhuǎn)發(fā)到死信隊(duì)列,有一個(gè)專門的庫存系統(tǒng)保存去獲取這條消息,來判斷訂單是支付了還是未支付。
具體步驟
- 定義正常的交換機(jī)( order_exchange )和隊(duì)列( order_queue )
- 定義死信交換機(jī)及和隊(duì)列
- 綁定設(shè)置正常隊(duì)列過期時(shí)間為10秒鐘,測(cè)試的時(shí)候30分鐘太久了
新建一個(gè)OrderCondig,我們直接復(fù)制并修改之前的死信隊(duì)列和死信交換機(jī),將order_queue過期時(shí)間設(shè)置為30分鐘,將正常交換機(jī)到DLX的routing key改為order.dlx.cancel,將DLX和死信隊(duì)列的routing key改為order.dlx.#,正常交換機(jī)和正常隊(duì)列的routing key 就為order.#
package org.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName DeadLetterConfig
* @Description TODO
* @Author Patrick Star
* @Date 2020/12/7 9:37 下午
*/
@Configuration
public class OrderConfig {
public static final String ORDER_DL_EXCHANGE = "order_dl_exchange";
public static final String ORDER_DL_QUEUE = "order_dl_queue";
public static final String ORDER_QUEUE = "order_queue";
public static final String ORDER_EXCHANGE = "order_exchange";
public static final String ORDER_DL_ORDER_ROUTING_KEY = "order.dlx.cancel";
public static final String ORDER_ROUTING_KEY = "order.#";
public static final String DLX_ROUTING_KEY = "order.dlx.#";
/**
* 死信隊(duì)列跟交換機(jī)類型沒有關(guān)系 不一定為directExchange 不影響該類型交換機(jī)的特性.
*/
@Bean("orderDLExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.topicExchange(ORDER_DL_EXCHANGE).durable(true).build();
}
@Bean("orderDLQueue")
public Queue deadLetterQueue() {
return QueueBuilder.durable(ORDER_DL_QUEUE).build();
}
// order 隊(duì)列
@Bean("orderQueue")
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 聲明 死信隊(duì)列Exchange
args.put("x-dead-letter-exchange", ORDER_DL_EXCHANGE);
// x-dead-letter-routing-key 聲明 死信隊(duì)列拋出異常重定向隊(duì)列的routingKey("order.dlx.cancel")
args.put("x-dead-letter-routing-key", ORDER_DL_ORDER_ROUTING_KEY);
args.put("x-message-ttl", 10000);
args.put("x-max-length", 30);
return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
}
// order交換機(jī)
@Bean("orderExchange")
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();
}
/**
* 死信隊(duì)列綁定到死信交換器上.
*
* @return the binding
*/
@Bean
public Binding newDLBinding(Queue orderDLQueue, Exchange orderDLExchange) {
return BindingBuilder
.bind(orderDLQueue)
.to(orderDLExchange)
.with(DLX_ROUTING_KEY)
.noargs();
}
/**
* 將重定向隊(duì)列通過routingKey(“order.dlx.cancel”)綁定到死信隊(duì)列的Exchange上
*
* @return the binding
*/
@Bean
public Binding orderToDLBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderDLExchange") Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ORDER_DL_ORDER_ROUTING_KEY)
.noargs();
}
/**
* 綁定正常的交換機(jī)和隊(duì)列
*
* @return the binding
*/
@Bean
public Binding orderBinding() {
return BindingBuilder
.bind(orderQueue())
.to(orderExchange())
.with(ORDER_ROUTING_KEY)
.noargs();
}
}
Consumer端新建一個(gè)orderListener,一定要監(jiān)聽死信隊(duì)列
package org.example.rabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName AckListener
* @Description TODO
* @Author Patrick Star
* @Date 2020/12/7 5:50 下午
*/
@Component
@RabbitListener(queues = "order_dl_queue")
public class OrderListener {
@RabbitHandler
public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 1. 接受轉(zhuǎn)換消息
System.out.println("orderListener收到的消息為:" + new String(message.getBody()));
//2. 處理業(yè)務(wù)邏輯
System.out.println("處理業(yè)務(wù)邏輯...");
System.out.println("根據(jù)訂單id查詢其狀態(tài)...");
System.out.println("判斷狀態(tài)是否為支付成功");
System.out.println("取消訂單,回滾庫存....");
// 3. 手動(dòng)簽收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 4. 拒絕簽收
/*
第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端
*/
channel.basicNack(deliveryTag, true, true);
// channel.basicReject(deliveryTag,true); 單條數(shù)據(jù)
//消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack返回false,并重新回到隊(duì)列,api里面解釋得很清楚
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
我們主要看消費(fèi)者是不是延遲10秒后收到消息,在消費(fèi)者控制臺(tái)打印消息。
延遲隊(duì)列小結(jié)
- 延遲隊(duì)列 指消息進(jìn)入隊(duì)列后,可以被延遲一定時(shí)間,再進(jìn)行消費(fèi)。
- RabbitMQ沒有提供延遲隊(duì)列功能,但是可以使用 : TTL + DLX 來實(shí)現(xiàn)延遲隊(duì)列效果
1.7 日志與監(jiān)控
我的rabbitmq運(yùn)行在linux服務(wù)器上的一個(gè)docker容器中,我們可以進(jìn)入該容器查看日志信息,對(duì)于不是docker運(yùn)行的服務(wù),就直接在服務(wù)器上查找。
RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
我們使用docker ps查找我們運(yùn)行的rabbitmq的容器id
再使用
docker logs 47b
查看docker的輸出日志,其中47b是我rabbitmq容器id的前幾個(gè)字母,里面就可以看到我們的rabbitmq的日志了。
進(jìn)入容器內(nèi),使用命令rabbitmqctl status可以看到我們的Log文件是輸出到了輸出流,而沒有使用文件保存,如果需要查看log文件,參考下面的解決方法
https://blog.csdn.net/fvdfsdafdsafs/article/details/110097643

也可以打開web控制臺(tái),也可以看到很多的的參數(shù)信息

點(diǎn)擊name,可以看到一些負(fù)載參數(shù)

如果綠色的接近紅色,就應(yīng)該注意了。
也可以通過rabbitmq的控制命令查看
查看隊(duì)列
rabbitmqctl list_queues
查看exchanges
rabbitmqctl list_exchanges
查看用戶
rabbitmqctl list_users
查看連接
rabbitmqctl list_connections
查看消費(fèi)者信息
rabbitmqctl list_consumers
查看環(huán)境變量
rabbitmqctl environment
查看未被確認(rèn)的隊(duì)列
rabbitmqctl list_queues name messages_unacknowledged
查看單個(gè)隊(duì)列的內(nèi)存使用
rabbitmqctl list_queues name memory
查看準(zhǔn)備就緒的隊(duì)列
rabbitmqctl list_queues name messages_ready
1.8 消息追蹤
在使用任何消息中間件的過程中,難免會(huì)出現(xiàn)某條消息異常丟失的情況。對(duì)于RabbitMQ而言,可能是因?yàn)樯a(chǎn)者或消費(fèi)者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機(jī)制;也有可能是因?yàn)榻粨Q器與隊(duì)列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒有與任何隊(duì)列進(jìn)行綁定,生產(chǎn)者又不感知或者沒有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個(gè)時(shí)候就需要有一個(gè)較好的機(jī)制跟蹤記錄消息的投遞過程,以此協(xié)助開發(fā)和運(yùn)維人員進(jìn)行問題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實(shí)現(xiàn)消息追蹤。
firehose的機(jī)制是將生產(chǎn)者投遞給rabbitmq的消息,rabbitmq投遞給消費(fèi)者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個(gè)默認(rèn)的exchange的名稱為amq.rabbitmq.trace,它是一個(gè)topic類型的exchange。發(fā)送到這個(gè)exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實(shí)際exchange和queue的名稱,分別對(duì)應(yīng)生產(chǎn)者投遞到exchange的消息,和消費(fèi)者從queue上獲取的消息。
如何使用呢,我們可以將一個(gè)隊(duì)列綁定到默認(rèn)交換機(jī),routing key 就為test_trace好了,然后往這個(gè)隊(duì)列發(fā)消息,默認(rèn)交換機(jī)會(huì)把消息轉(zhuǎn)發(fā)到隊(duì)列,同時(shí),隊(duì)列還收到了兩條消息,是trace交換機(jī)發(fā)的詳細(xì)的日志消息。
注意:打開 trace 會(huì)影響消息寫入功能,適當(dāng)打開后請(qǐng)關(guān)閉。
rabbitmqctl trace_on:開啟Firehose命令
rabbitmqctl trace_off:關(guān)閉Firehose命令
rabbitmq_tracing和Firehose在實(shí)現(xiàn)上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。
我們要做的就是啟用插件:rabbitmq-plugins enable rabbitmq_tracing
首先進(jìn)入 docker 容器內(nèi)部,執(zhí)行rabbitmq-plugins list命令
root@47b96c4e50ef:/# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@47b96c4e50ef
|/
[ ] rabbitmq_amqp1_0 3.8.9
[ ] rabbitmq_auth_backend_cache 3.8.9
[ ] rabbitmq_auth_backend_http 3.8.9
[ ] rabbitmq_auth_backend_ldap 3.8.9
[ ] rabbitmq_auth_backend_oauth2 3.8.9
[ ] rabbitmq_auth_mechanism_ssl 3.8.9
[ ] rabbitmq_consistent_hash_exchange 3.8.9
[ ] rabbitmq_event_exchange 3.8.9
[ ] rabbitmq_federation 3.8.9
[ ] rabbitmq_federation_management 3.8.9
[ ] rabbitmq_jms_topic_exchange 3.8.9
[E*] rabbitmq_management 3.8.9
[e*] rabbitmq_management_agent 3.8.9
[ ] rabbitmq_mqtt 3.8.9
[ ] rabbitmq_peer_discovery_aws 3.8.9
[ ] rabbitmq_peer_discovery_common 3.8.9
[ ] rabbitmq_peer_discovery_consul 3.8.9
[ ] rabbitmq_peer_discovery_etcd 3.8.9
[ ] rabbitmq_peer_discovery_k8s 3.8.9
[E*] rabbitmq_prometheus 3.8.9
[ ] rabbitmq_random_exchange 3.8.9
[ ] rabbitmq_recent_history_exchange 3.8.9
[ ] rabbitmq_sharding 3.8.9
[ ] rabbitmq_shovel 3.8.9
[ ] rabbitmq_shovel_management 3.8.9
[ ] rabbitmq_stomp 3.8.9
[ ] rabbitmq_top 3.8.9
[ ] rabbitmq_tracing 3.8.9
[ ] rabbitmq_trust_store 3.8.9
[e*] rabbitmq_web_dispatch 3.8.9
[ ] rabbitmq_web_mqtt 3.8.9
[ ] rabbitmq_web_mqtt_examples 3.8.9
[ ] rabbitmq_web_stomp 3.8.9
[ ] rabbitmq_web_stomp_examples 3.8.9
查看我們啟用的插件,帶*的就是啟用了
然后輸入
rabbitmq-plugins enable rabbitmq_tracing
然后在web控制臺(tái),刷新一下,點(diǎn)擊admin標(biāo)簽,可以看到右邊多了一個(gè)tracing標(biāo)簽

可以在這里添加新的tracing

Virtual host:虛擬主機(jī)名
Name :tracing 的名稱,將來可以在這個(gè)tracing中記錄很多的日志信息
Format:日志信息的格式,一種是text,一種是json,text是給我們程序員看的,是明文的,json便于計(jì)算機(jī)解析,經(jīng)過base64編碼了
Max payload bytes:不填就把所有的消息體記錄起來,填個(gè)10就表示取前10個(gè)字節(jié)
Pattern:#表示的接收所有的消息,不管是發(fā)過來的還是消費(fèi)的都可,如果只想接受發(fā)過來的,就填publish.#,如果想接收消費(fèi)的,就填deliver.#

添加完后上邊就多了一些東西,點(diǎn)進(jìn)去右邊的mytrace.log現(xiàn)在是啥都沒有,我們往里面發(fā)一些消息,就可以看到一些日志信息了。

點(diǎn)開隊(duì)列列表可以看到多了一個(gè)隊(duì)列記錄我們的日志消息

這個(gè)隊(duì)列綁定的是amq.rabbitmq.trace交換機(jī)

2. RabbitMQ應(yīng)用問題
2.1 消息可靠性保障
我們想要消息100%發(fā)送成功似乎是不可能的,但是我們最起碼可以保證消息99.9%能發(fā)送成功吧。其中就用到了消息補(bǔ)償。
來看一張圖,其中producer 和 consumer都有自己對(duì)應(yīng)的數(shù)據(jù)庫,正常情況下,producer將業(yè)務(wù)數(shù)據(jù)入庫,發(fā)送消息到Q1,consumer監(jiān)聽Q1,接收消息去消費(fèi),完成相應(yīng)的DB操作。

考慮不正常的情況,producer操作數(shù)據(jù)庫成功了,但是第2步發(fā)送消息失敗了,這樣consumer收不到消息,業(yè)務(wù)操作也會(huì)失敗,這時(shí)候怎么辦呢,producer發(fā)送消息完成之后,到第3步,延遲發(fā)送消息到Q3,也就是說,發(fā)送一條消息之后等待一段時(shí)間再發(fā)一條消息到Q3,這兩個(gè)消息一模一樣,如果消息到Q1發(fā)送成功,consumer消費(fèi)成功后要向Q2發(fā)送確認(rèn)消息,相當(dāng)于consumer轉(zhuǎn)換了一次角色,變成了生產(chǎn)端,我們有個(gè)回調(diào)檢查服務(wù),監(jiān)聽了Q2的確認(rèn)消息,收到確認(rèn)消息,將消息寫入消息數(shù)據(jù)庫中。而回調(diào)檢查服務(wù)也監(jiān)聽著Q3,Q1、Q2、Q3中的消息ID是一樣的,收到Q3的消息后,回調(diào)檢查服務(wù)要去比對(duì)當(dāng)前這條消息是否和剛才寫入MDB的消息是否一致,檢查是否被消費(fèi)過,沒有被消費(fèi)過得話,MDB一定不存在記錄,則轉(zhuǎn)到第8步,producer重新發(fā)送消息。最后還有個(gè)問題,如果第2步第3步都失敗了呢,我們還有個(gè)最后的保障,也就是定時(shí)檢查服務(wù),檢查業(yè)務(wù)數(shù)據(jù)庫DB和消息MDB是否能匹配,檢查DB是不是比MDB的數(shù)據(jù)多了,或者匹配不上了,再去調(diào)用producer,重發(fā)那些多的消息。
2.2 消息冪等性保障
冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說,其任意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
在MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。
舉個(gè)栗子,我花了500買衣服,所以服務(wù)器會(huì)發(fā)消息到MQ去扣款,下訂單付款的時(shí)候,可能由于網(wǎng)絡(luò)的原因,不管是什么原因,總之我發(fā)了兩條扣款500的消息,總不能扣我1000塊吧,RabbitMQ采取了用數(shù)據(jù)庫樂觀鎖的機(jī)制來保障消息的冪等性。

這里的樂觀鎖,就是給消息加了個(gè)版本號(hào),由上圖的例子,consumer第一次執(zhí)行消息寫入數(shù)據(jù)庫,version是1,樂觀鎖會(huì)將id與version綁定,且version取出來加1,下一條消息來了的時(shí)候,consumer想寫數(shù)據(jù)庫的時(shí)候判斷條件 id = 1 and version = 1便不成立了。
冪等性其實(shí)有很多的保障機(jī)制,這里只介紹了樂觀鎖的機(jī)制。