志梳理下,生產(chǎn)者消費(fèi)者模式
簡(jiǎn)單的模型
先從一個(gè)例子開(kāi)始吧,有一些角色我先聲明如下:
- 餐廳(Restaurant)--->載體
- 廚師(Chef) --->生產(chǎn)者
- 服務(wù)員(WaiterPerson) --->消費(fèi)者
- 食物(Meaf)--->被消費(fèi)
我梳理一下它們的工作流程:
故事的地點(diǎn)發(fā)生在餐廳,它是載體,包括了廚師、服務(wù)員、食物。
廚師在餐廳做飯,做完飯,飯放在櫥窗,通知服務(wù)員端走,送給客人吃完;期間,廚師會(huì)不斷地監(jiān)控櫥窗的食物是否被端走,如果端走則繼續(xù)做新的食物,否則等待。
服務(wù)員也不能閑著,它時(shí)刻留心著櫥窗是否有食物上架,如果沒(méi)有則繼續(xù)等待。如果有食物則端走,并通知廚師,我端走食物了,你可以做新食物了。
那么按照上面的步驟,首先我們看看生產(chǎn)者Chef的基本代碼
synchronized(this){
while (restaurant.meal != null) {
wait();
}
}
上面代碼表示,倘若食物已經(jīng)做好一份了,廚師不斷監(jiān)控櫥窗上面的食物,如果沒(méi)有被服務(wù)員端走(消費(fèi)),那我廚師就繼續(xù)等待,多休息一會(huì)。注意這里用while而不是if是因?yàn)榉乐苟鄠€(gè)消費(fèi)者產(chǎn)生競(jìng)爭(zhēng)引起并發(fā)問(wèn)題。
System.out.println("飯做好了,訂單生成...")。
synchronized(restaurant.waiter){
restaurant.meal=new Meal();
restaurant.waiter.notifyAll();
}
上面代碼表示 ,廚師沒(méi)有等待了,他開(kāi)始做飯(生產(chǎn)),完成生產(chǎn)食物后,廚師通知(notifyAll)正在櫥窗等待食物的服務(wù)員,叫他去端菜(消費(fèi))。
那消費(fèi)者Waiter的流程呢?我想過(guò)程應(yīng)該是和生產(chǎn)者恰好是對(duì)立的。
synchronized(this){
while(restaurant.meal==null){
wait();
}
}
上面代碼表示,服務(wù)員不斷監(jiān)控櫥窗上面的食物有沒(méi)有做好,如果沒(méi)有做好,那我服務(wù)員就繼續(xù)等待。 是吧?和前面的生產(chǎn)者的判斷條件剛好對(duì)立。
System.out.println("我服務(wù)員把飯端走了...")。
synchronized(restaurant.chef){
restaurant.meal=null;
restaurant.chef.notifyAll();
}
上面代碼表示 ,服務(wù)員被廚師通知端飯(消費(fèi))了,于是他開(kāi)始端飯送個(gè)客人,導(dǎo)致櫥窗上沒(méi)有飯了,之后,服務(wù)員通知(notifyAll)櫥窗口正在等待的廚師去做下一道菜(生產(chǎn))。
通過(guò)上面的例子,我們可以初步了解生產(chǎn)者與消費(fèi)者的工作模式。但是實(shí)際開(kāi)發(fā)場(chǎng)景中,應(yīng)該有不止一個(gè)生產(chǎn)者或者消費(fèi)者,而且食物應(yīng)該很多,那么這個(gè)時(shí)候我們應(yīng)該引入隊(duì)列(Queque)這個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)管理它們了。
利用隊(duì)列管理生產(chǎn)者與消費(fèi)者
我們可以設(shè)想一下,在餐廳中的業(yè)務(wù)場(chǎng)景,廚師chef應(yīng)該作為Runable角色可以有多個(gè),我們可以用Excutor.submit(r)提交很多個(gè)廚師,讓其工作, 而服務(wù)員我們也可以有多個(gè),同理,我們也把他放入線程池去運(yùn)行。 而食物Meal也有多個(gè),并且我們要用一個(gè)數(shù)據(jù)結(jié)構(gòu)存取它,讓它作為廚師和服務(wù)員兩者共同占有的資源又能做好同步處理。在上面的例子中,我們用wait(),notifyAll(),synchronized等方法進(jìn)行食物的同步與通信。它們有一個(gè)明顯的缺點(diǎn),我們發(fā)現(xiàn)代碼很是耦合,晦澀難懂,暫且不談性能。
讓開(kāi)發(fā)者欣慰的事,JDK中提供了BlockQueque接口來(lái)存取“食物”。它是一個(gè)阻塞隊(duì)列的數(shù)據(jù)結(jié)構(gòu)。在這里,我們需要了解兩點(diǎn);
在開(kāi)發(fā)過(guò)程中,"食物"常常指是的IO流。如網(wǎng)絡(luò)編程中,服務(wù)端與客戶端發(fā)送字節(jié)流相互通信?,F(xiàn)在有netty或者nio等異步非阻塞IO的框架,讓并發(fā)性能更佳。
-
阻塞是為了保證生產(chǎn)者與消費(fèi)者步調(diào)一致,不要產(chǎn)生大量浪費(fèi)的食物,消費(fèi)者吃不完,導(dǎo)致資源耗盡。亦或者消費(fèi)者盲目的去找生產(chǎn)者要食物,太多消費(fèi)者擁擠,也會(huì)消耗資源。所以在剛剛開(kāi)始的時(shí)候,jdk做了這個(gè)BlockQueque來(lái)管理食物和生產(chǎn)者和消費(fèi)者通信。 生產(chǎn)者要生產(chǎn)食物如下面的代碼:
@Override public void run() { try { while (!Thread.interrupted()) { Meal meal = new Meal(++count); mBlockQueque.put(meal);// 如果mBlockQueque容量不為empty則阻塞等待。 TimeUnit.SECONDS.sleep(2);//模擬生產(chǎn)耗時(shí)任務(wù)。 } } catch (InterruptedException e) { System.out.println("Chef sleep end interrupted..."); e.printStackTrace(); } }
上面mBlockQueque.put()為阻塞方法(如果櫥窗(隊(duì)列)還有食物未被領(lǐng)取,則等待不生產(chǎn)食物,否則生產(chǎn)食物并添加至櫥窗),如注釋上的說(shuō)明,它的作用類(lèi)似wait()/add();我們跟蹤下源碼:
/**
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock(); // ---(1)
try {
while (!linkFirst(node))
notFull.await(); ---(2)
} finally {
lock.unlock();
}
}
我解析下上面的代碼:put()是一個(gè)接口方法,它具體的實(shí)現(xiàn)方法之一是putFirst,給鏈表首位添加一個(gè)元素。
- (1)此處有l(wèi)ock-finally-unlock組成的臨界區(qū)。它的作用類(lèi)似synchronized,用來(lái)同步。它們之間不同的地方是:
一、用synchronized聲明鎖時(shí),任務(wù)A和任務(wù)B,都要獲取鎖O,如果A首先獲得鎖O,B則一直等待直到A釋放鎖,B一直阻塞著不能被中斷。
二、用lock-finally-unlock聲明鎖時(shí),任務(wù)A和任務(wù)B,都要獲取鎖O,如果A首先獲得鎖O,B可以等待一段時(shí)間,不想等待了,可以自行中斷。A如果想釋放鎖必須在finally后調(diào)用unlock。所以說(shuō)我覺(jué)得lock更加靈活。
但是在大多數(shù)資源競(jìng)爭(zhēng)不太激烈的情況下,我們還是用synchronized足夠了。
- (2)此處notFull是Condition的實(shí)例。它提供更好的性能,通過(guò)await()/signal()方法扮演之前的wait()/notify()的角色。 這里代碼是指while判斷鏈表是否超過(guò)容量,返回false時(shí),則調(diào)用await()阻塞等待當(dāng)前任務(wù)線程。
我們分析完生產(chǎn)者chef,我們來(lái)看看消費(fèi)者waiter的改造后的代碼:
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Meal meal = mBlockQueque.take();//從隊(duì)列中remove出一個(gè)食物,沒(méi)有食物則阻塞等待
meal.run();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上述代碼中,mBlockQueque.take()是一個(gè)可阻塞方法。它試圖從櫥窗隊(duì)列上取食物,如果發(fā)現(xiàn)沒(méi)有食物就阻塞消費(fèi)者線程??纯磘ake()的具體實(shí)現(xiàn)的源碼:
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null) // ---(1)
notEmpty.await(); ---(2)
return x;
} finally {
lock.unlock();
}
}
(1)takeFisrt()方法中會(huì)有去調(diào)用unlinkFirst()去隊(duì)列返回一個(gè)食物,如果有食物,就返回,并調(diào)用notFull.signal()喚醒正在阻塞的生產(chǎn)者線程。
(2) notEmpty是另外一個(gè)Condition實(shí)例,它用來(lái)和消費(fèi)者線程通信。如果發(fā)現(xiàn)返回的食物為空,則notEmpty.await()讓消費(fèi)者線程阻塞等待。
至此。我們看到我們把具體的通信交互過(guò)程封裝到了阻塞隊(duì)列BlockQueue里。 生產(chǎn)者只需要調(diào)用take通信,消費(fèi)者只需調(diào)用put通信。如下圖:

寫(xiě)到這里了,那生產(chǎn)者與消費(fèi)者模式有哪些實(shí)際應(yīng)用呢? 我想線程池應(yīng)該是應(yīng)用最廣泛的地方。下一篇我將詳細(xì)介紹線程池的原理。
注:部分參考自《Java 編程思想》