RabbitMQ高級(jí)特性

0. 前言

本文內(nèi)容分為如下三部分
RabbitMQ高級(jí)特性

消息可靠性投遞
Consumer ACK
消費(fèi)端限流
TTL
死信隊(duì)列
延遲隊(duì)列
日志與監(jiān)控
消息可靠性分析與追蹤
管理

RabbitMQ應(yīng)用問題

消息可靠性保障
消息冪等性處理

1. 高級(jí)特性

1.1 消息的可靠投遞

在使用 RabbitMQ 的時(shí)候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場(chǎng)景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
confirm 確認(rèn)模式
return 退回模式

rabbitmq 整個(gè)消息投遞的路徑為:
producer--->rabbitmq broker--->exchange--->queue--->consumer
消息從 producer 到 exchange 則會(huì)返回一個(gè) confirmCallback 。
消息從 exchange-->queue 投遞失敗則會(huì)返回一個(gè) returnCallback。
我們將利用這兩個(gè) callback 控制消息的可靠性投遞

confirm模式

在上一篇中的最后我們用spring-boot配置了rabbitMQ,
在這里在原來的基礎(chǔ)上繼續(xù)進(jìn)行,在生產(chǎn)者的application.yml添加

使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理
#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: xxx
    port: 5672
    username: root
    password: root
    virtual-host: /example
    #生產(chǎn)端配置
    #開啟發(fā)送確認(rèn),此配置在Springboot2.3.0版本中已經(jīng)@Deprecated了,默認(rèn)就是
    # publisher-confirms: true
    #
    publisher-confirm-type: simple
    #開啟發(fā)送失敗退回
    publisher-returns: true

然后新增一個(gè)test

@Test
    public void testConfirm() {
        // 1. 設(shè)置ConnectionFactory的publisher-confirms="true" 開啟 確認(rèn)模式。

        // 2. 使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。
        //      當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。
        //      在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相關(guān)配置信息
             * @param ack   exchange交換機(jī) 是否成功收到了消息。true 成功,false代表失敗
             * @param cause 失敗原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被執(zhí)行了");
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失敗
                    System.out.println("接收失敗消息" + cause);
                    //做一些處理,讓消息再次發(fā)送。
                }
            }
        });
        // 3. 發(fā)生消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.confirm", "confirm mq hello~~~~~~~~");

    }

啟動(dòng)測(cè)試,結(jié)束后控制臺(tái)成功打印

登陸rabbitmq管理后臺(tái)也可以看到消息已經(jīng)寫入隊(duì)列中了,只是還沒有被消費(fèi)。
其中回調(diào)函數(shù)中 confirm的第一個(gè)參數(shù)correlationData會(huì)在發(fā)送消息的函數(shù)convertAndSend的重載函數(shù)中會(huì)使用,這里沒有使用這個(gè)參數(shù)。
ack比較重要,可以判斷交換機(jī)是否收到消息
cause失敗原因

return模式

使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當(dāng)消息從exchange路由到queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會(huì)將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。

application.yml添加

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    #生產(chǎn)端配置
    #開啟發(fā)送確認(rèn),此配置在Springboot2.3.0版本中已經(jīng)@Deprecated了,默認(rèn)就是
    # publisher-confirms: true
    #
    publisher-confirm-type: simple
    #開啟發(fā)送失敗退回
    publisher-returns: true
    #開啟執(zhí)行return回調(diào)
    template:
      mandatory: true

編寫測(cè)試

 /**
     * 回退模式: 當(dāng)消息發(fā)送給Exchange后,Exchange路由到Queue失敗時(shí)候才會(huì)執(zhí)行 ReturnCallBack
     * 步驟:
     * 1. 開啟回退模式:publisher-returns="true"
     * 2. 設(shè)置ReturnCallBack
     * 3. 設(shè)置Exchange處理消息的模式:
     *      1. 如果消息沒有路由到Queue,則丟棄消息(默認(rèn))
     *      2. 如果消息沒有路由到Queue,返回給消息發(fā)送方ReturnCallBack
     */
    @Test
    public void testReturn(){
        // 設(shè)置交換機(jī)處理失敗消息的模式
        // rabbitTemplate.setMandatory(true);
        // 2.設(shè)置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   消息對(duì)象
             * @param replyCode 錯(cuò)誤碼
             * @param replyText 錯(cuò)誤信息
             * @param exchange  交換機(jī)
             * @param routingKey 路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 被執(zhí)行了");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });
        // 3. 發(fā)送消息
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boott.treturn", "return mq hello~~~~~~~~");

    }

控制臺(tái)打印

不知道因?yàn)槭裁丛?,return回調(diào)經(jīng)常會(huì)不打印信息,有待研究

在RabbitMQ中也提供了事務(wù)機(jī)制,但是性能較差,此處不做講解。
使用channel下列方法,完成事務(wù)控制:
txSelect(), 用于將當(dāng)前channel設(shè)置成transaction模式
txCommit(),用于提交事務(wù)
txRollback(),用于回滾事務(wù)

1.2 Consumer Ack

ack指Acknowledge,確認(rèn)。 表示消費(fèi)端收到消息后的確認(rèn)方式。
有三種確認(rèn)方式:

  • 自動(dòng)確認(rèn):acknowledge="none"
  • 手動(dòng)確認(rèn):acknowledge="manual"
  • 根據(jù)異常情況確認(rèn):acknowledge="auto",(這種方式使用麻煩,不作講解)

其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。

配置consumer的監(jiān)聽器

對(duì)于消費(fèi)者,配置application.yml為

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

我們?cè)谠瓉鞷abbitMQListener上修改
新建一個(gè)監(jiān)聽器ackListener

@Component
@RabbitListener(queues = "boot_queue")
public class AckListener  {
    @RabbitHandler
    public void process(String hello,Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 接受轉(zhuǎn)換消息
            System.out.println("ackListener收到的消息為:" + new String(message.getBody()));

            // 2. 處理業(yè)務(wù)邏輯
            System.out.println("處理業(yè)務(wù)邏輯");
            int i =3/0;
            // 3. 手動(dòng)簽收
            channel.basicAck(deliveryTag,true);
        }
        catch (Exception e){
            // 4. 拒絕簽收
            /*
            第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端
             */
            channel.basicNack(deliveryTag,true,true);
//            channel.basicReject(deliveryTag,true); 單條數(shù)據(jù)
           
        }
            //消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //ack返回false,并重新回到隊(duì)列,api里面解釋得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒絕消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

    }
}

@RabbitListener 可以標(biāo)注在類上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給 @RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型

設(shè)置acknowledge屬性,設(shè)置ack方式 none:自動(dòng)確認(rèn),manual:手動(dòng)確認(rèn)

如果在消費(fèi)端沒有出現(xiàn)異常,則調(diào)用channel.basicAck(deliveryTag,false);方法確認(rèn)簽收消息,簽收成功,消息就被消費(fèi)了。

如果出現(xiàn)異常,則在catch中調(diào)用 basicNack或 basicReject,拒絕消息,讓MQ重新發(fā)送消息。

消息可靠性小結(jié)

  1. 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  2. 生產(chǎn)方確認(rèn)Confirm
  3. 消費(fèi)方確認(rèn)Ack
  4. Broker高可用,后面集群會(huì)講到

1.3 消費(fèi)端限流

回顧一下這個(gè)圖,我們說過MQ有個(gè)很重要的作用就是削峰填谷

接下來就學(xué)習(xí)如何實(shí)現(xiàn)限流
新建一個(gè)QosListener,這時(shí)候要把之前的AckListener注釋掉,
修改application.yml

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    listener:
      direct:
        acknowledge-mode: manual
        #每次限流1條消息
        prefetch: 1
      simple:
        acknowledge-mode: manual
        prefetch: 1

其中新增了perfetch = 1,表示消費(fèi)端每次從mq拉去一條消息來消費(fèi),直到手動(dòng)確認(rèn)消費(fèi)完畢后,才會(huì)繼續(xù)拉去下一條消息。

/**
 * Consumer 限流機(jī)制
 * 1. 確保ack機(jī)制為手動(dòng)確認(rèn)。
 * 2. listener-container配置屬性
 * perfetch = 1,表示消費(fèi)端每次從mq拉去一條消息來消費(fèi),直到手動(dòng)確認(rèn)消費(fèi)完畢后,才會(huì)繼續(xù)拉去下一條消息。
 */
@Component
@RabbitListener(queues = "boot_queue")
public class QosListener {
    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        //1.獲取消息
        System.out.println("Qos:"+new String(message.getBody()));

        //2. 處理業(yè)務(wù)邏輯

        //3. 簽收
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
}

在這里我們把最后一行 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);注釋掉
啟動(dòng)消費(fèi)者
發(fā)現(xiàn)只打印了一次

小結(jié)

配置 prefetch屬性設(shè)置消費(fèi)端一次拉取多少消息

消費(fèi)端的確認(rèn)模式一定為手動(dòng)確認(rèn)。acknowledge="manual"

1.4 TTL

TTL 全稱 Time To Live(存活時(shí)間/過期時(shí)間)。

當(dāng)消息到達(dá)存活時(shí)間后,還沒有被消費(fèi),會(huì)被自動(dòng)清除。

RabbitMQ可以對(duì)消息設(shè)置過期時(shí)間,也可以對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過期時(shí)間。
比如我們有個(gè)訂單系統(tǒng),下訂單時(shí)候如果30分鐘內(nèi)未被支付,那么這條消息就失效了。


控制臺(tái)測(cè)試

打開我們的控制臺(tái),新建隊(duì)列的時(shí)候可以看到最下面的arguments,我們先設(shè)置一個(gè)message ttl為10秒

再創(chuàng)建一個(gè)交換機(jī)

將交換機(jī)和隊(duì)列綁定

再在這個(gè)頁面下面手動(dòng)發(fā)消息

十秒鐘這個(gè)消息就會(huì)消失

編寫代碼測(cè)試

刪除在控制臺(tái)創(chuàng)建的交換機(jī)和對(duì)壘
producer端新增一個(gè)TTLConfig

@Configuration
public class TTLConfig {

    public static final String EXCHANGE_NAME = "test_exchange_ttl";
    public static final String QUEUE_NAME = "test_queue_ttl";

    //1.交換機(jī)
    @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }


    //2.Queue 隊(duì)列
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(QUEUE_NAME).ttl(20000).build();
    }

    //3. 隊(duì)列和交互機(jī)綁定關(guān)系 Binding
    /*
        1. 知道哪個(gè)隊(duì)列
        2. 知道哪個(gè)交換機(jī)
        3. routing key
     */
    @Bean
    public Binding bindTTLQueueExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
    }

}

之后運(yùn)行測(cè)試文件就能創(chuàng)建exchange和queue,其中創(chuàng)建隊(duì)列時(shí)候設(shè)置了參數(shù)ttl為20秒,指隊(duì)列的過期時(shí)間。
再編寫測(cè)試函數(shù)

/**
     * TTL:過期時(shí)間
     *  1. 隊(duì)列統(tǒng)一過期
     *
     *  2. 消息單獨(dú)過期
     *
     *
     * 如果設(shè)置了消息的過期時(shí)間,也設(shè)置了隊(duì)列的過期時(shí)間,它以時(shí)間短的為準(zhǔn)。
     * 隊(duì)列過期后,會(huì)將隊(duì)列所有消息全部移除。
     * 消息過期后,只有消息在隊(duì)列頂端,才會(huì)判斷其是否過期(移除掉)
     *
     */
    @Test
    public void testTTL() {
        // 消息后處理對(duì)象,設(shè)置一些消息的參數(shù)信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){

            /**
             * Change (or replace) the message.
             *
             * @param message the message.
             * @return the message.
             * @throws AmqpException an exception.
             */
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1.設(shè)置message的信息
                message.getMessageProperties().setExpiration("10000");//消息的過期時(shí)間
                //2.返回該消息
                return message;
            }
        };


     
        //消息單獨(dú)過期
        //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);


        for (int i = 0; i < 10; i++) {
            if(i == 5){
                //消息單獨(dú)過期
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
            }else{
                //不過期的消息
                rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");

            }

        }
    }

在這里,其實(shí)消息不會(huì)看到它過期了,因?yàn)?不是在隊(duì)列的頭部,只有在隊(duì)列頭部的才會(huì)被移除掉。

小結(jié)

設(shè)置隊(duì)列過期時(shí)間使用參數(shù):x-message-ttl,單位:ms(毫秒),會(huì)對(duì)整個(gè)隊(duì)列消息統(tǒng)一過期。

設(shè)置消息過期時(shí)間使用參數(shù):expiration。單位:ms(毫秒),當(dāng)該消息在隊(duì)列頭部時(shí)(消費(fèi)時(shí)),會(huì)單獨(dú)判斷這一消息是否過期。

如果兩者都進(jìn)行了設(shè)置,以時(shí)間短的為準(zhǔn)。

1.5 死信隊(duì)列

死信隊(duì)列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機(jī)),當(dāng)消息成為Dead message后,可以被重新發(fā)送到另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。

在前面的ttl例子中,當(dāng)我們的消息過期后,會(huì)被丟棄,但如果這個(gè)隊(duì)列綁定了死信交換機(jī),則消息不會(huì)被丟棄,而是發(fā)送到死信交換機(jī),而死信交換機(jī)又可以綁定其他隊(duì)列,從而可以重新被消費(fèi)者消費(fèi)。

考慮兩個(gè)問題:

  • 第一個(gè)隊(duì)列如何綁定死信交換機(jī)?
  • 消息什么時(shí)候成為死信

消息成為死信的三種情況

  1. 隊(duì)列消息長(zhǎng)度到達(dá)限制;

  2. 消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊(duì)列,requeue=false;

  3. 原隊(duì)列存在消息過期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);

tip:
死信交換機(jī)和死信隊(duì)列和正常的交換機(jī)和隊(duì)列沒有任何區(qū)別

  1. 聲明正常的隊(duì)列(redirect_queue)和交換機(jī)(redirect_exchange)
  2. 聲明死信隊(duì)列(dlx_queue)和死信交換機(jī)(dlx_exchange)
  3. 正常隊(duì)列綁定死信交換機(jī)
    設(shè)置兩個(gè)參數(shù):
    • x-dead-letter-exchange:死信交換機(jī)名稱
    • x-dead-letter-routing-key:發(fā)送給死信交換機(jī)的routingkey

修改application.yml

#配置RabbitMQ的基本信息 ip 端口 username password
spring:
  rabbitmq:
    host: asjunor.site
    port: 5672
    username: root
    password: root
    virtual-host: /example
    #生產(chǎn)端配置
    #開啟發(fā)送確認(rèn),此配置在Springboot2.3.0版本中已經(jīng)@Deprecated了,默認(rèn)就是
    # publisher-confirms: true
    #
    publisher-confirm-type: simple
    #開啟發(fā)送失敗退回
    publisher-returns: true
    #開啟執(zhí)行return回調(diào)
    template:
      mandatory: true
      retry:
        # 允許消息消費(fèi)失敗的重試
        enabled: true
        # 消息最多消費(fèi)次數(shù)3次
        max-attempts: 3
        # 消息多次消費(fèi)的間隔1秒
        initial-interval: 1000
    listener:
      direct:
        #  設(shè)置為false,會(huì)丟棄消息或者重新發(fā)布到死信隊(duì)列
        default-requeue-rejected: false

編寫DeadLetterConfig

package org.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName DeadLetterConfig
 * @Description TODO
 * @Author Patrick Star
 * @Date 2020/12/7 9:37 下午
 */
@Configuration
public class DeadLetterConfig {
    public static final String DL_EXCHANGE = "dl_exchange";

    public static final String DL_QUEUE = "dl_queue";
    public static final String REDIRECT_QUEUE = "redirect_queue";
    public static final String REDIRECT_EXCHANGE = "redirect_exchange";
    public static final String DL_REDIRECT_ROUTING_KEY = "dlx.hehe";


    /**
     * 死信隊(duì)列跟交換機(jī)類型沒有關(guān)系 不一定為directExchange  不影響該類型交換機(jī)的特性.
     */
    @Bean("dlExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.topicExchange(DL_EXCHANGE).durable(true).build();
    }

    @Bean("dlQueue")
    public Queue deadLetterQueue() {
        // 設(shè)置正常隊(duì)列的長(zhǎng)度限制和ttl
        return QueueBuilder.durable(DL_QUEUE).build();
    }

    @Bean("redirectQueue")
    public Queue redirectQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    聲明  死信隊(duì)列Exchange
        args.put("x-dead-letter-exchange", DL_EXCHANGE);
//       x-dead-letter-routing-key    聲明 死信隊(duì)列拋出異常重定向隊(duì)列的routingKey("dlx.hehe")
        args.put("x-dead-letter-routing-key", DL_REDIRECT_ROUTING_KEY);
        args.put("x-message-ttl", 10000);
        args.put("x-max-length", 10);
        return QueueBuilder.durable(REDIRECT_QUEUE).withArguments(args).build();
    }

    //1.交換機(jī)
    @Bean("redirectExchange")
    public Exchange redirectExchange() {
        return ExchangeBuilder.topicExchange(REDIRECT_EXCHANGE).durable(true).build();
    }


    /**
     * 死信隊(duì)列綁定到死信交換器上.
     *
     * @return the binding
     */
    @Bean
    public Binding dlxBinding(Queue dlQueue, Exchange dlExchange) {
        return BindingBuilder
                .bind(dlQueue)
                .to(dlExchange)
                .with("dlx.#")
                .noargs();

    }

    /**
     * 將重定向隊(duì)列通過routingKey(“dlx.hehe”)綁定到死信隊(duì)列的Exchange上
     *
     * @return the binding
     */
    @Bean
    public Binding redirectToDLBinding(@Qualifier("redirectQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(DL_REDIRECT_ROUTING_KEY)
                .noargs();

    }

    /**
     * 綁定正常的交換機(jī)和隊(duì)列
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return BindingBuilder
                .bind(redirectQueue())
                .to(redirectExchange())
                .with("test.dlx.#")
                .noargs();

    }
}


其中分別使用了幾種方法來綁定隊(duì)列,在Binding函數(shù)中

  1. 我們可以直接寫隊(duì)列的函數(shù)和交換機(jī)的函數(shù),如最后一個(gè)
  2. 我們可以用@Qualifier注解指定隊(duì)列和交換機(jī),如倒數(shù)第二個(gè)
  3. 我們可以在傳遞參數(shù)時(shí),將參數(shù)名字和定義的隊(duì)列和交換機(jī)匹配

對(duì)消息產(chǎn)生死信的三種情況進(jìn)行測(cè)試

  /**
     * 發(fā)送測(cè)試死信消息:
     *  1. 過期時(shí)間
     *  2. 長(zhǎng)度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx(){
        //1. 測(cè)試過期時(shí)間,死信消息
        rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.hehe","我是一條消息,我會(huì)死嗎?");

        //2. 測(cè)試長(zhǎng)度限制后,消息死信
       /* for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.haha","我是一條消息,我會(huì)死嗎?");
        }*/

        //3. 測(cè)試消息拒收
//        rabbitTemplate.convertAndSend(DeadLetterConfig.REDIRECT_EXCHANGE,"test.dlx.haha","我是一條消息,我會(huì)死嗎?");

    }

對(duì)第一種情況,消息過期后會(huì)自動(dòng)轉(zhuǎn)到死信隊(duì)列
對(duì)第二種情況,消息長(zhǎng)度超過了限制,超過的會(huì)自動(dòng)轉(zhuǎn)到死信隊(duì)列
對(duì)第三種情況,我們要編寫一個(gè)消費(fèi)者來監(jiān)聽正常的隊(duì)列,讓消息拒絕接收

新建一個(gè)DlxListener,注意,這里是監(jiān)聽我們的正常隊(duì)列,而不是死信隊(duì)列

@Component
@RabbitListener(queues = "redirect_queue")
public class DlxListener {
    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 接受轉(zhuǎn)換消息
            System.out.println("DlxListener收到的消息為:" + new String(message.getBody()));

            // 2. 處理業(yè)務(wù)邏輯
            System.out.println("處理業(yè)務(wù)邏輯");
            int i = 3 / 0;
            // 3. 手動(dòng)簽收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {

            /*
            第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端
             */
            System.out.println("出現(xiàn)異常,拒絕接收");

            // 4. 拒絕簽收,不重回隊(duì)列 requeue = false
            channel.basicNack(deliveryTag, true, false);
//            channel.basicReject(deliveryTag,true); 單條數(shù)據(jù)

            //消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //ack返回false,并重新回到隊(duì)列,api里面解釋得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒絕消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }


    }


}

之后運(yùn)行Consumer,將test的第三個(gè)注釋打開,進(jìn)行測(cè)試,控制臺(tái)打印


小結(jié)

  1. 死信交換機(jī)和死信隊(duì)列和普通的沒有區(qū)別

  2. 當(dāng)消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列

  3. 消息成為死信的三種情況:

    1. 隊(duì)列消息長(zhǎng)度到達(dá)限制;
    2. 消費(fèi)者拒接消費(fèi)消息,并且不重回隊(duì)列;
    3. 原隊(duì)列存在消息過期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);

1.6 延遲隊(duì)列

延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間后,才會(huì)被消費(fèi)。

應(yīng)用場(chǎng)景有:
比如用戶下單后,30分鐘未支付,取消訂單,回滾庫存。再比如新用戶注冊(cè)成功7天后,發(fā)送短信問候。

實(shí)現(xiàn)方式:
1. 定時(shí)器
2. 延遲隊(duì)列

但是很可惜,在RabbitMQ中并未提供延遲隊(duì)列功能。

但是可以使用:TTL+死信隊(duì)列 組合實(shí)現(xiàn)延遲隊(duì)列的效果

image.png

如何實(shí)現(xiàn)呢,加入我們給訂單設(shè)定30分鐘的支付時(shí)間,訂單系統(tǒng)一開始將消息發(fā)送到正常隊(duì)列,30分鐘后轉(zhuǎn)發(fā)到死信隊(duì)列,有一個(gè)專門的庫存系統(tǒng)保存去獲取這條消息,來判斷訂單是支付了還是未支付。

具體步驟

  1. 定義正常的交換機(jī)( order_exchange )和隊(duì)列( order_queue )
  2. 定義死信交換機(jī)及和隊(duì)列
  3. 綁定設(shè)置正常隊(duì)列過期時(shí)間為10秒鐘,測(cè)試的時(shí)候30分鐘太久了

新建一個(gè)OrderCondig,我們直接復(fù)制并修改之前的死信隊(duì)列和死信交換機(jī),將order_queue過期時(shí)間設(shè)置為30分鐘,將正常交換機(jī)到DLX的routing key改為order.dlx.cancel,將DLX和死信隊(duì)列的routing key改為order.dlx.#,正常交換機(jī)和正常隊(duì)列的routing key 就為order.#

package org.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName DeadLetterConfig
 * @Description TODO
 * @Author Patrick Star
 * @Date 2020/12/7 9:37 下午
 */
@Configuration
public class OrderConfig {
    public static final String ORDER_DL_EXCHANGE = "order_dl_exchange";

    public static final String ORDER_DL_QUEUE = "order_dl_queue";
    public static final String ORDER_QUEUE = "order_queue";
    public static final String ORDER_EXCHANGE = "order_exchange";
    public static final String ORDER_DL_ORDER_ROUTING_KEY = "order.dlx.cancel";
    public static final String ORDER_ROUTING_KEY = "order.#";
    public static final String DLX_ROUTING_KEY = "order.dlx.#";

    /**
     * 死信隊(duì)列跟交換機(jī)類型沒有關(guān)系 不一定為directExchange  不影響該類型交換機(jī)的特性.
     */
    @Bean("orderDLExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.topicExchange(ORDER_DL_EXCHANGE).durable(true).build();
    }

    @Bean("orderDLQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(ORDER_DL_QUEUE).build();
    }
    // order 隊(duì)列
    @Bean("orderQueue")
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>(2);
                // x-dead-letter-exchange    聲明  死信隊(duì)列Exchange
        args.put("x-dead-letter-exchange", ORDER_DL_EXCHANGE);
                // x-dead-letter-routing-key    聲明 死信隊(duì)列拋出異常重定向隊(duì)列的routingKey("order.dlx.cancel")
        args.put("x-dead-letter-routing-key", ORDER_DL_ORDER_ROUTING_KEY);
        args.put("x-message-ttl", 10000);
        args.put("x-max-length", 30);
        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    }

    // order交換機(jī)
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();
    }


    /**
     * 死信隊(duì)列綁定到死信交換器上.
     *
     * @return the binding
     */
    @Bean
    public Binding newDLBinding(Queue orderDLQueue, Exchange orderDLExchange) {
        return BindingBuilder
                .bind(orderDLQueue)
                .to(orderDLExchange)
                .with(DLX_ROUTING_KEY)
                .noargs();

    }

    /**
     * 將重定向隊(duì)列通過routingKey(“order.dlx.cancel”)綁定到死信隊(duì)列的Exchange上
     *
     * @return the binding
     */
    @Bean
    public Binding orderToDLBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderDLExchange") Exchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ORDER_DL_ORDER_ROUTING_KEY)
                .noargs();

    }

    /**
     * 綁定正常的交換機(jī)和隊(duì)列
     *
     * @return the binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder
                .bind(orderQueue())
                .to(orderExchange())
                .with(ORDER_ROUTING_KEY)
                .noargs();

    }
}

Consumer端新建一個(gè)orderListener,一定要監(jiān)聽死信隊(duì)列

package org.example.rabbitmq.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @ClassName AckListener
 * @Description TODO
 * @Author Patrick Star
 * @Date 2020/12/7 5:50 下午
 */
@Component
@RabbitListener(queues = "order_dl_queue")
public class OrderListener {
    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException, InterruptedException {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1. 接受轉(zhuǎn)換消息
            System.out.println("orderListener收到的消息為:" + new String(message.getBody()));

            //2. 處理業(yè)務(wù)邏輯
            System.out.println("處理業(yè)務(wù)邏輯...");
            System.out.println("根據(jù)訂單id查詢其狀態(tài)...");
            System.out.println("判斷狀態(tài)是否為支付成功");
            System.out.println("取消訂單,回滾庫存....");
            // 3. 手動(dòng)簽收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 4. 拒絕簽收
            /*
            第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端
             */
            channel.basicNack(deliveryTag, true, true);
//            channel.basicReject(deliveryTag,true); 單條數(shù)據(jù)

            //消息的標(biāo)識(shí),false只確認(rèn)當(dāng)前一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //ack返回false,并重新回到隊(duì)列,api里面解釋得很清楚
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            //拒絕消息
            //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }


    }


}

我們主要看消費(fèi)者是不是延遲10秒后收到消息,在消費(fèi)者控制臺(tái)打印消息。

延遲隊(duì)列小結(jié)

  1. 延遲隊(duì)列 指消息進(jìn)入隊(duì)列后,可以被延遲一定時(shí)間,再進(jìn)行消費(fèi)。
  2. RabbitMQ沒有提供延遲隊(duì)列功能,但是可以使用 : TTL + DLX 來實(shí)現(xiàn)延遲隊(duì)列效果

1.7 日志與監(jiān)控

我的rabbitmq運(yùn)行在linux服務(wù)器上的一個(gè)docker容器中,我們可以進(jìn)入該容器查看日志信息,對(duì)于不是docker運(yùn)行的服務(wù),就直接在服務(wù)器上查找。

RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log

我們使用docker ps查找我們運(yùn)行的rabbitmq的容器id

再使用

docker logs 47b

查看docker的輸出日志,其中47b是我rabbitmq容器id的前幾個(gè)字母,里面就可以看到我們的rabbitmq的日志了。

進(jìn)入容器內(nèi),使用命令rabbitmqctl status可以看到我們的Log文件是輸出到了輸出流,而沒有使用文件保存,如果需要查看log文件,參考下面的解決方法

https://blog.csdn.net/fvdfsdafdsafs/article/details/110097643

也可以打開web控制臺(tái),也可以看到很多的的參數(shù)信息

點(diǎn)擊name,可以看到一些負(fù)載參數(shù)

如果綠色的接近紅色,就應(yīng)該注意了。

也可以通過rabbitmq的控制命令查看

查看隊(duì)列

rabbitmqctl list_queues

查看exchanges

rabbitmqctl list_exchanges

查看用戶

rabbitmqctl list_users

查看連接

rabbitmqctl list_connections

查看消費(fèi)者信息

rabbitmqctl list_consumers

查看環(huán)境變量

rabbitmqctl environment

查看未被確認(rèn)的隊(duì)列

rabbitmqctl list_queues name messages_unacknowledged

查看單個(gè)隊(duì)列的內(nèi)存使用

rabbitmqctl list_queues name memory

查看準(zhǔn)備就緒的隊(duì)列

rabbitmqctl list_queues name messages_ready

1.8 消息追蹤

在使用任何消息中間件的過程中,難免會(huì)出現(xiàn)某條消息異常丟失的情況。對(duì)于RabbitMQ而言,可能是因?yàn)樯a(chǎn)者或消費(fèi)者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機(jī)制;也有可能是因?yàn)榻粨Q器與隊(duì)列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒有與任何隊(duì)列進(jìn)行綁定,生產(chǎn)者又不感知或者沒有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個(gè)時(shí)候就需要有一個(gè)較好的機(jī)制跟蹤記錄消息的投遞過程,以此協(xié)助開發(fā)和運(yùn)維人員進(jìn)行問題的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實(shí)現(xiàn)消息追蹤。

firehose的機(jī)制是將生產(chǎn)者投遞給rabbitmq的消息,rabbitmq投遞給消費(fèi)者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個(gè)默認(rèn)的exchange的名稱為amq.rabbitmq.trace,它是一個(gè)topic類型的exchange。發(fā)送到這個(gè)exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實(shí)際exchange和queue的名稱,分別對(duì)應(yīng)生產(chǎn)者投遞到exchange的消息,和消費(fèi)者從queue上獲取的消息。

如何使用呢,我們可以將一個(gè)隊(duì)列綁定到默認(rèn)交換機(jī),routing key 就為test_trace好了,然后往這個(gè)隊(duì)列發(fā)消息,默認(rèn)交換機(jī)會(huì)把消息轉(zhuǎn)發(fā)到隊(duì)列,同時(shí),隊(duì)列還收到了兩條消息,是trace交換機(jī)發(fā)的詳細(xì)的日志消息。

注意:打開 trace 會(huì)影響消息寫入功能,適當(dāng)打開后請(qǐng)關(guān)閉。

  • rabbitmqctl trace_on:開啟Firehose命令

  • rabbitmqctl trace_off:關(guān)閉Firehose命令

rabbitmq_tracing和Firehose在實(shí)現(xiàn)上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。

我們要做的就是啟用插件:rabbitmq-plugins enable rabbitmq_tracing

首先進(jìn)入 docker 容器內(nèi)部,執(zhí)行rabbitmq-plugins list命令

root@47b96c4e50ef:/# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@47b96c4e50ef
 |/
[  ] rabbitmq_amqp1_0                  3.8.9
[  ] rabbitmq_auth_backend_cache       3.8.9
[  ] rabbitmq_auth_backend_http        3.8.9
[  ] rabbitmq_auth_backend_ldap        3.8.9
[  ] rabbitmq_auth_backend_oauth2      3.8.9
[  ] rabbitmq_auth_mechanism_ssl       3.8.9
[  ] rabbitmq_consistent_hash_exchange 3.8.9
[  ] rabbitmq_event_exchange           3.8.9
[  ] rabbitmq_federation               3.8.9
[  ] rabbitmq_federation_management    3.8.9
[  ] rabbitmq_jms_topic_exchange       3.8.9
[E*] rabbitmq_management               3.8.9
[e*] rabbitmq_management_agent         3.8.9
[  ] rabbitmq_mqtt                     3.8.9
[  ] rabbitmq_peer_discovery_aws       3.8.9
[  ] rabbitmq_peer_discovery_common    3.8.9
[  ] rabbitmq_peer_discovery_consul    3.8.9
[  ] rabbitmq_peer_discovery_etcd      3.8.9
[  ] rabbitmq_peer_discovery_k8s       3.8.9
[E*] rabbitmq_prometheus               3.8.9
[  ] rabbitmq_random_exchange          3.8.9
[  ] rabbitmq_recent_history_exchange  3.8.9
[  ] rabbitmq_sharding                 3.8.9
[  ] rabbitmq_shovel                   3.8.9
[  ] rabbitmq_shovel_management        3.8.9
[  ] rabbitmq_stomp                    3.8.9
[  ] rabbitmq_top                      3.8.9
[  ] rabbitmq_tracing                  3.8.9
[  ] rabbitmq_trust_store              3.8.9
[e*] rabbitmq_web_dispatch             3.8.9
[  ] rabbitmq_web_mqtt                 3.8.9
[  ] rabbitmq_web_mqtt_examples        3.8.9
[  ] rabbitmq_web_stomp                3.8.9
[  ] rabbitmq_web_stomp_examples       3.8.9

查看我們啟用的插件,帶*的就是啟用了

然后輸入

rabbitmq-plugins enable rabbitmq_tracing

然后在web控制臺(tái),刷新一下,點(diǎn)擊admin標(biāo)簽,可以看到右邊多了一個(gè)tracing標(biāo)簽

可以在這里添加新的tracing

Virtual host:虛擬主機(jī)名

Name :tracing 的名稱,將來可以在這個(gè)tracing中記錄很多的日志信息

Format:日志信息的格式,一種是text,一種是json,text是給我們程序員看的,是明文的,json便于計(jì)算機(jī)解析,經(jīng)過base64編碼了

Max payload bytes:不填就把所有的消息體記錄起來,填個(gè)10就表示取前10個(gè)字節(jié)

Pattern:#表示的接收所有的消息,不管是發(fā)過來的還是消費(fèi)的都可,如果只想接受發(fā)過來的,就填publish.#,如果想接收消費(fèi)的,就填deliver.#

添加完后上邊就多了一些東西,點(diǎn)進(jìn)去右邊的mytrace.log現(xiàn)在是啥都沒有,我們往里面發(fā)一些消息,就可以看到一些日志信息了。

點(diǎn)開隊(duì)列列表可以看到多了一個(gè)隊(duì)列記錄我們的日志消息

這個(gè)隊(duì)列綁定的是amq.rabbitmq.trace交換機(jī)

2. RabbitMQ應(yīng)用問題

2.1 消息可靠性保障

我們想要消息100%發(fā)送成功似乎是不可能的,但是我們最起碼可以保證消息99.9%能發(fā)送成功吧。其中就用到了消息補(bǔ)償。

來看一張圖,其中producer 和 consumer都有自己對(duì)應(yīng)的數(shù)據(jù)庫,正常情況下,producer將業(yè)務(wù)數(shù)據(jù)入庫,發(fā)送消息到Q1,consumer監(jiān)聽Q1,接收消息去消費(fèi),完成相應(yīng)的DB操作。

考慮不正常的情況,producer操作數(shù)據(jù)庫成功了,但是第2步發(fā)送消息失敗了,這樣consumer收不到消息,業(yè)務(wù)操作也會(huì)失敗,這時(shí)候怎么辦呢,producer發(fā)送消息完成之后,到第3步,延遲發(fā)送消息到Q3,也就是說,發(fā)送一條消息之后等待一段時(shí)間再發(fā)一條消息到Q3,這兩個(gè)消息一模一樣,如果消息到Q1發(fā)送成功,consumer消費(fèi)成功后要向Q2發(fā)送確認(rèn)消息,相當(dāng)于consumer轉(zhuǎn)換了一次角色,變成了生產(chǎn)端,我們有個(gè)回調(diào)檢查服務(wù),監(jiān)聽了Q2的確認(rèn)消息,收到確認(rèn)消息,將消息寫入消息數(shù)據(jù)庫中。而回調(diào)檢查服務(wù)也監(jiān)聽著Q3,Q1、Q2、Q3中的消息ID是一樣的,收到Q3的消息后,回調(diào)檢查服務(wù)要去比對(duì)當(dāng)前這條消息是否和剛才寫入MDB的消息是否一致,檢查是否被消費(fèi)過,沒有被消費(fèi)過得話,MDB一定不存在記錄,則轉(zhuǎn)到第8步,producer重新發(fā)送消息。最后還有個(gè)問題,如果第2步第3步都失敗了呢,我們還有個(gè)最后的保障,也就是定時(shí)檢查服務(wù),檢查業(yè)務(wù)數(shù)據(jù)庫DB和消息MDB是否能匹配,檢查DB是不是比MDB的數(shù)據(jù)多了,或者匹配不上了,再去調(diào)用producer,重發(fā)那些多的消息。

2.2 消息冪等性保障

冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說,其任意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

在MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。

舉個(gè)栗子,我花了500買衣服,所以服務(wù)器會(huì)發(fā)消息到MQ去扣款,下訂單付款的時(shí)候,可能由于網(wǎng)絡(luò)的原因,不管是什么原因,總之我發(fā)了兩條扣款500的消息,總不能扣我1000塊吧,RabbitMQ采取了用數(shù)據(jù)庫樂觀鎖的機(jī)制來保障消息的冪等性。

這里的樂觀鎖,就是給消息加了個(gè)版本號(hào),由上圖的例子,consumer第一次執(zhí)行消息寫入數(shù)據(jù)庫,version是1,樂觀鎖會(huì)將id與version綁定,且version取出來加1,下一條消息來了的時(shí)候,consumer想寫數(shù)據(jù)庫的時(shí)候判斷條件 id = 1 and version = 1便不成立了。

冪等性其實(shí)有很多的保障機(jī)制,這里只介紹了樂觀鎖的機(jī)制。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容