在前一章節(jié)中,我們簡(jiǎn)單分析過(guò)aqs中加鎖以及阻塞的流程,這一章我們來(lái)分析一下condition條件阻塞工具的實(shí)現(xiàn)
## 什么是condition
condition是作為條件阻塞器,通過(guò)調(diào)用await,signal和signalAll方法來(lái)阻塞和喚醒線程,可以橫向?qū)Ρ鹊氖荗bject對(duì)象的wait,notify以及notifyAll方法,值得注意的是,與Object的wait需要跟synchronized結(jié)合使用一樣,condition也需要跟鎖結(jié)合使用,比如ReenTrantLock中的newCondition方法就是創(chuàng)建一個(gè)全新的條件阻塞器,而調(diào)用await方法也需要通過(guò)lock進(jìn)行加鎖才可以正常使用.
<!--more-->
## condition.await與Object.wait的區(qū)別
* 首先Object相關(guān)的阻塞方法都是通過(guò)本地方法實(shí)現(xiàn)的,而condition的阻塞和喚醒方法都是通過(guò)java調(diào)用來(lái)實(shí)現(xiàn)的,其次就是每個(gè)Object只能綁定一個(gè)阻塞器,即synchronized所綁定的對(duì)象,只有通過(guò)調(diào)用該對(duì)象的wait和notify方法才能實(shí)現(xiàn)阻塞以及喚醒,并且notify會(huì)在調(diào)用wait方法的線程中隨機(jī)挑選一個(gè)喚醒
* 而一個(gè)lock可以創(chuàng)建多個(gè)condition,例如ReentrantLock中的newCondition方法每次調(diào)用都會(huì)返回一個(gè)新的條件阻塞器,這樣做的好處是,調(diào)用condition方法的signal只會(huì)喚醒當(dāng)前condition調(diào)用await方法阻塞的線程,利用這種模式可以實(shí)現(xiàn)阻塞隊(duì)列,如經(jīng)典的ArrayBlockingQueue就是利用ReenTrantLock創(chuàng)建了兩個(gè)condition控制隊(duì)列空時(shí)的阻塞以及隊(duì)列滿時(shí)的阻塞
## await方法的實(shí)現(xiàn)
老規(guī)矩先上源碼
```java
public final void await() throws InterruptedException {
? ? ????????//檢測(cè)線程是否中斷
? ? ? ? ? ? if (Thread.interrupted())
? ? ? ? ? ? ? ? throw new InterruptedException();
? ? ????????//添加一個(gè)condition的waiter
? ? ? ? ? ? Node node = addConditionWaiter();
? ? ????????//釋放鎖
? ? ? ? ? ? int savedState = fullyRelease(node);
? ? ????????//打斷模式
? ? ? ? ? ? int interruptMode = 0;
? ? ? ? ? ? while (!isOnSyncQueue(node)) {
? ? ????????//進(jìn)行循環(huán)如果當(dāng)前線程不在同步隊(duì)列中,則阻塞線程
? ? ? ? ? ? ? ? LockSupport.park(this);
? ? ? ? ? ? ? ? if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ????????//如果在同步隊(duì)列中,則進(jìn)行獲取阻塞操作
? ? ? ? ? ? if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
? ? ? ? ? ? ? ? interruptMode = REINTERRUPT;
? ? ? ? ? ? if (node.nextWaiter != null) // clean up if cancelled
? ? ? ? ? ? ? ? //取消后續(xù)已經(jīng)被取消的等待線程
? ? ? ? ? ? ? ? unlinkCancelledWaiters();
? ? ? ? ? ? if (interruptMode != 0)
? ? ? ? ? ? ? ? reportInterruptAfterWait(interruptMode);
? ? ? ? }
```
await的實(shí)現(xiàn)并不難理解,而操作的Node節(jié)點(diǎn)與AQS中的node節(jié)點(diǎn)是一個(gè)對(duì)象,通過(guò)標(biāo)記node的waitStatus變量來(lái)判斷當(dāng)前node的狀態(tài),我們?cè)賮?lái)看看addConditionWaiter的實(shí)現(xiàn)
```java
private Node addConditionWaiter() {
? ? ? ? Node t = lastWaiter;
? ? ? ? //如果尾部的等待node被取消了,則遍歷取消所有的被取消的節(jié)點(diǎn)
? ? ? ? if (t != null && t.waitStatus != Node.CONDITION) {
? ? ? ? ? ? unlinkCancelledWaiters();
? ? ? ? ? ? t = lastWaiter;
? ? ? ? }
? ? ????//創(chuàng)建一個(gè)condition狀態(tài)的node節(jié)點(diǎn)
? ? ? ? Node node = new Node(Thread.currentThread(), Node.CONDITION);
? ? ????//如果尾結(jié)點(diǎn)是空證明是一個(gè)空隊(duì)列,將頭結(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn),否則將當(dāng)前節(jié)點(diǎn)插入當(dāng)前尾節(jié)點(diǎn)的后面
? ? ? ? if (t == null)
? ? ? ? ? ? firstWaiter = node;
? ? ? ? else
? ? ? ? ? ? t.nextWaiter = node;
? ? ? ? lastWaiter = node;
? ? ? ? return node;
}
private void unlinkCancelledWaiters() {
? ? ? ? ? ? Node t = firstWaiter;
? ? ? ? ? ? Node trail = null;
? ? ????????//遍歷取消所有節(jié)點(diǎn)狀態(tài)不是condition的節(jié)點(diǎn)
? ? ? ? ? ? while (t != null) {
? ? ? ? ? ? ? ? Node next = t.nextWaiter;
? ? ? ? ? ? ? ? if (t.waitStatus != Node.CONDITION) {
? ? ? ? ? ? ? ? ? ? t.nextWaiter = null;
? ? ? ? ? ? ? ? ? ? if (trail == null)
? ? ? ? ? ? ? ? ? ? ? ? firstWaiter = next;
? ? ? ? ? ? ? ? ? ? else
? ? ? ? ? ? ? ? ? ? ? ? trail.nextWaiter = next;
? ? ? ? ? ? ? ? ? ? if (next == null)
? ? ? ? ? ? ? ? ? ? ? ? lastWaiter = trail;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? else
? ? ? ? ? ? ? ? ? ? trail = t;
? ? ? ? ? ? ? ? t = next;
? ? ? ? ? ? }
? ? ? ? }
```
在這個(gè)方法中,值得注意的是通過(guò)condition維護(hù)的隊(duì)列,與aqs中排隊(duì)的隊(duì)列是兩個(gè)完全不同的隊(duì)列,condition的隊(duì)列維護(hù)在condition對(duì)象中,通過(guò)firstWaiter和lastWaiter變量來(lái)維護(hù)隊(duì)列的頭與尾,我們繼續(xù)往下看fullyRelease方法
```java
? ? final int fullyRelease(Node node) {
? ? ? ? boolean failed = true;
? ? ? ? try {
? ? ? ? ? ? int savedState = getState();
? ? ? ? ? ? if (release(savedState)) {
? ? ? ? ? ? ? ? failed = false;
? ? ? ? ? ? ? ? return savedState;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (failed)
? ? ? ? ? ? ? ? node.waitStatus = Node.CANCELLED;
? ? ? ? }
? ? }
```
fullyRelease方法可見(jiàn)是直接釋放當(dāng)前獨(dú)占鎖,在java中目前只有ReenTrantLock以及ReentrantReadWriteLock,實(shí)現(xiàn)了newCondition方法,所以共享鎖是不允許condition阻塞的,
繼續(xù)向下看isOnSyncQueue方法,顧名思義該方法是判斷當(dāng)前node是否在同步隊(duì)列總
```java
final boolean isOnSyncQueue(Node node) {
? ? ? ? if (node.waitStatus == Node.CONDITION || node.prev == null)
? ? ? ? ? ? return false;
? ? ? ? if (node.next != null) // If has successor, it must be on queue
? ? ? ? ? ? return true;
? ? ? ? return findNodeFromTail(node);
? ? }
```
首先是校驗(yàn)當(dāng)前節(jié)點(diǎn)的狀態(tài),如果節(jié)點(diǎn)狀態(tài)還是condition那么一定沒(méi)有插入隊(duì)列中,而同樣node.prev前面節(jié)點(diǎn)如果為空自然是也沒(méi)有插入隊(duì)列的,后續(xù)判斷node.next同樣是判斷后續(xù)有沒(méi)有等待節(jié)點(diǎn),這里值得注意的是,node.next是同步隊(duì)列節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),而condition阻塞隊(duì)列的節(jié)點(diǎn)為nextWaiter不要弄混了,
如果這兩步判斷沒(méi)有成功的話,說(shuō)明當(dāng)前節(jié)點(diǎn)的prev節(jié)點(diǎn)不為空,而next節(jié)點(diǎn)為空,而node.prev節(jié)點(diǎn)不為空,,但是還沒(méi)有在隊(duì)列上,因?yàn)橛锌赡躢as失敗,所以要從尾部遍歷一遍確定在沒(méi)在節(jié)點(diǎn)中.
如果在同步隊(duì)列中則調(diào)用acquireQueued嘗試獲取鎖或者排隊(duì),接下來(lái)就是判斷是否打斷等流程,后續(xù)不在贅述,接下來(lái)我們康康signal方法
```java
public final void signal() {
? ? ????????//判斷當(dāng)前是否是獨(dú)占模式獲取鎖,如果以非獨(dú)占模式獲取鎖則拋出異常
? ? ? ? ? ? if (!isHeldExclusively())
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? Node first = firstWaiter;
? ? ? ? ? ? if (first != null)
? ? ? ? ? ? ? ? doSignal(first);
? ? ? ? }
private void doSignal(Node first) {
? ? ? ? ? ? do {
? ? ? ? ? ? ? ? //把頭結(jié)點(diǎn)后移并且判斷是否為空,如果為空則將尾節(jié)點(diǎn)為空
? ? ? ? ? ? ? ? if ( (firstWaiter = first.nextWaiter) == null)
? ? ? ? ? ? ? ? ? ? lastWaiter = null;
? ? ? ? ? ? ? ? first.nextWaiter = null;
? ? ? ? ? ? ? ? //將當(dāng)前節(jié)點(diǎn)插入阻塞隊(duì)列中
? ? ? ? ? ? } while (!transferForSignal(first) &&
? ? ? ? ? ? ? ? ? ? (first = firstWaiter) != null);
? ? ? ? }
final boolean transferForSignal(Node node) {
? ? ? ? //如果換失敗,則只有可能是被取消了
? ? ? ? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
? ? ? ? ? ? return false;
????????//將當(dāng)前線程插入隊(duì)列,并返回node節(jié)點(diǎn)前面的節(jié)點(diǎn),
? ? ? ? Node p = enq(node);
? ? ? ? int ws = p.waitStatus;
? ? ????//修改前一個(gè)節(jié)點(diǎn)的狀態(tài)為signle以便被喚醒
? ? ? ? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
? ? ? ? ? ? //如果線程被取消了,或者將waitstatus修改失敗的話,說(shuō)明當(dāng)前線程已經(jīng)被取消了
? ? ? ? ? ? LockSupport.unpark(node.thread);
? ? ? ? return true;
? ? }
```
首先調(diào)用signal時(shí)一定是要以獨(dú)占鎖的模式調(diào)用,否則會(huì)拋出異常,然后將當(dāng)前等待節(jié)點(diǎn)后移,并且將當(dāng)前的節(jié)點(diǎn)插入阻塞隊(duì)列中,不需要喚醒線程因?yàn)檎{(diào)用signal時(shí)一定是已經(jīng)被某一線程獲取了鎖,而當(dāng)調(diào)用release時(shí)會(huì)釋放鎖并且自動(dòng)調(diào)用后續(xù)的鎖
那么這里有一個(gè)問(wèn)題就是為什么會(huì)在這里調(diào)用一次unpark,就算不調(diào)用,等到下次喚醒的時(shí)候,也會(huì)清除掉被取消的節(jié)點(diǎn),這里我查閱資料發(fā)現(xiàn),這次喚醒主要是提升性能,在這里喚醒一次,將前面取消的節(jié)點(diǎn)都刪除,以便下次喚醒不需要在刪除節(jié)點(diǎn).這里加不加這個(gè)喚醒邏輯上是一樣的
我們?cè)賮?lái)看看signalAll方法
```java
? ? ? public final void signalAll() {
? ? ? ? ? ? if (!isHeldExclusively())
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? Node first = firstWaiter;
? ? ? ? ? ? if (first != null)
? ? ? ? ? ? ? ? doSignalAll(first);
? ? ? ? }
? ? ? private void doSignalAll(Node first) {
? ? ? ? ? ? lastWaiter = firstWaiter = null;
? ? ? ? ? ? do {
? ? ? ? ? ? ? ? Node next = first.nextWaiter;
? ? ? ? ? ? ? ? first.nextWaiter = null;
? ? ? ? ? ? ? ? transferForSignal(first);
? ? ? ? ? ? ? ? first = next;
? ? ? ? ? ? } while (first != null);
? ? ? ? }
```
這里實(shí)現(xiàn)與signal幾乎相同,只不過(guò)一個(gè)是將first節(jié)點(diǎn)插入隊(duì)列,而signalAll方法則是將后續(xù)隊(duì)列全部插入同步隊(duì)列中
到這里我們就已經(jīng)將condition的實(shí)現(xiàn)完全理清了,后續(xù)我們也會(huì)再分析利用condition來(lái)實(shí)現(xiàn)的同步阻塞隊(duì)列ArrayBlockingQueue