RabbitMQ之八高級特性

個人專題目錄


1. RabbitMQ 高級特性

1.1 消息可靠性投遞

在使用 RabbitMQ 的時候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。

  • confirm 確認(rèn)模式

  • return 退回模式

rabbitmq 整個消息投遞的路徑為:

? producer ---> rabbitmq broker ---> exchange ---> queue ---> consumer

  • 消息從 producer 到 exchange 則會返回一個 confirmCallback 。

  • 消息從 exchange 到 queue 投遞失敗則會返回一個 returnCallback 。

我們將利用這兩個 callback 控制消息的可靠性投遞

confirm確認(rèn)模式代碼實現(xiàn)

  1. 創(chuàng)建maven工程,消息的生產(chǎn)者工程,項目模塊名稱:rabbitmq-producer-spring

  2. 添加依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    
  3. 在 resources 目錄下創(chuàng)建 rabbitmq.properties 配置文件,添加鏈接RabbitMQ相關(guān)信息

    rabbitmq.host=172.16.98.133
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    rabbitmq.virtual-host=/
    
  4. 在 resources 目錄下創(chuàng)建 spring-rabbitmq-producer.xml 配置文件,添加以下配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
                     http://www.springframework.org/schema/beans/spring-beans.xsd
                     http://www.springframework.org/schema/context
                     https://www.springframework.org/schema/context/spring-context.xsd
                     http://www.springframework.org/schema/rabbit
                     http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加載配置文件-->
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        <!-- 定義rabbitmq connectionFactory  1. 設(shè)置  publisher-confirms="true" -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   
                                   publisher-confirms="true"
                                   />
        <!--定義管理交換機、隊列-->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!--定義rabbitTemplate對象操作可以在代碼中方便發(fā)送消息-->
        <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
    
        <!--2. 消息可靠性投遞(生產(chǎn)端)-->
       <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
        <rabbit:direct-exchange name="test_exchange_confirm">
            <rabbit:bindings>
                <rabbit:binding queue="test_queue_confirm" key="confirm">                           </rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
        
    </beans>
    
  5. 編寫測試代碼

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
    public class ProducerTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 確認(rèn)模式:
         * 步驟:
         * 1. 確認(rèn)模式開啟:ConnectionFactory中開啟publisher-confirms="true"
         * 2. 在rabbitTemplate定義ConfirmCallBack回調(diào)函數(shù)
         */
        @Test
        public void testConfirm() {
    
            //2. 定義回調(diào) **
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 *
                 * @param correlationData 相關(guān)配置信息
                 * @param ack   exchange交換機 是否成功收到了消息。true 成功,false代表失敗
                 * @param cause 失敗原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("confirm方法被執(zhí)行了....");
    
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失敗
                        System.out.println("接收失敗消息" + cause);
                        //做一些處理,讓消息再次發(fā)送。
                    }
                }
            });
    
            //3. 發(fā)送消息
            rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");
        }
    }
    
  6. 測試結(jié)果

    1569161424745.png

return退回模式代碼實現(xiàn)

回退模式: 當(dāng)消息發(fā)送給Exchange后,Exchange路由到Queue失敗是 才會執(zhí)行 ReturnCallBack,具體實現(xiàn)如下:

  1. 在 spring-rabbitmq-producer.xml 配置文件,在 rabbit:connection-factory節(jié)點 添加配置:

    publisher-returns="true"
    
  2. 編寫測試方法

    /**
     * 步驟:
     * 1. 開啟回退模式:publisher-returns="true"
     * 2. 設(shè)置ReturnCallBack
     * 3. 設(shè)置Exchange處理消息的模式:
     *  1. 如果消息沒有路由到Queue,則丟棄消息(默認(rèn))
     *  2. 如果消息沒有路由到Queue,返回給消息發(fā)送方ReturnCallBack
     */
    
    @Test
    public void testReturn() {
    
        //設(shè)置交換機處理失敗消息的模式
        rabbitTemplate.setMandatory(true);
    
        //2.設(shè)置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   消息對象
             * @param replyCode 錯誤碼
             * @param replyText 錯誤信息
             * @param exchange  交換機
             * @param routingKey 路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 執(zhí)行了....");
    
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
    
                //處理
            }
        });
    
    
        //3. 發(fā)送消息   
        rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
    }
    

    設(shè)置 routingKey 為一個不符合規(guī)則的key,觀察控制臺打印結(jié)果。

小結(jié)

對于確認(rèn)模式:

  • 設(shè)置ConnectionFactory的publisher-confirms="true" 開啟 確認(rèn)模式。

  • 使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。

對于退回模式

  • 設(shè)置ConnectionFactory的publisher-returns="true" 開啟 退回模式。

  • 使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當(dāng)消息從exchange路由到queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。

在RabbitMQ中也提供了事務(wù)機制,但是性能較差,此處不做講解。

使用channel列方法,完成事務(wù)控制:

txSelect(), 用于將當(dāng)前channel設(shè)置成transaction模式

txCommit(),用于提交事務(wù)

txRollback(),用于回滾事務(wù)

1.2 Consumer ACK

ack指 Acknowledge,確認(rèn)。 表示消費端收到消息后的確認(rèn)方式。

有三種確認(rèn)方式:

? 自動確認(rèn):acknowledge="none"

? 手動確認(rèn):acknowledge="manual"

? 根據(jù)異常情況確認(rèn):acknowledge="auto",(這種方式使用麻煩)

其中自動確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會丟失。

如果設(shè)置了手動確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動重新發(fā)送消息。

代碼實現(xiàn)

  1. 創(chuàng)建maven工程,消息的消費者工程,項目模塊名稱:rabbitmq-consumer-spring

  2. 添加依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    
  3. 在 resources 目錄下創(chuàng)建 rabbitmq.properties 配置文件,添加鏈接RabbitMQ相關(guān)信息

    rabbitmq.host=172.16.98.133
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    rabbitmq.virtual-host=/
    
  4. 在 resources 目錄下創(chuàng)建 spring-rabbitmq-consumer.xml 配置文件,添加以下配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
                 http://www.springframework.org/schema/beans/spring-beans.xsd
                 http://www.springframework.org/schema/context
                 https://www.springframework.org/schema/context/spring-context.xsd
                 http://www.springframework.org/schema/rabbit
                 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        <!--加載配置文件-->
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
    
        <!-- 定義rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"/>
    
    
        <context:component-scan base-package="com.itheima.listener" />
    
        <!--定義監(jiān)聽器容器  添加  acknowledge="manual" 手動-->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" >
            <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"> 
            </rabbit:listener>
        </rabbit:listener-container>
    
    </beans>
    
  5. 編寫ackListener 監(jiān)聽類實現(xiàn)ChannelAwareMessageListener接口

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * Consumer ACK機制:
     *  1. 設(shè)置手動簽收。acknowledge="manual"
     *  2. 讓監(jiān)聽器類實現(xiàn)ChannelAwareMessageListener接口
     *  3. 如果消息成功處理,則調(diào)用channel的 basicAck()簽收
     *  4. 如果消息處理失敗,則調(diào)用channel的basicNack()拒絕簽收,broker重新發(fā)送給consumer
     */
    @Component
    public class AckListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收轉(zhuǎn)換消息
                System.out.println(new String(message.getBody()));
    
                //2. 處理業(yè)務(wù)邏輯
                System.out.println("處理業(yè)務(wù)邏輯...");
                int i = 3/0;//出現(xiàn)錯誤
                //3. 手動簽收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
    
                //4.拒絕簽收
                /*
                第三個參數(shù):requeue:重回隊列。如果設(shè)置為true,則消息重新回到queue,broker會重新發(fā)送該消息給消費端
                 */
                channel.basicNack(deliveryTag,true,true);
                // 了解
                //channel.basicReject(deliveryTag,true);
            }
        }
    }
    
  6. 編寫測試類,啟動容器監(jiān)聽MQ隊列

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
    public class ConsumerTest {
    
        @Test
        public void test(){
            while (true){
    
            }
        }
    }
    

小結(jié)

  • 在rabbit:listener-container標(biāo)簽中設(shè)置acknowledge屬性,設(shè)置ack方式 none:自動確認(rèn),manual:手動確認(rèn)

  • 如果在消費端沒有出現(xiàn)異常,則調(diào)用channel.basicAck(deliveryTag,false);方法確認(rèn)簽收消息

  • 如果出現(xiàn)異常,則在catch中調(diào)用 basicNack或 basicReject,拒絕消息,讓MQ重新發(fā)送消息。

如何保證消息的高可靠性傳輸?

1.持久化

? exchange要持久化

? queue要持久化

? message要持久化

2.生產(chǎn)方確認(rèn)Confirm

3.消費方確認(rèn)Ack

4.Broker高可用

1.3 消費端限流

1569164559749.png

如上圖所示:如果在A系統(tǒng)中需要維護相關(guān)的業(yè)務(wù)功能,可能需要將A系統(tǒng)的服務(wù)停止,那么這個時候消息的生產(chǎn)者還是一直會向MQ中發(fā)送待處理的消息,消費者此時服務(wù)已經(jīng)關(guān)閉,導(dǎo)致大量的消息都會在MQ中累積。如果當(dāng)A系統(tǒng)成功啟動后,默認(rèn)情況下消息的消費者會一次性將MQ中累積的大量的消息全部拉取到自己的服務(wù),導(dǎo)致服務(wù)在短時間內(nèi)會處理大量的業(yè)務(wù),可能會導(dǎo)致系統(tǒng)服務(wù)的崩潰。 所以消費端限流是非常有必要的。

可以通過MQ中的 listener-container 配置屬性
perfetch = 1,表示消費端每次從mq拉去一條消息來消費,直到手動確認(rèn)消費完畢后,才會繼續(xù)拉去下一條消息。

代碼實現(xiàn)

  1. 編寫 QosListener 監(jiān)聽類,保證當(dāng)前的監(jiān)聽類消息處理機制是 ACK 為手動方式

    @Component
    public class QosListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
    
            Thread.sleep(1000);
            //1.獲取消息
            System.out.println(new String(message.getBody()));
            //2. 處理業(yè)務(wù)邏輯
            //3. 簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
    
  2. 在配置文件的 listener-container 配置屬性中添加配置

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
    

    配置說明:

    perfetch = 1,表示消費端每次從mq拉去一條消息來消費,直到手動確認(rèn)消費完畢后,才會繼續(xù)拉去下一條消息。

小結(jié)

  • 在<rabbit:listener-container> 中配置 prefetch屬性設(shè)置消費端一次拉取多少消息

  • 消費端的確認(rèn)模式一定為手動確認(rèn)。acknowledge="manual"

1.4 TTL

TTL 全稱 Time To Live(存活時間/過期時間)。當(dāng)消息到達存活時間后,還沒有被消費,會被自動清除。

RabbitMQ可以對消息設(shè)置過期時間,也可以對整個隊列(Queue)設(shè)置過期時間。

1569166173852.png

可以在RabbitMQ管理控制臺設(shè)置過期時間.

代碼實現(xiàn)

設(shè)置隊列的過期時間

  1. 在消息的生產(chǎn)方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:

    <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
        <!--設(shè)置queue的參數(shù)-->
        <rabbit:queue-arguments>
            <!--x-message-ttl指隊列的過期時間-->
            <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    
    <rabbit:topic-exchange name="test_exchange_ttl" >
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
  2. 編寫發(fā)送消息測試方法

    @Test
    public void testTtl() {
    
        for (int i = 0; i < 10; i++) {
            // 發(fā)送消息
            rabbitTemplate.convertAndSend("test_exchange_ttl",
                                          "ttl.hehe", "message ttl....");
        }
    }
    

    測試結(jié)果:當(dāng)消息發(fā)送成功后,過10s后在RabbitMQ的管理控制臺會看到消息會自動刪除。

設(shè)置單個消息的過期時間

編寫代碼測試,并且設(shè)置隊列的過期時間為100s, 單個消息的過期時間為5s

@Test
public void testTtl() {

  // 消息后處理對象,設(shè)置一些消息的參數(shù)信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //1.設(shè)置message的信息
            message.getMessageProperties().setExpiration("5000");//消息的過期時間
            //2.返回該消息
            return message;
        }
    };

    //消息單獨過期
    rabbitTemplate.convertAndSend("test_exchange_ttl", 
                                  "ttl.hehe", "message ttl....",messagePostProcessor);

    for (int i = 0; i < 10; i++) {
        if(i == 5){
            //消息單獨過期
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
        }else{
            //不過期的消息
            rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");

        }

    }
}

如果設(shè)置了消息的過期時間,也設(shè)置了隊列的過期時間,它以時間短的為準(zhǔn)。

  • 隊列過期后,會將隊列所有消息全部移除。
  • 消息過期后,只有消息在隊列頂端,才會判斷其是否過期(移除掉)

小結(jié)

  • 設(shè)置隊列過期時間使用參數(shù):x-message-ttl,單位:ms(毫秒),會對整個隊列消息統(tǒng)一過期。

  • 設(shè)置消息過期時間使用參數(shù):expiration。單位:ms(毫秒),當(dāng)該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。

  • 如果兩者都進行了設(shè)置,以時間短的為準(zhǔn)。

1.5 死信隊列

死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個交換機,這個交換機就是DLX。

1569167524589.png

消息成為死信的三種情況:

  1. 隊列消息長度到達限制;

  2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊列,requeue=false;

  3. 原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;

隊列綁定死信交換機:

給隊列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key

1569167616750.png

代碼實現(xiàn)

  1. 在消息的生產(chǎn)方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:

    • 聲明正常的隊列(test_queue_dlx)和交換機(test_exchange_dlx)

      <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
      </rabbit:queue>
      <rabbit:topic-exchange name="test_exchange_dlx">
          <rabbit:bindings>
              <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
          </rabbit:bindings>
      </rabbit:topic-exchange>
      
    • 聲明死信隊列(queue_dlx)和死信交換機(exchange_dlx)

      <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
      <rabbit:topic-exchange name="exchange_dlx">
          <rabbit:bindings>
              <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
          </rabbit:bindings>
      </rabbit:topic-exchange>
      
    • 正常隊列綁定死信交換機,并設(shè)置相關(guān)參數(shù)信息

      <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
          <!--3. 正常隊列綁定死信交換機-->
          <rabbit:queue-arguments>
              <!--3.1 x-dead-letter-exchange:死信交換機名稱-->
              <entry key="x-dead-letter-exchange" value="exchange_dlx" />
      
              <!--3.2 x-dead-letter-routing-key:發(fā)送給死信交換機的routingkey-->
              <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
      
              <!--4.1 設(shè)置隊列的過期時間 ttl-->
              <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
              <!--4.2 設(shè)置隊列的長度限制 max-length -->
              <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
          </rabbit:queue-arguments>
      </rabbit:queue>
      <rabbit:topic-exchange name="test_exchange_dlx">
          <rabbit:bindings>
              <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
          </rabbit:bindings>
      </rabbit:topic-exchange>
      
  2. 編寫測試方法

        /**
         * 發(fā)送測試死信消息:
         *  1. 過期時間
         *  2. 長度限制
         *  3. 消息拒收
         */
        @Test
        public void testDlx(){
            //1. 測試過期時間,死信消息
            rabbitTemplate.convertAndSend("test_exchange_dlx",
                                          "test.dlx.haha","我是一條消息,我會死嗎?");
    
            //2. 測試長度限制后,消息死信
           for (int i = 0; i < 20; i++) {
                rabbitTemplate.convertAndSend("test_exchange_dlx",
                                              "test.dlx.haha","我是一條消息,我會死嗎?");
            }
    
            //3. 測試消息拒收
            rabbitTemplate.convertAndSend("test_exchange_dlx",
                                          "test.dlx.haha","我是一條消息,我會死嗎?");
    
        }
    

小結(jié)

  1. 死信交換機和死信隊列和普通的沒有區(qū)別

  2. 當(dāng)消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列

  3. 消息成為死信的三種情況:

    • 隊列消息長度到達限制;

    • 消費者拒接消費消息,并且不重回隊列;

    • 原隊列存在消息過期設(shè)置,消息到達超時時間未被消費;

1.6 延遲隊列

延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。

提出需求:

  1. 下單后,30分鐘未支付,取消訂單,回滾庫存。

  2. 新用戶注冊成功7天后,發(fā)送短信問候。

實現(xiàn)方式:

  1. 定時器

  2. 延遲隊列

1569168202661.png

注意:在RabbitMQ中并未提供延遲隊列功能。

但是可以使用:TTL+死信隊列 組合實現(xiàn)延遲隊列的效果。

1569168255196.png

代碼實現(xiàn)

  1. 在消息的生產(chǎn)方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:

    <!-- 1. 定義正常交換機(order_exchange)和隊列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
        <!-- 3. 綁定,設(shè)置正常隊列過期時間為30分鐘-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
    <!--  2. 定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
  2. 編寫測試方法

    @Test
    public  void testDelay() throws InterruptedException {
        //1.發(fā)送訂單消息。 將來是在訂單系統(tǒng)中,下單成功后,發(fā)送消息
        rabbitTemplate.convertAndSend("order_exchange",
                                      "order.msg","訂單信息:id=1,time=2019年");
    
        //2.打印倒計時10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }
    

小結(jié)

  1. 延遲隊列 指消息進入隊列后,可以被延遲一定時間,再進行消費。

  2. RabbitMQ沒有提供延遲隊列功能,但是可以使用 : TTL + DLX 來實現(xiàn)延遲隊列效果。

1.7 日志與監(jiān)控

RabbitMQ日志

RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log

RabbitMQ 日志所在的目錄:

RabbitMQ日志詳細信息:

日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務(wù)節(jié)點名稱、cookie的hash值、RabbitMQ配置文件地址、內(nèi)存限制、磁盤限制、默認(rèn)賬戶guest的創(chuàng)建以及權(quán)限配置等等。

web管控臺監(jiān)控

直接訪問當(dāng)前的IP:15672,輸入用戶名和密碼(默認(rèn)是 guest),就可以查看RabbitMQ的管理控制臺。當(dāng)然也可通過命令的形式來查看。如下:

  • 查看隊列:rabbitmqctl list_queues

  • 查看用戶: rabbitmqctl list_users

  • 查看連接:rabbitmqctl list_connections

其它相關(guān)命令(了解):

查看exchanges:rabbitmqctl list_exchanges

查看消費者信息:rabbitmqctl list_consumers

查看環(huán)境變量:rabbitmqctl environment

查看未被確認(rèn)的隊列:rabbitmqctl list_queues name messages_unacknowledged

查看單個隊列的內(nèi)存使用:rabbitmqctl list_queues name memory

查看準(zhǔn)備就緒的隊列:rabbitmqctl list_queues name messages_ready

1.8 消息追蹤

在使用任何消息中間件的過程中,難免會出現(xiàn)某條消息異常丟失的情況。對于RabbitMQ而言,可能是因為生產(chǎn)者或消費者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機制;也有可能是因為交換器與隊列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒有與任何隊列進行綁定,生產(chǎn)者又不感知或者沒有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個時候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協(xié)助開發(fā)和運維人員進行問題的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現(xiàn)消息追蹤。

消息追蹤-Firehose

firehose的機制是將生產(chǎn)者投遞給rabbitmq的消息,rabbitmq投遞給消費者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個默認(rèn)的exchange的名稱為 amq.rabbitmq.trace,它是一個topic類型的exchange。發(fā)送到這個exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實際exchange和queue的名稱,分別對應(yīng)生產(chǎn)者投遞到exchange的消息,和消費者從queue上獲取的消息。

注意:打開 trace 會影響消息寫入功能,適當(dāng)打開后請關(guān)閉。

rabbitmqctl trace_on:開啟Firehose命令

消息追蹤驗證:

  1. 創(chuàng)建一個隊列 test_trace,并將當(dāng)前的隊列綁定到 amq.rabbitmq.trace 交換機上,設(shè)置RoutingKey為:#

  2. 未開啟消息追蹤之前,我們發(fā)送一個消息

    當(dāng)前消息發(fā)送成功后,在控制臺我們可以看到當(dāng)前消息的具體信息

  3. 設(shè)置開啟消息追蹤,在發(fā)送一條消息

  4. rabbitmqctl trace_on

    1569202059745.png

我們發(fā)現(xiàn)當(dāng)前消息也正常存在,并且開啟消息追蹤后,會多出一條消息是 amq.rabbitmq.trace 交換機發(fā)給當(dāng)前隊列的消息,消息中的內(nèi)容是比較完整的。

建議:在開發(fā)階段我們可以開啟消息追蹤,在實際生產(chǎn)環(huán)境建議將其關(guān)閉

rabbitmqctl trace_off:關(guān)閉Firehose命令

消息追蹤-rabbitmq_tracing

rabbitmq_tracing和Firehose在實現(xiàn)上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。

啟用插件:rabbitmq-plugins enable rabbitmq_tracing

發(fā)送消息成功后,我們點擊日志文件,要求輸入RabbitMQ的登錄用戶名和密碼。

建議:在開發(fā)階段我們可以開啟消息追蹤插件,在實際生產(chǎn)環(huán)境不建議建議開啟,除非是非常特殊的業(yè)務(wù)場景,大家根據(jù)實際情況選擇開啟即可。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容