六、線程基礎(chǔ)(五)實現(xiàn)生產(chǎn)者消費者模式的方法

1、有哪幾種實現(xiàn)生產(chǎn)者消費者模式的方法?

1.1 生產(chǎn)者消費者模式

先來看看什么是生產(chǎn)者消費者模式,生產(chǎn)者消費者模式是程序設(shè)計中非常常見的一種設(shè)計模式,被廣泛運用在解耦、消息隊列等場景。在現(xiàn)實世界中,我們把生產(chǎn)商品的一方稱為生產(chǎn)者,把消費商品的一方稱為消費者,有時生產(chǎn)者的生產(chǎn)速度特別快,但消費者的消費速度跟不上,俗稱“產(chǎn)能過?!?,又或是多個生產(chǎn)者對應(yīng)多個消費者時,大家可能會手忙腳亂。如何才能讓大家更好地配合呢?這時在生產(chǎn)者和消費者之間就需要一個中介來進行調(diào)度,于是便誕生了生產(chǎn)者消費者模式。

使用生產(chǎn)者消費者模式通常需要在兩者之間增加一個阻塞隊列作為媒介,有了媒介之后就相當于有了一個緩沖,平衡了兩者的能力,整體的設(shè)計如圖所示,最上面是阻塞隊列,右側(cè)的 1 是生產(chǎn)者線程,生產(chǎn)者在生產(chǎn)數(shù)據(jù)后將數(shù)據(jù)存放在阻塞隊列中,左側(cè)的 2 是消費者線程,消費者獲取阻塞隊列中的數(shù)據(jù)。而中間的 3 和 4 分別代表生產(chǎn)者消費者之間互相通信的過程,因為無論阻塞隊列是滿還是空都可能會產(chǎn)生阻塞,阻塞之后就需要在合適的時機去喚醒被阻塞的線程。

那么什么時候阻塞線程需要被喚醒呢?有兩種情況。第一種情況是當消費者看到阻塞隊列為空時,開始進入等待,這時生產(chǎn)者一旦往隊列中放入數(shù)據(jù),就會通知所有的消費者,喚醒阻塞的消費者線程。另一種情況是如果生產(chǎn)者發(fā)現(xiàn)隊列已經(jīng)滿了,也會被阻塞,而一旦消費者獲取數(shù)據(jù)之后就相當于隊列空了一個位置,這時消費者就會通知所有正在阻塞的生產(chǎn)者進行生產(chǎn),這便是對生產(chǎn)者消費者模式的簡單介紹。

1.2 如何用 BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式

接下來看如何用 wait/notify/Condition/BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式,先從最簡單的 BlockingQueue 開始講起:

public static void main(String[] args) {
 
  BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
 
 Runnable producer = () -> {
    while (true) {
          queue.put(new Object());
  }
   };
 
new Thread(producer).start();
new Thread(producer).start();
 
Runnable consumer = () -> {
      while (true) {
           queue.take();
}
   };
new Thread(consumer).start();
new Thread(consumer).start();
}

如代碼所示,首先,創(chuàng)建了一個 ArrayBlockingQueue 類型的 BlockingQueue,命名為 queue 并將它的容量設(shè)置為 10;其次,創(chuàng)建一個簡單的生產(chǎn)者,while(true) 循環(huán)體中的queue.put() 負責(zé)往隊列添加數(shù)據(jù);然后,創(chuàng)建兩個生產(chǎn)者線程并啟動;同樣消費者也非常簡單,while(true) 循環(huán)體中的 queue.take() 負責(zé)消費數(shù)據(jù),同時創(chuàng)建兩個消費者線程并啟動。為了代碼簡潔并突出設(shè)計思想,代碼里省略了 try/catch 檢測,我們不糾結(jié)一些語法細節(jié)。以上便是利用 BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式的代碼。雖然代碼非常簡單,但實際上 ArrayBlockingQueue 已經(jīng)在背后完成了很多工作,比如隊列滿了就去阻塞生產(chǎn)者線程,隊列有空就去喚醒生產(chǎn)者線程等。

1.3 如何用 Condition 實現(xiàn)生產(chǎn)者消費者模式

BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式看似簡單,背后卻暗藏玄機,在掌握這種方法的基礎(chǔ)上仍需要掌握更復(fù)雜的實現(xiàn)方法。接下來看如何在掌握了 BlockingQueue 的基礎(chǔ)上利用 Condition 實現(xiàn)生產(chǎn)者消費者模式,它們背后的實現(xiàn)原理非常相似,相當于自己實現(xiàn)一個簡易版的 BlockingQueue:

public class MyBlockingQueueForCondition {
 
   private Queue queue;
   private int max = 16;
   private ReentrantLock lock = new ReentrantLock();
   private Condition notEmpty = lock.newCondition();
   private Condition notFull = lock.newCondition();
 
 
   public MyBlockingQueueForCondition(int size) {
       this.max = size;
       queue = new LinkedList();
   }
 
   public void put(Object o) throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == max) {
               notFull.await();
           }
           queue.add(o);
           notEmpty.signalAll();
       } finally {
           lock.unlock();
       }
   }
 
   public Object take() throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == 0) {
               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {
           lock.unlock();
       }
   }
}

如代碼所示,首先,定義了一個隊列變量 queue 并設(shè)置最大容量為 16;其次,定義了一個 ReentrantLock 類型的 Lock 鎖,并在 Lock 鎖的基礎(chǔ)上創(chuàng)建兩個 Condition,一個是 notEmpty,另一個是 notFull,分別代表隊列沒有空和沒有滿的條件;最后,聲明了 put 和 take 這兩個核心方法。

因為生產(chǎn)者消費者模式通常是面對多線程的場景,需要一定的同步措施保障線程安全,所以在 put 方法中先將 Lock 鎖上,然后,在 while 的條件里檢測 queue 是不是已經(jīng)滿了,如果已經(jīng)滿了,則調(diào)用 notFull 的 await() 阻塞生產(chǎn)者線程并釋放 Lock,如果沒有滿,則往隊列放入數(shù)據(jù)并利用 notEmpty.signalAll() 通知正在等待的所有消費者并喚醒它們。最后在 finally 中利用 lock.unlock() 方法解鎖,把 unlock 方法放在 finally 中是一個基本原則,否則可能會產(chǎn)生無法釋放鎖的情況。

下面再來看 take 方法,take 方法實際上是與 put 方法相互對應(yīng)的,同樣是通過 while 檢查隊列是否為空,如果為空,消費者開始等待,如果不為空則從隊列中獲取數(shù)據(jù)并通知生產(chǎn)者隊列有空余位置,最后在 finally 中解鎖。

這里需要注意,在 take() 方法中使用 while( queue.size() == 0 ) 檢查隊列狀態(tài),而不能用 if( queue.size() == 0 )。為什么呢?思考這樣一種情況,因為生產(chǎn)者消費者往往是多線程的,假設(shè)有兩個消費者,第一個消費者線程獲取數(shù)據(jù)時,發(fā)現(xiàn)隊列為空,便進入等待狀態(tài);因為第一個線程在等待時會釋放 Lock 鎖,所以第二個消費者可以進入并執(zhí)行 if( queue.size() == 0 ),也發(fā)現(xiàn)隊列為空,于是第二個線程也進入等待;而此時,如果生產(chǎn)者生產(chǎn)了一個數(shù)據(jù),便會喚醒兩個消費者線程,而兩個線程中只有一個線程可以拿到鎖,并執(zhí)行 queue.remove 操作,另外一個線程因為沒有拿到鎖而卡在被喚醒的地方,而第一個線程執(zhí)行完操作后會在 finally 中通過 unlock 解鎖,而此時第二個線程便可以拿到被第一個線程釋放的鎖,繼續(xù)執(zhí)行操作,也會去調(diào)用 queue.remove 操作,然而這個時候隊列已經(jīng)為空了,所以會拋出 NoSuchElementException 異常,這不符合邏輯。而如果用 while 做檢查,當?shù)谝粋€消費者被喚醒得到鎖并移除數(shù)據(jù)之后,第二個線程在執(zhí)行 remove 前仍會進行 while 檢查,發(fā)現(xiàn)此時依然滿足 queue.size() == 0 的條件,就會繼續(xù)執(zhí)行 await 方法,避免了獲取的數(shù)據(jù)為 null 或拋出異常的情況。

1.4 如何用 wait/notify 實現(xiàn)生產(chǎn)者消費者模式

再來看看使用 wait/notify 實現(xiàn)生產(chǎn)者消費者模式的方法,實際上實現(xiàn)原理和Condition 是非常類似的,它們是兄弟關(guān)系:

class MyBlockingQueue {
 
   private int maxSize;
   private LinkedList<Object> storage;
 
   public MyBlockingQueue(int size) {
       this.maxSize = size;
       storage = new LinkedList<>();
   }
 
   public synchronized void put() throws InterruptedException {
       while (storage.size() == maxSize) {
           wait();
       }
       storage.add(new Object());
       notifyAll();
   }
 
   public synchronized void take() throws InterruptedException {
       while (storage.size() == 0) {
           wait();
       }
       System.out.println(storage.remove());
       notifyAll();
   }
}

如代碼所示,最主要的部分仍是 take 與 put 方法,我們先來看 put 方法,put 方法被 synchronized 保護,while 檢查隊列是否為滿,如果不滿就往里放入數(shù)據(jù)并通過 notifyAll() 喚醒其他線程。同樣,take 方法也被 synchronized 修飾,while 檢查隊列是否為空,如果不為空就獲取數(shù)據(jù)并喚醒其他線程。使用這個 MyBlockingQueue 實現(xiàn)的生產(chǎn)者消費者代碼如下:

/**
* 描述:     wait形式實現(xiàn)生產(chǎn)者消費者模式
*/
public class WaitStyle {
 
   public static void main(String[] args) {
       MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
       Producer producer = new Producer(myBlockingQueue);
       Consumer consumer = new Consumer(myBlockingQueue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}
 
class Producer implements Runnable {
 
   private MyBlockingQueue storage;
 
   public Producer(MyBlockingQueue storage) {
       this.storage = storage;
   }
 
   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.put();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}
 
class Consumer implements Runnable {
 
   private MyBlockingQueue storage;
 
   public Consumer(MyBlockingQueue storage) {
       this.storage = storage;
   }
 
   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.take();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

以上就是三種實現(xiàn)生產(chǎn)者消費者模式的方式,其中,第一種 BlockingQueue 模式實現(xiàn)比較簡單,但其背后的實現(xiàn)原理在第二種、第三種實現(xiàn)方法中得以體現(xiàn),第二種、第三種實現(xiàn)方法本質(zhì)上是我們自己實現(xiàn)了 BlockingQueue 的一些核心邏輯,供生產(chǎn)者與消費者使用。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容