背景
????實(shí)質(zhì)上,很多后臺服務(wù)程序并發(fā)控制的基本原理都可以歸納為生產(chǎn)者/消費(fèi)者模式,而這是恰恰是在本科操作系統(tǒng)課堂上老師反復(fù)講解,而我們卻視而不見不以為然的。
????生產(chǎn)者消費(fèi)者問題是研究多線程程序時(shí)繞不開的經(jīng)典問題之一,它描述是有一塊緩沖區(qū)作為倉庫,生產(chǎn)者可以將產(chǎn)品放入倉庫,消費(fèi)者則可以從倉庫中取走產(chǎn)品。解決生產(chǎn)者/消費(fèi)者問題的方法可分為兩類:
(1)采用某種機(jī)制保護(hù)生產(chǎn)者和消費(fèi)者之間的同步;
(2)在生產(chǎn)者和消費(fèi)者之間建立一個(gè)管道。
????第一種方式有較高的效率,并且易于實(shí)現(xiàn),代碼的可控制性較好,屬于常用的模式。第二種管道緩沖區(qū)不易控制,被傳輸數(shù)據(jù)對象不易于封裝等,實(shí)用性不強(qiáng)。因此本文只介紹同步機(jī)制實(shí)現(xiàn)的生產(chǎn)者/消費(fèi)者問題。
同步問題核心在于:如何保證同一資源被多個(gè)線程并發(fā)訪問時(shí)的完整性。常用的同步方法是采用信號或加鎖機(jī)制,保證資源在任意時(shí)刻至多被一個(gè)線程訪問。Java語言在多線程編程上實(shí)現(xiàn)了完全對象化,提供了對同步機(jī)制的良好支持。在Java中一共有四種方法支持同步,其中前三個(gè)是同步方法,一個(gè)是管道方法。
wait() / notify()方法
await() / signal()方法
BlockingQueue阻塞隊(duì)列方法
-
PipedInputStream / PipedOutputStream
本文只介紹最常用的前三種,第四種暫不做討論。
一. wait() / notify()方法
????wait() / nofity()方法是基類Object的兩個(gè)方法,也就意味著所有Java類都會擁有這兩個(gè)方法,這樣,我們就可以為任何對象實(shí)現(xiàn)同步機(jī)制。
wait()方法:當(dāng)緩沖區(qū)已滿/空時(shí),生產(chǎn)者/消費(fèi)者線程停止自己的執(zhí)行,放棄鎖,使自己處于等等狀態(tài),讓其他線程執(zhí)行。
notify()方法:當(dāng)生產(chǎn)者/消費(fèi)者向緩沖區(qū)放入/取出一個(gè)產(chǎn)品時(shí),向其他等待的線程發(fā)出可執(zhí)行的通知,同時(shí)放棄鎖,使自己處于等待狀態(tài)。
來段代碼就明白了:
public class Main {
public static void main(String[] args) {
// write your code here
Main main = new Main();
main.test();
}
public void test() {
// 倉庫對象
Storage storage = new Storage();
// 生產(chǎn)者對象
Producer p1 = new Producer(storage);
Producer p2 = new Producer(storage);
Producer p3 = new Producer(storage);
Producer p4 = new Producer(storage);
Producer p5 = new Producer(storage);
Producer p6 = new Producer(storage);
Producer p7 = new Producer(storage);
// 消費(fèi)者對象
Consumer c1 = new Consumer(storage);
Consumer c2 = new Consumer(storage);
Consumer c3 = new Consumer(storage);
// 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
// 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
// 線程開始執(zhí)行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
/**
* 倉庫類Storage實(shí)現(xiàn)緩沖區(qū)
*/
class Storage {
// 倉庫最大存儲量
private final int MAX_SIZE = 100;
// 倉庫存儲的載體
private LinkedList<Object> list = new LinkedList<>();
// 生產(chǎn)num個(gè)產(chǎn)品
public void produce(int num) {
synchronized (list) {
// 如果倉庫剩余容量不足
while (list.size() + num > MAX_SIZE) {
System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
// 由于條件不滿足,生產(chǎn)阻塞
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到這里說明,生產(chǎn)條件滿足情況,生產(chǎn)num個(gè)產(chǎn)品
for (int i = 1; i <= num; ++i) {
list.add(new Object());
}
System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
list.notifyAll();
}
}
// 消費(fèi)num個(gè)產(chǎn)品
public void consume(int num) {
// 同步代碼段
synchronized (list) {
// 如果倉庫存儲量不足
while (list.size() < num) {
System.out.println("【要消費(fèi)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行消費(fèi)任務(wù)!");
try {
// 由于條件不滿足,消費(fèi)阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到這里說明, 消費(fèi)條件滿足情況,消費(fèi)num個(gè)產(chǎn)品
for (int i = 1; i <= num; ++i) {
list.remove();
}
System.out.println("【已經(jīng)消費(fèi)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
list.notifyAll();
}
}
public LinkedList<Object> getList() {
return list;
}
public void setList(LinkedList<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
/**
* 生產(chǎn)者類Producer繼承線程類Thread
*/
public class Producer extends Thread {
// 每次生產(chǎn)的產(chǎn)品數(shù)量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構(gòu)造函數(shù),設(shè)置倉庫
public Producer(Storage storage) {
this.storage = storage;
}
// 線程run函數(shù)
@Override
public void run() {
produce(num);
}
// 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
/**
* 消費(fèi)者類Consumer繼承線程類Thread
*/
public class Consumer extends Thread {
// 每次消費(fèi)的產(chǎn)品數(shù)量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構(gòu)造函數(shù),設(shè)置倉庫
public Consumer(Storage storage) {
this.storage = storage;
}
// 線程run函數(shù)
@Override
public void run() {
consume(num);
}
// 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
public void consume(int num) {
storage.consume(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
}
二. await() / signal()方法
????在JDK5.0之后,Java提供了更加健壯的線程處理機(jī)制,包括同步、鎖定、線程池等,它們可以實(shí)現(xiàn)更細(xì)粒度的線程控制。await()和signal()就是其中用來做同步的兩種方法,它們的功能基本上和wait() / nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機(jī)制Lock直接掛鉤,具有更大的靈活性。通過在Lock對象上調(diào)用newCondition()方法,將條件變量和一個(gè)鎖對象進(jìn)行綁定,進(jìn)而控制并發(fā)程序訪問競爭資源的安全。
? await()方法會使得當(dāng)前線程等待,同時(shí)釋放鎖,當(dāng)其他線程使用sinal()方法或者sinalAll()方法時(shí),線程會重新獲得鎖并繼續(xù)執(zhí)行?;蛘弋?dāng)線程被中斷時(shí),也能跳出等待,和wait相似。
? awaitUninterruptibly()方法與await()基本一致,但是并不會在等待過程中響應(yīng)中斷。
? sinal()方法用于喚醒一個(gè)在等待中的線程,sinalAll()方法會喚醒所有在等待中的線程。這和Object.notify()很相似。
下面來看condition實(shí)現(xiàn)阻塞隊(duì)列代碼:
public class Main {
public static void main(String[] args) throws InterruptedException {
// write your code here
Main main = new Main();
main.test();
}
public void test() {
// 倉庫對象
Storage storage = new Storage();
// 生產(chǎn)者對象
Producer p1 = new Producer(storage);
Producer p2 = new Producer(storage);
Producer p3 = new Producer(storage);
Producer p4 = new Producer(storage);
Producer p5 = new Producer(storage);
Producer p6 = new Producer(storage);
Producer p7 = new Producer(storage);
// 消費(fèi)者對象
Consumer c1 = new Consumer(storage);
Consumer c2 = new Consumer(storage);
Consumer c3 = new Consumer(storage);
// 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
// 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
// 線程開始執(zhí)行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
/**
* 倉庫類Storage實(shí)現(xiàn)緩沖區(qū)
*/
class Storage {
// 倉庫最大存儲量
private final int MAX_SIZE = 100;
// 倉庫存儲的載體
private LinkedList<Object> list = new LinkedList<>();
// 鎖
private final Lock lock = new ReentrantLock(true);
// 倉庫滿的條件變量
private final Condition full = lock.newCondition();
// 倉庫空的條件變量
private final Condition empty = lock.newCondition();
// 生產(chǎn)num個(gè)產(chǎn)品
public void produce(int num) {
// 獲得鎖
lock.lock();
// 如果倉庫剩余容量不足
while (list.size() + num > MAX_SIZE) {
System.out.println("【要生產(chǎn)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
// 由于條件不滿足,生產(chǎn)阻塞
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到這里說明,生產(chǎn)條件滿足情況,生產(chǎn)num個(gè)產(chǎn)品
for (int i = 1; i <= num; ++i) {
list.add(new Object());
}
System.out.println("【已經(jīng)生產(chǎn)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
// 喚醒其他所有線程
empty.signalAll();
// 釋放鎖
lock.unlock();
}
// 消費(fèi)num個(gè)產(chǎn)品
public void consume(int num) {
// 同步代碼段
lock.lock();
// 如果倉庫存儲量不足
while (list.size() < num) {
System.out.println("【要消費(fèi)的產(chǎn)品數(shù)量】:" + num + "\t【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行消費(fèi)任務(wù)!");
try {
// 由于條件不滿足,消費(fèi)阻塞
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 到這里說明, 消費(fèi)條件滿足情況,消費(fèi)num個(gè)產(chǎn)品
for (int i = 1; i <= num; ++i) {
list.remove();
}
System.out.println("【已經(jīng)消費(fèi)產(chǎn)品數(shù)】:" + num + "\t\t【現(xiàn)倉儲量為】:" + list.size());
// 喚醒其他所有線程
full.signalAll();
// 釋放鎖
lock.unlock();
}
public LinkedList<Object> getList() {
return list;
}
public void setList(LinkedList<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
/**
* 生產(chǎn)者類Producer繼承線程類Thread
*/
public class Producer extends Thread {
// 每次生產(chǎn)的產(chǎn)品數(shù)量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構(gòu)造函數(shù),設(shè)置倉庫
public Producer(Storage storage) {
this.storage = storage;
}
// 線程run函數(shù)
@Override
public void run() {
produce(num);
}
// 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
/**
* 消費(fèi)者類Consumer繼承線程類Thread
*/
public class Consumer extends Thread {
// 每次消費(fèi)的產(chǎn)品數(shù)量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構(gòu)造函數(shù),設(shè)置倉庫
public Consumer(Storage storage) {
this.storage = storage;
}
// 線程run函數(shù)
@Override
public void run() {
consume(num);
}
// 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
public void consume(int num) {
storage.consume(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
}
三. BlockingQueue阻塞隊(duì)列方法
????BlockingQueue是JDK5.0的新增內(nèi)容,它是一個(gè)已經(jīng)在內(nèi)部實(shí)現(xiàn)了同步的隊(duì)列,實(shí)現(xiàn)方式采用的是 await() / signal()方法。它可以在生成對象時(shí)指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:類似于我們上面的生產(chǎn)者線程,容量達(dá)到最大時(shí),自動阻塞。
take()方法:類似于我們上面的消費(fèi)者線程,容量為0時(shí),自動阻塞。
關(guān)于BlockingQueue的內(nèi)容網(wǎng)上有很多,在這不多介紹。下面直接看代碼即可:
public class Main {
public static void main(String[] args) throws InterruptedException {
// write your code here
Main main = new Main();
main.test();
}
public void test() {
// 倉庫對象
Storage storage = new Storage();
// 生產(chǎn)者對象
Producer p1 = new Producer(storage);
Producer p2 = new Producer(storage);
Producer p3 = new Producer(storage);
Producer p4 = new Producer(storage);
Producer p5 = new Producer(storage);
Producer p6 = new Producer(storage);
Producer p7 = new Producer(storage);
// 消費(fèi)者對象
Consumer c1 = new Consumer(storage);
Consumer c2 = new Consumer(storage);
Consumer c3 = new Consumer(storage);
// 設(shè)置生產(chǎn)者產(chǎn)品生產(chǎn)數(shù)量
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
// 設(shè)置消費(fèi)者產(chǎn)品消費(fèi)數(shù)量
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
// 線程開始執(zhí)行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
/**
* 倉庫類Storage實(shí)現(xiàn)緩沖區(qū)
*/
class Storage {
// 倉庫最大存儲量
private final int MAX_SIZE = 100;
// 倉庫存儲的載體
private ArrayBlockingQueue<Object> list = new ArrayBlockingQueue<>(MAX_SIZE);
// 生產(chǎn)num個(gè)產(chǎn)品
public void produce(int num) {
// 如果倉庫剩余容量不足
while (list.size() >= MAX_SIZE) {
System.out.println("【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行生產(chǎn)任務(wù)!");
}
// 到這里說明,生產(chǎn)條件滿足情況,生產(chǎn)num個(gè)產(chǎn)品
for (int i = 1; i <= num; ++i) {
// 放入產(chǎn)品,自動阻塞
try {
list.put(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("【現(xiàn)倉儲量為】:" + list.size());
}
// 消費(fèi)num個(gè)產(chǎn)品
public void consume(int num) {
// 如果倉庫存儲量不足
while (list.size() <= 0) {
System.out.println("【庫存量】:" + list.size() + "\t\t暫時(shí)不能執(zhí)行消費(fèi)任務(wù)!");
}
// 到這里說明, 消費(fèi)條件滿足情況,消費(fèi)num個(gè)產(chǎn)品
for (int i = 1; i <= num; ++i) {
// 消費(fèi)產(chǎn)品,自動阻塞
try {
list.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("【現(xiàn)倉儲量為】:" + list.size());
}
public ArrayBlockingQueue<Object> getList() {
return list;
}
public void setList(ArrayBlockingQueue<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
/**
* 生產(chǎn)者類Producer繼承線程類Thread
*/
public class Producer extends Thread {
// 每次生產(chǎn)的產(chǎn)品數(shù)量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構(gòu)造函數(shù),設(shè)置倉庫
public Producer(Storage storage) {
this.storage = storage;
}
// 線程run函數(shù)
@Override
public void run() {
produce(num);
}
// 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
/**
* 消費(fèi)者類Consumer繼承線程類Thread
*/
public class Consumer extends Thread {
// 每次消費(fèi)的產(chǎn)品數(shù)量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構(gòu)造函數(shù),設(shè)置倉庫
public Consumer(Storage storage) {
this.storage = storage;
}
// 線程run函數(shù)
@Override
public void run() {
consume(num);
}
// 調(diào)用倉庫Storage的生產(chǎn)函數(shù)
public void consume(int num) {
storage.consume(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
}
????有時(shí)使用BlockingQueue可能會出現(xiàn)put()和System.out.println()輸出不匹配的情況,這是由于它們之間沒有同步造成的。當(dāng)緩沖區(qū)已滿,生產(chǎn)者在put()操作時(shí),put()內(nèi)部調(diào)用了await()方法,放棄了線程的執(zhí)行,然后消費(fèi)者線程執(zhí)行,調(diào)用take()方法,take()內(nèi)部調(diào)用了signal()方法,通知生產(chǎn)者線程可以執(zhí)行,致使在消費(fèi)者的println()還沒運(yùn)行的情況下生產(chǎn)者的println()先被執(zhí)行,所以有了輸出不匹配的情況。
對于BlockingQueue大家可以放心使用,這可不是它的問題,只是在它和別的對象之間的同步有問題。