一、多線程下生產(chǎn)者消費者模式

生產(chǎn)者消費者模式

我們先來看看什么是生產(chǎn)者消費者模式,生產(chǎn)者消費者模式是程序設(shè)計中非常常見的一種設(shè)計模式,被廣泛運用在解耦、消息隊列等場景。在現(xiàn)實世界中,我們把生產(chǎn)商品的一方稱為生產(chǎn)者,把消費商品的一方稱為消費者,有時生產(chǎn)者的生產(chǎn)速度特別快,但消費者的消費速度跟不上,俗稱“產(chǎn)能過?!?,又或是多個生產(chǎn)者對應(yīng)多個消費者時,大家可能會手忙腳亂。如何才能讓大家更好地配合呢?這時在生產(chǎn)者和消費者之間就需要一個中介來進行調(diào)度,于是便誕生了生產(chǎn)者消費者模式。

image.png

使用生產(chǎn)者消費者模式通常需要在兩者之間增加一個阻塞隊列作為媒介,有了媒介之后就相當(dāng)于有了一個緩沖,平衡了兩者的能力,整體的設(shè)計如圖所示,最上面是阻塞隊列,右側(cè)的 1 是生產(chǎn)者線程,生產(chǎn)者在生產(chǎn)數(shù)據(jù)后將數(shù)據(jù)存放在阻塞隊列中,左側(cè)的 2 是消費者線程,消費者獲取阻塞隊列中的數(shù)據(jù)。而中間的 3 和 4 分別代表生產(chǎn)者消費者之間互相通信的過程,因為無論阻塞隊列是滿還是空都可能會產(chǎn)生阻塞,阻塞之后就需要在合適的時機去喚醒被阻塞的線程。

那么什么時候阻塞線程需要被喚醒呢?有兩種情況。第一種情況是當(dāng)消費者看到阻塞隊列為空時,開始進入等待,這時生產(chǎn)者一旦往隊列中放入數(shù)據(jù),就會通知所有的消費者,喚醒阻塞的消費者線程。另一種情況是如果生產(chǎn)者發(fā)現(xiàn)隊列已經(jīng)滿了,也會被阻塞,而一旦消費者獲取數(shù)據(jù)之后就相當(dāng)于隊列空了一個位置,這時消費者就會通知所有正在阻塞的生產(chǎn)者進行生產(chǎn),這便是對生產(chǎn)者消費者模式的簡單介紹。

BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式
package com.example.multithread.ProducerAndConsumer.GroupA;

import java.util.concurrent.BlockingQueue;

/**
 * @author liujy
 * @description 生產(chǎn)者A
 * @since 2020-12-29 10:18
 */
public class AProducer implements Runnable {
    private BlockingQueue<Object> blockingQueue;

    public AProducer(BlockingQueue<Object> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Object o = new Object();
                blockingQueue.put(o);
                System.out.println("produce:" + o);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.example.multithread.ProducerAndConsumer.GroupA;

import java.util.concurrent.BlockingQueue;

/**
 * @author liujy
 * @description 消費者A
 * @since 2020-12-29 10:18
 */
public class AConsumer implements Runnable {
    private BlockingQueue<Object> blockingQueue;

    public AConsumer(BlockingQueue<Object> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Object take = blockingQueue.take();
                System.out.println("consume:" + take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

package com.example.multithread.ProducerAndConsumer.GroupA;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author liujy
 * @description 用BlockingQueue實現(xiàn)生產(chǎn)者消費者模式
 * @since 2020-12-29 09:52
 */
public class ATest {
    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
        new Thread(new AProducer(blockingQueue)).start();
        new Thread(new AProducer(blockingQueue)).start();
        new Thread(new AConsumer(blockingQueue)).start();
        new Thread(new AConsumer(blockingQueue)).start();
    }
}
Condition 實現(xiàn)生產(chǎn)者消費者模式

BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式看似簡單,背后卻暗藏玄機,我們在掌握這種方法的基礎(chǔ)上仍需要掌握更復(fù)雜的實現(xiàn)方法。我們接下來看如何在掌握了 BlockingQueue 的基礎(chǔ)上利用 Condition 實現(xiàn)生產(chǎn)者消費者模式,它們背后的實現(xiàn)原理非常相似,相當(dāng)于我們自己實現(xiàn)一個簡易版的 BlockingQueue:

package com.example.multithread.ProducerAndConsumer.GroupB;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author liujy
 * @description 自定義BlockingQueue
 * @since 2020-12-29 14:41
 */
public class MyBlockingQueueForCondition {
    private Queue queue;
    private int maxCapacity;
    private ReentrantLock lock = new ReentrantLock();
    private Condition isEmpty = lock.newCondition();
    private Condition isFull = lock.newCondition();

    public MyBlockingQueueForCondition(int maxCapacity) {
        this.maxCapacity = maxCapacity;
        this.queue = new LinkedList();
    }

    // 添加元素
    public void put(int o) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == maxCapacity) {
                isFull.await();
            }
            queue.add(o);
            isFull.signalAll();
        } finally {
            lock.unlock();
        }
    }

    // 移除元素
    public Object pop() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                isEmpty.await();
            }
            Object o = queue.poll();
            isEmpty.signalAll();
            return o;
        } finally {
            lock.unlock();
        }
    }
}
package com.example.multithread.ProducerAndConsumer.GroupB;

/**
 * @author liujy
 * @description 生產(chǎn)者B
 * @since 2020-12-29 14:37
 */
public class BProducer implements Runnable {
    private MyBlockingQueueForCondition myBlockingQueue;

    public BProducer(MyBlockingQueueForCondition myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println(Thread.currentThread().getName() + " " + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

package com.example.multithread.ProducerAndConsumer.GroupB;

/**
 * @author liujy
 * @description 消費者B
 * @since 2020-12-29 14:37
 */
public class BConsumer implements Runnable {
    private MyBlockingQueueForCondition myBlockingQueue;

    public BConsumer(MyBlockingQueueForCondition myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Object pop = myBlockingQueue.pop();
                System.out.println(Thread.currentThread().getName() + " " + pop);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.example.multithread.ProducerAndConsumer.GroupB;

/**
 * @author liujy
 * @description 用Condition實現(xiàn)生產(chǎn)者消費者模式
 * @since 2020-12-29 14:38
 */
public class BTest {
    public static void main(String[] args) {
        MyBlockingQueueForCondition myBlockingQueue = new MyBlockingQueueForCondition(10);
        new Thread(new BProducer(myBlockingQueue), "producer one").start();
        new Thread(new BConsumer(myBlockingQueue), "consumer one").start();
//        new Thread(new BProducer(myBlockingQueue), "producer two").start();
//        new Thread(new BConsumer(myBlockingQueue), "consumer two").start();
    }

}
wait/notify 實現(xiàn)生產(chǎn)者消費者模式

wait/notify 實現(xiàn)生產(chǎn)者消費者模式的方法,實際上實現(xiàn)原理和Condition 是非常類似的,它們是兄弟關(guān)系:

package com.example.multithread.ProducerAndConsumer.GroupC;

import java.util.LinkedList;
import java.util.Queue;

/**
 * @author liujy
 * @description 自定義BlockingQueue
 * @since 2020-12-29 16:32
 */
public class MyBlockingQueue {
    private Queue queue;
    private int capacity;

    public MyBlockingQueue(int capacity) {
        this.queue = new LinkedList();
        this.capacity = capacity;
    }

    public synchronized void put(int i) throws InterruptedException {
        while (queue.size() == capacity) {
            wait();
        }
        queue.add(i);
        notifyAll();
    }

    public synchronized int pop() throws InterruptedException {
        while (queue.size() == 0) {
            wait();
        }
        Integer i = (Integer) queue.poll();
        notifyAll();
        return i;
    }
}

package com.example.multithread.ProducerAndConsumer.GroupC;

/**
 * @author liujy
 * @description 生產(chǎn)者C
 * @since 2020-12-29 16:29
 */
public class CProducer implements Runnable {
    private MyBlockingQueue myBlockingQueue;

    public CProducer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                myBlockingQueue.put(i);
                System.out.println("producer " + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.example.multithread.ProducerAndConsumer.GroupC;

/**
 * @author liujy
 * @description 消費者C
 * @since 2020-12-29 16:29
 */
public class CConsumer implements Runnable {
    private MyBlockingQueue myBlockingQueue;

    public CConsumer(MyBlockingQueue myBlockingQueue) {
        this.myBlockingQueue = myBlockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                int pop = myBlockingQueue.pop();
                System.out.println("consumer " + pop);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.example.multithread.ProducerAndConsumer.GroupC;

/**
 * @author liujy
 * @description 用wait、notify實現(xiàn)生產(chǎn)者消費者模式
 * @since 2020-12-29 16:29
 */
public class CTest {
    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
        new Thread(new CProducer(myBlockingQueue), "producer one").start();
        new Thread(new CConsumer(myBlockingQueue), "consumer one").start();
//        new Thread(new BProducer(myBlockingQueue), "producer two").start();
//        new Thread(new BConsumer(myBlockingQueue), "consumer two").start();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容