背景
生產(chǎn)者和消費(fèi)者問(wèn)題是線(xiàn)程模型中的經(jīng)典問(wèn)題:生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)存儲(chǔ)空間,生產(chǎn)者往存儲(chǔ)空間中添加產(chǎn)品,消費(fèi)者從存儲(chǔ)空間中取走產(chǎn)品,當(dāng)存儲(chǔ)空間為空時(shí),消費(fèi)者阻塞,當(dāng)存儲(chǔ)空間滿(mǎn)時(shí),生產(chǎn)者阻塞。

解決方案
第一種解決方案 使用 synchronized 的 wait() 和 notify() 來(lái)實(shí)現(xiàn)
關(guān)鍵的思路就是通過(guò) 生產(chǎn)者 和 消費(fèi)者 的不斷循環(huán)來(lái)不斷運(yùn)行。
通過(guò) full 來(lái)表示緩沖區(qū)的大小,當(dāng)然此題中沒(méi)有往緩沖區(qū)里面放的過(guò)程,可以自行替換成list等,因?yàn)橐呀?jīng)有了 synchronized(LOCK) 來(lái)保護(hù),所以不需要線(xiàn)程安全的集合類(lèi)。
count用來(lái)表示緩沖區(qū)中的現(xiàn)有的項(xiàng)目已經(jīng)生產(chǎn)到了哪里。
沒(méi)有體現(xiàn)但很重要的是,wait/notify方法的調(diào)用必須處在該對(duì)象的鎖(Monitor)中,也即,在調(diào)用這些方法時(shí)首先需要獲得該對(duì)象的鎖。否則會(huì)拋出IllegalMonitorStateException異常。
這里需要注意的是sleep()不能放在synchronized代碼塊里面,因?yàn)槲覀冎纒leep()執(zhí)行之后是不會(huì)釋放鎖的,也就是說(shuō)當(dāng)前線(xiàn)程仍然持有對(duì)container對(duì)象的互斥鎖,這個(gè)時(shí)候當(dāng)前線(xiàn)程繼續(xù)判斷l(xiāng)ist.size是否等于capacity,不等于就繼續(xù)put,然后又sleep一會(huì),然后又繼續(xù),直到當(dāng)list.size == capacity,這個(gè)時(shí)候終于進(jìn)入wait()方法,我們知道wait()方法會(huì)釋放鎖,這個(gè)時(shí)候其他線(xiàn)程才有機(jī)會(huì)獲取到container的互斥鎖,
notifyAll()不能單獨(dú)放在producer類(lèi)里面,因?yàn)閚otifyAll()必須放在同步代碼塊里面
弊端:這里由于不能區(qū)分哪些是not empty或者not full或者is full/empty線(xiàn)程,所以需要喚醒所有其他等待的線(xiàn)程,但實(shí)際上我們需要的是喚醒那些not empty或者not full的線(xiàn)程就夠了
import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.Callable;
public class LeetCode215 {
private static Integer count =0;
private static final int full =10;
private static final String LOCK="lock";
public static void main(String[] args){
System.out.println("test");
LeetCode215 test1 = new LeetCode215();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new producer()).start();
}
//生產(chǎn)者
class Producer implements Runnable{
@Override
public void run() {
//進(jìn)行不斷的循環(huán),來(lái)保證一直生產(chǎn)
while(true){
//slepp一下線(xiàn)程,保證循環(huán)不要執(zhí)行的太快,浪費(fèi)性能
//sleep不能放在同步代碼塊里面,因?yàn)閟leep不會(huì)釋放鎖,
//當(dāng)前線(xiàn)程會(huì)一直占有produce線(xiàn)程,直到達(dá)到容量,調(diào)用wait()方法主動(dòng)釋放鎖
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
//加鎖到本方法上面
synchronized (LOCK){
//緩沖區(qū)已經(jīng)滿(mǎn)了,調(diào)用wait等待
while(count==full) {
try {
LOCK.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//沒(méi)有滿(mǎn),可以繼續(xù)生產(chǎn)
count++;
System.out.println("生產(chǎn)者produce了:"+Thread.currentThread().getName()+" 一共有:"+count);
//喚醒其他都處于wait()的線(xiàn)程,包括生產(chǎn)者和消費(fèi)者
LOCK.notifyAll();
}
}
}
}
class Consumer implements Runnable{
public void run(){
//不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
//利用鎖來(lái)控制 消費(fèi)者 和 生產(chǎn)者的訪(fǎng)問(wèn)
synchronized (LOCK){
//如果緩沖區(qū)里面沒(méi)有數(shù)據(jù)
while(count==0){
try{
LOCK.wait();
}catch (Exception e){
e.printStackTrace();
}
}
//緩沖區(qū)有資源了
count--;
System.out.println("消費(fèi)者consume了:"+Thread.currentThread().getName()+" 一共有:"+count);
LOCK.notifyAll();
}
}
}
}
}
第二種解決方案 利用重入鎖 ReentrantLock()的 await() 和 signalAll() 機(jī)制
這種方式和 synchroized 方式是基本上一樣的
參照 Object 的 wait() 和 notify/notifyAll() 方法,
Condition 也提供了同樣的 await() 和 signal/signalAll() 方法。
Condition的await()/signal()方法和Object的wait()/notify()方法
方法ConditionObject阻塞等待await()wait()喚醒其他線(xiàn)程signal()notify()/notifyall()使用的鎖互斥鎖/共享鎖,如Lock同步鎖:如synchronized一個(gè)鎖對(duì)應(yīng)可以創(chuàng)建多個(gè)condition對(duì)應(yīng)一個(gè)Object喚醒指定的線(xiàn)程明確的指定線(xiàn)程只能通過(guò)notifyAll喚醒所有線(xiàn)程;或者notify()隨機(jī)喚醒
lock和condition實(shí)現(xiàn)生產(chǎn)者消費(fèi)者
該實(shí)現(xiàn)方式相比較synchronized于object的wait()/notify()方法具有更加的靈活性,可以喚醒具體的消費(fèi)者線(xiàn)程或者生產(chǎn)者線(xiàn)程,達(dá)到當(dāng)緩沖區(qū)滿(mǎn)的時(shí)候,喚醒消費(fèi)者線(xiàn)程,此時(shí)生產(chǎn)者線(xiàn)程都將被阻塞,而不是向notifyall()那樣喚醒所有的線(xiàn)程。
import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LeetCode215 {
//使用 ReentrantLock變量來(lái)進(jìn)行
private final Lock lock = new ReentrantLock();
//條件變量 通知生產(chǎn)者
private final Condition isFull = lock.newCondition();
//條件變量 通知消費(fèi)者的
private final Condition isEmpty = lock.newCondition();
private static int count =0;
private static final int num = 10;
public static void main(String[] args){
System.out.println("test");
LeetCode215 test1 = new LeetCode215();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
}
//生產(chǎn)者
class Producer implements Runnable{
@Override
public void run() {
//進(jìn)行一個(gè)循環(huán)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
//進(jìn)行加鎖
lock.lock();
try {
//如果緩沖區(qū)滿(mǎn)了
while (count == num) {
try {
//刮起生產(chǎn)者的線(xiàn)程,暫時(shí)不能生產(chǎn)了。
isFull.await();
} catch (Exception e) {
e.printStackTrace();
}
}
//
count++;
System.out.println("生產(chǎn)者:"+Thread.currentThread().getName()+"產(chǎn)生數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
//就可以通知消費(fèi)者,可以開(kāi)始消費(fèi)了
isEmpty.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable{
public void run(){
//不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
//加鎖
lock.lock();
try{
//如果緩沖區(qū)是空的
while(count==0){
try{
//消費(fèi)者要開(kāi)始等待了
isEmpty.await();
}catch (Exception e){
e.printStackTrace();
}
}
//開(kāi)始消費(fèi)
count--;
System.out.println("消費(fèi)者:"+Thread.currentThread().getName()+"消費(fèi)數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
//通知所有的生產(chǎn)者線(xiàn)程開(kāi)始生產(chǎn)數(shù)據(jù)
isFull.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}
}
第三種方法 使用BlockingQueue 實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者
BlockingQueue即阻塞隊(duì)列,從阻塞這個(gè)詞可以看出,在某些情況下對(duì)阻塞隊(duì)列的訪(fǎng)問(wèn)可能會(huì)造成阻塞。被阻塞的情況主要有如下兩種:
- 當(dāng)隊(duì)列滿(mǎn)了的時(shí)候進(jìn)行入隊(duì)列操作
-
當(dāng)隊(duì)列空了的時(shí)候進(jìn)行出隊(duì)列操作
因此,當(dāng)一個(gè)線(xiàn)程對(duì)已經(jīng)滿(mǎn)了的阻塞隊(duì)列進(jìn)行入隊(duì)操作時(shí)會(huì)阻塞,除非有另外一個(gè)線(xiàn)程進(jìn)行了出隊(duì)操作,當(dāng)一個(gè)線(xiàn)程對(duì)一個(gè)空的阻塞隊(duì)列進(jìn)行出隊(duì)操作時(shí)也會(huì)阻塞,除非有另外一個(gè)線(xiàn)程進(jìn)行了入隊(duì)操作。
從上可知,阻塞隊(duì)列是線(xiàn)程安全的。
下面是BlockingQueue接口的一些方法:
image.png
這四類(lèi)方法分別對(duì)應(yīng)的是:
- ThrowsException:如果操作不能馬上進(jìn)行,則拋出異常
- SpecialValue:如果操作不能馬上進(jìn)行,將會(huì)返回一個(gè)特殊的值,一般是true或者false
- Blocks:如果操作不能馬上進(jìn)行,操作會(huì)被阻塞
- TimesOut:如果操作不能馬上進(jìn)行,操作會(huì)被阻塞指定的時(shí)間,如果指定時(shí)間沒(méi)執(zhí)行,則返回一個(gè)特殊值,一般是true或者false
下面來(lái)看由阻塞隊(duì)列實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者模型,這里我們使用take()和put()方法,這里生產(chǎn)者和生產(chǎn)者,消費(fèi)者和消費(fèi)者之間不存在同步,所以會(huì)出現(xiàn)連續(xù)生成和連續(xù)消費(fèi)的現(xiàn)象
ps:下面的代碼是生產(chǎn)者和消費(fèi)者之間的關(guān)系是沒(méi)有問(wèn)題的,但是count是不對(duì)的,因?yàn)閎lockingqueue的put和take操作是線(xiàn)程安全的,后面的num++ 和 num-- 不一定線(xiàn)程安全。
import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LeetCode215 {
private static BlockingQueue queue = new ArrayBlockingQueue<>(10);
private static int count = 0;
public static void main(String[] args){
LeetCode215 test1 = new LeetCode215();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
}
//生產(chǎn)者
class Producer implements Runnable{
@Override
public void run() {
//進(jìn)行一個(gè)循環(huán)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
try {
// 往隊(duì)列里面放入元素,put方法的好處就是如果這個(gè)時(shí)候blockingqueue已經(jīng)滿(mǎn)了,那么這個(gè)線(xiàn)程就會(huì)自動(dòng)阻塞,直到有空閑
queue.put(1);
//緩沖區(qū)數(shù)量的標(biāo)示位
count ++;
System.out.println("生產(chǎn)者:"+Thread.currentThread().getName()+"生產(chǎn)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
public void run(){
//不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
try{
//從blockingqueue里面拿元素,同樣如果隊(duì)列是空,就阻塞了
queue.take();
//緩沖區(qū)的數(shù)量剪1
count --;
System.out.println("消費(fèi)者:"+Thread.currentThread().getName()+"消費(fèi)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
第四種方法 使用Semaphore(信號(hào)量) 實(shí)現(xiàn)
Java并發(fā)工具類(lèi)(信號(hào)量Semaphore)
關(guān)鍵是理解信號(hào)量這個(gè)操作:
isFull信號(hào)量 初始值為10 表示還可以生產(chǎn)10個(gè),生產(chǎn)一個(gè)就 acquire 減1,同時(shí) isEmpty 執(zhí)行 release 加1。
isEmpty信號(hào)量 表示還可以消費(fèi)多少個(gè),初始值為0,表示沒(méi)有可消費(fèi)的,每消費(fèi)一個(gè),就要先 acquire 減 1,同時(shí) isFull 執(zhí)行 release 加1。
import java.rmi.server.ExportException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LeetCode215 {
private static int count = 0;
//isFull信號(hào)量 10 表示緩沖區(qū)的數(shù)量為10,表示還可以生產(chǎn)多少個(gè)
static Semaphore isFull = new Semaphore(10);
//isEmpty信號(hào)量 表示還可以消費(fèi)多少個(gè)
static Semaphore isEmpty = new Semaphore(0);
//互斥鎖
static Semaphore isUse = new Semaphore(1);
public static void main(String[] args){
LeetCode215 test1 = new LeetCode215();
new Thread(test1.new Producer()).start();
new Thread(test1.new Consumer()).start();
new Thread(test1.new Producer()).start();
}
//生產(chǎn)者
class Producer implements Runnable{
@Override
public void run() {
//進(jìn)行一個(gè)循環(huán)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
try {
//首先拿到 isFull 信號(hào)量,表示
isFull.acquire();
//拿到控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
isUse.acquire();
//緩沖區(qū)數(shù)量的標(biāo)示位
count ++;
System.out.println("生產(chǎn)者:"+Thread.currentThread().getName()+"生產(chǎn)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
}catch (Exception e){
e.printStackTrace();
}finally {
//釋放控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
isUse.release();
//isEmpty的信號(hào)量加1,表示緩沖區(qū)有數(shù)據(jù),可以在消費(fèi)一個(gè)
isEmpty.release();
}
}
}
}
class Consumer implements Runnable{
public void run(){
//不斷的執(zhí)行循環(huán),來(lái)進(jìn)行消費(fèi)數(shù)據(jù)
while(true){
//消費(fèi)時(shí)間的一個(gè)間隔,避免大量的時(shí)間用于執(zhí)行循環(huán)
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
try{
//默認(rèn)是0,緩沖區(qū)里面沒(méi)數(shù)據(jù),要消費(fèi)數(shù)據(jù)需要先申請(qǐng)
isEmpty.acquire();
//拿到控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
isUse.acquire();
//緩沖區(qū)的數(shù)量減1
count --;
System.out.println("消費(fèi)者:"+Thread.currentThread().getName()+"消費(fèi)了數(shù)據(jù),緩沖區(qū)的數(shù)量為:"+count);
}catch (Exception e){
e.printStackTrace();
}finally {
//釋放控制生產(chǎn)者 和 消費(fèi)者 的互斥信號(hào)量
isUse.release();
//isFull 加1,表示可以再生產(chǎn)1個(gè)
isFull.release();
}
}
}
}
}
