生產(chǎn)者與消費模式

前言

最近在看并發(fā)編程藝術(shù)這本書,對看書的一些筆記及個人工作中的總結(jié)。

什么是生產(chǎn)者和消費者模式?

在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序整體處理數(shù)據(jù)的速度。
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,
那么消費者就必須等待生產(chǎn)者。為了解決這種生產(chǎn)消費能力不均衡的問題,便有了生產(chǎn)者和消費者模式。

生產(chǎn)者和消費者模式是通過一個容器(一般使用阻塞隊列)來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通信,而是通過阻塞隊列來進行通信,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。

直接看demo:
生產(chǎn)者:

public class Provider implements Runnable{

    //共享緩存區(qū)
    private BlockingQueue<Data> queue;
    //多線程間是否啟動變量,有強制從主內(nèi)存中刷新的功能。即時返回線程的狀態(tài)
    private volatile boolean isRunning = true;
    //id生成器
    private static AtomicInteger count = new AtomicInteger();
    //隨機對象
    private static Random r = new Random();

    public Provider(BlockingQueue queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while(isRunning){
            try {
                //隨機休眠0 - 1000 毫秒 表示獲取數(shù)據(jù)(產(chǎn)生數(shù)據(jù)的耗時)
                Thread.sleep(r.nextInt(1000));
                //獲取的數(shù)據(jù)進行累計...
                int id = count.incrementAndGet();
                //比如通過一個getData方法獲取了
                Data data = new Data(Integer.toString(id), "數(shù)據(jù)" + id);
                System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + ", 獲取了數(shù)據(jù),id為:" + id + ", 將數(shù)據(jù)加入到阻塞隊列...");
                //將生產(chǎn)者生產(chǎn)的數(shù)據(jù)加入阻塞隊列中,如果2s加入沒有成功就失敗
                if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
                    System.out.println("提交緩沖區(qū)數(shù)據(jù)失敗....");
                    //do something... 比如重新提交
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop(){
        this.isRunning = false;
    }

}

消費端:

public class Consumer implements Runnable{

    private BlockingQueue<Data> queue;

    public Consumer(BlockingQueue queue){
        this.queue = queue;
    }

    //隨機對象
    private static Random r = new Random();

    @Override
    public void run() {
        while(true){
            try {
                //獲取數(shù)據(jù)
                Data data = this.queue.take();
                //進行數(shù)據(jù)處理。休眠0 - 1000毫秒模擬耗時
                Thread.sleep(r.nextInt(1000));
                System.out.println("當(dāng)前消費線程:" + Thread.currentThread().getName() + ", 消費成功,消費數(shù)據(jù)為id: " + data.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public final class Data {

    private String id;
    private String name;

    public Data(String id, String name){
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString(){
        return "{id: " + id + ", name: " + name + "}";
    }

}

測試類:

public class Test {

    public static void main(String[] args) throws Exception {
        //內(nèi)存緩沖區(qū)
        BlockingQueue<Data> queue = new LinkedBlockingQueue<>(10);
        //生產(chǎn)者
        Provider p1 = new Provider(queue);

        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);
        //消費者
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        //創(chuàng)建線程池運行,這是一個緩存的線程池,可以創(chuàng)建無窮大的線程,沒有任務(wù)的時候不創(chuàng)建線程??臻e線程存活時間為60s(默認(rèn)值)

        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(p1);
        cachePool.execute(p2);
        cachePool.execute(p3);
        cachePool.execute(c1);
        cachePool.execute(c2);
        cachePool.execute(c3);

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //3s之后停止生產(chǎn)數(shù)據(jù),消費者也就無法再繼續(xù)取到數(shù)據(jù)
        p1.stop();
        p2.stop();
        p3.stop();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

//      cachePool.shutdown();
//      cachePool.shutdownNow();


    }
}

線程池與生產(chǎn)消費者模式

之前的博客已經(jīng)提到過線程池的底層都使用了阻塞隊列(BlockingQueue)
Java中的線程池類其實就是一種生產(chǎn)者和消費者模式的實現(xiàn)方式,但是我覺得其實現(xiàn)方式更加高明。生產(chǎn)者把任務(wù)丟給線程池,線程池創(chuàng)建線程并處理任務(wù),如果將要運行的任務(wù)數(shù)大于線程池的基本線程數(shù)就把任務(wù)扔到阻塞隊列里,這種做法比只使用一個阻塞隊列來實現(xiàn)生產(chǎn)者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產(chǎn)者先存,消費者再取這種方式顯然慢一些。
比如:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

比如:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

我相信這種場景應(yīng)該非常多,特別是需要處理任務(wù)時間比較長的場景,比如上傳附件并處理,用戶把文件上傳到系統(tǒng)后,系統(tǒng)把文件丟到隊列里,然后立刻返回告訴用戶上傳成功,最后消費者再去隊列里取出文件處理。再如,調(diào)用一個遠(yuǎn)程接口查詢數(shù)據(jù),如果遠(yuǎn)程服務(wù)接口查詢時需要幾十秒的時間,那么它可以提供一個申請查詢的接口,這個接口把要申請查詢?nèi)蝿?wù)放數(shù)據(jù)庫中,然后該接口立刻返回。然后服務(wù)器端用線程輪詢并獲取申請任務(wù)進行處理,處理完之后發(fā)消息給調(diào)用方,讓調(diào)用方再來調(diào)用另外一個接口取數(shù)據(jù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容