AQS:Java 中悲觀鎖的底層實(shí)現(xiàn)機(jī)制

介紹 AQS

AQS(AbstractQueuedSynchronizer)是 Java 并發(fā)包中,實(shí)現(xiàn)各種同步組件的基礎(chǔ)。比如

  • 各種鎖:ReentrantLock、ReadWriteLock、StampedLock
  • 各種線程同步工具類:CountDownLatch、CyclicBarrier、Semaphore
  • 線程池中的 Worker

Lock 接口的實(shí)現(xiàn)基本都是通過聚合了一個 AQS 的子類來完成線程訪問控制的。


Doug Lea 曾經(jīng)介紹過 AQS 的設(shè)計初衷。從原理上,一種同步組件往往是可以利用其他的組件實(shí)現(xiàn)的,例如可以使用 Semaphore 實(shí)現(xiàn)互斥鎖。但是,對某種同步組件的傾向,會導(dǎo)致復(fù)雜、晦澀的實(shí)現(xiàn)邏輯,所以,他選擇了將基礎(chǔ)的同步相關(guān)操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 為我們構(gòu)建同步組件提供了范本。

如何使用 AQS

利用 AQS 實(shí)現(xiàn)一個同步組件,我們至少要實(shí)現(xiàn)兩類基本的方法,分別是:

  • 獲取資源,需要實(shí)現(xiàn) tryAcquire(int arg) 方法
  • 釋放資源,需要實(shí)現(xiàn) tryRelease(int arg) 方法

如果需要共享式獲取 / 釋放資源,需要實(shí)現(xiàn)對應(yīng)的 tryAcquireShared(int arg)、tryReleaseShared(int arg)


AQS 使用的是模板方法設(shè)計模式。AQS 方法的修飾符很有規(guī)律,其中,使用 protected 修飾的方法為抽象方法,通常需要子類去實(shí)現(xiàn),從而實(shí)現(xiàn)不同的同步組件;使用 public 修飾的方法基本可以認(rèn)為是模板方法,不建議子類直接覆蓋。

通過調(diào)用 AQS 的 acquire(int arg) 方法可以獲取資源,該方法會調(diào)用 protected 修飾的 tryAcquire(int arg) 方法,因此我們需要在 AQS 的子類中實(shí)現(xiàn) tryAcquire(int arg),tryAcquire(int arg) 方法的作用是:獲取資源。

當(dāng)前線程獲取資源并執(zhí)行了相應(yīng)邏輯之后,就需要釋放資源,使得后續(xù)節(jié)點(diǎn)能夠繼續(xù)獲取資源。通過調(diào)用 AQS 的 release(int arg) 方法可以釋放資源,該方法會調(diào)用 protected 修飾的 tryRelease(int arg) 方法,因此我們需要在 AQS 的子類中實(shí)現(xiàn) tryRelease(int arg),tryRelease(int arg) 方法的作用是:釋放資源。

AQS 的實(shí)現(xiàn)原理

從實(shí)現(xiàn)角度分析 AQS 是如何完成線程訪問控制。

AQS 的實(shí)現(xiàn)原理可以從 同步阻塞隊列、獲取資源時的執(zhí)行流程、釋放資源時的執(zhí)行流程 這 3 個方面介紹。

同步阻塞隊列

AQS 依賴內(nèi)部的同步阻塞隊列(一個 FIFO 雙向隊列)來完成資源的管理。

同步阻塞隊列的工作機(jī)制

  • 節(jié)點(diǎn):同步阻塞隊列中的節(jié)點(diǎn)(Node)用來保存獲取資源失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點(diǎn),沒有成功獲取資源的線程將會成為節(jié)點(diǎn)加入同步阻塞隊列的尾部,同時會阻塞當(dāng)前線程(Java 線程處于 WAITING 狀態(tài),釋放 CPU 的使用權(quán))。
  • 首節(jié)點(diǎn):同步阻塞隊列遵循 FIFO(先進(jìn)先出),首節(jié)點(diǎn)是獲取資源成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放資源時,將會喚醒后繼節(jié)點(diǎn),使其再次嘗試獲取資源,而后繼節(jié)點(diǎn)將會在獲取資源成功時將自己設(shè)置為首節(jié)點(diǎn)。
static final class Node {
    /**
     * Marker to indicate a node is waiting in shared mode
     */
    static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
    /**
     * Marker to indicate a node is waiting in exclusive mode
     */
    static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;

    /**
     * waitStatus value to indicate thread has cancelled
     */
    static final int CANCELLED = 1;
    /**
     * waitStatus value to indicate successor's thread needs unparking
     */
    static final int SIGNAL = -1;
    /**
     * waitStatus value to indicate thread is waiting on condition
     */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    // 等待狀態(tài)
    volatile int waitStatus;

    // 前驅(qū)節(jié)點(diǎn)
    volatile AbstractQueuedSynchronizer.Node prev;

    // 后繼節(jié)點(diǎn)
    volatile AbstractQueuedSynchronizer.Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    // 條件等待隊列的后繼節(jié)點(diǎn)
    AbstractQueuedSynchronizer.Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
        AbstractQueuedSynchronizer.Node p = prev;
        if (p == null) throw new NullPointerException();
        else return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

等待狀態(tài)
在節(jié)點(diǎn)中用 volatile int waitStatus 屬性表示節(jié)點(diǎn)的等待狀態(tài)。
節(jié)點(diǎn)有如下幾種等待狀態(tài):

  • CANCELLED,值為 1,由于在同步阻塞隊列中等待的線程等待超時或者被中斷,需要從同步阻塞隊列中取消等待,節(jié)點(diǎn)進(jìn)人該狀態(tài)將不會變化
  • SIGNAL,值為 -1,后繼節(jié)點(diǎn)的線程處于等待狀態(tài),而當(dāng)前節(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者被取消,將會通知后繼節(jié)點(diǎn),使后繼節(jié)點(diǎn)的線程得以運(yùn)行
  • CONDITION,值為 -2,節(jié)點(diǎn)在條件等待隊列中,節(jié)點(diǎn)線程等待在 Condition 上,當(dāng)其他線程對Condition 調(diào)用了 signal() 方法后,該節(jié)點(diǎn)將會從條件等待隊列轉(zhuǎn)移到同步阻塞隊列中,加入到對同步狀態(tài)的獲取中
  • PROPAGATE,值為 -3,表示下一次共享式同步狀態(tài)獲取將會無條件地被傳播下去
  • INITIAL,值為 0,初始狀態(tài)

獲取資源、釋放資源的執(zhí)行流程,結(jié)論先行:

  • 在獲取資源時,獲取資源失敗的線程都會被加入到同步阻塞隊列中,并在隊列中進(jìn)行自旋;移出隊列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了資源。
  • 在釋放資源時,AQS 調(diào)用 tryRelease(int arg) 方法釋放資源,然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。

獲取資源

下面來介紹獲取資源時的執(zhí)行流程。

調(diào)用 AQS 的 acquire(int arg) 方法可以獲取資源。

acquire(int arg) 方法是獨(dú)占式獲取資源,它調(diào)用流程如下圖所示。

1663401341056-7dde7eea-ba39-4dc7-84ed-0b2fc6a5a0c2.png

用文字描述 acquire(int arg) 方法的調(diào)用流程:首先調(diào)用自定義 AQS 實(shí)現(xiàn)的 tryAcquire(int arg) 方法,該方法的作用是嘗試獲取資源:

  • 如果獲取資源成功,則直接從 acquire(int arg) 方法返回

  • 如果獲取資源失敗,則構(gòu)造節(jié)點(diǎn),并將該節(jié)點(diǎn)加入到同步阻塞隊列的尾部,最后調(diào)用 acquireQueued(Node node,int arg) 方法,使得該節(jié)點(diǎn)以“死循環(huán)”的方式嘗試獲取資源。只有當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),才能嘗試獲取資源。

    • 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),并且獲取資源成功,則設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn),并從 acquireQueued(Node node,int arg) 方法返回
    • 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn) 或者 獲取資源失敗,則阻塞當(dāng)前線程,線程被喚醒后繼續(xù)執(zhí)行該循環(huán)操作

acquireQueued(Node node,int arg) 方法的調(diào)用過程也被稱為“自旋過程”。

自旋是什么意思是呢?我的理解就是:自旋就是一個死循環(huán),循環(huán)執(zhí)行某個操作序列,直到滿足某個條件才退出循環(huán)。


/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire(int arg) 的主要邏輯是:

首先調(diào)用自定義 AQS 實(shí)現(xiàn)的 tryAcquire(int arg) 方法,該方法保證線程安全的獲取資源:

  • 如果獲取資源成功,則直接從 acquire(int arg) 方法返回
  • 如果獲取資源失敗,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式 Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取資源)并通過 addWaiter(Node node) 方法將該節(jié)點(diǎn)加入到同步阻塞隊列的尾部,最后調(diào)用 acquireQueued(Node node,int arg) 方法,使得該節(jié)點(diǎn)以“死循環(huán)”的方式獲取資源。如果獲取不到則阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠 前驅(qū)節(jié)點(diǎn)的出隊 或 阻塞線程被中斷 來實(shí)現(xiàn)。
/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在 acquireQueued(final Node node,int arg) 方法中,當(dāng)前線程在“死循環(huán)”中嘗試獲取資源,而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取資源,這是為什么?原因有兩個,如下。

  • 第一,頭節(jié)點(diǎn)是成功獲取到資源的節(jié)點(diǎn),而頭節(jié)點(diǎn)的線程釋放了資源之后,將會喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)。
  • 第二,維護(hù)同步阻塞隊列的 FIFO 原則。

釋放資源

當(dāng)前線程獲取資源并執(zhí)行了相應(yīng)邏輯之后,就需要釋放資源,使得后續(xù)節(jié)點(diǎn)能夠繼續(xù)獲取資源。

下面來介紹釋放資源時的執(zhí)行流程。

通過調(diào)用 AQS 的 release(int arg) 方法可以釋放資源,該方法在釋放資源之后,會喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn),進(jìn)而使后繼節(jié)點(diǎn)重新嘗試獲取資源。


/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release(int arg) 方法執(zhí)行時,會喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程, unparkSuccessor(Node node) 方法使用 LockSupport#unpark() 方法來喚醒處于等待狀態(tài)的線程。

共享式 獲取 & 釋放 資源

上面講的是獨(dú)占式獲取 / 釋放 資源。

共享式獲取與獨(dú)占式獲取最主要的區(qū)別在于:同一時刻能否有多個線程同時獲取到資源。以文件的讀寫為例,如果一個程序在對文件進(jìn)行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進(jìn)行。寫操作要求對資源的獨(dú)占式訪問,而讀操作可以是共享式訪問。

  • 共享式訪問資源時,其他共享式的訪問均被允許,獨(dú)占式訪問被阻塞
  • 獨(dú)占式訪問資源時,同一時刻其他訪問均被阻塞

共享式獲取資源

調(diào)用 AQS 的 acquireShared(int arg) 方法可以共享式地獲取資源。

在 acquireShared(int arg) 方法中,AQS 調(diào)用 tryAcquireShared(int arg) 方法嘗試獲取資源, tryAcquireShared(int arg) 方法返回值為 int 類型,當(dāng)返回值 >= 0 時,表示能夠獲取到資源。

可以看到,在 doAcquireShared(int arg) 方法的自旋過程中,如果當(dāng)前節(jié)點(diǎn)的前驅(qū)為頭節(jié)點(diǎn)時,才能嘗試獲取資源,如果獲取資源成功(返回值 >= 0),則設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn),并從自旋過程中退出。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

共享式釋放資源
調(diào)用 releaseShared(int arg) 方法可以釋放資源。該方法在釋放資源之后,會喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn),進(jìn)而使后繼節(jié)點(diǎn)重新嘗試獲取資源。

對于能夠支持多個線程同時訪問的并發(fā)組件(比如 Semaphore),它和獨(dú)占式主要區(qū)別在于 tryReleaseShared(int arg) 方法必須確保資源安全釋放,因為釋放資源的操作會同時來自多個線程。 確保資源安全釋放一般是通過循環(huán)和 CAS 來保證的。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

獨(dú)占式超時獲取資源

調(diào)用 AQS 的 doAcquireNanos(int arg,long nanosTimeout) 方法可以超時獲取資源,即在指定的時間段內(nèi)獲取資源,如果獲取資源成功則返回 true,否則返回 false。

該方法提供了傳統(tǒng) Java 同步操作(比如 synchronized 關(guān)鍵字)所不具備的特性。


在分析該方法的實(shí)現(xiàn)前,先介紹一下響應(yīng)中斷的獲取資源過程。

  • 在 Java 5 之前,當(dāng)一個線程獲取不到鎖而被阻塞在 synchronized 之外時,對該線程進(jìn)行中斷操作,此時該線程的中斷標(biāo)志位會被修改,但線程依舊會阻塞在 synchronized 上,等待著獲取鎖。
  • 在 Java 5 中,AQS 提供了 acquireInterruptibly(int arg) 方法,這個方法在等待獲取資源時,如果當(dāng)前線程被中斷,會立刻返回,并拋出 InterruptedException。

acquire(int arg) 方法對中斷不敏感,也就是由于線程獲取資源失敗后進(jìn)入同步阻塞隊列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步阻塞隊列中移出。

超時獲取資源過程可以被視作響應(yīng)中斷獲取資源過程的“增強(qiáng)版”,doAcquireNanos(int arg,long nanosTimeout) 方法在支持響應(yīng)中斷的基礎(chǔ)上,增加了超時獲取的特性。

針對超時獲取,主要需要計算出需要睡眠的時間間隔 nanosTimeout,為了防止過早通知, nanosTimeout 計算公式為:nanosTimeout -= now - lastTime,其中 now 為當(dāng)前喚醒時間, lastTime 為上次喚醒時間,如果 nanosTimeout 大于 0 則表示超時時間未到,需要繼續(xù)睡眠 nanosTimeout 納秒,反之,表示已經(jīng)超時。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

/**
 * Acquires in exclusive timed mode.
 *
 * @param arg the acquire argument
 * @param nanosTimeout max wait time
 * @return {@code true} if acquired
 */
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

該方法在自旋過程中,當(dāng)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)時嘗試獲取資源,如果成功獲取資源則從該方法返回,這個過程和獨(dú)占式同步獲取的過程類似,但是在獲取資源失敗的處理上有所不同。

如果當(dāng)前線程獲取資源失敗,則判斷是否超時(nanosTimeout 小于等于 0 表示已經(jīng)超時),如果沒有超時,則重新計算超時間隔 nanosTimeout,然后使當(dāng)前線程等待 nanosTimeout 納秒(當(dāng)已到設(shè)置的超時時間,該線程會從 LockSupport.parkNanos(Object blocker,long nanos)方法返回)。

如果 nanosTimeout 小于等于 spinForTimeoutThreshold(1000 納秒)時,將不會使該線程進(jìn)行超時等待,而是進(jìn)入快速的自旋過程。原因在于,非常短的超時等待無法做到十分精確,如果這時再進(jìn)行超時等待,相反會讓 nanosTimeout 的超時從整體上表現(xiàn)得反而不精確。因此,在超時非常短的場景下,AQS 會進(jìn)入無條件的快速自旋。

獨(dú)占式超時獲取資源的流程如下所示。

1663404761814-6e8f338c-99da-441e-88f4-3e13fa4ceb00.png

從圖中可以看出,獨(dú)占式超時獲取資源 doAcquireNanos(int arg,long nanosTimeout) 和獨(dú)占式獲取資源 acquire(int args)在流程上非常相似,其主要區(qū)別在于:未獲取到資源時的處理邏輯。

acquire(int args) 在未獲取到資源時,將會使當(dāng)前線程一直處于等待狀態(tài),而 doAcquireNanos(int arg,long nanosTimeout) 會使當(dāng)前線程等待 nanosTimeout 納秒,如果當(dāng)前線程在 nanosTimeout 納秒內(nèi)沒有獲取到資源,將會從等待邏輯中自動返回。

Condition 的實(shí)現(xiàn)原理

技術(shù)是為了解決問題而生的,通過 Condition 我們可以實(shí)現(xiàn)等待 / 通知功能。

ConditionObject 是 AQS 的內(nèi)部類。每個 Condition 對象都包含著一個條件等待隊列,這個條件等待隊列是 Condition 對象實(shí)現(xiàn)等待 / 通知功能的關(guān)鍵。

下面我們分析 Condition 的實(shí)現(xiàn)原理,主要包括:條件等待隊列、等待 和 通知。

下面提到的 Condition 如果不加說明均指的是 ConditionObject。

條件等待隊列

Condition 依賴內(nèi)部的條件等待隊列(一個 FIFO 雙向隊列)來實(shí)現(xiàn)等待 / 通知功能。

條件等待隊列的工作機(jī)制

  • 節(jié)點(diǎn):條件等待隊列中的每個節(jié)點(diǎn)(Node)都包含一個線程引用,該線程就是在 Condition 對象上等待的線程,如果一個線程調(diào)用了 Condition.await()方法,那么該線程將會釋放資源、構(gòu)造成為節(jié)點(diǎn)加入條件等待隊列的尾部,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)。

事實(shí)上,條件等待隊列中的節(jié)點(diǎn)定義復(fù)用了 AQS 節(jié)點(diǎn)的定義,也就是說,同步阻塞隊列和條件等待隊列中節(jié)點(diǎn)類型都是 AQS 的靜態(tài)內(nèi)部類 AbstractQueuedSynchronizer.Node。

在 Object 的監(jiān)視器模型上,一個對象擁有一個同步阻塞隊列和一個條件等待隊列,而并發(fā)包中的 Lock(更確切地說是 AQS)擁有一個同步阻塞隊列和多個條件等待隊列。

等待

下面來介紹讓線程等待的執(zhí)行流程。

調(diào)用 Condition 的 await() 方法(或者以 await 開頭的方法),將會使當(dāng)前線程釋放資源、構(gòu)造成為節(jié)點(diǎn)加入條件等待隊列的尾部,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)。

如果從隊列(同步阻塞隊列和條件等待隊列)的角度看 await()方法,當(dāng)調(diào)用 await() 方法時,相當(dāng)于同步阻塞隊列的首節(jié)點(diǎn)(獲取到鎖的節(jié)點(diǎn))移動到 Condition 的條件等待隊列中。并且同步阻塞隊列的首節(jié)點(diǎn)并不會直接加入條件等待隊列,而是通過 addConditionWaiter() 方法把當(dāng)前線程構(gòu)造成一個新的節(jié)點(diǎn),將其加入條件等待隊列中。

/**
 * Implements interruptible condition wait.
 * <ol>
 * <li> If current thread is interrupted, throw InterruptedException.
 * <li> Save lock state returned by {@link #getState}.
 * <li> Invoke {@link #release} with saved state as argument,
 *      throwing IllegalMonitorStateException if it fails.
 * <li> Block until signalled or interrupted.
 * <li> Reacquire by invoking specialized version of
 *      {@link #acquire} with saved state as argument.
 * <li> If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

通知

下面來介紹喚醒等待線程的執(zhí)行流程。

調(diào)用 Condition 的 signal() 方法,將會喚醒在條件等待隊列中等待時間最長的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會將當(dāng)前節(jié)點(diǎn)從條件等待隊列移動到同步阻塞隊列中。

條件等待隊列中的節(jié)點(diǎn)被喚醒后,被喚醒的線程以“死循環(huán)”的方式嘗試獲取資源。成功獲取資源之后,被喚醒的線程將從先前調(diào)用的 await() 方法返回。

如果被喚醒的線程不是通過其他線程調(diào)用 Condition.signal() 方法喚醒,而是對等待線程進(jìn)行中斷,則會拋出InterruptedException。

被喚醒的線程,將從 await() 方法中的 while 循環(huán)中退出(isOnSyncQueue(Node node) 方法返回 true,節(jié)點(diǎn)已經(jīng)在同步阻塞隊列中),進(jìn)而調(diào)用 AQS 的 acquireQueued() 方法以“死循環(huán)”的方式嘗試獲取資源。成功獲取資源之后,被喚醒的線程將從先前調(diào)用的 await() 方法返回。


Condition 的 signalAll() 方法,相當(dāng)于對條件等待隊列中的每個節(jié)點(diǎn)均執(zhí)行一次 signal() 方法,效果就是將條件等待隊列中所有節(jié)點(diǎn)全部移動到同步阻塞隊列中,并喚醒每個節(jié)點(diǎn)的線程。

雖然是把每個節(jié)點(diǎn)的線程都喚醒了,這些線程需要嘗試獲取資源, 但是只有一個線程能夠成功獲取資源,然后從 await() 方法返回;其他獲取資源失敗的線程又都會被加入到同步阻塞隊列中,并在隊列中進(jìn)行自旋;移出隊列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了資源。

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

參考資料

《Java并發(fā)編程藝術(shù)》第5章:Java 中的鎖

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容