ReentrantLock 的基本操作除了lock()和unlock()外,還有condition的await()和signal()。但是是通過(guò)調(diào)用AbstractQueuedSynchronizer的內(nèi)部類(lèi)CondtionObject來(lái)實(shí)現(xiàn)的。所以await()和singnal()的操作主要在CondtionObject類(lèi)里。

如上圖可以看到,ConditonObject是AbstractQueuedSynchronizer的內(nèi)部類(lèi),同時(shí)上節(jié)講解lock()和unlock()提到的Node也是它的內(nèi)部類(lèi),以及其他方法組成了依賴(lài)FIFO waiting queue的阻塞鎖和相關(guān)同步器(semaphores,events,等待)基本框架。
現(xiàn)在主要看一下內(nèi)部類(lèi)ConditionObject:
/**
* Condition implementation for a {@link
* AbstractQueuedSynchronizer} serving as the basis of a {@link
* Lock} implementation.
*
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* {@code AbstractQueuedSynchronizer}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
public class ConditionObject implements Condition, java.io.Serializable {
一. await操作的流程:
因?yàn)榫€程是先通過(guò)lock()獲得鎖資源,然后調(diào)用await()時(shí),先釋放鎖資源然后park。所以park前要先釋放鎖,讓別的線程獲得爭(zhēng)取鎖資源的權(quán)利。
調(diào)用流程圖:

1.await()
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先用addConditionWaiter()方法把這個(gè)線程包裝成Node并放入condition queue。
2.addConditionWaiter():加入condition等待隊(duì)列。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
新建包裝當(dāng)前線程的node。找到condition隊(duì)列中沒(méi)有被cancel的lastWaiter,然后把當(dāng)前Node設(shè)為lastWaiter。
3.fullyRelease(node):釋放當(dāng)前state值。并返回釋放前的值。
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
4.release(savedState):tryRelease(arg)調(diào)用釋放鎖資源:
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
5.unparkSuccessor(h):叫醒syn queue中沉睡的線程:找到頭結(jié)點(diǎn)后第一個(gè)沒(méi)有被cancel的節(jié)點(diǎn),然后對(duì)這個(gè)節(jié)點(diǎn)包裝的線程執(zhí)行unpark()操作
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
await()操作總結(jié):
1.新建包裝當(dāng)前線程的node放到condition waiting queue中去。
2.釋放此節(jié)點(diǎn)的鎖資源。
3.找到syn queue中頭節(jié)點(diǎn)后第一個(gè)沒(méi)被cancel的節(jié)點(diǎn)并叫醒這個(gè)節(jié)點(diǎn)沉睡的線程。
4.對(duì)當(dāng)前線程進(jìn)行park();
二. signal():首先這個(gè)線程獲取到了鎖資源,然后執(zhí)行signal()操作,把condition queue 中的第一個(gè)節(jié)點(diǎn)刪除,并把這個(gè)節(jié)點(diǎn)放入syn queue
調(diào)用圖:

1.signal():
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
對(duì)condition queue 第一個(gè)節(jié)點(diǎn)進(jìn)行喚醒。
2.doSignal(first):從隊(duì)頭找,找到第一個(gè)非空節(jié)點(diǎn)進(jìn)行節(jié)點(diǎn)遷移。
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
3.transferForSignal(Node node):把當(dāng)前節(jié)點(diǎn)從condition queue 遷移到sync queue 中。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
先調(diào)用enq(node);把節(jié)點(diǎn)放到sync queue 的隊(duì)尾,同時(shí)unpark 當(dāng)前線程,這時(shí)這個(gè)線程又獲得了競(jìng)爭(zhēng)鎖資源的資格。