在實際的軟件開發(fā)過程中,經(jīng)常會碰到如下場景:某個模塊負責產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數(shù)、線程、進程等)。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費者;
生產(chǎn)者和消費者之間通過緩沖區(qū)(通常是一個阻塞隊列)實現(xiàn)通訊, 生產(chǎn)者將產(chǎn)生的數(shù)據(jù)放入緩沖區(qū),消費者從緩沖區(qū)中獲取數(shù)據(jù)。

舉個栗子
去食堂吃飯,食堂的叔叔阿姨會先將飯做好,放到食堂窗口,同學(xué)們會去食堂打飯。
生產(chǎn)者(食堂的叔叔阿姨) -> 生產(chǎn)數(shù)據(jù)(做飯) -> 緩沖區(qū)(食堂窗口) -> 消費數(shù)據(jù)(打飯) -> 消費者(同學(xué))
生產(chǎn)者消費者實現(xiàn)思路

生產(chǎn)者和消費者的任務(wù)很明確,生產(chǎn)者只管生產(chǎn)數(shù)據(jù),然后添加到緩沖隊列。而消費者只管從緩沖隊列中獲取數(shù)據(jù)
可以說生產(chǎn)者消費者都很無腦,而緩沖隊列則要忙一些,他起到了一個平衡生產(chǎn)者和消費者的作用。
如果生產(chǎn)者生產(chǎn)速度過快,消費者消費的很慢,并且緩沖隊列達到了最大長度時。緩沖隊列會阻塞生產(chǎn)者,讓生產(chǎn)者停止生產(chǎn),等待消費者消費了數(shù)據(jù)后,再喚醒生產(chǎn)者
同理,當消費者消費速度過快時,隊列為空時。緩沖隊列則會阻塞消費者,待生產(chǎn)者向隊列添加數(shù)據(jù)后,再喚醒消費者
實現(xiàn)
通過上述的分析后,我們來用最基本的Java代碼實現(xiàn)一下
我們先來定義一下Consumer 和Producer ,他們的邏輯比較簡單,這里我們只循環(huán)十次模擬一下生產(chǎn)消費的場景。Buffer 為緩沖區(qū),我們待會再看
/**
* 消費者
*/
class Consumer extends Thread {
private Buffer buffer;
private int number;
public Consumer(Buffer b, int number) {
buffer = b;
this.number = number;
}
public void run() {
int value;
for (int i = 0; i < 10; i++) {
// 從緩沖區(qū)中獲取數(shù)據(jù)
value = buffer.get();
try {
// 模擬消費數(shù)據(jù)
sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("消費者 #" + this.number + " got: " + value);
}
}
}
/**
* 生產(chǎn)者
*/
class Producer extends Thread {
private Buffer buffer;
private int number;
public Producer(Buffer b, int number) {
buffer = b;
this.number = number;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
// 模擬生產(chǎn)數(shù)據(jù)
sleep(500);
} catch (InterruptedException e) {
}
// 將數(shù)據(jù)放入緩沖區(qū)
buffer.put(i);
System.out.println("生產(chǎn)者 #" + this.number + " put: " + i);
}
}
}
可以看到 Consumer 和Producer 沒有什么邏輯,只是對緩沖區(qū)的讀寫操作,下面我們來重點看一下 Buffer的實現(xiàn)
/**
* 緩沖區(qū)
*/
class Buffer {
private List<Integer> data = new ArrayList<>();
private static final int MAX = 10;
private static final int MIN = 0;
public synchronized int get() {
while (MIN == data.size()) {
try {
wait();
} catch (InterruptedException e) {
}
}
Integer i = data.remove(0);
notifyAll();
return i;
}
public synchronized void put(int value) {
while (MAX == data.size()) {
try {
wait();
} catch (InterruptedException e) {
}
}
data.add(value);
notifyAll();
}
}
-
分析
put(): 生產(chǎn)者向緩沖區(qū)寫入數(shù)據(jù)的操作,當MAX == data.size()時,就是我們剛剛所說的生產(chǎn)者速度過快,消費者速度過慢的情況,這個時候身為 "緩沖區(qū)" 要平衡一下,調(diào)用 Object.wait(),讓當前生產(chǎn)者線程進入掛起狀態(tài),等待消費者消費數(shù)據(jù)后將其喚醒當
MAX > data.size()時,向ArrayList中添加數(shù)據(jù),并嘗試喚醒正在等待的消費者(這一步是必須的)get(): 消費者向緩沖區(qū)讀數(shù)據(jù)的操作,和上述邏輯相反,當
MIN == data.size()時,這時消費者速度太快,生產(chǎn)者太慢,隊列中已經(jīng)沒有數(shù)據(jù)了。"緩沖區(qū)" 再一次站了出來,通過wait(),讓當前消費者線程進入掛起狀態(tài),等待生產(chǎn)者生產(chǎn)數(shù)據(jù)后將其喚醒當
MIN < data.size()時,取出ArrayList中第一條數(shù)據(jù),并嘗試喚醒正在等待的生產(chǎn)者
上述案例完整代碼 ProducerConsumer.java
上述使用最基本的Java代碼實現(xiàn)生產(chǎn)者消費者模式,實際開發(fā)中我們可能會使用BlockingQueue、ReentrantLock、ThreadPoolExecutor這些更成熟的輪子,但是一通百通
關(guān)于上述案例的思考
為什么緩沖區(qū)的判斷條件是
while(condition)而不是if(condition)?
答:防止線程被錯誤的喚醒
舉例:當有兩個消費者線程wait() 時,此時生產(chǎn)者在隊列里放入了一條數(shù)據(jù),并調(diào)用notifyAll(), 兩個消費者線程被喚醒,第一個消費者成功取出隊列中數(shù)據(jù),而第二個消費者此時就是被錯誤的喚醒了,程序拋出異常,所以此處使用 while(condition)循環(huán)檢查Java中要求wait()方法為什么要放在同步塊中?
答:防止出現(xiàn)Lost Wake-Up
舉例:如果隊列沒有同步限制,消費者和生產(chǎn)者并發(fā)執(zhí)行,很可能出現(xiàn)這種情況,消費者這時候檢查了條件正準備wait(),這時候上下文切換到了生產(chǎn)者,生產(chǎn)者咔咔一頓操作向隊列中添加了數(shù)據(jù),并喚醒了消費者,而此時消費者并沒有wait(),這個通知就丟掉了,然后消費者wait() 就這樣睡去了...為什么緩沖區(qū)一定要使用阻塞隊列實現(xiàn)?
同理就是為了防止出現(xiàn)Lost Wake-Up
為什么要使用生產(chǎn)者消費者模式
順序執(zhí)行不就可以了嗎?生產(chǎn)者消費者到底有什么意義?
并發(fā) (異步)
生產(chǎn)者直接調(diào)用消費者,兩者是同步(阻塞)的,如果消費者吞吐數(shù)據(jù)很慢,這時候生產(chǎn)者白白浪費大好時光。而使用這種模式之后,生產(chǎn)者將數(shù)據(jù)丟到緩沖區(qū),繼續(xù)生產(chǎn),完全不依賴消費者,程序執(zhí)行效率會大大提高。解耦
生產(chǎn)者和消費者之間不直接依賴,通過緩沖區(qū)通訊,將兩個類之間的耦合度降到最低。