生產(chǎn)者消費者模式
我們先來看看什么是生產(chǎn)者消費者模式,生產(chǎn)者消費者模式是程序設(shè)計中非常常見的一種設(shè)計模式,被廣泛運用在解耦、消息隊列等場景。在現(xiàn)實世界中,我們把生產(chǎn)商品的一方稱為生產(chǎn)者,把消費商品的一方稱為消費者,有時生產(chǎn)者的生產(chǎn)速度特別快,但消費者的消費速度跟不上,俗稱“產(chǎn)能過?!?,又或是多個生產(chǎn)者對應(yīng)多個消費者時,大家可能會手忙腳亂。如何才能讓大家更好地配合呢?這時在生產(chǎn)者和消費者之間就需要一個中介來進行調(diào)度,于是便誕生了生產(chǎn)者消費者模式。

使用生產(chǎn)者消費者模式通常需要在兩者之間增加一個阻塞隊列作為媒介,有了媒介之后就相當(dāng)于有了一個緩沖,平衡了兩者的能力,整體的設(shè)計如圖所示,最上面是阻塞隊列,右側(cè)的 1 是生產(chǎn)者線程,生產(chǎn)者在生產(chǎn)數(shù)據(jù)后將數(shù)據(jù)存放在阻塞隊列中,左側(cè)的 2 是消費者線程,消費者獲取阻塞隊列中的數(shù)據(jù)。而中間的 3 和 4 分別代表生產(chǎn)者消費者之間互相通信的過程,因為無論阻塞隊列是滿還是空都可能會產(chǎn)生阻塞,阻塞之后就需要在合適的時機去喚醒被阻塞的線程。
那么什么時候阻塞線程需要被喚醒呢?有兩種情況。第一種情況是當(dāng)消費者看到阻塞隊列為空時,開始進入等待,這時生產(chǎn)者一旦往隊列中放入數(shù)據(jù),就會通知所有的消費者,喚醒阻塞的消費者線程。另一種情況是如果生產(chǎn)者發(fā)現(xiàn)隊列已經(jīng)滿了,也會被阻塞,而一旦消費者獲取數(shù)據(jù)之后就相當(dāng)于隊列空了一個位置,這時消費者就會通知所有正在阻塞的生產(chǎn)者進行生產(chǎn),這便是對生產(chǎn)者消費者模式的簡單介紹。
BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式
package com.example.multithread.ProducerAndConsumer.GroupA;
import java.util.concurrent.BlockingQueue;
/**
* @author liujy
* @description 生產(chǎn)者A
* @since 2020-12-29 10:18
*/
public class AProducer implements Runnable {
private BlockingQueue<Object> blockingQueue;
public AProducer(BlockingQueue<Object> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Object o = new Object();
blockingQueue.put(o);
System.out.println("produce:" + o);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupA;
import java.util.concurrent.BlockingQueue;
/**
* @author liujy
* @description 消費者A
* @since 2020-12-29 10:18
*/
public class AConsumer implements Runnable {
private BlockingQueue<Object> blockingQueue;
public AConsumer(BlockingQueue<Object> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Object take = blockingQueue.take();
System.out.println("consume:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupA;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author liujy
* @description 用BlockingQueue實現(xiàn)生產(chǎn)者消費者模式
* @since 2020-12-29 09:52
*/
public class ATest {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
new Thread(new AProducer(blockingQueue)).start();
new Thread(new AProducer(blockingQueue)).start();
new Thread(new AConsumer(blockingQueue)).start();
new Thread(new AConsumer(blockingQueue)).start();
}
}
Condition 實現(xiàn)生產(chǎn)者消費者模式
BlockingQueue 實現(xiàn)生產(chǎn)者消費者模式看似簡單,背后卻暗藏玄機,我們在掌握這種方法的基礎(chǔ)上仍需要掌握更復(fù)雜的實現(xiàn)方法。我們接下來看如何在掌握了 BlockingQueue 的基礎(chǔ)上利用 Condition 實現(xiàn)生產(chǎn)者消費者模式,它們背后的實現(xiàn)原理非常相似,相當(dāng)于我們自己實現(xiàn)一個簡易版的 BlockingQueue:
package com.example.multithread.ProducerAndConsumer.GroupB;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author liujy
* @description 自定義BlockingQueue
* @since 2020-12-29 14:41
*/
public class MyBlockingQueueForCondition {
private Queue queue;
private int maxCapacity;
private ReentrantLock lock = new ReentrantLock();
private Condition isEmpty = lock.newCondition();
private Condition isFull = lock.newCondition();
public MyBlockingQueueForCondition(int maxCapacity) {
this.maxCapacity = maxCapacity;
this.queue = new LinkedList();
}
// 添加元素
public void put(int o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == maxCapacity) {
isFull.await();
}
queue.add(o);
isFull.signalAll();
} finally {
lock.unlock();
}
}
// 移除元素
public Object pop() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
isEmpty.await();
}
Object o = queue.poll();
isEmpty.signalAll();
return o;
} finally {
lock.unlock();
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupB;
/**
* @author liujy
* @description 生產(chǎn)者B
* @since 2020-12-29 14:37
*/
public class BProducer implements Runnable {
private MyBlockingQueueForCondition myBlockingQueue;
public BProducer(MyBlockingQueueForCondition myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
myBlockingQueue.put(i);
System.out.println(Thread.currentThread().getName() + " " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupB;
/**
* @author liujy
* @description 消費者B
* @since 2020-12-29 14:37
*/
public class BConsumer implements Runnable {
private MyBlockingQueueForCondition myBlockingQueue;
public BConsumer(MyBlockingQueueForCondition myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Object pop = myBlockingQueue.pop();
System.out.println(Thread.currentThread().getName() + " " + pop);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupB;
/**
* @author liujy
* @description 用Condition實現(xiàn)生產(chǎn)者消費者模式
* @since 2020-12-29 14:38
*/
public class BTest {
public static void main(String[] args) {
MyBlockingQueueForCondition myBlockingQueue = new MyBlockingQueueForCondition(10);
new Thread(new BProducer(myBlockingQueue), "producer one").start();
new Thread(new BConsumer(myBlockingQueue), "consumer one").start();
// new Thread(new BProducer(myBlockingQueue), "producer two").start();
// new Thread(new BConsumer(myBlockingQueue), "consumer two").start();
}
}
wait/notify 實現(xiàn)生產(chǎn)者消費者模式
wait/notify 實現(xiàn)生產(chǎn)者消費者模式的方法,實際上實現(xiàn)原理和Condition 是非常類似的,它們是兄弟關(guān)系:
package com.example.multithread.ProducerAndConsumer.GroupC;
import java.util.LinkedList;
import java.util.Queue;
/**
* @author liujy
* @description 自定義BlockingQueue
* @since 2020-12-29 16:32
*/
public class MyBlockingQueue {
private Queue queue;
private int capacity;
public MyBlockingQueue(int capacity) {
this.queue = new LinkedList();
this.capacity = capacity;
}
public synchronized void put(int i) throws InterruptedException {
while (queue.size() == capacity) {
wait();
}
queue.add(i);
notifyAll();
}
public synchronized int pop() throws InterruptedException {
while (queue.size() == 0) {
wait();
}
Integer i = (Integer) queue.poll();
notifyAll();
return i;
}
}
package com.example.multithread.ProducerAndConsumer.GroupC;
/**
* @author liujy
* @description 生產(chǎn)者C
* @since 2020-12-29 16:29
*/
public class CProducer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public CProducer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
myBlockingQueue.put(i);
System.out.println("producer " + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupC;
/**
* @author liujy
* @description 消費者C
* @since 2020-12-29 16:29
*/
public class CConsumer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public CConsumer(MyBlockingQueue myBlockingQueue) {
this.myBlockingQueue = myBlockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
int pop = myBlockingQueue.pop();
System.out.println("consumer " + pop);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.multithread.ProducerAndConsumer.GroupC;
/**
* @author liujy
* @description 用wait、notify實現(xiàn)生產(chǎn)者消費者模式
* @since 2020-12-29 16:29
*/
public class CTest {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
new Thread(new CProducer(myBlockingQueue), "producer one").start();
new Thread(new CConsumer(myBlockingQueue), "consumer one").start();
// new Thread(new BProducer(myBlockingQueue), "producer two").start();
// new Thread(new BConsumer(myBlockingQueue), "consumer two").start();
}
}