本文從源碼角度,以AbstractQueuedSynchronizer.ConditionObject的await、signal、signalAll為入口,對基于ReentrantLock的線程間通信的原理進行介紹,其中涉及到的與ReentrantLock相關(guān)的方法,在ReentrantLock一文中已經(jīng)介紹過,這里不再說明。
源碼注釋中同步隊列指ReentrantLock維護的隊列,條件隊列指ConditionObject維護的隊列,對于內(nèi)部數(shù)據(jù)結(jié)構(gòu)的組織形式,這里不進行介紹。
由于是多線程并發(fā),所以會有多種不同的情況,源碼中針對每個條件,僅給出一兩種情形,留作參考。
類名簡寫說明:
- CO:ConditionObject
- AQS:AbstractQueuedSynchronizer
1. AQS.CO.await()
(1)AQS.CO.await()
public final void await() throws InterruptedException {
// 先判斷當前線程是否被中斷,被中斷則拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 將當前線程對應(yīng)的節(jié)點加入條件隊列
Node node = addConditionWaiter();
// 完全釋放鎖,因為可重入,所以saveState可能大于1
int savedState = fullyRelease(node);
int interruptMode = 0;
// 若不在同步隊列中,被喚醒后會再次進入while中被park
while (!isOnSyncQueue(node)) {
// 掛起當前線程
LockSupport.park(this);
// checkInterruptWhileWaiting中會判斷當前線程是否是被中斷喚醒的
// 返回值非0表示是被中斷喚醒的,會通過break跳出while
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 條件1:
// 為true的情形:
// 當前線程在上面while中park時被中斷,在acquireQueued中park時再次被中斷時條件1為true
// 條件2:
// 條件2為true,即interruptMode是0或REINTERRUPT,為
// REINTERRUPT的情況詳見下面(5)處,為0的情況尚不明確
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 從上面舉的情形中來看,到這里時interruptMode就是REINTERRUPT,
// 這里難道是Doug Lea老先生的一個編碼缺陷?
interruptMode = REINTERRUPT;
// 該條件成立的情形:
// 假設(shè)有三個線程thread0、thread1、thread2,三個線程依次調(diào)用AQS.CO.await(),之后三者均
// 在上面while中被park,此時,外部線程中斷thread0,thread0會執(zhí)行到這里,此時條件為true
if (node.nextWaiter != null)
// 斷開條件隊列中所有不是Node.CONDITION狀態(tài)的節(jié)點(代碼不再貼出)
unlinkCancelledWaiters();
// 該條件成立時,說明當前線程被中斷,interruptMode為THROW_IE或REINTERRUPT
if (interruptMode != 0)
// 方法中會根據(jù)interruptMode的值做相應(yīng)處理(代碼不再貼出)
reportInterruptAfterWait(interruptMode);
}
(2)AQS.CO.addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// 條件2:
// 為true時的情形:
// 情形1:假設(shè)有兩個線程thread0、thread1,初始時,thread0在未持有鎖的情況下調(diào)用
// AQS.CO.await(),當thread0執(zhí)行到AQS.fullyRelease中時會將其對應(yīng)節(jié)點
// 的waitStatus字段設(shè)置為取消狀態(tài)(見下面(4)處),之后持有鎖的thread1調(diào)用
// AQS.CO.await(),會執(zhí)行到這里,這里t.waitStatus為1
// 情形2:假設(shè)有兩個線程thread0、thread1,初始時,thread0持有鎖,之后調(diào)用AQS.CO.await()
// 釋放鎖并阻塞在LockSupport.park(this)處,之后外部線程中斷thread0,thread0被喚
// 醒后會執(zhí)行到AQS.transferAfterCancelledWait里的if處將t.waitStatus設(shè)置為0,之
// 后thread1獲取到鎖,執(zhí)行到這里時,t.waitStatus為0,條件2為true
if (t != null && t.waitStatus != Node.CONDITION) {
// 斷開條件隊列中所有不是Node.CONDITION狀態(tài)的節(jié)點(代碼不再貼出)
unlinkCancelledWaiters();
// 更新t。經(jīng)過unlinkCancelledWaiters,lastWaiter可能已經(jīng)改變
t = lastWaiter;
}
// 創(chuàng)建新的狀態(tài)為Node.CONDITION的節(jié)點,并連接到同步隊列中
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
(3)AQS.fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取AQS.state
// 因為鎖可重入,所以saveState可能大于1
int savedState = getState();
// 傳入saveState而不是1,因此該方法名叫"fully"Release
// 該方法在ReentrantLock的1.2(2)處已經(jīng)說明,這里不再介紹
if (release(savedState)) {
failed = false;
// 返回saveState,再次喚醒該線程時會用到
return savedState;
} else { // 可以說不會執(zhí)行到該分支
throw new IllegalMonitorStateException();
}
} finally {
// 當未持有鎖的線程調(diào)用await方法時,release(savedState)中會拋異常,此時failed為true
if (failed)
// 設(shè)置為取消狀態(tài)
node.waitStatus = Node.CANCELLED;
}
}
(4)AQS.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
// 一般情況下,初次進入該方法,條件1為true,該方法返回false,
// 下面對初次進入該方法時的一些特殊情形進行介紹:
// a.情形:
// 假設(shè)有兩個線程thread0、thread1,初始時,thread0持有鎖,調(diào)用了AQS.CO.await(),從
// while (!isOnSyncQueue(node))處到這里(還未執(zhí)行),thread1獲取到鎖,調(diào)用了AQS.CO.signal,
// 一直調(diào)到AQS.transferForSignal,在方法里的第一個if執(zhí)行后,會將node.waitStatus設(shè)置為0,
// 使得這里的條件1為false,因為thread1還未執(zhí)行AQS.transferForSignal的Node p=enq(node),
// 因此這里node.prev為null,條件2為true。
// b.接著上面的情形繼續(xù)延伸:
// 若thread0仍未執(zhí)行該if語句,thread1執(zhí)行完AQS.transferForSignal
// 的Node p=enq(node),則條件2變?yōu)閒alse
// c.繼續(xù)延伸:
// 若thread0仍未執(zhí)行該if語句,線程thread2與來競爭鎖,因為thread0此時仍持有鎖,
// 因此,thread2會被加入到同步隊列中,經(jīng)過一些列步驟后,會被park,期間會將
// thread1對應(yīng)節(jié)點的waitStatus設(shè)置為-1,條件1仍為false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false; // 返回false,表示node不在同步隊列中
// 條件為false的情形:
// 上面a、b情形不變,將c處的情形改為:thread2執(zhí)行到到AQS.addWaiter的pred.next = node;
// 處時(該語句還未執(zhí)行),thread0執(zhí)行到這里,此時該條件為false
// 條件為true的情形:
// thread0執(zhí)行到這里時,thread2已經(jīng)執(zhí)行完AQS.addWaiter的pred.next = node;處,此時該條件成立
if (node.next != null) // If has successor, it must be on queue
return true;
// 到這里說明node.waitStatus不為Node.CONDITION且node.prev不為null
// findNodeFromTail中會從tail向前遍歷同步隊列,尋找node是否已在隊列中,存在則返回true,否則返回false。
// 情形:
// 上面a情形不變,將b處的情形改為:thread1執(zhí)行到AQS.enq的if (compareAndSetTail(t, node))
// (該語句還未執(zhí)行),或執(zhí)行了但由于其他線程的競爭導(dǎo)致執(zhí)行失敗了,findNodeFromTail
// 中會返回false;若執(zhí)行了且成功了,則findNodeFromTail會返回true
// (多數(shù)情況下node都在同步隊列的尾部附近)
return findNodeFromTail(node);
}
(5)AQS.CO.checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// 被中斷
// 返回THROW_IE表示當前線程在其他線程調(diào)用signal前被中斷
// 返回REINTERRUPT表示當前線程在其他線程調(diào)用signal后被中斷
// 具體的分界點就是node.waitStatus的值,若其值為Node.CONDITION,
// 則是signal前被中斷,否則在signal后被中斷
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
(6)AQS.transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
// 該條件為true的情形:
// 假設(shè)僅有thread0,thread0持有鎖后調(diào)用AQS.CO.await()被park,
// 之后被外部線程中斷,會執(zhí)行到這里,此時條件為true
// 該條件為false的情形:
// 假設(shè)有兩個線程thread0、thread1,thread0,thread0持有鎖后調(diào)用AQS.CO.await()被park,thread1
// 獲取到鎖后調(diào)用AQS.CO.signal,之后會執(zhí)行到AQS.transferForSignal的第一個if處,該if語句執(zhí)行
// 完后,這里node.waitStatus被修改為0,所以thread0執(zhí)行這里的if語句時會失敗
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 入隊
return true;
}
// 到這里說明其他線程調(diào)用了AQS.CO.signal,將當前線程對應(yīng)的節(jié)點加入同步隊列,這里自選等待入隊完成
while (!isOnSyncQueue(node))
Thread.yield(); // 主動讓出當前線程的CPU時間片
return false;
}
2. AQS.CO.signal
(1)AQS.CO.signal
public final void signal() {
// 持有鎖的線程不是當前線程(即外部某個線程調(diào)用了該方法)則拋異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取條件隊列中的第一個節(jié)點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
(2)AQS.CO.doSignal
private void doSignal(Node first) {
// 遍歷條件隊列,僅喚醒第一個符合條件的線程
do {
// 更新firstWaiter
// firstWaiter為null,說明已到隊列尾部
if ( (firstWaiter = first.nextWaiter) == null)
// 將lastWaiter設(shè)置為null
lastWaiter = null;
// 將first與其后繼節(jié)點斷開
first.nextWaiter = null;
// 條件1:只要一個線程入隊成功,transferForSignal返回true,就會終止該循環(huán)
// 條件2:用于判斷是否到隊列尾部
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
(3)AQS.transferForSignal
final boolean transferForSignal(Node node) {
// 條件為true的情形(注意前面的"!"):
// 情形1:假設(shè)有兩個線程thread0、thread1,初始時,thread0在未持有鎖的情況下調(diào)用
// AQS.CO.await(),當thread0執(zhí)行到AQS.fullyRelease中時會將其對應(yīng)節(jié)點的
// waitStatus字段設(shè)置為取消狀態(tài)(見上面1.(4)處),之后持有鎖的thread1調(diào)用
// AQS.CO.signal就會執(zhí)行到這里,此時條件為true
// 情形2:假設(shè)有兩個線程thread0、thread1,初始時,持有鎖的thread0調(diào)用AQS.CO.await(),
// 執(zhí)行到該方法的while里被park,之后外部線程中斷thread0,thread0被喚醒后會執(zhí)
// 行到AQS.transferAfterCancelledWait的第一個if處將node.waitStatus設(shè)置為0,
// 之后thread1獲取到鎖,調(diào)用AQS.CO.signal執(zhí)行到這里,條件就為true
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 入隊
// 注意p是node的前驅(qū)節(jié)點
Node p = enq(node);
int ws = p.waitStatus;
// 條件1:
// 為true時的情形:
// 假設(shè)有三個線程thread0、thread1、thread2,初始時,持有鎖的thread0調(diào)用AQS.CO.await(),執(zhí)行到該方法
// 的while里被park,之后thread1獲取到鎖,調(diào)用AQS.CO.signal執(zhí)行到上面Node p = enq(node);處(該行還未執(zhí)行),
// 之后thread2通過ReentrantLock.lockInterruptibly獲取鎖,因為thread1仍持有鎖,因此thread2最終會在
// AQS.parkAndCheckInterrupt中被park,之后外部線程中斷thread2,當thread2執(zhí)行完AQS.cancelAcquire的
// node.waitStatus = Node.CANCELLED;后,thread1從上面Node p = enq(node);處執(zhí)行到這里,此時ws為1
// 條件2:
// 為true時的情形(注意前面的"!"):
// 對于條件1為true時的情形,在thread2被park后,若thread1先執(zhí)行完ws > 0(條件2還未執(zhí)行)(此時ws為0,
// 條件1不成立),此時thread2,被中斷后在AQS.cancelAcquire的node.waitStatus = Node.CANCELLED;處
// 將p.waitStatus設(shè)置為1,此時條件2就為true
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// node.thread會從AQS.CO.await里的park處被喚醒,因為已經(jīng)在同步隊列中了,因此會跳出while,進
// 入AQS.acquireQueued中,若此時p前面仍有節(jié)點等待獲取鎖或p前面沒有節(jié)點了但調(diào)用AQS.CO.signal
// 的線程仍未釋放鎖,node.thread在嘗試幾次后最終仍會在AQS.parkAndCheckInterrupt中被park
LockSupport.unpark(node.thread);
return true;
}
3. AQS.CO.signalAll
AQS.CO.signalAll與AQS.CO.signal類似,區(qū)別是signalAll會將所有節(jié)點加入同步隊列,下面僅對AQS.CO.doSignalAll方法進行介紹:
private void doSignalAll(Node first) {
// 將條件隊列的頭節(jié)點和尾節(jié)點都置為null
lastWaiter = firstWaiter = null;
// 遍歷條件隊列,依次喚醒所有線程
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}