生產(chǎn)者/消費(fèi)者問題的多種Java實(shí)現(xiàn)

背景
????實(shí)質(zhì)上,很多后臺服務(wù)程序并發(fā)控制的基本原理都可以歸納為生產(chǎn)者/消費(fèi)者模式,而這是恰恰是在本科操作系統(tǒng)課堂上老師反復(fù)講解,而我們卻視而不見不以為然的。


????生產(chǎn)者消費(fèi)者問題是研究多線程程序時(shí)繞不開的經(jīng)典問題之一,它描述是有一塊緩沖區(qū)作為倉庫,生產(chǎn)者可以將產(chǎn)品放入倉庫,消費(fèi)者則可以從倉庫中取走產(chǎn)品。解決生產(chǎn)者/消費(fèi)者問題的方法可分為兩類:
(1)采用某種機(jī)制保護(hù)生產(chǎn)者和消費(fèi)者之間的同步;
(2)在生產(chǎn)者和消費(fèi)者之間建立一個(gè)管道。


????第一種方式有較高的效率,并且易于實(shí)現(xiàn),代碼的可控制性較好,屬于常用的模式。第二種管道緩沖區(qū)不易控制,被傳輸數(shù)據(jù)對象不易于封裝等,實(shí)用性不強(qiáng)。因此本文只介紹同步機(jī)制實(shí)現(xiàn)的生產(chǎn)者/消費(fèi)者問題。
同步問題核心在于:如何保證同一資源被多個(gè)線程并發(fā)訪問時(shí)的完整性。常用的同步方法是采用信號或加鎖機(jī)制,保證資源在任意時(shí)刻至多被一個(gè)線程訪問。Java語言在多線程編程上實(shí)現(xiàn)了完全對象化,提供了對同步機(jī)制的良好支持。在Java中一共有四種方法支持同步,其中前三個(gè)是同步方法,一個(gè)是管道方法。

  1. wait() / notify()方法

  2. await() / signal()方法

  3. BlockingQueue阻塞隊(duì)列方法

  4. PipedInputStream / PipedOutputStream

    本文只介紹最常用的前三種,第四種暫不做討論。


    一. wait() / notify()方法
    ????wait() / nofity()方法是基類Object的兩個(gè)方法,也就意味著所有Java類都會擁有這兩個(gè)方法,這樣,我們就可以為任何對象實(shí)現(xiàn)同步機(jī)制。
    wait()方法:當(dāng)緩沖區(qū)已滿/空時(shí),生產(chǎn)者/消費(fèi)者線程停止自己的執(zhí)行,放棄鎖,使自己處于等等狀態(tài),讓其他線程執(zhí)行。
    notify()方法:當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí),向其他等待的線程發(fā)出可執(zhí)行的通知,同時(shí)放棄鎖,使自己處于等待狀態(tài)。
    來段代碼就明白了:

public class Main {
    public static void main(String[] args) {
        // write your code here
        Main main = new Main();
        main.test();
    }
    public void test() {
        // 倉庫對象
        Storage storage = new Storage();
        // 生產(chǎn)者對象
        Producer p1 = new Producer(storage);
        Producer p2 = new Producer(storage);
        Producer p3 = new Producer(storage);
        Producer p4 = new Producer(storage);
        Producer p5 = new Producer(storage);
        Producer p6 = new Producer(storage);
        Producer p7 = new Producer(storage);
        // 消費(fèi)者對象
        Consumer c1 = new Consumer(storage);
        Consumer c2 = new Consumer(storage);
        Consumer c3 = new Consumer(storage);
        // 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);
        // 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);
        // 線程開始執(zhí)行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
    /**
     * 倉庫類Storage實(shí)現(xiàn)緩沖區(qū)
     */
    class Storage {
        // 倉庫最大存儲量
        private final int MAX_SIZE = 100;
        // 倉庫存儲的載體
        private LinkedList<Object> list = new LinkedList<>();
        // 生產(chǎn)num個(gè)產(chǎn)品
        public void produce(int num) {
            synchronized (list) {
                // 如果倉庫剩余容量不足
                while (list.size() + num > MAX_SIZE) {
                    System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
                    // 由于條件不滿足,生產(chǎn)阻塞
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 到這里說明,生產(chǎn)條件滿足情況,生產(chǎn)num個(gè)產(chǎn)品
                for (int i = 1; i <= num; ++i) {
                    list.add(new Object());
                }
                System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
                list.notifyAll();
            }
        }
        // 消費(fèi)num個(gè)產(chǎn)品
        public void consume(int num) {
            // 同步代碼段
            synchronized (list) {
                // 如果倉庫存儲量不足
                while (list.size() < num) {
                    System.out.println("【要消費(fèi)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行消費(fèi)任務(wù)!");
                    try {
                        // 由于條件不滿足,消費(fèi)阻塞
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 到這里說明, 消費(fèi)條件滿足情況,消費(fèi)num個(gè)產(chǎn)品
                for (int i = 1; i <= num; ++i) {
                    list.remove();
                }
                System.out.println("【已經(jīng)消費(fèi)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
                list.notifyAll();
            }
        }
        public LinkedList<Object> getList() {
            return list;
        }
        public void setList(LinkedList<Object> list) {
            this.list = list;
        }
        public int getMAX_SIZE() {
            return MAX_SIZE;
        }
    }
    /**
     * 生產(chǎn)者類Producer繼承線程類Thread
     */
    public class Producer extends Thread {
        // 每次生產(chǎn)的產(chǎn)品數(shù)量
        private int num;
        // 所在放置的倉庫
        private Storage storage;
        // 構(gòu)造函數(shù),設(shè)置倉庫
        public Producer(Storage storage) {
            this.storage = storage;
        }
        // 線程run函數(shù)
        @Override
        public void run() {
            produce(num);
        }
        // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
        public void produce(int num) {
            storage.produce(num);
        }
        // get/set方法
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        public Storage getStorage() {
            return storage;
        }
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
    /**
     * 消費(fèi)者類Consumer繼承線程類Thread
     */
    public class Consumer extends Thread {
        // 每次消費(fèi)的產(chǎn)品數(shù)量
        private int num;
        // 所在放置的倉庫
        private Storage storage;
        // 構(gòu)造函數(shù),設(shè)置倉庫
        public Consumer(Storage storage) {
            this.storage = storage;
        }
        // 線程run函數(shù)
        @Override
        public void run() {
            consume(num);
        }
        // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
        public void consume(int num) {
            storage.consume(num);
        }
        // get/set方法
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        public Storage getStorage() {
            return storage;
        }
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
}

截屏2021-04-19 22.10.46.png


二. await() / signal()方法
????在JDK5.0之后,Java提供了更加健壯的線程處理機(jī)制,包括同步、鎖定、線程池等,它們可以實(shí)現(xiàn)更細(xì)粒度的線程控制。await()和signal()就是其中用來做同步的兩種方法,它們的功能基本上和wait() / nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機(jī)制Lock直接掛鉤,具有更大的靈活性。通過在Lock對象上調(diào)用newCondition()方法,將條件變量和一個(gè)鎖對象進(jìn)行綁定,進(jìn)而控制并發(fā)程序訪問競爭資源的安全。


? await()方法會使得當(dāng)前線程等待,同時(shí)釋放鎖,當(dāng)其他線程使用sinal()方法或者sinalAll()方法時(shí),線程會重新獲得鎖并繼續(xù)執(zhí)行?;蛘弋?dāng)線程被中斷時(shí),也能跳出等待,和wait相似。
? awaitUninterruptibly()方法與await()基本一致,但是并不會在等待過程中響應(yīng)中斷。
? sinal()方法用于喚醒一個(gè)在等待中的線程,sinalAll()方法會喚醒所有在等待中的線程。這和Object.notify()很相似。

下面來看condition實(shí)現(xiàn)阻塞隊(duì)列代碼:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // write your code here
        Main main = new Main();
        main.test();
    }
    public void test() {
        // 倉庫對象
        Storage storage = new Storage();
        // 生產(chǎn)者對象
        Producer p1 = new Producer(storage);
        Producer p2 = new Producer(storage);
        Producer p3 = new Producer(storage);
        Producer p4 = new Producer(storage);
        Producer p5 = new Producer(storage);
        Producer p6 = new Producer(storage);
        Producer p7 = new Producer(storage);
        // 消費(fèi)者對象
        Consumer c1 = new Consumer(storage);
        Consumer c2 = new Consumer(storage);
        Consumer c3 = new Consumer(storage);
        // 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);
        // 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);
        // 線程開始執(zhí)行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
    /**
     * 倉庫類Storage實(shí)現(xiàn)緩沖區(qū)
     */
    class Storage {
        // 倉庫最大存儲量
        private final int MAX_SIZE = 100;
        // 倉庫存儲的載體
        private LinkedList<Object> list = new LinkedList<>();
        // 鎖
        private final Lock lock = new ReentrantLock(true);
        // 倉庫滿的條件變量
        private final Condition full = lock.newCondition();
        // 倉庫空的條件變量
        private final Condition empty = lock.newCondition();
        // 生產(chǎn)num個(gè)產(chǎn)品
        public void produce(int num) {
            // 獲得鎖
            lock.lock();
            // 如果倉庫剩余容量不足
            while (list.size() + num > MAX_SIZE) {
                System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
                // 由于條件不滿足,生產(chǎn)阻塞
                try {
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 到這里說明,生產(chǎn)條件滿足情況,生產(chǎn)num個(gè)產(chǎn)品
            for (int i = 1; i <= num; ++i) {
                list.add(new Object());
            }
            System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
            // 喚醒其他所有線程
            empty.signalAll();
            // 釋放鎖
            lock.unlock();
        }
        // 消費(fèi)num個(gè)產(chǎn)品
        public void consume(int num) {
            // 同步代碼段
            lock.lock();
            // 如果倉庫存儲量不足
            while (list.size() < num) {
                System.out.println("【要消費(fèi)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行消費(fèi)任務(wù)!");
                try {
                    // 由于條件不滿足,消費(fèi)阻塞
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 到這里說明, 消費(fèi)條件滿足情況,消費(fèi)num個(gè)產(chǎn)品
            for (int i = 1; i <= num; ++i) {
                list.remove();
            }
            System.out.println("【已經(jīng)消費(fèi)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
            // 喚醒其他所有線程
            full.signalAll();
            // 釋放鎖
            lock.unlock();
        }
        public LinkedList<Object> getList() {
            return list;
        }
        public void setList(LinkedList<Object> list) {
            this.list = list;
        }
        public int getMAX_SIZE() {
            return MAX_SIZE;
        }
    }
    /**
     * 生產(chǎn)者類Producer繼承線程類Thread
     */
    public class Producer extends Thread {
        // 每次生產(chǎn)的產(chǎn)品數(shù)量
        private int num;
        // 所在放置的倉庫
        private Storage storage;
        // 構(gòu)造函數(shù),設(shè)置倉庫
        public Producer(Storage storage) {
            this.storage = storage;
        }
        // 線程run函數(shù)
        @Override
        public void run() {
            produce(num);
        }
        // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
        public void produce(int num) {
            storage.produce(num);
        }
        // get/set方法
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        public Storage getStorage() {
            return storage;
        }
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
    /**
     * 消費(fèi)者類Consumer繼承線程類Thread
     */
    public class Consumer extends Thread {
        // 每次消費(fèi)的產(chǎn)品數(shù)量
        private int num;
        // 所在放置的倉庫
        private Storage storage;
        // 構(gòu)造函數(shù),設(shè)置倉庫
        public Consumer(Storage storage) {
            this.storage = storage;
        }
        // 線程run函數(shù)
        @Override
        public void run() {
            consume(num);
        }
        // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
        public void consume(int num) {
            storage.consume(num);
        }
        // get/set方法
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        public Storage getStorage() {
            return storage;
        }
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
}
截屏2021-04-19 22.10.46.png


三. BlockingQueue阻塞隊(duì)列方法
????BlockingQueue是JDK5.0的新增內(nèi)容,它是一個(gè)已經(jīng)在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列,實(shí)現(xiàn)方式采用的是 await() / signal()方法。它可以在生成對象時(shí)指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:類似于我們上面的生產(chǎn)者線程,容量達(dá)到最大時(shí),自動阻塞。
take()方法:類似于我們上面的消費(fèi)者線程,容量為0時(shí),自動阻塞。
關(guān)于BlockingQueue的內(nèi)容網(wǎng)上有很多,在這不多介紹。下面直接看代碼即可:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // write your code here
        Main main = new Main();
        main.test();
    }
    public void test() {
        // 倉庫對象
        Storage storage = new Storage();
        // 生產(chǎn)者對象
        Producer p1 = new Producer(storage);
        Producer p2 = new Producer(storage);
        Producer p3 = new Producer(storage);
        Producer p4 = new Producer(storage);
        Producer p5 = new Producer(storage);
        Producer p6 = new Producer(storage);
        Producer p7 = new Producer(storage);
        // 消費(fèi)者對象
        Consumer c1 = new Consumer(storage);
        Consumer c2 = new Consumer(storage);
        Consumer c3 = new Consumer(storage);
        // 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);
        // 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);
        // 線程開始執(zhí)行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
    /**
     * 倉庫類Storage實(shí)現(xiàn)緩沖區(qū)
     */
    class Storage {
        // 倉庫最大存儲量
        private final int MAX_SIZE = 100;
        // 倉庫存儲的載體
        private ArrayBlockingQueue<Object> list = new ArrayBlockingQueue<>(MAX_SIZE);
        // 生產(chǎn)num個(gè)產(chǎn)品
        public void produce(int num) {
            // 如果倉庫剩余容量不足
            while (list.size() >= MAX_SIZE) {
                System.out.println("【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
            }
            // 到這里說明,生產(chǎn)條件滿足情況,生產(chǎn)num個(gè)產(chǎn)品
            for (int i = 1; i <= num; ++i) {
                // 放入產(chǎn)品,自動阻塞
                try {
                    list.put(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("【現(xiàn)倉儲量為】:" + list.size());
        }
        // 消費(fèi)num個(gè)產(chǎn)品
        public void consume(int num) {
            // 如果倉庫存儲量不足
            while (list.size() <= 0) {
                System.out.println("【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行消費(fèi)任務(wù)!");
            }
            // 到這里說明, 消費(fèi)條件滿足情況,消費(fèi)num個(gè)產(chǎn)品
            for (int i = 1; i <= num; ++i) {
                // 消費(fèi)產(chǎn)品,自動阻塞
                try {
                    list.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("【現(xiàn)倉儲量為】:" + list.size());
        }
        public ArrayBlockingQueue<Object> getList() {
            return list;
        }
        public void setList(ArrayBlockingQueue<Object> list) {
            this.list = list;
        }
        public int getMAX_SIZE() {
            return MAX_SIZE;
        }
    }
    /**
     * 生產(chǎn)者類Producer繼承線程類Thread
     */
    public class Producer extends Thread {
        // 每次生產(chǎn)的產(chǎn)品數(shù)量
        private int num;
        // 所在放置的倉庫
        private Storage storage;
        // 構(gòu)造函數(shù),設(shè)置倉庫
        public Producer(Storage storage) {
            this.storage = storage;
        }
        // 線程run函數(shù)
        @Override
        public void run() {
            produce(num);
        }
        // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
        public void produce(int num) {
            storage.produce(num);
        }
        // get/set方法
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        public Storage getStorage() {
            return storage;
        }
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
    /**
     * 消費(fèi)者類Consumer繼承線程類Thread
     */
    public class Consumer extends Thread {
        // 每次消費(fèi)的產(chǎn)品數(shù)量
        private int num;
        // 所在放置的倉庫
        private Storage storage;
        // 構(gòu)造函數(shù),設(shè)置倉庫
        public Consumer(Storage storage) {
            this.storage = storage;
        }
        // 線程run函數(shù)
        @Override
        public void run() {
            consume(num);
        }
        // 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
        public void consume(int num) {
            storage.consume(num);
        }
        // get/set方法
        public int getNum() {
            return num;
        }
        public void setNum(int num) {
            this.num = num;
        }
        public Storage getStorage() {
            return storage;
        }
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
}
截屏2021-04-19 22.08.54.png

????有時(shí)使用BlockingQueue可能會出現(xiàn)put()和System.out.println()輸出不匹配的情況,這是由于它們之間沒有同步造成的。當(dāng)緩沖區(qū)已滿,生產(chǎn)者在put()操作時(shí),put()內(nèi)部調(diào)用了await()方法,放棄了線程的執(zhí)行,然后消費(fèi)者線程執(zhí)行,調(diào)用take()方法,take()內(nèi)部調(diào)用了signal()方法,通知生產(chǎn)者線程可以執(zhí)行,致使在消費(fèi)者的println()還沒運(yùn)行的情況下生產(chǎn)者的println()先被執(zhí)行,所以有了輸出不匹配的情況。
對于BlockingQueue大家可以放心使用,這可不是它的問題,只是在它和別的對象之間的同步有問題。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容