消息中間件——RabbitMQ(八)高級(jí)特性全在這里!(下)

求關(guān)注
高級(jí)特性全在這里!(下)

前言

上一篇消息中間件——RabbitMQ(七)高級(jí)特性全在這里!(上)中我們介紹了消息如何保障100%的投遞成功?,冪等性概念詳解,在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費(fèi)的問(wèn)題?,Confirm確認(rèn)消息、Return返回消息。這篇我們來(lái)介紹下下面內(nèi)容。

  • 自定義消費(fèi)者
  • 消息的限流(防止占用內(nèi)存過(guò)多,節(jié)點(diǎn)宕機(jī))
  • 消息的ACK與重回隊(duì)列
  • TTL消息
  • 死信隊(duì)列

1. 自定義消費(fèi)者

1.1 消費(fèi)端自定義監(jiān)聽(tīng)

我們一般就在代碼中編寫(xiě)while循環(huán),進(jìn)行consumer.nextDelivery方法進(jìn)行獲取下一條消息,然后進(jìn)行消費(fèi)處理!

但是這種輪訓(xùn)的方式肯定是不好的,代碼也比較low。

  • 我們使用自定義的Consumer更加的方便,解耦性更加的強(qiáng),也是在實(shí)際工作中最常見(jiàn)的使用方式!
消費(fèi)端自定義監(jiān)聽(tīng)

1.2 代碼演示

1.2.1 生產(chǎn)者


/**
 * 
* @ClassName: Producer 
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51 
*
 */
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_consumer_exchange";
        String routingKey = "consumer.save";
        
        String msg = "Hello RabbitMQ Consumer Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}


1.2.2 消費(fèi)者


/**
 * 
* @ClassName: Consumer 
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51 
*
 */
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        // 創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //實(shí)現(xiàn)自己的MyConsumer()
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}


1.2.3 自定義類(lèi):MyConsumer

/**
 * 
* @ClassName: MyConsumer 
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55 
*
 */
public class MyConsumer extends DefaultConsumer {


    public MyConsumer(Channel channel) {
        super(channel);
    }

    //根據(jù)需求,重寫(xiě)自己需要的方法。
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        //消費(fèi)標(biāo)簽
        System.err.println("consumerTag: " + consumerTag);
        //這個(gè)對(duì)象包含許多關(guān)鍵信息
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }


}


1.3 打印結(jié)果

打印結(jié)果

2. 消費(fèi)端限流

2.1 什么是消費(fèi)端的限流?

  • 假設(shè)一個(gè)場(chǎng)景,首先,我們Rabbitmq服務(wù)器有上萬(wàn)條未處理的消息,我們隨便打開(kāi)一個(gè)消費(fèi)者客戶(hù)端,會(huì)出現(xiàn)下面情況:
  • 巨量的消息瞬間全部推送過(guò)來(lái),但是我們單個(gè)客戶(hù)端無(wú)法同時(shí)處理這么多數(shù)據(jù)!這個(gè)時(shí)候很容易導(dǎo)致服務(wù)器崩潰,出現(xiàn)故障。

為什么不在生產(chǎn)端進(jìn)行限流呢?

因?yàn)樵诟卟l(fā)的情況下,客戶(hù)量就是非常大,所以很難在生產(chǎn)端做限制。因此我們可以用MQ在消費(fèi)端做限流。

  • RabbitMQ提供了一種qos(服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過(guò)基于consume或者channel設(shè)置Qos的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
    在限流的情況下,千萬(wàn)不要設(shè)置自動(dòng)簽收,要設(shè)置為手動(dòng)簽收。
  • void BasicQos(uint prfetchSize,ushort prefetchCount,bool global);

參數(shù)解釋?zhuān)?/strong>
prefetchSize:0
prefetchCount:會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送多于N個(gè)消息,即一旦有N個(gè)消息還沒(méi)有ack,則該consumer將block掉,直到有消息ack。
global: true\false 是否將上面設(shè)置應(yīng)用于channel,簡(jiǎn)單點(diǎn)說(shuō),就是上面限制是channel級(jí)別還是consumer級(jí)別。
prefetchSize和global這兩項(xiàng),rabbitmq沒(méi)有實(shí)現(xiàn),暫且不研究prefetch_count在no_ask = false的情況下生效,即在自動(dòng)應(yīng)答的情況下這兩個(gè)值是不生效的。

第一個(gè)參數(shù):消息的限制大小,消息多少兆。一般不做限制,設(shè)置為0
第二個(gè)參數(shù):一次最多處理多少條,實(shí)際工作中設(shè)置為1就好
第三個(gè)參數(shù):限流策略在什么上應(yīng)用。在RabbitMQ一般有兩個(gè)應(yīng)用級(jí)別:1.通道 2.Consumer級(jí)別。一般設(shè)置為false,true 表示channel級(jí)別,false表示在consumer級(jí)別

2.2 代碼演示

2.2.1 生產(chǎn)者


/**
 * 
* @ClassName: Producer 
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51 
*
 */
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}


2.2.2 消費(fèi)者


/**
 * 
* @ClassName: Consumer 
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51 
*
 */
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //1 限流方式  第一件事就是 autoAck設(shè)置為 false
        //設(shè)置為1,表示一條一條數(shù)據(jù)處理
        channel.basicQos(0, 1, false);
        
        channel.basicConsume(queueName, false, new MyConsumer(channel));
        
        
    }
}


2.2.3 自定義類(lèi):MyConsumer


/**
 * 
* @ClassName: MyConsumer 
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55 
*
 */
public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        
        //需要做簽收,false表示不支持批量簽收
        channel.basicAck(envelope.getDeliveryTag(), false);
        
    }


}


2.2.4 測(cè)試結(jié)果

我們先注釋掉:channel.basicAck(envelope.getDeliveryTag(), false);然后啟動(dòng)Consumer。
查看Exchange

Exchange

查看Queues


Queues

然后再啟動(dòng)Producer。查看打印結(jié)果:


打印結(jié)果

我們會(huì)發(fā)現(xiàn)消費(fèi)端,只收到了一條消息。這是為什么呢?

第一點(diǎn)因?yàn)槲覀冊(cè)赾onsumer中

channel.basicConsume(queueName, false, new MyConsumer(channel));

第二個(gè)參數(shù)設(shè)置為false為手動(dòng)簽收。

第二點(diǎn)在qos中設(shè)置只接受一條消息。如果這一條消息不給Broker Ack應(yīng)答的話(huà),那么Broker會(huì)認(rèn)為你并沒(méi)有消費(fèi)完這一條消息,那么就不會(huì)繼續(xù)發(fā)送消息。

channel.basicQos(0, 1, false);

可以看下管控臺(tái),unack=1,Ready=4,total=5.

queues

接下來(lái)我們放開(kāi)注釋channel.basicAck(envelope.getDeliveryTag(), false); 進(jìn)行消息簽收。重啟服務(wù)。

3.1 打印結(jié)果

打印結(jié)果

可以看到正常打印五條結(jié)果

4. 消費(fèi)端ACK與重回隊(duì)列

4.1 消費(fèi)端的手工ACK和NACK

消費(fèi)端進(jìn)行消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償!

如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,那我們就需要手工進(jìn)行ACK保障消費(fèi)端消費(fèi)成功!

4.2 消費(fèi)端的重回隊(duì)列

消費(fèi)端重回隊(duì)列是為了對(duì)沒(méi)有處理成功的消息,把消息重新傳遞給Broker!

一般我們?cè)趯?shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是設(shè)置為False.

4.3 代碼演示

4.3.1 生產(chǎn)者


/**
 * 
* @ClassName: Producer 
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51 
*
 */
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        //1創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        
        
        for(int i =0; i<5; i ++){
            
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            
            //添加屬性,后續(xù)會(huì)使用到
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) //投遞模式,持久化
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}


4.3.2 消費(fèi)者


/**
 * 
* @ClassName: Consumer 
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51 
*
 */
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        // 手工簽收 必須要關(guān)閉 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
        
        
    }
}


4.3.3 自定義類(lèi):MyConsumer

/**
 * 
* @ClassName: MyConsumer 
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55 
*
 */
public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            //Nack三個(gè)參數(shù)  第二個(gè)參數(shù):是否是批量,第三個(gè)參數(shù):是否重回隊(duì)列(需要注意可能會(huì)發(fā)生重復(fù)消費(fèi),造成死循環(huán))
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
        
    }


}


5.1 打印結(jié)果:

打印結(jié)果

注意:
可以看到重回隊(duì)列會(huì)出現(xiàn)重復(fù)消費(fèi)導(dǎo)致死循環(huán)的問(wèn)題,這時(shí)候最好設(shè)置重試次數(shù),比如超過(guò)三次后,消息還是消費(fèi)失敗,就將消息丟棄。

6. TTL隊(duì)列/消息

6.1 TTL

  • TTL是Time To Live的縮寫(xiě),也就是生存時(shí)間
  • RabbitMQ支持消息的過(guò)期時(shí)間,在消息發(fā)送時(shí)可以進(jìn)行指定
  • RabbitMQ支持隊(duì)列的過(guò)期時(shí)間,從消息入隊(duì)列開(kāi)始計(jì)算,只要超過(guò)了隊(duì)列的超時(shí)時(shí)間配置,那么消息會(huì)自動(dòng)的清除

6.2 代碼演示

6.2.1 直接通過(guò)管控臺(tái)進(jìn)行演示

添加queue

通過(guò)管控臺(tái)創(chuàng)建一個(gè)隊(duì)列

x-max-length 隊(duì)列的最大大小
x-message-ttl 設(shè)置10秒鐘,如果消息還沒(méi)有被消費(fèi)的話(huà),就會(huì)被清除。

queue添加成功

添加exchange
添加exchange
exchange添加成功

Queue與Exchange進(jìn)行綁定

點(diǎn)擊 test_ttl_exchange 進(jìn)行綁定

進(jìn)行綁定
查看是否綁定成功

綁定1

綁定2

通過(guò)管控臺(tái)發(fā)送消息
發(fā)送消息
queue中有消息
消息未處理自動(dòng)清除
十秒后消息消失

生產(chǎn)端設(shè)置過(guò)期時(shí)間

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .headers(headers)
                .build();

這兩個(gè)屬性并不相同,一個(gè)對(duì)應(yīng)的是消息體,一個(gè)對(duì)應(yīng)的是隊(duì)列的過(guò)期。

7. 死信隊(duì)列

7.1 概念理解

死信隊(duì)列:DLX,Dead-Letter-Exchange
RabbitMQ的死信隊(duì)里與Exchange息息相關(guān)

  • 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX

消息變成死信有以下幾種情況

  • 消息被拒絕(basic.reject/basic.nack)并且requeue=false
  • 消息TTL過(guò)期
  • 隊(duì)列達(dá)到最大長(zhǎng)度

DLX也是一個(gè)正常的Exchange,和一般的Exchange沒(méi)有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性

當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列。

可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理,這個(gè)特征可以彌補(bǔ)RabbitMQ3.0以前支持的immediate參數(shù)的功能。

7.2 代碼演示

  • 死信隊(duì)列設(shè)置:
  • 首先需要設(shè)置死信隊(duì)列的exchange和queue,然后進(jìn)行綁定:
    Exchange:dlx.exchange
    Queue:dlx.queue
    RoutingKey:#
  • 然后我們進(jìn)行正常聲明交換機(jī)、隊(duì)列、綁定,只不過(guò)我們需要在隊(duì)列加上一個(gè)參數(shù)即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
  • 這樣消息在過(guò)期、requeue、隊(duì)列在達(dá)到最大長(zhǎng)度時(shí),消息就可以直接路由到死信隊(duì)列!

7.2.1 生產(chǎn)者

/**
 * 
* @ClassName: Producer 
* @Description: 生產(chǎn)者
* @author Coder編程
* @date2019年7月30日 下午23:15:51 
*
 */
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        //創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        for(int i =0; i<1; i ++){
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .build();
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}


7.2.2 消費(fèi)者

/**
 * 
* @ClassName: Consumer 
* @Description: 消費(fèi)者
* @author Coder編程
* @date2019年7月30日 下午23:13:51 
*
 */
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        //創(chuàng)建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        // 這就是一個(gè)普通的交換機(jī) 和 隊(duì)列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        
        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //這個(gè)agruments屬性,要設(shè)置到聲明隊(duì)列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //要進(jìn)行死信隊(duì)列的聲明:
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");
        
        channel.basicConsume(queueName, true, new MyConsumer(channel));
        
        
    }
}


7.2.3 自定義類(lèi):MyConsumer


/**
 * 
* @ClassName: MyConsumer 
* @Description: TODO
* @author Coder編程
* @date 2019年7月30日 下午23:11:55 
*
 */
public class MyConsumer extends DefaultConsumer {


    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

7.2.4 測(cè)試結(jié)果

運(yùn)行Consumer,查看管控臺(tái)
查看Exchanges

exchange

查看queue
queue

可以看到test_dlx_queue多了DLX的標(biāo)識(shí),表示當(dāng)隊(duì)列中出現(xiàn)死信的時(shí)候,會(huì)將消息發(fā)送到死信隊(duì)列dlx_queue中

關(guān)閉Consumer,只運(yùn)行Producer

查看queue10秒鐘前

過(guò)10秒鐘后,消息過(guò)期


查看queue10秒鐘后

在我們工作中,死信隊(duì)列非常重要,用于消息沒(méi)有消費(fèi)者,處于死信狀態(tài)。我們可以才用補(bǔ)償機(jī)制。

小結(jié)

本次主要介紹了RabbitMQ的高級(jí)特性,首先介紹了互聯(lián)網(wǎng)大廠在實(shí)際使用中如何保障100%的消息投遞成功和冪等性的,以及對(duì)RabbitMQ的確認(rèn)消息、返回消息、ACK與重回隊(duì)列、消息的限流,以及對(duì)超時(shí)時(shí)間、死信隊(duì)列的使用

文末

歡迎關(guān)注個(gè)人微信公眾號(hào):Coder編程
獲取最新原創(chuàng)技術(shù)文章和免費(fèi)學(xué)習(xí)資料,更有大量精品思維導(dǎo)圖、面試資料、PMP備考資料等你來(lái)領(lǐng),方便你隨時(shí)隨地學(xué)習(xí)技術(shù)知識(shí)!
新建了一個(gè)qq群:315211365,歡迎大家進(jìn)群交流一起學(xué)習(xí)。謝謝了!也可以介紹給身邊有需要的朋友。

文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關(guān)注并star~

微信公眾號(hào)

參考文章:

《RabbitMQ消息中間件精講》

推薦文章:

消息中間件——RabbitMQ(五)快速入門(mén)生產(chǎn)者與消費(fèi)者,SpringBoot整合RabbitMQ!

消息中間件——RabbitMQ(六)理解Exchange交換機(jī)核心概念!

消息中間件——RabbitMQ(七)高級(jí)特性全在這里!(上)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容