生產(chǎn)者消費(fèi)者模式-java原生、Disruptor實(shí)現(xiàn)方案

生產(chǎn)者消費(fèi)者模式介紹

生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。

這個(gè)阻塞隊(duì)列就是用來給生產(chǎn)者和消費(fèi)者解耦的。阻塞隊(duì)列如何實(shí)現(xiàn)高并發(fā)多線程安全也是生產(chǎn)者消費(fèi)者模式中的核心關(guān)鍵。

在日常開發(fā)過程中,我們常常會(huì)遇到一些高并發(fā)場景,例如很多秒殺場景,其實(shí)真實(shí)的秒殺場景會(huì)很復(fù)雜,這里只是簡單描述下秒殺場景下的生產(chǎn)者消費(fèi)者模式,在秒殺場景下生產(chǎn)者是普通參與秒殺的用戶,消費(fèi)者是秒殺系統(tǒng),通常來說這樣的場景下秒殺用戶是非常多的,如果系統(tǒng)采用常規(guī)的實(shí)時(shí)同步交易,那么勢必造成系統(tǒng)處理線程池被瞬間占滿,后續(xù)請求全部被丟棄,這樣造成的用戶體驗(yàn)是非常差的,而且系統(tǒng)可能會(huì)出現(xiàn)快速雪崩。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。秒殺場景是個(gè)非常適合生產(chǎn)者、消費(fèi)者的模式,通過中間的緩沖隊(duì)列解決秒殺請求接收和秒殺處理兩者之間的處理時(shí)間差,并且能夠在過程中通過反欺詐和公平算法來保證消費(fèi)者的公平利益。下面我們就來用java實(shí)現(xiàn)下生產(chǎn)者和消費(fèi)者模式,這里實(shí)現(xiàn)的是可以直接用于生產(chǎn)環(huán)境的架構(gòu),并不是簡單的使用Queue做個(gè)簡單的進(jìn)棧出棧,而是通過java.util.concurrent下的相關(guān)類實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式的多線程方案。

Java實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式

隊(duì)列的特性:先進(jìn)先出(FIFO)—先進(jìn)入隊(duì)列的元素先出隊(duì)列(可以理解為我們生活中的排隊(duì)情況,早辦完,早滾蛋)。生產(chǎn)者(Producer)往隊(duì)列里發(fā)布(publish)事件,消費(fèi)者(Consumer)獲得通知,消費(fèi)事件;如果隊(duì)列中沒有事件時(shí),消費(fèi)者堵塞,直到生產(chǎn)者發(fā)布了新事件。

說到隊(duì)列,那就不得不提到Java中的concurrent包,其主要實(shí)現(xiàn)包括ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue、LinkedTransferQueue。下面,簡單介紹下:

  • ArrayBlockingQueue:基于數(shù)組形式的隊(duì)列,通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;

  • LinkedBlockingQueue:基于鏈表形式的隊(duì)列,也通過加鎖的方式,來保證多線程情況下數(shù)據(jù)的安全;

  • ConcurrentLinkedQueue:基于鏈表形式的隊(duì)列,通過compare and swap(簡稱CAS)協(xié)議的方式,來保證多線程情況下數(shù)據(jù)的安全,不加鎖,主要使用了Java中的sun.misc.Unsafe類來實(shí)現(xiàn);

  • LinkedTransferQueue:同上;

因?yàn)長inkedBlockingQueue采用了樂觀鎖方案,所以性能是非常高的,下面我們就用LinkedBlockingQueue作為隊(duì)列緩沖區(qū)來實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式。

待處理數(shù)據(jù)類

首先我們需要實(shí)現(xiàn)一個(gè)緩沖隊(duì)列中的待處理類,這里例子中實(shí)現(xiàn)的比較簡單,只是設(shè)置了一個(gè)int類型的變量,重寫了構(gòu)造函數(shù)并定義了get方法,大家可以根據(jù)自己的需要定義相關(guān)的內(nèi)容。

public class PCData {
    private final int intData;

    public PCData(int intData) {
        this.intData = intData;
    }

    public int getIntData() {
        return intData;
    }

    @Override
    public String toString() {
        return "PCData{" +
                "intData=" + intData +
                '}';
    }
}

生產(chǎn)者類

下面我們定義生產(chǎn)者類,在生產(chǎn)者類中需要定義一個(gè)緩沖隊(duì)列,這里使用了剛才提到的BlockingDeque。

private BlockingDeque<PCData> queue;

生產(chǎn)者中還需要再定義一個(gè)靜態(tài)的AtomicInteger類型的對(duì)象,用于多線程中共享數(shù)據(jù),用于生成PCData,為什么使用AtomicInteger類型,是因?yàn)锳tomicInteger類型已經(jīng)實(shí)現(xiàn)了線程安全的自增功能,在實(shí)際項(xiàng)目使用過程中,這個(gè)值可能是UUID或者其他的全局唯一的數(shù)值。

private static AtomicInteger count = new AtomicInteger();

還需要重寫構(gòu)造方法,在生成生產(chǎn)者的時(shí)候使用同一個(gè)緩沖隊(duì)列,來保證生產(chǎn)者和開發(fā)者都使用一樣的隊(duì)列,在實(shí)際項(xiàng)目中也可以定一個(gè)全局的隊(duì)列,來保證所有的生產(chǎn)者和消費(fèi)者都使用同一個(gè)對(duì)列。

//定義入?yún)锽lockingQueue的構(gòu)造函數(shù)
    public Producer(BlockingDeque<PCData> queue){
        this.queue = queue;
    }

生產(chǎn)者的核心方法中主要實(shí)現(xiàn)了創(chuàng)建PCData類并將該待處理對(duì)象放入緩沖隊(duì)列中,這里為了模擬處理耗時(shí),sleep了1秒鐘,所有繼承子BlockingDeque的隊(duì)列類都實(shí)現(xiàn)了offer方法,該方法主要是將待處理對(duì)象放入緩沖隊(duì)列中,這樣生產(chǎn)者就完成了生產(chǎn)者的基本工作,創(chuàng)建待處理類對(duì)象,并將其放入隊(duì)列。

Thread.sleep(1000);
data = new PCData(count.incrementAndGet());
queue.offer(data);

下面是整個(gè)Producer的代碼:

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: feiweiwei
 * @Description: 生產(chǎn)者
 * @Created Date: 17:14 17/9/10.
 * @Modify by:
 */
public class Producer implements Runnable {

    private volatile boolean isrunning = true;
    //內(nèi)存緩沖隊(duì)列
    private BlockingDeque<PCData> queue;
    private static AtomicInteger count = new AtomicInteger();

    //定義入?yún)锽lockingQueue的構(gòu)造函數(shù)
    public Producer(BlockingDeque<PCData> queue){
        this.queue = queue;
    }

    public void stop(){
        this.isrunning = false;
    }

    @Override
    public void run() {
        PCData data = null;
        System.out.println("producer id = " + Thread.currentThread().getId());

        while (isrunning) {
            try {
                Thread.sleep(1000);
                data = new PCData(count.incrementAndGet());
                queue.offer(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

消費(fèi)者類

消費(fèi)者類的核心工作就是將待處理數(shù)據(jù)從緩沖隊(duì)列中取出,并處理。

在消費(fèi)者類中同樣有個(gè)BlockingDeque<PCData>的對(duì)象,同樣也是在創(chuàng)建消費(fèi)者類的時(shí)候從外部傳入,這樣可以保證所有生產(chǎn)者和消費(fèi)者使用一樣的隊(duì)列。

在核心處理邏輯中通過BlockingDeque的take方法取出待處理對(duì)象,然后就可以對(duì)該對(duì)象進(jìn)行處理了,調(diào)用take方法后,該待處理對(duì)象也自動(dòng)從queue中彈出。

下面是消費(fèi)者實(shí)現(xiàn)代碼:

import java.util.concurrent.BlockingDeque;

/**
 * @Author: feiweiwei
 * @Description: 消費(fèi)者類
 * @Created Date: 17:26 17/9/10.
 * @Modify by:
 */
public class Customer implements Runnable {

    private BlockingDeque<PCData> queue;
    private volatile boolean isrunning = true;

    //定義入?yún)锽lockingQueue的構(gòu)造函數(shù)
    public Customer(BlockingDeque<PCData> queue){
        this.queue = queue;
    }

    public void stop(){
        this.isrunning = false;
    }

    @Override
    public void run() {
        System.out.println("customer id = " + Thread.currentThread().getId());

        while (isrunning){
            try {
                PCData data = queue.take();
                if ( null != data){
                    int re = data.getIntData() * data.getIntData();
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getId() + " data is " + re + "done!");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

主調(diào)用Main

在主調(diào)用Main中,我們先創(chuàng)建一個(gè)隊(duì)列長度為10的LinkedBlockingDeque對(duì)象作為緩沖隊(duì)列。

BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);

再分別創(chuàng)建10個(gè)生產(chǎn)者對(duì)象和2個(gè)消費(fèi)者,并將剛才創(chuàng)建的queue對(duì)象作為構(gòu)造函數(shù)入?yún)ⅰ?/p>

Producer[] producers = new Producer[10];
Customer[] customers = new Customer[2];

for(int i=0; i<10; i++){
    producers[i] = new Producer(queue);
}
for(int j=0; j<2; j++){
    customers[j] = new Customer(queue);
}

創(chuàng)建一個(gè)線程池將生產(chǎn)者和消費(fèi)者調(diào)用起來,這里的線程池大家可以使用自定義的線程池。

ExecutorService es = Executors.newCachedThreadPool();
for(Producer producer : producers){
    es.execute(producer);
}

for(Customer customer : customers){
    es.execute(customer);
}

下面是Main代碼:

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * @Author: feiweiwei
 * @Description:
 * @Created Date: 17:29 17/9/10.
 * @Modify by:
 */
public class Main {

    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<PCData> queue = new LinkedBlockingDeque<PCData>(10);
        Producer[] producers = new Producer[10];
        Customer[] customers = new Customer[2];

        for(int i=0; i<10; i++){
            producers[i] = new Producer(queue);
        }
        for(int j=0; j<2; j++){
            customers[j] = new Customer(queue);
        }
        ExecutorService es = Executors.newCachedThreadPool();
        for(Producer producer : producers){
            es.execute(producer);
        }

        for(Customer customer : customers){
            es.execute(customer);
        }
        Thread.sleep(10000);
        for(Producer producer : producers){
            producer.stop();
        }
        for(Customer customer : customers){
            customer.stop();
        }

        es.shutdown();
    }
}

Disruptor實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式

剛才那個(gè)是我們自己使用java.util.concurrent下的類實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模式,目前業(yè)界已經(jīng)有比較成熟的方案,這里向大家推薦LMAX公司開源的Disruptor框架,Disruptor是一個(gè)開源的框架,可以在無鎖的情況下對(duì)隊(duì)列進(jìn)行操作,那么這個(gè)隊(duì)列的設(shè)計(jì)就是Disruptor的核心所在。

Screenshot 2017-09-18 14.38.08.png

在Disruptor中,采用了RingBuffer來作為隊(duì)列的數(shù)據(jù)結(jié)構(gòu),RingBuffer就是一個(gè)環(huán)形的數(shù)組,既然是數(shù)組,我們便可對(duì)其設(shè)置大小。在這個(gè)ringBuffer中,除了數(shù)組之外,還有一個(gè)序列號(hào),是用來指向數(shù)組中的下一個(gè)可用元素,供生產(chǎn)者使用或者消費(fèi)者使用,也就是生產(chǎn)者可以生產(chǎn)的地方,或者消費(fèi)者可以消費(fèi)的地方。在Disruptor中使用的是位運(yùn)算,并且在Disruptor中數(shù)組內(nèi)的元素并不會(huì)被刪除,而是新數(shù)據(jù)來覆蓋原有數(shù)據(jù),所以整個(gè)環(huán)鏈的處理效率非常高。

下面我們使用Disruptor來實(shí)現(xiàn)剛才用jdk自帶庫實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者。

Disruptor主要類

  • Disruptor:Disruptor的入口,主要封裝了環(huán)形隊(duì)列RingBuffer、消費(fèi)者集合ConsumerRepository的引用;主要提供了獲取環(huán)形隊(duì)列、添加消費(fèi)者、生產(chǎn)者向RingBuffer中添加事件(可以理解為生產(chǎn)者生產(chǎn)數(shù)據(jù))的操作;

  • RingBuffer:Disruptor中隊(duì)列具體的實(shí)現(xiàn),底層封裝了Object[]數(shù)組;在初始化時(shí),會(huì)使用Event事件對(duì)數(shù)組進(jìn)行填充,填充的大小就是bufferSize設(shè)置的值;此外,該對(duì)象內(nèi)部還維護(hù)了Sequencer(序列生產(chǎn)器)具體的實(shí)現(xiàn);

  • Sequencer:序列生產(chǎn)器,分別有MultiProducerSequencer(多生產(chǎn)者序列生產(chǎn)器) 和 SingleProducerSequencer(單生產(chǎn)者序列生產(chǎn)器)兩個(gè)實(shí)現(xiàn)類。上面的例子中,使用的是SingleProducerSequencer;在Sequencer中,維護(hù)了消費(fèi)者的Sequence(序列對(duì)象)和生產(chǎn)者自己的Sequence(序列對(duì)象);以及維護(hù)了生產(chǎn)者與消費(fèi)者序列沖突時(shí)候的等待策略WaitStrategy;

  • Sequence:序列對(duì)象,內(nèi)部維護(hù)了一個(gè)long型的value,這個(gè)序列指向了RingBuffer中Object[]數(shù)組具體的角標(biāo)。生產(chǎn)者和消費(fèi)者各自維護(hù)自己的Sequence;但都是指向RingBuffer的Object[]數(shù)組;

  • Wait Strategy:等待策略。當(dāng)沒有可消費(fèi)的事件時(shí),消費(fèi)者根據(jù)特定的策略進(jìn)行等待;當(dāng)沒有可生產(chǎn)的地方時(shí),生產(chǎn)者根據(jù)特定的策略進(jìn)行等待;

  • Event:事件對(duì)象,就是我們Ringbuffer中存在的數(shù)據(jù),在Disruptor中用Event來定義數(shù)據(jù),并不存在Event類,它只是一個(gè)定義;

  • EventProcessor:事件處理器,單獨(dú)在一個(gè)線程內(nèi)執(zhí)行,判斷消費(fèi)者的序列和生產(chǎn)者序列關(guān)系,決定是否調(diào)用我們自定義的事件處理器,也就是是否可以進(jìn)行消費(fèi);

  • EventHandler:事件處理器,由用戶自定義實(shí)現(xiàn),也就是最終的事件消費(fèi)者,需要實(shí)現(xiàn)EventHandler接口;

  • Producer:事件生產(chǎn)者,也就是我們上面代碼中最后那部門的for循環(huán);

待處理類

Disruptor的待處理類和自己實(shí)現(xiàn)的待處理類沒有本質(zhì)的區(qū)別,可以按照自己要求進(jìn)行定義。

public class PCData {
    private int data;

    public int getData() {
        return data;
    }

    public void setData(int data) {
        this.data = data;
    }
}

待處理類工廠

這里需要實(shí)現(xiàn)disruptor的EventFactory接口,并且實(shí)現(xiàn)newInstance方法。這里我們實(shí)現(xiàn)的newInstance方法,其實(shí)就是創(chuàng)建待處理類的對(duì)象,該工廠類在創(chuàng)建Disruptor對(duì)象的時(shí)候會(huì)使用到。

import com.lmax.disruptor.EventFactory;

/**
 * @Author: feiweiwei
 * @Description: 待處理類工廠
 * @Created Date: 18:55 17/9/10.
 * @Modify by:
 */
public class PCDataFactory implements EventFactory<PCData> {
    @Override
    public PCData newInstance() {
        return new PCData();
    }
}

disruptor生產(chǎn)者類

同樣需要在生產(chǎn)者中定義一個(gè)RingBuffer<PCData>的環(huán)形隊(duì)列,還需要實(shí)現(xiàn)一個(gè)push的方法,通過ringBuffer.next()取到下一個(gè)待處理類序列號(hào),使用ringBuffer.get(sequence)獲取到這個(gè)序列號(hào)對(duì)應(yīng)的待處理類,并對(duì)待處理類進(jìn)行賦值為新的待處理類。
最后通過ringBuffer.publish(sequence)才會(huì)將待處理對(duì)象發(fā)布出來,消費(fèi)者才能看到。

import com.lmax.disruptor.RingBuffer;

/**
 * @Author: feiweiwei
 * @Description: disruptor生產(chǎn)者類
 * @Created Date: 18:56 17/9/10.
 * @Modify by:
 */
public class Producer {
    private final RingBuffer<PCData> ringBuffer;

    public Producer(RingBuffer<PCData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(int data){
        long sequence = ringBuffer.next();
        try{
            PCData event = ringBuffer.get(sequence);
            event.setData(data);
        }finally {
            ringBuffer.publish(sequence);
        }
    }
}

disruptor消費(fèi)者

disruptor的消費(fèi)者類需要實(shí)現(xiàn)WorkHandler接口,并實(shí)現(xiàn)onEvent方法來處理待處理類,例子中只是對(duì)待處理類中的值做了平方。

import com.lmax.disruptor.WorkHandler;

/**
 * @Author: feiweiwei
 * @Description: disruptor消費(fèi)者
 * @Created Date: 18:52 17/9/10.
 * @Modify by:
 */
public class Consumer implements WorkHandler<PCData> {
    @Override
    public void onEvent(PCData pcData) throws Exception {
        System.out.println(Thread.currentThread().getId() +
        "Event = " + pcData.getData()*pcData.getData());
    }
}

Main

待處理類、待處理工廠、生產(chǎn)者、消費(fèi)者都定義好之后就可以進(jìn)行使用了,定義一個(gè)緩行隊(duì)列為1024的disruptor對(duì)象,這里構(gòu)造函數(shù)入?yún)⒖疵志椭懒?,很簡單?/p>

PCDataFactory factory = new PCDataFactory();
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor,
ProducerType.MULTI,new BlockingWaitStrategy());

給disruptor對(duì)象定義消費(fèi)者,這里就簡單定義兩個(gè)consumer作為生產(chǎn)者。

disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer());

初始化Producer并且將ringBuffer作為構(gòu)造函數(shù)入?yún)?,并通過生產(chǎn)者循環(huán)100次將數(shù)據(jù)push入隊(duì)列,消費(fèi)者會(huì)自動(dòng)從隊(duì)列取值進(jìn)行處理。

RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);

for(int i=0; i<100; i++){
    producer.pushData(i);
    Thread.sleep(100);
    System.out.println("push data " + i);
}

以下為Main全部代碼:

package com.monkey01.producercustomer.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @Author: feiweiwei
 * @Description:
 * @Created Date: 18:59 17/9/10.
 * @Modify by:
 */
public class Main {
    public static void main(String args[]) throws InterruptedException {
        Executor executor = Executors.newCachedThreadPool();
        PCDataFactory factory = new PCDataFactory();
        int bufferSize = 1024;
        Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,bufferSize,executor,
        ProducerType.MULTI,new BlockingWaitStrategy());

        disruptor.handleEventsWithWorkerPool(new Consumer(),
                new Consumer());
        disruptor.start();

        RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);

        for(int i=0; i<100; i++){
            producer.pushData(i);
            Thread.sleep(100);
            System.out.println("push data " + i);
        }

        disruptor.shutdown();
    }
}

總結(jié)

大家看到這里也基本對(duì)生產(chǎn)者、消費(fèi)者模式有個(gè)比較深入的了解了,也可以按照文中的例子,在自己的項(xiàng)目中使用,這個(gè)模式在日常項(xiàng)目中還是比較常見的,希望大家能夠熟練使用該模式。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,533評(píng)論 19 139
  • 一、多線程 說明下線程的狀態(tài) java中的線程一共有 5 種狀態(tài)。 NEW:這種情況指的是,通過 New 關(guān)鍵字創(chuàng)...
    Java旅行者閱讀 4,857評(píng)論 0 44
  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司,掛了不少,但最終還是拿到小米、百度、阿里、京東、新浪、CVTE、樂視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,787評(píng)論 11 349
  • 傍晚到家,隔壁二大伯正和落炕(臥床不起)的父親大嚷白喝地說著話。他喝 醉了。 二大伯是鰥夫,性...
    董耀奎閱讀 282評(píng)論 0 0
  • 今天晚上西紅門后街又著火了:安全重于泰山。 明天整理安全隱患
    京心達(dá)張新波閱讀 223評(píng)論 0 0

友情鏈接更多精彩內(nèi)容