jdk源碼解讀-并發(fā)包-Lock-ReentrantLock(2)--await()與signal()方法走讀

ReentrantLock 的基本操作除了lock()和unlock()外,還有condition的await()和signal()。但是是通過(guò)調(diào)用AbstractQueuedSynchronizer的內(nèi)部類(lèi)CondtionObject來(lái)實(shí)現(xiàn)的。所以await()和singnal()的操作主要在CondtionObject類(lèi)里。

a1.png

如上圖可以看到,ConditonObject是AbstractQueuedSynchronizer的內(nèi)部類(lèi),同時(shí)上節(jié)講解lock()和unlock()提到的Node也是它的內(nèi)部類(lèi),以及其他方法組成了依賴(lài)FIFO waiting queue的阻塞鎖和相關(guān)同步器(semaphores,events,等待)基本框架。

現(xiàn)在主要看一下內(nèi)部類(lèi)ConditionObject:

/**
 * Condition implementation for a {@link
 * AbstractQueuedSynchronizer} serving as the basis of a {@link
 * Lock} implementation.
 *
 * <p>Method documentation for this class describes mechanics,
 * not behavioral specifications from the point of view of Lock
 * and Condition users. Exported versions of this class will in
 * general need to be accompanied by documentation describing
 * condition semantics that rely on those of the associated
 * {@code AbstractQueuedSynchronizer}.
 *
 * <p>This class is Serializable, but all fields are transient,
 * so deserialized conditions have no waiters.
 */
public class ConditionObject implements Condition, java.io.Serializable {

一. await操作的流程:

因?yàn)榫€程是先通過(guò)lock()獲得鎖資源,然后調(diào)用await()時(shí),先釋放鎖資源然后park。所以park前要先釋放鎖,讓別的線程獲得爭(zhēng)取鎖資源的權(quán)利。
調(diào)用流程圖:


a2.png

1.await()

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
    Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
    }
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

首先用addConditionWaiter()方法把這個(gè)線程包裝成Node并放入condition queue。

2.addConditionWaiter():加入condition等待隊(duì)列。

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
        t.nextWaiter = node;
lastWaiter = node;
return node;
}

新建包裝當(dāng)前線程的node。找到condition隊(duì)列中沒(méi)有被cancel的lastWaiter,然后把當(dāng)前Node設(shè)為lastWaiter。

3.fullyRelease(node):釋放當(dāng)前state值。并返回釋放前的值。

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 * @param node the condition node for this wait
 * @return previous sync state
 */
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
            failed = false;
return savedState;
        } else {
throw new IllegalMonitorStateException();
        }
    } finally {
if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

4.release(savedState):tryRelease(arg)調(diào)用釋放鎖資源:

/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
if (tryRelease(arg)) {
        Node h = head;
if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
return true;
    }
return false;
}

5.unparkSuccessor(h):叫醒syn queue中沉睡的線程:找到頭結(jié)點(diǎn)后第一個(gè)沒(méi)有被cancel的節(jié)點(diǎn),然后對(duì)這個(gè)節(jié)點(diǎn)包裝的線程執(zhí)行unpark()操作

 /**
* Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
/*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
if (s == null || s.waitStatus > 0) {
        s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
                s = t;
    }
if (s != null)
        LockSupport.unpark(s.thread);
}

await()操作總結(jié):

1.新建包裝當(dāng)前線程的node放到condition waiting queue中去。

2.釋放此節(jié)點(diǎn)的鎖資源。

3.找到syn queue中頭節(jié)點(diǎn)后第一個(gè)沒(méi)被cancel的節(jié)點(diǎn)并叫醒這個(gè)節(jié)點(diǎn)沉睡的線程。

4.對(duì)當(dāng)前線程進(jìn)行park();

二. signal():首先這個(gè)線程獲取到了鎖資源,然后執(zhí)行signal()操作,把condition queue 中的第一個(gè)節(jié)點(diǎn)刪除,并把這個(gè)節(jié)點(diǎn)放入syn queue

調(diào)用圖:

a3.png

1.signal():

/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

對(duì)condition queue 第一個(gè)節(jié)點(diǎn)進(jìn)行喚醒。

2.doSignal(first):從隊(duì)頭找,找到第一個(gè)非空節(jié)點(diǎn)進(jìn)行節(jié)點(diǎn)遷移。

/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

3.transferForSignal(Node node):把當(dāng)前節(jié)點(diǎn)從condition queue 遷移到sync queue 中。

final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                 return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                  LockSupport.unpark(node.thread);
        return true;
}

先調(diào)用enq(node);把節(jié)點(diǎn)放到sync queue 的隊(duì)尾,同時(shí)unpark 當(dāng)前線程,這時(shí)這個(gè)線程又獲得了競(jìng)爭(zhēng)鎖資源的資格。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容