個人專題目錄
1. RabbitMQ 高級特性
1.1 消息可靠性投遞
在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
confirm 確認(rèn)模式
return 退回模式
rabbitmq 整個消息投遞的路徑為:
? producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer
消息從 producer 到 exchange 則會返回一個 confirmCallback 。
消息從 exchange 到 queue 投遞失敗則會返回一個 returnCallback 。
我們將利用這兩個 callback 控制消息的可靠性投遞
confirm確認(rèn)模式代碼實現(xiàn)
創(chuàng)建maven工程,消息的生產(chǎn)者工程,項目模塊名稱:rabbitmq-producer-spring
-
添加依賴
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> -
在 resources 目錄下創(chuàng)建 rabbitmq.properties 配置文件,添加鏈接RabbitMQ相關(guān)信息
rabbitmq.host=172.16.98.133 rabbitmq.port=5672 rabbitmq.username=guest rabbitmq.password=guest rabbitmq.virtual-host=/ -
在 resources 目錄下創(chuàng)建 spring-rabbitmq-producer.xml 配置文件,添加以下配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加載配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定義rabbitmq connectionFactory 1. 設(shè)置 publisher-confirms="true" --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" /> <!--定義管理交換機、隊列--> <rabbit:admin connection-factory="connectionFactory"/> <!--定義rabbitTemplate對象操作可以在代碼中方便發(fā)送消息--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> <!--2. 消息可靠性投遞(生產(chǎn)端)--> <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue> <rabbit:direct-exchange name="test_exchange_confirm"> <rabbit:bindings> <rabbit:binding queue="test_queue_confirm" key="confirm"> </rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> </beans> -
編寫測試代碼
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /** * 確認(rèn)模式: * 步驟: * 1. 確認(rèn)模式開啟:ConnectionFactory中開啟publisher-confirms="true" * 2. 在rabbitTemplate定義ConfirmCallBack回調(diào)函數(shù) */ @Test public void testConfirm() { //2. 定義回調(diào) ** rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 相關(guān)配置信息 * @param ack exchange交換機 是否成功收到了消息。true 成功,false代表失敗 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被執(zhí)行了...."); if (ack) { //接收成功 System.out.println("接收成功消息" + cause); } else { //接收失敗 System.out.println("接收失敗消息" + cause); //做一些處理,讓消息再次發(fā)送。 } } }); //3. 發(fā)送消息 rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm...."); } } -
測試結(jié)果
1569161424745.png
return退回模式代碼實現(xiàn)
回退模式: 當(dāng)消息發(fā)送給Exchange后,Exchange路由到Queue失敗是 才會執(zhí)行 ReturnCallBack,具體實現(xiàn)如下:
-
在 spring-rabbitmq-producer.xml 配置文件,在 rabbit:connection-factory節(jié)點 添加配置:
publisher-returns="true" -
編寫測試方法
/** * 步驟: * 1. 開啟回退模式:publisher-returns="true" * 2. 設(shè)置ReturnCallBack * 3. 設(shè)置Exchange處理消息的模式: * 1. 如果消息沒有路由到Queue,則丟棄消息(默認(rèn)) * 2. 如果消息沒有路由到Queue,返回給消息發(fā)送方ReturnCallBack */ @Test public void testReturn() { //設(shè)置交換機處理失敗消息的模式 rabbitTemplate.setMandatory(true); //2.設(shè)置ReturnCallBack rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * * @param message 消息對象 * @param replyCode 錯誤碼 * @param replyText 錯誤信息 * @param exchange 交換機 * @param routingKey 路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 執(zhí)行了...."); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); //處理 } }); //3. 發(fā)送消息 rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm...."); }設(shè)置 routingKey 為一個不符合規(guī)則的key,觀察控制臺打印結(jié)果。
小結(jié)
對于確認(rèn)模式:
設(shè)置ConnectionFactory的publisher-confirms="true" 開啟 確認(rèn)模式。
使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
對于退回模式
設(shè)置ConnectionFactory的publisher-returns="true" 開啟 退回模式。
使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當(dāng)消息從exchange路由到queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。
在RabbitMQ中也提供了事務(wù)機制,但是性能較差,此處不做講解。
使用channel列方法,完成事務(wù)控制:
txSelect(), 用于將當(dāng)前channel設(shè)置成transaction模式
txCommit(),用于提交事務(wù)
txRollback(),用于回滾事務(wù)
1.2 Consumer ACK
ack指 Acknowledge,確認(rèn)。 表示消費端收到消息后的確認(rèn)方式。
有三種確認(rèn)方式:
? 自動確認(rèn):acknowledge="none"
? 手動確認(rèn):acknowledge="manual"
? 根據(jù)異常情況確認(rèn):acknowledge="auto",(這種方式使用麻煩)
其中自動確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會丟失。
如果設(shè)置了手動確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動重新發(fā)送消息。
代碼實現(xiàn)
創(chuàng)建maven工程,消息的消費者工程,項目模塊名稱:rabbitmq-consumer-spring
-
添加依賴
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> -
在 resources 目錄下創(chuàng)建 rabbitmq.properties 配置文件,添加鏈接RabbitMQ相關(guān)信息
rabbitmq.host=172.16.98.133 rabbitmq.port=5672 rabbitmq.username=guest rabbitmq.password=guest rabbitmq.virtual-host=/ -
在 resources 目錄下創(chuàng)建 spring-rabbitmq-consumer.xml 配置文件,添加以下配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--加載配置文件--> <context:property-placeholder location="classpath:rabbitmq.properties"/> <!-- 定義rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <context:component-scan base-package="com.itheima.listener" /> <!--定義監(jiān)聽器容器 添加 acknowledge="manual" 手動--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" > <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"> </rabbit:listener> </rabbit:listener-container> </beans> -
編寫ackListener 監(jiān)聽類實現(xiàn)ChannelAwareMessageListener接口
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * Consumer ACK機制: * 1. 設(shè)置手動簽收。acknowledge="manual" * 2. 讓監(jiān)聽器類實現(xiàn)ChannelAwareMessageListener接口 * 3. 如果消息成功處理,則調(diào)用channel的 basicAck()簽收 * 4. 如果消息處理失敗,則調(diào)用channel的basicNack()拒絕簽收,broker重新發(fā)送給consumer */ @Component public class AckListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1.接收轉(zhuǎn)換消息 System.out.println(new String(message.getBody())); //2. 處理業(yè)務(wù)邏輯 System.out.println("處理業(yè)務(wù)邏輯..."); int i = 3/0;//出現(xiàn)錯誤 //3. 手動簽收 channel.basicAck(deliveryTag,true); } catch (Exception e) { //e.printStackTrace(); //4.拒絕簽收 /* 第三個參數(shù):requeue:重回隊列。如果設(shè)置為true,則消息重新回到queue,broker會重新發(fā)送該消息給消費端 */ channel.basicNack(deliveryTag,true,true); // 了解 //channel.basicReject(deliveryTag,true); } } } -
編寫測試類,啟動容器監(jiān)聽MQ隊列
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test(){ while (true){ } } }
小結(jié)
在rabbit:listener-container標(biāo)簽中設(shè)置acknowledge屬性,設(shè)置ack方式 none:自動確認(rèn),manual:手動確認(rèn)
如果在消費端沒有出現(xiàn)異常,則調(diào)用channel.basicAck(deliveryTag,false);方法確認(rèn)簽收消息
如果出現(xiàn)異常,則在catch中調(diào)用 basicNack或 basicReject,拒絕消息,讓MQ重新發(fā)送消息。
如何保證消息的高可靠性傳輸?
1.持久化
? exchange要持久化
? queue要持久化
? message要持久化
2.生產(chǎn)方確認(rèn)Confirm
3.消費方確認(rèn)Ack
4.Broker高可用
1.3 消費端限流

如上圖所示:如果在A系統(tǒng)中需要維護相關(guān)的業(yè)務(wù)功能,可能需要將A系統(tǒng)的服務(wù)停止,那么這個時候消息的生產(chǎn)者還是一直會向MQ中發(fā)送待處理的消息,消費者此時服務(wù)已經(jīng)關(guān)閉,導(dǎo)致大量的消息都會在MQ中累積。如果當(dāng)A系統(tǒng)成功啟動后,默認(rèn)情況下消息的消費者會一次性將MQ中累積的大量的消息全部拉取到自己的服務(wù),導(dǎo)致服務(wù)在短時間內(nèi)會處理大量的業(yè)務(wù),可能會導(dǎo)致系統(tǒng)服務(wù)的崩潰。 所以消費端限流是非常有必要的。
可以通過MQ中的 listener-container 配置屬性
perfetch = 1,表示消費端每次從mq拉去一條消息來消費,直到手動確認(rèn)消費完畢后,才會繼續(xù)拉去下一條消息。
代碼實現(xiàn)
-
編寫 QosListener 監(jiān)聽類,保證當(dāng)前的監(jiān)聽類消息處理機制是 ACK 為手動方式
@Component public class QosListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(1000); //1.獲取消息 System.out.println(new String(message.getBody())); //2. 處理業(yè)務(wù)邏輯 //3. 簽收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } } -
在配置文件的 listener-container 配置屬性中添加配置
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >配置說明:
perfetch = 1,表示消費端每次從mq拉去一條消息來消費,直到手動確認(rèn)消費完畢后,才會繼續(xù)拉去下一條消息。
小結(jié)
在<rabbit:listener-container> 中配置 prefetch屬性設(shè)置消費端一次拉取多少消息
消費端的確認(rèn)模式一定為手動確認(rèn)。acknowledge="manual"
1.4 TTL
TTL 全稱 Time To Live(存活時間/過期時間)。當(dāng)消息到達存活時間后,還沒有被消費,會被自動清除。
RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。

可以在RabbitMQ管理控制臺設(shè)置過期時間.
代碼實現(xiàn)
設(shè)置隊列的過期時間
-
在消息的生產(chǎn)方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
<!--ttl--> <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"> <!--設(shè)置queue的參數(shù)--> <rabbit:queue-arguments> <!--x-message-ttl指隊列的過期時間--> <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="test_exchange_ttl" > <rabbit:bindings> <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> -
編寫發(fā)送消息測試方法
@Test public void testTtl() { for (int i = 0; i < 10; i++) { // 發(fā)送消息 rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl...."); } }測試結(jié)果:當(dāng)消息發(fā)送成功后,過10s后在RabbitMQ的管理控制臺會看到消息會自動刪除。
設(shè)置單個消息的過期時間
編寫代碼測試,并且設(shè)置隊列的過期時間為100s, 單個消息的過期時間為5s
@Test
public void testTtl() {
// 消息后處理對象,設(shè)置一些消息的參數(shù)信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.設(shè)置message的信息
message.getMessageProperties().setExpiration("5000");//消息的過期時間
//2.返回該消息
return message;
}
};
//消息單獨過期
rabbitTemplate.convertAndSend("test_exchange_ttl",
"ttl.hehe", "message ttl....",messagePostProcessor);
for (int i = 0; i < 10; i++) {
if(i == 5){
//消息單獨過期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
}else{
//不過期的消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
}
}
如果設(shè)置了消息的過期時間,也設(shè)置了隊列的過期時間,它以時間短的為準(zhǔn)。
- 隊列過期后,會將隊列所有消息全部移除。
- 消息過期后,只有消息在隊列頂端,才會判斷其是否過期(移除掉)
小結(jié)
設(shè)置隊列過期時間使用參數(shù):x-message-ttl,單位:ms(毫秒),會對整個隊列消息統(tǒng)一過期。
設(shè)置消息過期時間使用參數(shù):expiration。單位:ms(毫秒),當(dāng)該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。
如果兩者都進行了設(shè)置,以時間短的為準(zhǔn)。
1.5 死信隊列
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。

消息成為死信的三種情況:
隊列消息長度到達限制;
消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊列,requeue=false;
原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;
隊列綁定死信交換機:
給隊列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key

代碼實現(xiàn)
-
在消息的生產(chǎn)方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
-
聲明正常的隊列(test_queue_dlx)和交換機(test_exchange_dlx)
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> </rabbit:queue> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> -
聲明死信隊列(queue_dlx)和死信交換機(exchange_dlx)
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> -
正常隊列綁定死信交換機,并設(shè)置相關(guān)參數(shù)信息
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> <!--3. 正常隊列綁定死信交換機--> <rabbit:queue-arguments> <!--3.1 x-dead-letter-exchange:死信交換機名稱--> <entry key="x-dead-letter-exchange" value="exchange_dlx" /> <!--3.2 x-dead-letter-routing-key:發(fā)送給死信交換機的routingkey--> <entry key="x-dead-letter-routing-key" value="dlx.hehe" /> <!--4.1 設(shè)置隊列的過期時間 ttl--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> <!--4.2 設(shè)置隊列的長度限制 max-length --> <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange>
-
-
編寫測試方法
/** * 發(fā)送測試死信消息: * 1. 過期時間 * 2. 長度限制 * 3. 消息拒收 */ @Test public void testDlx(){ //1. 測試過期時間,死信消息 rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha","我是一條消息,我會死嗎?"); //2. 測試長度限制后,消息死信 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha","我是一條消息,我會死嗎?"); } //3. 測試消息拒收 rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha","我是一條消息,我會死嗎?"); }
小結(jié)
死信交換機和死信隊列和普通的沒有區(qū)別
當(dāng)消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
-
消息成為死信的三種情況:
隊列消息長度到達限制;
消費者拒接消費消息,并且不重回隊列;
原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;
1.6 延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
提出需求:
下單后,30分鐘未支付,取消訂單,回滾庫存。
新用戶注冊成功7天后,發(fā)送短信問候。
實現(xiàn)方式:
定時器
延遲隊列

注意:在RabbitMQ中并未提供延遲隊列功能。
但是可以使用:TTL+死信隊列 組合實現(xiàn)延遲隊列的效果。

代碼實現(xiàn)
-
在消息的生產(chǎn)方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
<!-- 1. 定義正常交換機(order_exchange)和隊列(order_queue)--> <rabbit:queue id="order_queue" name="order_queue"> <!-- 3. 綁定,設(shè)置正常隊列過期時間為30分鐘--> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="order_exchange_dlx" /> <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" /> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> </rabbit:queue-arguments> </rabbit:queue> <rabbit:topic-exchange name="order_exchange"> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!-- 2. 定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)--> <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue> <rabbit:topic-exchange name="order_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> -
編寫測試方法
@Test public void testDelay() throws InterruptedException { //1.發(fā)送訂單消息。 將來是在訂單系統(tǒng)中,下單成功后,發(fā)送消息 rabbitTemplate.convertAndSend("order_exchange", "order.msg","訂單信息:id=1,time=2019年"); //2.打印倒計時10秒 for (int i = 10; i > 0 ; i--) { System.out.println(i+"..."); Thread.sleep(1000); } }
小結(jié)
延遲隊列 指消息進入隊列后,可以被延遲一定時間,再進行消費。
RabbitMQ沒有提供延遲隊列功能,但是可以使用 : TTL + DLX 來實現(xiàn)延遲隊列效果。
1.7 日志與監(jiān)控
RabbitMQ日志
RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
RabbitMQ 日志所在的目錄:
RabbitMQ日志詳細信息:
日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務(wù)節(jié)點名稱、cookie的hash值、RabbitMQ配置文件地址、內(nèi)存限制、磁盤限制、默認(rèn)賬戶guest的創(chuàng)建以及權(quán)限配置等等。
web管控臺監(jiān)控
直接訪問當(dāng)前的IP:15672,輸入用戶名和密碼(默認(rèn)是 guest),就可以查看RabbitMQ的管理控制臺。當(dāng)然也可通過命令的形式來查看。如下:
查看隊列:rabbitmqctl list_queues
查看用戶: rabbitmqctl list_users
查看連接:rabbitmqctl list_connections
其它相關(guān)命令(了解):
查看exchanges:rabbitmqctl list_exchanges
查看消費者信息:rabbitmqctl list_consumers
查看環(huán)境變量:rabbitmqctl environment
查看未被確認(rèn)的隊列:rabbitmqctl list_queues name messages_unacknowledged
查看單個隊列的內(nèi)存使用:rabbitmqctl list_queues name memory
查看準(zhǔn)備就緒的隊列:rabbitmqctl list_queues name messages_ready
1.8 消息追蹤
在使用任何消息中間件的過程中,難免會出現(xiàn)某條消息異常丟失的情況。對于RabbitMQ而言,可能是因為生產(chǎn)者或消費者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機制;也有可能是因為交換器與隊列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒有與任何隊列進行綁定,生產(chǎn)者又不感知或者沒有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個時候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協(xié)助開發(fā)和運維人員進行問題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現(xiàn)消息追蹤。
消息追蹤-Firehose
firehose的機制是將生產(chǎn)者投遞給rabbitmq的消息,rabbitmq投遞給消費者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個默認(rèn)的exchange的名稱為 amq.rabbitmq.trace,它是一個topic類型的exchange。發(fā)送到這個exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實際exchange和queue的名稱,分別對應(yīng)生產(chǎn)者投遞到exchange的消息,和消費者從queue上獲取的消息。
注意:打開 trace 會影響消息寫入功能,適當(dāng)打開后請關(guān)閉。
rabbitmqctl trace_on:開啟Firehose命令
消息追蹤驗證:
創(chuàng)建一個隊列 test_trace,并將當(dāng)前的隊列綁定到 amq.rabbitmq.trace 交換機上,設(shè)置RoutingKey為:#
-
未開啟消息追蹤之前,我們發(fā)送一個消息
當(dāng)前消息發(fā)送成功后,在控制臺我們可以看到當(dāng)前消息的具體信息
設(shè)置開啟消息追蹤,在發(fā)送一條消息
-
rabbitmqctl trace_on
1569202059745.png
我們發(fā)現(xiàn)當(dāng)前消息也正常存在,并且開啟消息追蹤后,會多出一條消息是 amq.rabbitmq.trace 交換機發(fā)給當(dāng)前隊列的消息,消息中的內(nèi)容是比較完整的。
建議:在開發(fā)階段我們可以開啟消息追蹤,在實際生產(chǎn)環(huán)境建議將其關(guān)閉
rabbitmqctl trace_off:關(guān)閉Firehose命令
消息追蹤-rabbitmq_tracing
rabbitmq_tracing和Firehose在實現(xiàn)上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。
啟用插件:rabbitmq-plugins enable rabbitmq_tracing
發(fā)送消息成功后,我們點擊日志文件,要求輸入RabbitMQ的登錄用戶名和密碼。
建議:在開發(fā)階段我們可以開啟消息追蹤插件,在實際生產(chǎn)環(huán)境不建議建議開啟,除非是非常特殊的業(yè)務(wù)場景,大家根據(jù)實際情況選擇開啟即可。

