面試官:小伙子,說一下實(shí)現(xiàn)生產(chǎn)者消費(fèi)者有幾種方式?

前言

生產(chǎn)者-消費(fèi)者模式是一個(gè)十分經(jīng)典的多線程并發(fā)協(xié)作的模式,弄懂生產(chǎn)者-消費(fèi)者問題能夠讓我們對(duì)并發(fā)編程的理解加深。所謂生產(chǎn)者-消費(fèi)者問題,實(shí)際上主要是包含了兩類線程,一種是生產(chǎn)者線程用于生產(chǎn)數(shù)據(jù),另一種是消費(fèi)者線程用于消費(fèi)數(shù)據(jù),為了解耦生產(chǎn)者和消費(fèi)者的關(guān)系,通常會(huì)有一個(gè)共享的數(shù)據(jù)區(qū)域,就像是一個(gè)倉庫,生產(chǎn)者生產(chǎn)數(shù)據(jù)之后直接放置在共享數(shù)據(jù)區(qū)中,并不需要關(guān)心消費(fèi)者的行為;而消費(fèi)者只需要從共享數(shù)據(jù)區(qū)中去獲取數(shù)據(jù),就不再需要關(guān)心生產(chǎn)者的行為。但是,這個(gè)共享數(shù)據(jù)區(qū)域中應(yīng)該具備這樣的線程間并發(fā)協(xié)作的功能:

  1. 如果共享數(shù)據(jù)區(qū)已滿的話,阻塞生產(chǎn)者繼續(xù)生產(chǎn)數(shù)據(jù)放置入內(nèi);
  2. 如果共享數(shù)據(jù)區(qū)為空的話,阻塞消費(fèi)者繼續(xù)消費(fèi)數(shù)據(jù);

在實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題時(shí),可以采用三種方式:

  • 使用Object的wait/notify的消息通知機(jī)制;

  • 使用Lock的Condition的await/signal的消息通知機(jī)制;

  • 使用BlockingQueue實(shí)現(xiàn)。

本文主要將這三種實(shí)現(xiàn)方式進(jìn)行總結(jié)歸納。

wait/notify的消息通知機(jī)制

預(yù)備知識(shí)

Java 中,可以通過配合調(diào)用 Object 對(duì)象的 wait() 方法和 notify()方法或 notifyAll() 方法來實(shí)現(xiàn)線程間的通信。在線程中調(diào)用 wait() 方法,將阻塞當(dāng)前線程,直至等到其他線程調(diào)用了 notify() 方法或 notifyAll() 方法進(jìn)行通知之后,當(dāng)前線程才能從wait()方法出返回,繼續(xù)執(zhí)行下面的操作。

  1. wait

    該方法用來將當(dāng)前線程置入休眠狀態(tài),直到接到通知或被中斷為止。在調(diào)用 wait()之前,線程必須要獲得該對(duì)象的對(duì)象監(jiān)視器鎖,即只能在同步方法或同步塊中調(diào)用 wait()方法。調(diào)用wait()方法之后,當(dāng)前線程會(huì)釋放鎖。如果調(diào)用wait()方法時(shí),線程并未獲取到鎖的話,則會(huì)拋出IllegalMonitorStateException異常,這是一個(gè)RuntimeException。如果再次獲取到鎖的話,當(dāng)前線程才能從wait()方法處成功返回。

  2. notify

    該方法也要在同步方法或同步塊中調(diào)用,即在調(diào)用前,線程也必須要獲得該對(duì)象的對(duì)象級(jí)別鎖,如果調(diào)用 notify()時(shí)沒有持有適當(dāng)?shù)逆i,也會(huì)拋出 IllegalMonitorStateException。
    該方法任意從WAITTING狀態(tài)的線程中挑選一個(gè)進(jìn)行通知,使得調(diào)用wait()方法的線程從等待隊(duì)列移入到同步隊(duì)列中,等待有機(jī)會(huì)再一次獲取到鎖,從而使得調(diào)用wait()方法的線程能夠從wait()方法處退出。調(diào)用notify后,當(dāng)前線程不會(huì)馬上釋放該對(duì)象鎖,要等到程序退出同步塊后,當(dāng)前線程才會(huì)釋放鎖。

  3. notifyAll
    該方法與 notify ()方法的工作方式相同,重要的一點(diǎn)差異是:
    notifyAll 使所有原來在該對(duì)象上 wait 的線程統(tǒng)統(tǒng)退出WAITTING狀態(tài),使得他們?nèi)繌牡却?duì)列中移入到同步隊(duì)列中去,等待下一次能夠有機(jī)會(huì)獲取到對(duì)象監(jiān)視器鎖。

wait/notify消息通知潛在的一些問題

notify過早通知

notify 通知的遺漏很容易理解,即 threadA 還沒開始 wait 的時(shí)候,threadB 已經(jīng) notify 了,這樣,threadB 通知是沒有任何響應(yīng)的,當(dāng) threadB 退出 synchronized 代碼塊后,threadA 再開始 wait,便會(huì)一直阻塞等待,直到被別的線程打斷。比如在下面的示例代碼中,就模擬出notify早期通知帶來的問題:

public class EarlyNotifyDemo1 {

    private static String lockObject = "";

    public static void main(String[] args) {
        WaitThread waitThread = new WaitThread(lockObject);
        NotifyThread notifyThread = new NotifyThread(lockObject);
        notifyThread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitThread.start();
    }

    static class WaitThread extends Thread {
        private String lock;

        public WaitThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println(Thread.currentThread().getName() + "  進(jìn)去代碼塊");
                    System.out.println(Thread.currentThread().getName() + "  開始wait");
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() + "   結(jié)束wait");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class NotifyThread extends Thread {
        private String lock;

        public NotifyThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "  進(jìn)去代碼塊");
                System.out.println(Thread.currentThread().getName() + "  開始notify");
                lock.notify();
                System.out.println(Thread.currentThread().getName() + "   結(jié)束開始notify");
            }
        }
    }
}

輸出結(jié)果

Thread-1  進(jìn)去代碼塊
Thread-1  開始notify
Thread-1   結(jié)束開始notify
Thread-0  進(jìn)去代碼塊
Thread-0  開始wait

示例中開啟了兩個(gè)線程,一個(gè)是WaitThread,另一個(gè)是NotifyThread。NotifyThread會(huì)先啟動(dòng),先調(diào)用notify方法。然后WaitThread線程才啟動(dòng),調(diào)用wait方法,但是由于通知過了,wait方法就無法再獲取到相應(yīng)的通知,因此WaitThread會(huì)一直在wait方法出阻塞,這種現(xiàn)象就是通知過早的現(xiàn)象。針對(duì)這種現(xiàn)象,解決方法,一般是添加一個(gè)狀態(tài)標(biāo)志,讓waitThread調(diào)用wait方法前先判斷狀態(tài)是否已經(jīng)改變了沒,如果通知早已發(fā)出的話,WaitThread就不再去wait。對(duì)上面的代碼進(jìn)行更正:

public class EarlyNotifyDemo2 {

    private static String lockObject = "";
    private static boolean isWait = true;

    public static void main(String[] args) {
        WaitThread waitThread = new WaitThread(lockObject);
        NotifyThread notifyThread = new NotifyThread(lockObject);
        notifyThread.start();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        waitThread.start();
    }

    static class WaitThread extends Thread {
        private String lock;

        public WaitThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    while (isWait) {
                        System.out.println(Thread.currentThread().getName() + "  進(jìn)去代碼塊");
                        System.out.println(Thread.currentThread().getName() + "  開始wait");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "   結(jié)束wait");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class NotifyThread extends Thread {
        private String lock;

        public NotifyThread(String lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "  進(jìn)去代碼塊");
                System.out.println(Thread.currentThread().getName() + "  開始notify");
                lock.notifyAll();
                isWait = false;
                System.out.println(Thread.currentThread().getName() + "   結(jié)束開始notify");
            }
        }
    }
}

這段代碼只是增加了一個(gè)isWait狀態(tài)變量,NotifyThread調(diào)用notify方法后會(huì)對(duì)狀態(tài)變量進(jìn)行更新,在WaitThread中調(diào)用wait方法之前會(huì)先對(duì)狀態(tài)變量進(jìn)行判斷,在該示例中,調(diào)用notify后將狀態(tài)變量isWait改變?yōu)閒alse,因此,在WaitThread中while對(duì)isWait判斷后就不會(huì)執(zhí)行wait方法,從而避免了Notify過早通知造成遺漏的情況。

總結(jié):在使用線程的等待/通知機(jī)制時(shí),一般都要配合一個(gè) boolean 變量值(或者其他能夠判斷真假的條件),在 notify 之前改變?cè)?boolean 變量的值,讓 wait 返回后能夠退出 while 循環(huán)(一般都要在 wait 方法外圍加一層 while 循環(huán),以防止早期通知),或在通知被遺漏后,不會(huì)被阻塞在 wait 方法處。這樣便保證了程序的正確性

等待wait的條件發(fā)生變化

如果線程在等待時(shí)接受到了通知,但是之后等待的條件發(fā)生了變化,并沒有再次對(duì)等待條件進(jìn)行判斷,也會(huì)導(dǎo)致程序出現(xiàn)錯(cuò)誤。

下面用一個(gè)例子來說明這種情況

public class ConditionChangeDemo1 {

    private static List<String> lockObject = new ArrayList();

    public static void main(String[] args) {
        Consumer consumer1 = new Consumer(lockObject);
        Consumer consumer2 = new Consumer(lockObject);
        Productor productor = new Productor(lockObject);
        consumer1.start();
        consumer2.start();
        productor.start();
    }

    static class Consumer extends Thread {
        private List<String> lock;

        public Consumer(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    //這里使用if的話,就會(huì)存在wait條件變化造成程序錯(cuò)誤的問題
                    if (lock.isEmpty()) {
                        System.out.println(Thread.currentThread().getName() + " list為空");
                        System.out.println(Thread.currentThread().getName() + " 調(diào)用wait方法");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "  wait方法結(jié)束");
                    }
                    String element = lock.remove(0);
                    System.out.println(Thread.currentThread().getName() + " 取出第一個(gè)元素為:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class Productor extends Thread {
        private List<String> lock;

        public Productor(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 開始添加元素");
                lock.add(Thread.currentThread().getName());
                lock.notifyAll();
            }
        }

    }

}

輸出結(jié)果

Thread-0 list為空
Thread-0 調(diào)用wait方法
Thread-2 開始添加元素
Thread-1 取出第一個(gè)元素為:Thread-2
Thread-0  wait方法結(jié)束
Exception in thread "Thread-0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0

異常原因分析:在這個(gè)例子中一共開啟了3個(gè)線程,Consumer1,Consumer2以及Productor。首先Consumer1調(diào)用了wait方法后,線程處于了WAITTING狀態(tài),并且將對(duì)象鎖釋放出來。因此,Consumer2能夠獲取對(duì)象鎖,從而進(jìn)入到同步代塊中,當(dāng)執(zhí)行到wait方法時(shí),同樣的也會(huì)釋放對(duì)象鎖。因此,productor能夠獲取到對(duì)象鎖,進(jìn)入到同步代碼塊中,向list中插入數(shù)據(jù)后,通過notifyAll方法通知處于WAITING狀態(tài)的Consumer1和Consumer2線程。consumer1得到對(duì)象鎖后,從wait方法出退出,刪除了一個(gè)元素讓List為空,方法執(zhí)行結(jié)束,退出同步塊,釋放掉對(duì)象鎖。這個(gè)時(shí)候Consumer2獲取到對(duì)象鎖后,從wait方法退出,繼續(xù)往下執(zhí)行,這個(gè)時(shí)候Consumer2再執(zhí)行lock.remove(0);就會(huì)出錯(cuò),因?yàn)長ist由于Consumer1刪除一個(gè)元素之后已經(jīng)為空了。

解決方案:通過上面的分析,可以看出Consumer2報(bào)異常是因?yàn)榫€程從wait方法退出之后沒有再次對(duì)wait條件進(jìn)行判斷,因此,此時(shí)的wait條件已經(jīng)發(fā)生了變化。解決辦法就是,在wait退出之后再對(duì)條件進(jìn)行判斷即可。

public class ConditionChangeDemo2 {

    private static List<String> lockObject = new ArrayList();

    public static void main(String[] args) {
        Consumer consumer1 = new Consumer(lockObject);
        Consumer consumer2 = new Consumer(lockObject);
        Productor productor = new Productor(lockObject);
        consumer1.start();
        consumer2.start();
        productor.start();
    }

    static class Consumer extends Thread {

        private List<String> lock;

        public Consumer(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    //這里使用if的話,就會(huì)存在wait條件變化造成程序錯(cuò)誤的問題
                    while (lock.isEmpty()) {
                        System.out.println(Thread.currentThread().getName() + " list為空");
                        System.out.println(Thread.currentThread().getName() + " 調(diào)用wait方法");
                        lock.wait();
                        System.out.println(Thread.currentThread().getName() + "  wait方法結(jié)束");
                    }
                    String element = lock.remove(0);
                    System.out.println(Thread.currentThread().getName() + " 取出第一個(gè)元素為:" + element);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class Productor extends Thread {
        private List<String> lock;

        public Productor(List lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " 開始添加元素");
                lock.add(Thread.currentThread().getName());
                lock.notifyAll();
            }
        }

    }

}

輸出結(jié)果

Thread-0 list為空
Thread-0 調(diào)用wait方法
Thread-2 開始添加元素
Thread-1 取出第一個(gè)元素為:Thread-2
Thread-0  wait方法結(jié)束
Thread-0 list為空
Thread-0 調(diào)用wait方法

上面的代碼與之前的代碼僅僅只是將 wait 外圍的 if 語句改為 while 循環(huán)即可,這樣當(dāng) list 為空時(shí),線程便會(huì)繼續(xù)等待,而不會(huì)繼續(xù)去執(zhí)行刪除 list 中元素的代碼。

總結(jié):在使用線程的等待/通知機(jī)制時(shí),一般都要在 while 循環(huán)中調(diào)用 wait()方法,因此配合使用一個(gè) boolean 變量(或其他能判斷真假的條件,如本文中的 list.isEmpty()),滿足 while 循環(huán)的條件時(shí),進(jìn)入 while 循環(huán),執(zhí)行 wait()方法,不滿足 while 循環(huán)的條件時(shí),跳出循環(huán),執(zhí)行后面的代碼。

假死狀態(tài)

現(xiàn)象:如果是多消費(fèi)者和多生產(chǎn)者情況,如果使用notify方法可能會(huì)出現(xiàn)“假死”的情況,即喚醒的是同類線程。

原因分析:假設(shè)當(dāng)前多個(gè)生產(chǎn)者線程會(huì)調(diào)用wait方法阻塞等待,當(dāng)其中的生產(chǎn)者線程獲取到對(duì)象鎖之后使用notify通知處于WAITTING狀態(tài)的線程,如果喚醒的仍然是生產(chǎn)者線程,就會(huì)造成所有的生產(chǎn)者線程都處于等待狀態(tài)。

解決辦法:將notify方法替換成notifyAll方法,如果使用的是lock的話,就將signal方法替換成signalAll方法。

總結(jié)

在Object提供的消息通知機(jī)制應(yīng)該遵循如下這些條件:

  1. 永遠(yuǎn)在while循環(huán)中對(duì)條件進(jìn)行判斷而不是if語句中進(jìn)行wait條件的判斷;
  2. 使用NotifyAll而不是使用notify。

基本的使用范式如下:

// The standard idiom for calling the wait method in Java 
synchronized (sharedObject) { 
    while (condition) { 
    sharedObject.wait(); 
        // (Releases lock, and reacquires on wakeup) 
    } 
    // do action based upon condition e.g. take or put into queue 
}

wait/notifyAll實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者

利用wait/notifyAll實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者代碼如下:

public class ProductorConsumerDemo1 {

    public static void main(String[] args) {

        LinkedList linkedList = new LinkedList();
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(linkedList, 8));
        }

        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(linkedList));
        }

    }

    static class Productor implements Runnable {

        private List<Integer> list;
        private int maxLength;

        public Productor(List list, int maxLength) {
            this.list = list;
            this.maxLength = maxLength;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "  list以達(dá)到最大容量,進(jìn)行wait");
                            list.wait();
                            System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + " 生產(chǎn)數(shù)據(jù)" + i);
                        list.add(i);
                        list.notifyAll();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }
    }

    static class Consumer implements Runnable {

        private List<Integer> list;

        public Consumer(List list) {
            this.list = list;
        }

        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "  list為空,進(jìn)行wait");
                            list.wait();
                            System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Integer element = list.remove(0);
                        System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "  消費(fèi)數(shù)據(jù):" + element);
                        list.notifyAll();
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}

輸出結(jié)果

生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-703210513
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-1025434820
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)70070412
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-598504371
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-716978999
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-1175198461
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-1212912406
生產(chǎn)者pool-1-thread-2 生產(chǎn)數(shù)據(jù)-332467186
生產(chǎn)者pool-1-thread-2  list以達(dá)到最大容量,進(jìn)行wait
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-703210513
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-1025434820
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):70070412
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-598504371
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-716978999
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-1175198461
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-1212912406
消費(fèi)者pool-1-thread-15  消費(fèi)數(shù)據(jù):-332467186
消費(fèi)者pool-1-thread-15  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-14  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-13  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-11  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-12  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-10  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-9  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-8  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-7  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-6  list為空,進(jìn)行wait
生產(chǎn)者pool-1-thread-5 生產(chǎn)數(shù)據(jù)84590545
生產(chǎn)者pool-1-thread-5 生產(chǎn)數(shù)據(jù)-1631754695

使用Lock中Condition的await/signalAll實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者

參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:

針對(duì)wait方法

void await() throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài),如果其他線程調(diào)用condition的signal或者signalAll方法并且當(dāng)前線程獲取Lock從await方法返回,如果在等待狀態(tài)中被中斷會(huì)拋出被中斷異常;

long awaitNanos(long nanosTimeout):當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知,中斷或者超時(shí);

boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支持自定義時(shí)間單位

boolean awaitUntil(Date deadline) throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知,中斷或者到了某個(gè)時(shí)間

針對(duì)notify方法

void signal():?jiǎn)拘岩粋€(gè)等待在condition上的線程,將該線程從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,如果在同步隊(duì)列中能夠競(jìng)爭(zhēng)到Lock則可以從等待方法中返回。

void signalAll():與signal的區(qū)別在于能夠喚醒所有等待在condition上的線程

也就是說wait—>await,notify---->Signal。

如果采用lock中Conditon的消息通知原理來實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者問題,原理同使用wait/notifyAll一樣。直接上代碼:

public class ProductorConsumerDemo2 {

    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    public static void main(String[] args) {
        LinkedList linkedList = new LinkedList();
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(linkedList, 8, lock));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(linkedList, lock));
        }

    }

    static class Productor implements Runnable {

        private List<Integer> list;
        private int maxLength;
        private Lock lock;

        public Productor(List list, int maxLength, Lock lock) {
            this.list = list;
            this.maxLength = maxLength;
            this.lock = lock;
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.size() == maxLength) {
                        System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "  list以達(dá)到最大容量,進(jìn)行wait");
                        full.await();
                        System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + " 生產(chǎn)數(shù)據(jù)" + i);
                    list.add(i);
                    empty.signalAll();
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    static class Consumer implements Runnable {

        private List<Integer> list;
        private Lock lock;

        public Consumer(List list, Lock lock) {
            this.list = list;
            this.lock = lock;
        }

        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.isEmpty()) {
                        System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "  list為空,進(jìn)行wait");
                        empty.await();
                        System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Integer element = list.remove(0);
                    System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "  消費(fèi)數(shù)據(jù):" + element);
                    full.signalAll();
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

輸出結(jié)果

生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)-1633842993
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)1337251950
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)1310879631
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)-214297115
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)738937512
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)13060041
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)-957049554
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)-1062017880
生產(chǎn)者pool-1-thread-1  list以達(dá)到最大容量,進(jìn)行wait
生產(chǎn)者pool-1-thread-2  list以達(dá)到最大容量,進(jìn)行wait
生產(chǎn)者pool-1-thread-3  list以達(dá)到最大容量,進(jìn)行wait
生產(chǎn)者pool-1-thread-4  list以達(dá)到最大容量,進(jìn)行wait
生產(chǎn)者pool-1-thread-5  list以達(dá)到最大容量,進(jìn)行wait
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):-1633842993
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):1337251950
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):1310879631
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):-214297115
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):738937512
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):13060041
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):-957049554
消費(fèi)者pool-1-thread-6  消費(fèi)數(shù)據(jù):-1062017880
消費(fèi)者pool-1-thread-6  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-7  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-8  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-9  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-10  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-11  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-12  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-13  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-14  list為空,進(jìn)行wait
消費(fèi)者pool-1-thread-15  list為空,進(jìn)行wait
生產(chǎn)者pool-1-thread-1  退出wait
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)1949864858
生產(chǎn)者pool-1-thread-1 生產(chǎn)數(shù)據(jù)-1693880970

使用BlockingQueue實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者

由于BlockingQueue內(nèi)部實(shí)現(xiàn)就附加了兩個(gè)阻塞操作。即當(dāng)隊(duì)列已滿時(shí),阻塞向隊(duì)列中插入數(shù)據(jù)的線程,直至隊(duì)列中未滿;當(dāng)隊(duì)列為空時(shí),阻塞從隊(duì)列中獲取數(shù)據(jù)的線程,直至隊(duì)列非空時(shí)為止??梢岳肂lockingQueue實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者為題,阻塞隊(duì)列完全可以充當(dāng)共享數(shù)據(jù)區(qū)域,就可以很好的完成生產(chǎn)者和消費(fèi)者線程之間的協(xié)作。

public class ProductorConsumerDmoe3 {

    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(queue));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(queue));
        }
    }

    static class Productor implements Runnable {
        private BlockingQueue queue;

        public Productor(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生產(chǎn)者" + Thread.currentThread().getName() + "生產(chǎn)數(shù)據(jù)" + i);
                    queue.put(i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer element = (Integer) queue.take();
                    System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "正在消費(fèi)數(shù)據(jù)" + element);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

輸出結(jié)果

生產(chǎn)者pool-1-thread-2生產(chǎn)數(shù)據(jù)-1056722868
生產(chǎn)者pool-1-thread-1生產(chǎn)數(shù)據(jù)-1217947426
生產(chǎn)者pool-1-thread-3生產(chǎn)數(shù)據(jù)590686437
生產(chǎn)者pool-1-thread-4生產(chǎn)數(shù)據(jù)1782376429
生產(chǎn)者pool-1-thread-5生產(chǎn)數(shù)據(jù)1558897279
消費(fèi)者pool-1-thread-6正在消費(fèi)數(shù)據(jù)-1056722868
消費(fèi)者pool-1-thread-7正在消費(fèi)數(shù)據(jù)-1217947426
消費(fèi)者pool-1-thread-8正在消費(fèi)數(shù)據(jù)590686437
消費(fèi)者pool-1-thread-9正在消費(fèi)數(shù)據(jù)1782376429
消費(fèi)者pool-1-thread-10正在消費(fèi)數(shù)據(jù)1558897279
生產(chǎn)者pool-1-thread-4生產(chǎn)數(shù)據(jù)1977644261
生產(chǎn)者pool-1-thread-3生產(chǎn)數(shù)據(jù)182370155
消費(fèi)者pool-1-thread-11正在消費(fèi)數(shù)據(jù)1977644261
生產(chǎn)者pool-1-thread-2生產(chǎn)數(shù)據(jù)949821636
生產(chǎn)者pool-1-thread-5生產(chǎn)數(shù)據(jù)1931032717
消費(fèi)者pool-1-thread-13正在消費(fèi)數(shù)據(jù)949821636
生產(chǎn)者pool-1-thread-1生產(chǎn)數(shù)據(jù)873417555
消費(fèi)者pool-1-thread-14正在消費(fèi)數(shù)據(jù)1931032717
消費(fèi)者pool-1-thread-12正在消費(fèi)數(shù)據(jù)182370155
消費(fèi)者pool-1-thread-15正在消費(fèi)數(shù)據(jù)873417555

可以看出,使用BlockingQueue來實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者很簡(jiǎn)潔,這正是利用了BlockingQueue插入和獲取數(shù)據(jù)附加阻塞操作的特性。

關(guān)于生產(chǎn)者-消費(fèi)者實(shí)現(xiàn)的三種方式,到這里就全部總結(jié)出來,如果覺得不錯(cuò)的話,請(qǐng)點(diǎn)贊轉(zhuǎn)發(fā),也算是給我的鼓勵(lì),在此表示感謝!歡迎關(guān)注公眾號(hào):前程有光,領(lǐng)取一線大廠Java面試題總結(jié)+各知識(shí)點(diǎn)學(xué)習(xí)思維導(dǎo)+一份300頁pdf文檔的Java核心知識(shí)點(diǎn)總結(jié)!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容