(逐行注釋?zhuān)阌?種方法精通生產(chǎn)者——消費(fèi)者模式

背景

生產(chǎn)者和消費(fèi)者問(wèn)題是線(xiàn)程模型中的經(jīng)典問(wèn)題:生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)存儲(chǔ)空間,生產(chǎn)者往存儲(chǔ)空間中添加產(chǎn)品,消費(fèi)者從存儲(chǔ)空間中取走產(chǎn)品,當(dāng)存儲(chǔ)空間為空時(shí),消費(fèi)者阻塞,當(dāng)存儲(chǔ)空間滿(mǎn)時(shí),生產(chǎn)者阻塞。


image.png

解決方案

第一種解決方案 使用 synchronized 的 wait() 和 notify() 來(lái)實(shí)現(xiàn)

關(guān)鍵的思路就是通過(guò) 生產(chǎn)者 和 消費(fèi)者 的不斷循環(huán)來(lái)不斷運(yùn)行。

通過(guò) full 來(lái)表示緩沖區(qū)的大小,當(dāng)然此題中沒(méi)有往緩沖區(qū)里面放的過(guò)程,可以自行替換成list等,因?yàn)橐呀?jīng)有了 synchronized(LOCK) 來(lái)保護(hù),所以不需要線(xiàn)程安全的集合類(lèi)。

count用來(lái)表示緩沖區(qū)中的現(xiàn)有的項(xiàng)目已經(jīng)生產(chǎn)到了哪里。

沒(méi)有體現(xiàn)但很重要的是,wait/notify方法的調(diào)用必須處在該對(duì)象的鎖(Monitor)中,也即,在調(diào)用這些方法時(shí)首先需要獲得該對(duì)象的鎖。否則會(huì)拋出IllegalMonitorStateException異常。

這里需要注意的是sleep()不能放在synchronized代碼塊里面,因?yàn)槲覀冎纒leep()執(zhí)行之后是不會(huì)釋放鎖的,也就是說(shuō)當(dāng)前線(xiàn)程仍然持有對(duì)container對(duì)象的互斥鎖,這個(gè)時(shí)候當(dāng)前線(xiàn)程繼續(xù)判斷l(xiāng)ist.size是否等于capacity,不等于就繼續(xù)put,然后又sleep一會(huì),然后又繼續(xù),直到當(dāng)list.size == capacity,這個(gè)時(shí)候終于進(jìn)入wait()方法,我們知道wait()方法會(huì)釋放鎖,這個(gè)時(shí)候其他線(xiàn)程才有機(jī)會(huì)獲取到container的互斥鎖,

notifyAll()不能單獨(dú)放在producer類(lèi)里面,因?yàn)閚otifyAll()必須放在同步代碼塊里面
弊端:這里由于不能區(qū)分哪些是not empty或者not full或者is full/empty線(xiàn)程,所以需要喚醒所有其他等待的線(xiàn)程,但實(shí)際上我們需要的是喚醒那些not empty或者not full的線(xiàn)程就夠了

wait/notify 的機(jī)制


import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.Callable;

public class LeetCode215 {

    private static Integer count =0;
    private static final int full =10;
    private static final String LOCK="lock";


    public static void main(String[] args){

        System.out.println("test");
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new producer()).start();
    }

    //生產(chǎn)者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //進(jìn)行不斷的循環(huán),來(lái)保證一直生產(chǎn)
            while(true){
                //slepp一下線(xiàn)程,保證循環(huán)不要執(zhí)行的太快,浪費(fèi)性能
                //sleep不能放在同步代碼塊里面,因?yàn)閟leep不會(huì)釋放鎖,
                //當(dāng)前線(xiàn)程會(huì)一直占有produce線(xiàn)程,直到達(dá)到容量,調(diào)用wait()方法主動(dòng)釋放鎖
                try{
                    Thread.sleep(1000);
                }catch(Exception e){
                    e.printStackTrace();
                }

                //加鎖到本方法上面
                synchronized (LOCK){
                    //緩沖區(qū)已經(jīng)滿(mǎn)了,調(diào)用wait等待
                    while(count==full) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    //沒(méi)有滿(mǎn),可以繼續(xù)生產(chǎn)
                    count++;
                    System.out.println("生產(chǎn)者produce了:"+Thread.currentThread().getName()+" 一共有:"+count);

                    //喚醒其他都處于wait()的線(xiàn)程,包括生產(chǎn)者和消費(fèi)者
                    LOCK.notifyAll();


                }

            }


        }
    }


    class Consumer implements Runnable{

        public void run(){

            //不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
            while(true){

                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                //利用鎖來(lái)控制 消費(fèi)者 和 生產(chǎn)者的訪(fǎng)問(wèn)
                synchronized (LOCK){
                    //如果緩沖區(qū)里面沒(méi)有數(shù)據(jù)
                    while(count==0){
                        try{
                            LOCK.wait();
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }

                    //緩沖區(qū)有資源了
                    count--;
                    System.out.println("消費(fèi)者consume了:"+Thread.currentThread().getName()+" 一共有:"+count);
                    LOCK.notifyAll();
                }

            }


        }

    }


}

第二種解決方案 利用重入鎖 ReentrantLock()的 await() 和 signalAll() 機(jī)制

深入理解條件變量 Condition

ReentrantLock 實(shí)現(xiàn)原理

這種方式和 synchroized 方式是基本上一樣的

參照 Object 的 wait() 和 notify/notifyAll() 方法,
Condition 也提供了同樣的 await() 和 signal/signalAll() 方法。

Condition的await()/signal()方法和Object的wait()/notify()方法

方法ConditionObject阻塞等待await()wait()喚醒其他線(xiàn)程signal()notify()/notifyall()使用的鎖互斥鎖/共享鎖,如Lock同步鎖:如synchronized一個(gè)鎖對(duì)應(yīng)可以創(chuàng)建多個(gè)condition對(duì)應(yīng)一個(gè)Object喚醒指定的線(xiàn)程明確的指定線(xiàn)程只能通過(guò)notifyAll喚醒所有線(xiàn)程;或者notify()隨機(jī)喚醒

lock和condition實(shí)現(xiàn)生產(chǎn)者消費(fèi)者

該實(shí)現(xiàn)方式相比較synchronized于object的wait()/notify()方法具有更加的靈活性,可以喚醒具體的消費(fèi)者線(xiàn)程或者生產(chǎn)者線(xiàn)程,達(dá)到當(dāng)緩沖區(qū)滿(mǎn)的時(shí)候,喚醒消費(fèi)者線(xiàn)程,此時(shí)生產(chǎn)者線(xiàn)程都將被阻塞,而不是向notifyall()那樣喚醒所有的線(xiàn)程。

import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LeetCode215 {

    //使用 ReentrantLock變量來(lái)進(jìn)行
    private final Lock lock = new ReentrantLock();
    //條件變量 通知生產(chǎn)者
    private final Condition isFull = lock.newCondition();
    //條件變量 通知消費(fèi)者的
    private final Condition isEmpty = lock.newCondition();

    private static int count =0;
    private static final  int num = 10;

    public static void main(String[] args){
        System.out.println("test");
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();
        
    }

    //生產(chǎn)者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //進(jìn)行一個(gè)循環(huán)
            while(true){
                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                //進(jìn)行加鎖
                lock.lock();

                try {
                    //如果緩沖區(qū)滿(mǎn)了
                    while (count == num) {
                        try {
                            //刮起生產(chǎn)者的線(xiàn)程,暫時(shí)不能生產(chǎn)了。
                            isFull.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    //
                    count++;
                    System.out.println("生產(chǎn)者:"+Thread.currentThread().getName()+"產(chǎn)生數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
                    //就可以通知消費(fèi)者,可以開(kāi)始消費(fèi)了
                    isEmpty.signalAll();
                }catch (Exception  e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }


        }

    }


    class Consumer implements Runnable{

        public void run(){

            //不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
            while(true){

                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                //加鎖
                lock.lock();
                try{
                    //如果緩沖區(qū)是空的
                    while(count==0){
                        try{
                            //消費(fèi)者要開(kāi)始等待了
                            isEmpty.await();
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
                    //開(kāi)始消費(fèi)
                    count--;
                    System.out.println("消費(fèi)者:"+Thread.currentThread().getName()+"消費(fèi)數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
                    //通知所有的生產(chǎn)者線(xiàn)程開(kāi)始生產(chǎn)數(shù)據(jù)
                    isFull.signalAll();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }

            }


        }

    }


}

第三種方法 使用BlockingQueue 實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者

BlockingQueue的原理

BlockingQueue即阻塞隊(duì)列,從阻塞這個(gè)詞可以看出,在某些情況下對(duì)阻塞隊(duì)列的訪(fǎng)問(wèn)可能會(huì)造成阻塞。被阻塞的情況主要有如下兩種:

  1. 當(dāng)隊(duì)列滿(mǎn)了的時(shí)候進(jìn)行入隊(duì)列操作
  2. 當(dāng)隊(duì)列空了的時(shí)候進(jìn)行出隊(duì)列操作
    因此,當(dāng)一個(gè)線(xiàn)程對(duì)已經(jīng)滿(mǎn)了的阻塞隊(duì)列進(jìn)行入隊(duì)操作時(shí)會(huì)阻塞,除非有另外一個(gè)線(xiàn)程進(jìn)行了出隊(duì)操作,當(dāng)一個(gè)線(xiàn)程對(duì)一個(gè)空的阻塞隊(duì)列進(jìn)行出隊(duì)操作時(shí)也會(huì)阻塞,除非有另外一個(gè)線(xiàn)程進(jìn)行了入隊(duì)操作。
    從上可知,阻塞隊(duì)列是線(xiàn)程安全的。
    下面是BlockingQueue接口的一些方法:


    image.png

這四類(lèi)方法分別對(duì)應(yīng)的是:

  1. ThrowsException:如果操作不能馬上進(jìn)行,則拋出異常
  2. SpecialValue:如果操作不能馬上進(jìn)行,將會(huì)返回一個(gè)特殊的值,一般是true或者false
  3. Blocks:如果操作不能馬上進(jìn)行,操作會(huì)被阻塞
  4. TimesOut:如果操作不能馬上進(jìn)行,操作會(huì)被阻塞指定的時(shí)間,如果指定時(shí)間沒(méi)執(zhí)行,則返回一個(gè)特殊值,一般是true或者false

下面來(lái)看由阻塞隊(duì)列實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型,這里我們使用take()和put()方法,這里生產(chǎn)者和生產(chǎn)者,消費(fèi)者和消費(fèi)者之間不存在同步,所以會(huì)出現(xiàn)連續(xù)生成和連續(xù)消費(fèi)的現(xiàn)象

ps:下面的代碼是生產(chǎn)者和消費(fèi)者之間的關(guān)系是沒(méi)有問(wèn)題的,但是count是不對(duì)的,因?yàn)閎lockingqueue的put和take操作是線(xiàn)程安全的,后面的num++ 和 num-- 不一定線(xiàn)程安全。

import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LeetCode215 {

    private static BlockingQueue queue = new ArrayBlockingQueue<>(10);
    private static int count  = 0;

    public static void main(String[] args){
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();

    }

    //生產(chǎn)者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //進(jìn)行一個(gè)循環(huán)
            while(true){
                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                try {
                    // 往隊(duì)列里面放入元素,put方法的好處就是如果這個(gè)時(shí)候blockingqueue已經(jīng)滿(mǎn)了,那么這個(gè)線(xiàn)程就會(huì)自動(dòng)阻塞,直到有空閑
                    queue.put(1);
                    //緩沖區(qū)數(shù)量的標(biāo)示位
                    count ++;
                    System.out.println("生產(chǎn)者:"+Thread.currentThread().getName()+"生產(chǎn)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
                }catch (Exception  e){
                    e.printStackTrace();
                }
            }


        }

    }


    class Consumer implements Runnable{

        public void run(){

            //不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
            while(true){

                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }


                try{
                    //從blockingqueue里面拿元素,同樣如果隊(duì)列是空,就阻塞了
                    queue.take();
                    //緩沖區(qū)的數(shù)量剪1
                    count --;
                    System.out.println("消費(fèi)者:"+Thread.currentThread().getName()+"消費(fèi)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
                }catch (Exception e){
                    e.printStackTrace();
                }

            }


        }

    }


}

第四種方法 使用Semaphore(信號(hào)量) 實(shí)現(xiàn)

Java并發(fā)工具類(lèi)(信號(hào)量Semaphore)

多線(xiàn)程之Semaphore

關(guān)鍵是理解信號(hào)量這個(gè)操作:

isFull信號(hào)量 初始值為10 表示還可以生產(chǎn)10個(gè),生產(chǎn)一個(gè)就 acquire 減1,同時(shí) isEmpty 執(zhí)行 release 加1。

isEmpty信號(hào)量 表示還可以消費(fèi)多少個(gè),初始值為0,表示沒(méi)有可消費(fèi)的,每消費(fèi)一個(gè),就要先 acquire 減 1,同時(shí) isFull 執(zhí)行 release 加1。

import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LeetCode215 {

    private static int count  = 0;
    //isFull信號(hào)量  10 表示緩沖區(qū)的數(shù)量為10,表示還可以生產(chǎn)多少個(gè)
    static Semaphore isFull = new Semaphore(10);
    //isEmpty信號(hào)量 表示還可以消費(fèi)多少個(gè)
    static Semaphore isEmpty = new Semaphore(0);
    //互斥鎖
    static Semaphore isUse = new Semaphore(1);


    public static void main(String[] args){
        LeetCode215 test1 = new LeetCode215();
        new Thread(test1.new Producer()).start();
        new Thread(test1.new Consumer()).start();
        new Thread(test1.new Producer()).start();

    }

    //生產(chǎn)者
    class Producer implements  Runnable{

        @Override
        public void run() {
            //進(jìn)行一個(gè)循環(huán)
            while(true){
                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }

                try {
                    //首先拿到 isFull 信號(hào)量,表示
                    isFull.acquire();
                    //拿到控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
                    isUse.acquire();
                    //緩沖區(qū)數(shù)量的標(biāo)示位
                    count ++;
                    System.out.println("生產(chǎn)者:"+Thread.currentThread().getName()+"生產(chǎn)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
                }catch (Exception  e){
                    e.printStackTrace();
                }finally {
                    //釋放控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
                    isUse.release();
                    //isEmpty的信號(hào)量加1,表示緩沖區(qū)有數(shù)據(jù),可以在消費(fèi)一個(gè)
                    isEmpty.release();
                }
            }


        }

    }


    class Consumer implements Runnable{

        public void run(){

            //不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
            while(true){

                //消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
                try{
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }


                try{
                    //默認(rèn)是0,緩沖區(qū)里面沒(méi)數(shù)據(jù),要消費(fèi)數(shù)據(jù)需要先申請(qǐng)
                    isEmpty.acquire();
                    //拿到控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
                    isUse.acquire();
                    //緩沖區(qū)的數(shù)量減1
                    count --;
                    System.out.println("消費(fèi)者:"+Thread.currentThread().getName()+"消費(fèi)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    //釋放控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
                    isUse.release();
                    //isFull 加1,表示可以再生產(chǎn)1個(gè)
                    isFull.release();
                }

            }


        }

    }


}

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容