Java 理解生產(chǎn)者-消費者設(shè)計模式

在實際的軟件開發(fā)過程中,經(jīng)常會碰到如下場景:某個模塊負責產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數(shù)、線程、進程等)。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費者;

生產(chǎn)者和消費者之間通過緩沖區(qū)(通常是一個阻塞隊列)實現(xiàn)通訊, 生產(chǎn)者將產(chǎn)生的數(shù)據(jù)放入緩沖區(qū),消費者從緩沖區(qū)中獲取數(shù)據(jù)。

圖片來源網(wǎng)絡(luò)

舉個栗子

去食堂吃飯,食堂的叔叔阿姨會先將飯做好,放到食堂窗口,同學(xué)們會去食堂打飯。

生產(chǎn)者(食堂的叔叔阿姨) -> 生產(chǎn)數(shù)據(jù)(做飯) -> 緩沖區(qū)(食堂窗口) -> 消費數(shù)據(jù)(打飯) -> 消費者(同學(xué))

生產(chǎn)者消費者實現(xiàn)思路

圖片來源網(wǎng)絡(luò)

生產(chǎn)者和消費者的任務(wù)很明確,生產(chǎn)者只管生產(chǎn)數(shù)據(jù),然后添加到緩沖隊列。而消費者只管從緩沖隊列中獲取數(shù)據(jù)

可以說生產(chǎn)者消費者都很無腦,而緩沖隊列則要忙一些,他起到了一個平衡生產(chǎn)者和消費者的作用。

如果生產(chǎn)者生產(chǎn)速度過快,消費者消費的很慢,并且緩沖隊列達到了最大長度時。緩沖隊列會阻塞生產(chǎn)者,讓生產(chǎn)者停止生產(chǎn),等待消費者消費了數(shù)據(jù)后,再喚醒生產(chǎn)者

同理,當消費者消費速度過快時,隊列為空時。緩沖隊列則會阻塞消費者,待生產(chǎn)者向隊列添加數(shù)據(jù)后,再喚醒消費者

實現(xiàn)

通過上述的分析后,我們來用最基本的Java代碼實現(xiàn)一下

我們先來定義一下Consumer 和Producer ,他們的邏輯比較簡單,這里我們只循環(huán)十次模擬一下生產(chǎn)消費的場景。Buffer 為緩沖區(qū),我們待會再看

/**
 * 消費者
 */
class Consumer extends Thread {
    private Buffer buffer;
    private int number;

    public Consumer(Buffer b, int number) {
        buffer = b;
        this.number = number;
    }

    public void run() {
        int value;
        for (int i = 0; i < 10; i++) {
            // 從緩沖區(qū)中獲取數(shù)據(jù)
            value = buffer.get();
            try {
                // 模擬消費數(shù)據(jù)
                sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println("消費者 #" + this.number + " got: " + value);
        }
    }
}

/**
 * 生產(chǎn)者
 */
class Producer extends Thread {
    private Buffer buffer;
    private int number;

    public Producer(Buffer b, int number) {
        buffer = b;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                // 模擬生產(chǎn)數(shù)據(jù)
                sleep(500);
            } catch (InterruptedException e) {
            }
            
            // 將數(shù)據(jù)放入緩沖區(qū)
            buffer.put(i);
            System.out.println("生產(chǎn)者 #" + this.number + " put: " + i);
        }
    }
}

可以看到 Consumer 和Producer 沒有什么邏輯,只是對緩沖區(qū)的讀寫操作,下面我們來重點看一下 Buffer的實現(xiàn)

/**
 * 緩沖區(qū)
 */
class Buffer {
    private List<Integer> data = new ArrayList<>();
    private static final int MAX = 10;
    private static final int MIN = 0;

    public synchronized int get() {
        while (MIN == data.size()) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        Integer i = data.remove(0);
        notifyAll();
        return i;
    }

    public synchronized void put(int value) {
        while (MAX == data.size()) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        data.add(value);
        notifyAll();
    }
}
  • 分析
    put(): 生產(chǎn)者向緩沖區(qū)寫入數(shù)據(jù)的操作,當MAX == data.size()時,就是我們剛剛所說的生產(chǎn)者速度過快,消費者速度過慢的情況,這個時候身為 "緩沖區(qū)" 要平衡一下,調(diào)用 Object.wait(),讓當前生產(chǎn)者線程進入掛起狀態(tài),等待消費者消費數(shù)據(jù)后將其喚醒

    MAX > data.size()時,向ArrayList中添加數(shù)據(jù),并嘗試喚醒正在等待的消費者(這一步是必須的)

    get(): 消費者向緩沖區(qū)讀數(shù)據(jù)的操作,和上述邏輯相反,當MIN == data.size()時,這時消費者速度太快,生產(chǎn)者太慢,隊列中已經(jīng)沒有數(shù)據(jù)了。"緩沖區(qū)" 再一次站了出來,通過wait(),讓當前消費者線程進入掛起狀態(tài),等待生產(chǎn)者生產(chǎn)數(shù)據(jù)后將其喚醒

    MIN < data.size()時,取出ArrayList中第一條數(shù)據(jù),并嘗試喚醒正在等待的生產(chǎn)者

上述案例完整代碼 ProducerConsumer.java

上述使用最基本的Java代碼實現(xiàn)生產(chǎn)者消費者模式,實際開發(fā)中我們可能會使用BlockingQueue、ReentrantLockThreadPoolExecutor這些更成熟的輪子,但是一通百通

關(guān)于上述案例的思考

  1. 為什么緩沖區(qū)的判斷條件是 while(condition) 而不是 if(condition)
    答:防止線程被錯誤的喚醒
    舉例:當有兩個消費者線程wait() 時,此時生產(chǎn)者在隊列里放入了一條數(shù)據(jù),并調(diào)用notifyAll(), 兩個消費者線程被喚醒,第一個消費者成功取出隊列中數(shù)據(jù),而第二個消費者此時就是被錯誤的喚醒了,程序拋出異常,所以此處使用 while(condition)循環(huán)檢查

  2. Java中要求wait()方法為什么要放在同步塊中?
    答:防止出現(xiàn)Lost Wake-Up
    舉例:如果隊列沒有同步限制,消費者和生產(chǎn)者并發(fā)執(zhí)行,很可能出現(xiàn)這種情況,消費者這時候檢查了條件正準備wait(),這時候上下文切換到了生產(chǎn)者,生產(chǎn)者咔咔一頓操作向隊列中添加了數(shù)據(jù),并喚醒了消費者,而此時消費者并沒有wait(),這個通知就丟掉了,然后消費者wait() 就這樣睡去了...

  3. 為什么緩沖區(qū)一定要使用阻塞隊列實現(xiàn)?
    同理就是為了防止出現(xiàn)Lost Wake-Up

為什么要使用生產(chǎn)者消費者模式

順序執(zhí)行不就可以了嗎?生產(chǎn)者消費者到底有什么意義?

  • 并發(fā) (異步)
    生產(chǎn)者直接調(diào)用消費者,兩者是同步(阻塞)的,如果消費者吞吐數(shù)據(jù)很慢,這時候生產(chǎn)者白白浪費大好時光。而使用這種模式之后,生產(chǎn)者將數(shù)據(jù)丟到緩沖區(qū),繼續(xù)生產(chǎn),完全不依賴消費者,程序執(zhí)行效率會大大提高。

  • 解耦
    生產(chǎn)者和消費者之間不直接依賴,通過緩沖區(qū)通訊,將兩個類之間的耦合度降到最低。

參考

https://blog.csdn.net/u011109589/article/details/80519863

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容