1、有哪幾種實現(xiàn)生產(chǎn)者消費者模式的方法?
1.1 生產(chǎn)者消費者模式
先來看看什么是生產(chǎn)者消費者模式,生產(chǎn)者消費者模式是程序設(shè)計中非常常見的一種設(shè)計模式,被廣泛運用在解耦、消息隊列等場景。在現(xiàn)實世界中,我們把生產(chǎn)商品的一方稱為生產(chǎn)者,把消費商品的一方稱為消費者,有時生產(chǎn)者的生產(chǎn)速度特別快,但消費者的消費速度跟不上,俗稱“產(chǎn)能過?!?,又或是多個生產(chǎn)者對應(yīng)多個消費者時,大家可能會手忙腳亂。如何才能讓大家更好地配合呢?這時在生產(chǎn)者和消費者之間就需要一個中介來進行調(diào)度,于是便誕生了生產(chǎn)者消費者模式。

使用生產(chǎn)者消費者模式通常需要在兩者之間增加一個阻塞隊列作為媒介,有了媒介之后就相當于有了一個緩沖,平衡了兩者的能力,整體的設(shè)計如圖所示,最上面是阻塞隊列,右側(cè)的 1 是生產(chǎn)者線程,生產(chǎn)者在生產(chǎn)數(shù)據(jù)后將數(shù)據(jù)存放在阻塞隊列中,左側(cè)的 2 是消費者線程,消費者獲取阻塞隊列中的數(shù)據(jù)。而中間的 3 和 4 分別代表生產(chǎn)者消費者之間互相通信的過程,因為無論阻塞隊列是滿還是空都可能會產(chǎn)生阻塞,阻塞之后就需要在合適的時機去喚醒被阻塞的線程。
那么什么時候阻塞線程需要被喚醒呢?有兩種情況。第一種情況是當消費者看到阻塞隊列為空時,開始進入等待,這時生產(chǎn)者一旦往隊列中放入數(shù)據(jù),就會通知所有的消費者,喚醒阻塞的消費者線程。另一種情況是如果生產(chǎn)者發(fā)現(xiàn)隊列已經(jīng)滿了,也會被阻塞,而一旦消費者獲取數(shù)據(jù)之后就相當于隊列空了一個位置,這時消費者就會通知所有正在阻塞的生產(chǎn)者進行生產(chǎn),這便是對生產(chǎn)者消費者模式的簡單介紹。
1.2 如何用 BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式
接下來看如何用 wait/notify/Condition/BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式,先從最簡單的 BlockingQueue 開始講起:
public static void main(String[] args) {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
Runnable producer = () -> {
while (true) {
queue.put(new Object());
}
};
new Thread(producer).start();
new Thread(producer).start();
Runnable consumer = () -> {
while (true) {
queue.take();
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
如代碼所示,首先,創(chuàng)建了一個 ArrayBlockingQueue 類型的 BlockingQueue,命名為 queue 并將它的容量設(shè)置為 10;其次,創(chuàng)建一個簡單的生產(chǎn)者,while(true) 循環(huán)體中的queue.put() 負責(zé)往隊列添加數(shù)據(jù);然后,創(chuàng)建兩個生產(chǎn)者線程并啟動;同樣消費者也非常簡單,while(true) 循環(huán)體中的 queue.take() 負責(zé)消費數(shù)據(jù),同時創(chuàng)建兩個消費者線程并啟動。為了代碼簡潔并突出設(shè)計思想,代碼里省略了 try/catch 檢測,我們不糾結(jié)一些語法細節(jié)。以上便是利用 BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式的代碼。雖然代碼非常簡單,但實際上 ArrayBlockingQueue 已經(jīng)在背后完成了很多工作,比如隊列滿了就去阻塞生產(chǎn)者線程,隊列有空就去喚醒生產(chǎn)者線程等。
1.3 如何用 Condition 實現(xiàn)生產(chǎn)者消費者模式
BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式看似簡單,背后卻暗藏玄機,在掌握這種方法的基礎(chǔ)上仍需要掌握更復(fù)雜的實現(xiàn)方法。接下來看如何在掌握了 BlockingQueue 的基礎(chǔ)上利用 Condition 實現(xiàn)生產(chǎn)者消費者模式,它們背后的實現(xiàn)原理非常相似,相當于自己實現(xiàn)一個簡易版的 BlockingQueue:
public class MyBlockingQueueForCondition {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size) {
this.max = size;
queue = new LinkedList();
}
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
queue.add(o);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
Object item = queue.remove();
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}
如代碼所示,首先,定義了一個隊列變量 queue 并設(shè)置最大容量為 16;其次,定義了一個 ReentrantLock 類型的 Lock 鎖,并在 Lock 鎖的基礎(chǔ)上創(chuàng)建兩個 Condition,一個是 notEmpty,另一個是 notFull,分別代表隊列沒有空和沒有滿的條件;最后,聲明了 put 和 take 這兩個核心方法。
因為生產(chǎn)者消費者模式通常是面對多線程的場景,需要一定的同步措施保障線程安全,所以在 put 方法中先將 Lock 鎖上,然后,在 while 的條件里檢測 queue 是不是已經(jīng)滿了,如果已經(jīng)滿了,則調(diào)用 notFull 的 await() 阻塞生產(chǎn)者線程并釋放 Lock,如果沒有滿,則往隊列放入數(shù)據(jù)并利用 notEmpty.signalAll() 通知正在等待的所有消費者并喚醒它們。最后在 finally 中利用 lock.unlock() 方法解鎖,把 unlock 方法放在 finally 中是一個基本原則,否則可能會產(chǎn)生無法釋放鎖的情況。
下面再來看 take 方法,take 方法實際上是與 put 方法相互對應(yīng)的,同樣是通過 while 檢查隊列是否為空,如果為空,消費者開始等待,如果不為空則從隊列中獲取數(shù)據(jù)并通知生產(chǎn)者隊列有空余位置,最后在 finally 中解鎖。
這里需要注意,在 take() 方法中使用 while( queue.size() == 0 ) 檢查隊列狀態(tài),而不能用 if( queue.size() == 0 )。為什么呢?思考這樣一種情況,因為生產(chǎn)者消費者往往是多線程的,假設(shè)有兩個消費者,第一個消費者線程獲取數(shù)據(jù)時,發(fā)現(xiàn)隊列為空,便進入等待狀態(tài);因為第一個線程在等待時會釋放 Lock 鎖,所以第二個消費者可以進入并執(zhí)行 if( queue.size() == 0 ),也發(fā)現(xiàn)隊列為空,于是第二個線程也進入等待;而此時,如果生產(chǎn)者生產(chǎn)了一個數(shù)據(jù),便會喚醒兩個消費者線程,而兩個線程中只有一個線程可以拿到鎖,并執(zhí)行 queue.remove 操作,另外一個線程因為沒有拿到鎖而卡在被喚醒的地方,而第一個線程執(zhí)行完操作后會在 finally 中通過 unlock 解鎖,而此時第二個線程便可以拿到被第一個線程釋放的鎖,繼續(xù)執(zhí)行操作,也會去調(diào)用 queue.remove 操作,然而這個時候隊列已經(jīng)為空了,所以會拋出 NoSuchElementException 異常,這不符合邏輯。而如果用 while 做檢查,當?shù)谝粋€消費者被喚醒得到鎖并移除數(shù)據(jù)之后,第二個線程在執(zhí)行 remove 前仍會進行 while 檢查,發(fā)現(xiàn)此時依然滿足 queue.size() == 0 的條件,就會繼續(xù)執(zhí)行 await 方法,避免了獲取的數(shù)據(jù)為 null 或拋出異常的情況。
1.4 如何用 wait/notify 實現(xiàn)生產(chǎn)者消費者模式
再來看看使用 wait/notify 實現(xiàn)生產(chǎn)者消費者模式的方法,實際上實現(xiàn)原理和Condition 是非常類似的,它們是兄弟關(guān)系:
class MyBlockingQueue {
private int maxSize;
private LinkedList<Object> storage;
public MyBlockingQueue(int size) {
this.maxSize = size;
storage = new LinkedList<>();
}
public synchronized void put() throws InterruptedException {
while (storage.size() == maxSize) {
wait();
}
storage.add(new Object());
notifyAll();
}
public synchronized void take() throws InterruptedException {
while (storage.size() == 0) {
wait();
}
System.out.println(storage.remove());
notifyAll();
}
}
如代碼所示,最主要的部分仍是 take 與 put 方法,我們先來看 put 方法,put 方法被 synchronized 保護,while 檢查隊列是否為滿,如果不滿就往里放入數(shù)據(jù)并通過 notifyAll() 喚醒其他線程。同樣,take 方法也被 synchronized 修飾,while 檢查隊列是否為空,如果不為空就獲取數(shù)據(jù)并喚醒其他線程。使用這個 MyBlockingQueue 實現(xiàn)的生產(chǎn)者消費者代碼如下:
/**
* 描述: wait形式實現(xiàn)生產(chǎn)者消費者模式
*/
public class WaitStyle {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Producer producer = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private MyBlockingQueue storage;
public Producer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
storage.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private MyBlockingQueue storage;
public Consumer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
storage.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
以上就是三種實現(xiàn)生產(chǎn)者消費者模式的方式,其中,第一種 BlockingQueue 模式實現(xiàn)比較簡單,但其背后的實現(xiàn)原理在第二種、第三種實現(xiàn)方法中得以體現(xiàn),第二種、第三種實現(xiàn)方法本質(zhì)上是我們自己實現(xiàn)了 BlockingQueue 的一些核心邏輯,供生產(chǎn)者與消費者使用。