我在前段時(shí)間寫了一篇關(guān)于AQS的文章,在文章里邊我說幾乎所有在JUC包中的所有多線程相關(guān)的類都和AQS相關(guān),今天我就在這里總結(jié)一下另一個(gè)依賴于AQS來實(shí)現(xiàn)的同步工具類:BlockingQueue。我們主要以ArrayBlockingQueue為主來分析相關(guān)的源碼。
阻塞隊(duì)列
相信大多數(shù)同學(xué)都是在學(xué)習(xí)線程池相關(guān)知識時(shí)了解到阻塞隊(duì)列的概念的。知道各種類型的阻塞隊(duì)列對線程池初始化時(shí)的影響。在java doc中這樣定義阻塞隊(duì)列。當(dāng)從阻塞隊(duì)列獲取元素但是隊(duì)列為空時(shí),當(dāng)前線程會(huì)阻塞直到另一個(gè)線程向阻塞隊(duì)列中添加一個(gè)元素;類似的,當(dāng)向一個(gè)阻塞隊(duì)列加入元素時(shí),如果隊(duì)列已經(jīng)滿了,當(dāng)前線程也會(huì)阻塞知道另外一個(gè)線程從隊(duì)列中讀取一個(gè)元素。阻塞隊(duì)列一般都是FIFO,用來實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模式。阻塞隊(duì)列的方法通過四種不同的方式來處理操作無法被立即完成的情況,這四種情況分別為拋出異常,返回特殊值(null或在是false),阻塞當(dāng)前線程直到執(zhí)行結(jié)束,最后一種是只阻塞固定時(shí)間,然后還未執(zhí)行成功就放棄操作。這些方法都總結(jié)在下邊這種表中了。

我們就只分析put和take方法。
put和take函數(shù)
我們都知道,使用同步隊(duì)列可以很輕松的實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式,其實(shí),同步隊(duì)列就是按照生產(chǎn)者-消費(fèi)者的模式來實(shí)現(xiàn)的,我們可以將put函數(shù)看作生產(chǎn)者的操作,take是消費(fèi)者的操作。
?put函數(shù)會(huì)在隊(duì)列末尾添加元素,如果隊(duì)列已經(jīng)滿了,無法添加元素的話,就一直阻塞等待到可以加入為止。函數(shù)的源碼如下所示。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //先獲得鎖
try {
while (count == items.length)
//如果隊(duì)列滿了,就NotFull這個(gè)condition對象上進(jìn)行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//這里可以注意的是ArrayBlockingList實(shí)際上使用Array實(shí)現(xiàn)了一個(gè)環(huán)形數(shù)組,
//當(dāng)putIndex達(dá)到最大時(shí),就返回到起點(diǎn),繼續(xù)插入,
//當(dāng)然,如果此時(shí)0位置的元素還沒有被取走,
//下次put時(shí),就會(huì)因?yàn)閏out == item.length未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
//因?yàn)椴迦肓嗽?,通知等待notEmpty事件的線程。
notEmpty.signal();
}
我們會(huì)發(fā)現(xiàn)put函數(shù)也是使用了wait/notify的機(jī)制。與一般生產(chǎn)者-消費(fèi)者的實(shí)現(xiàn)方式不同,同步隊(duì)列使用ReentrantLock和Condition相結(jié)合的先獲得鎖,再等待的機(jī)制;而不是synchronized和Object.wait的機(jī)制。這里的區(qū)別我們下一節(jié)再詳細(xì)講解。
?看完了生產(chǎn)者相關(guān)的put函數(shù),我們再來看一下消費(fèi)者調(diào)用的take函數(shù)。take函數(shù)在隊(duì)列為空時(shí)會(huì)被阻塞,一直到阻塞隊(duì)列加入了新的元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//如果隊(duì)列為空,那么在notEmpty對象上等待,
//當(dāng)put函數(shù)調(diào)用時(shí),會(huì)調(diào)用notEmpty的notify進(jìn)行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; //取出takeIndex位置的元素
if (++takeIndex == items.length)
//如果到了尾部,將指針重新調(diào)整到頭部
takeIndex = 0;
count--;
....
//通知notFull對象上等待的線程
notFull.signal();
return x;
}
Condition.await和Object.wait
我們發(fā)現(xiàn)ArrayBlockingList并沒有使用Object.wait,而是使用的Condition.await,這是為什么呢?其中又有哪些原因呢?
?Condition對象可以提供和Object的wait和notify一樣的行為,但是后者必須使用synchronized這個(gè)內(nèi)置的monitor鎖,而Condition使用的是RenentranceLock。這兩種方式在阻塞等待時(shí)都會(huì)將相應(yīng)的鎖釋放掉,但是Condition的等待可以中斷,這是二者唯一的區(qū)別。
 Condition的流程大致如下邊兩張圖所示.


我們首先來看一下await函數(shù)的實(shí)現(xiàn),詳細(xì)的講解都在代碼中.
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在condition wait隊(duì)列上添加新的節(jié)點(diǎn)
Node node = addConditionWaiter();
//釋放當(dāng)前持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//由于node在之前是添加到condition wait queue上的,現(xiàn)在判斷這個(gè)node
//是否被添加到Sync的獲得鎖的等待隊(duì)列上。
//node在condition queue上說明還在等待事件的notify,
//notify函數(shù)會(huì)將condition queue 上的node轉(zhuǎn)化到Sync的隊(duì)列上。
while (!isOnSyncQueue(node)) {
//node還沒有被添加到Sync Queue上,說明還在等待事件通知
//所以調(diào)用park函數(shù)來停止線程執(zhí)行
LockSupport.park(this);
//判斷是否被中斷,線程從park函數(shù)返回有兩種情況,一種是
//其他線程調(diào)用了unpark,另外一種是線程被中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//代碼執(zhí)行到這里,已經(jīng)有其他線程調(diào)用notify函數(shù),或則被中斷,該線程可以繼續(xù)執(zhí)行,但是必須先
//再次獲得調(diào)用await函數(shù)時(shí)的鎖.a(chǎn)cquireQueued函數(shù)在AQS文章中做了介紹.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
....
}
final int fullyRelease(Node node) {
//AQS的方法,當(dāng)前已經(jīng)在鎖中了,所以直接操作
boolean failed = true;
try {
int savedState = getState();
//獲取state當(dāng)前的值,然后保存,以待以后恢復(fù)
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
//中斷可能發(fā)生在兩個(gè)階段中,一是在等待singla,另外一個(gè)是在獲得signal之后
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//這里要和下邊的transferForSignal對應(yīng)著看,這是線程中斷進(jìn)入的邏輯.那邊是signal的邏輯
//兩邊可能有并發(fā)沖突,但是成功的一方必須調(diào)用enq來進(jìn)入acquire lock queue中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//如果失敗了,說明transferForSignal那邊成功了,等待node 進(jìn)入acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
signal函數(shù)將等待事件最長時(shí)間的線程節(jié)點(diǎn)從等待condition的隊(duì)列移動(dòng)到獲得lock的等待隊(duì)列中.
public final void signal() {
//
if (!isHeldExclusively())
//如果當(dāng)前線程沒有獲得鎖,拋出異常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//將Condition wait queue中的第一個(gè)node轉(zhuǎn)移到acquire lock queue中.
doSignal(first);
}
private void doSignal(Node first) {
do {
//由于生產(chǎn)者的signal在有消費(fèi)者等待的情況下,必須要通知
//一個(gè)消費(fèi)者,所以這里有一個(gè)循環(huán),直到隊(duì)列為空
//把first 這個(gè)node從condition queue中刪除掉
//condition queue的頭指針指向node的后繼節(jié)點(diǎn),如果node后續(xù)節(jié)點(diǎn)為null,那么也將尾指針也置為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal將node轉(zhuǎn)而添加到Sync的acquire lock 隊(duì)列
}
final boolean transferForSignal(Node node) {
//如果設(shè)置失敗,說明該node已經(jīng)被取消了,所以返回false,讓doSignal繼續(xù)向下通知其他未被取消的node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將node添加到acquire lock queue中.
Node p = enq(node);
int ws = p.waitStatus;
//需要注意的是這里的node進(jìn)行了轉(zhuǎn)化
//ws>0代表canceled的含義所以直接unpark線程
//如果compareAndSetWaitStatus失敗,所以直接unpark,讓線程繼續(xù)執(zhí)行await中的
//進(jìn)行isOnSyncQueue判斷的while循環(huán),然后進(jìn)入acquireQueue函數(shù).
//這里失敗的原因可能是Lock其他線程釋放掉了鎖,同步設(shè)置p的waitStatus
//如果compareAndSetWaitStatus成功了呢?那么該node就一直在acquire lock queue中
//等待鎖被釋放掉再次搶奪鎖,然后再unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
后記
?后邊一篇文章主要講解如何自己使用AQS來創(chuàng)建符合自己業(yè)務(wù)需求的鎖,請大家繼續(xù)關(guān)注我的文章啦.一起進(jìn)步偶.