Java技術(shù)之AQS詳解

??AQS是AbstractQueuedSynchronizer的簡(jiǎn)稱。AQS提供了一種實(shí)現(xiàn)阻塞鎖和一系列依賴FIFO等待隊(duì)列的同步器的框架,如下圖所示。AQS為一系列同步器依賴于一個(gè)單獨(dú)的原子變量(state)的同步器提供了一個(gè)非常有用的基礎(chǔ)。子類們必須定義改變state變量的protected方法,這些方法定義了state是如何被獲取或釋放的。鑒于此,本類中的其他方法執(zhí)行所有的排隊(duì)和阻塞機(jī)制。子類也可以維護(hù)其他的state變量,但是為了保證同步,必須原子地操作這些變量。

AQS.png

?? AbstractQueuedSynchronizer中對(duì)state的操作是原子的,且不能被繼承。所有的同步機(jī)制的實(shí)現(xiàn)均依賴于對(duì)改變量的原子操作。為了實(shí)現(xiàn)不同的同步機(jī)制,我們需要?jiǎng)?chuàng)建一個(gè)非共有的(non-public internal)擴(kuò)展了AQS類的內(nèi)部輔助類來(lái)實(shí)現(xiàn)相應(yīng)的同步邏輯。AbstractQueuedSynchronizer并不實(shí)現(xiàn)任何同步接口,它提供了一些可以被具體實(shí)現(xiàn)類直接調(diào)用的一些原子操作方法來(lái)重寫(xiě)相應(yīng)的同步邏輯。AQS同時(shí)提供了互斥模式(exclusive)和共享模式(shared)兩種不同的同步邏輯。一般情況下,子類只需要根據(jù)需求實(shí)現(xiàn)其中一種模式,當(dāng)然也有同時(shí)實(shí)現(xiàn)兩種模式的同步類,如ReadWriteLock。接下來(lái)將詳細(xì)介紹AbstractQueuedSynchronizer的提供的一些具體實(shí)現(xiàn)方法。

state狀態(tài)

??AbstractQueuedSynchronizer維護(hù)了一個(gè)volatile int類型的變量,用戶表示當(dāng)前同步狀態(tài)。volatile雖然不能保證操作的原子性,但是保證了當(dāng)前變量state的可見(jiàn)性。至于volatile的具體語(yǔ)義,可以參考相關(guān)文章。state的訪問(wèn)方式有三種:

  • getState()
  • setState()
  • compareAndSetState()

??這三種叫做均是原子操作,其中compareAndSetState的實(shí)現(xiàn)依賴于Unsafe的compareAndSwapInt()方法。代碼實(shí)現(xiàn)如下:

    /**
     * The synchronization state.
     */
    private volatile int state;
  
    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

自定義資源共享方式

??AQS定義兩種資源共享方式:Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock)和Share(共享,多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch)。
??不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了。自定義同步器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:

  • isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。

源碼實(shí)現(xiàn)

??接下來(lái)我們開(kāi)始開(kāi)始講解AQS的源碼實(shí)現(xiàn)。依照acquire-release、acquireShared-releaseShared的次序來(lái)。

1. acquire(int)

? ? acquire是一種以獨(dú)占方式獲取資源,如果獲取到資源,線程直接返回,否則進(jìn)入等待隊(duì)列,直到獲取到資源為止,且整個(gè)過(guò)程忽略中斷的影響。該方法是獨(dú)占模式下線程獲取共享資源的頂層入口。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了。下面是acquire()的源碼:

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

??通過(guò)注釋我們知道,acquire方法是一種互斥模式,且忽略中斷。該方法至少執(zhí)行一次tryAcquire(int)方法,如果tryAcquire(int)方法返回true,則acquire直接返回,否則當(dāng)前線程需要進(jìn)入隊(duì)列進(jìn)行排隊(duì)。函數(shù)流程如下:

  1. tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  2. addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
  3. acquireQueued()使線程在等待隊(duì)列中獲取資源,一直獲取到資源后才返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),則返回true,否則返回false。
  4. 如果線程在等待過(guò)程中被中斷過(guò),它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。

接下來(lái)一次介紹相關(guān)方法。

1.1 tryAcquire(int)

?? tryAcquire嘗試以獨(dú)占的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用于實(shí)現(xiàn)Lock中的tryLock()方法。該方法的默認(rèn)實(shí)現(xiàn)是拋出UnsupportedOperationException,具體實(shí)現(xiàn)由自定義的擴(kuò)展了AQS的同步類來(lái)實(shí)現(xiàn)。AQS在這里只負(fù)責(zé)定義了一個(gè)公共的方法框架。這里之所以沒(méi)有定義成abstract,是因?yàn)楠?dú)占模式下只用實(shí)現(xiàn)tryAcquire-tryRelease,而共享模式下只用實(shí)現(xiàn)tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個(gè)模式也要去實(shí)現(xiàn)另一模式下的接口。

/**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
1.2 addWaiter(Node)

??該方法用于將當(dāng)前線程根據(jù)不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待隊(duì)列的隊(duì)尾,并返回當(dāng)前線程所在的結(jié)點(diǎn)。如果隊(duì)列不為空,則以通過(guò)compareAndSetTail方法以CAS的方式將當(dāng)前線程節(jié)點(diǎn)加入到等待隊(duì)列的末尾。否則,通過(guò)enq(node)方法初始化一個(gè)等待隊(duì)列,并返回當(dāng)前節(jié)點(diǎn)。源碼如下:

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
1.2.1 enq(node)

??enq(node)用于將當(dāng)前節(jié)點(diǎn)插入等待隊(duì)列,如果隊(duì)列為空,則初始化當(dāng)前隊(duì)列。整個(gè)過(guò)程以CAS自旋的方式進(jìn)行,直到成功加入隊(duì)尾為止。源碼如下:

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
1.3 acquireQueued(Node, int)

??acquireQueued()用于隊(duì)列中的線程自旋地以獨(dú)占且不可中斷的方式獲取同步狀態(tài)(acquire),直到拿到鎖之后再返回。該方法的實(shí)現(xiàn)分成兩部分:如果當(dāng)前節(jié)點(diǎn)已經(jīng)成為頭結(jié)點(diǎn),嘗試獲取鎖(tryAcquire)成功,然后返回;否則檢查當(dāng)前節(jié)點(diǎn)是否應(yīng)該被park,然后將該線程park并且檢查當(dāng)前線程是否被可以被中斷。

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        //標(biāo)記是否成功拿到資源,默認(rèn)false
        boolean failed = true;
        try {
            boolean interrupted = false;//標(biāo)記等待過(guò)程中是否被中斷過(guò)
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
1.3.1 shouldParkAfterFailedAcquire(Node, Node)

??shouldParkAfterFailedAcquire方法通過(guò)對(duì)當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的狀態(tài)進(jìn)行判斷,對(duì)當(dāng)前節(jié)點(diǎn)做出不同的操作,至于每個(gè)Node的狀態(tài)表示,可以參考接口文檔。

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
1.3.2 parkAndCheckInterrupt()

??該方法讓線程去休息,真正進(jìn)入等待狀態(tài)。park()會(huì)讓當(dāng)前線程進(jìn)入waiting狀態(tài)。在此狀態(tài)下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會(huì)清除當(dāng)前線程的中斷標(biāo)記位。

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

??我們?cè)倩氐絘cquireQueued(),總結(jié)下該函數(shù)的具體流程:

  1. 結(jié)點(diǎn)進(jìn)入隊(duì)尾后,檢查狀態(tài),找到安全休息點(diǎn);
  2. 調(diào)用park()進(jìn)入waiting狀態(tài),等待unpark()或interrupt()喚醒自己;
  3. 被喚醒后,看自己是不是有資格能拿到號(hào)。如果拿到,head指向當(dāng)前結(jié)點(diǎn),并返回從入隊(duì)到拿到號(hào)的整個(gè)過(guò)程中是否被中斷過(guò);如果沒(méi)拿到,繼續(xù)流程1。

最后,總結(jié)一下acquire()的流程:

  1. 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  2. 沒(méi)成功,則addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
  3. acquireQueued()使線程在等待隊(duì)列中休息,有機(jī)會(huì)時(shí)(輪到自己,會(huì)被unpark())會(huì)去嘗試獲取資源。獲取到資源后才返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),則返回true,否則返回false。
  4. 如果線程在等待過(guò)程中被中斷過(guò),它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。

2. release(int)

??release(int)方法是獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。這也正是unlock()的語(yǔ)義,當(dāng)然不僅僅只限于unlock()。下面是release()的源碼:

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

/**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

??與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨(dú)占模式的自定義同步器去實(shí)現(xiàn)的。正常來(lái)說(shuō),tryRelease()都會(huì)成功的,因?yàn)檫@是獨(dú)占模式,該線程來(lái)釋放資源,那么它肯定已經(jīng)拿到獨(dú)占資源了,直接減掉相應(yīng)量的資源即可(state-=arg),也不需要考慮線程安全的問(wèn)題。但要注意它的返回值,上面已經(jīng)提到了,release()是根據(jù)tryRelease()的返回值來(lái)判斷該線程是否已經(jīng)完成釋放掉資源了!所以自義定同步器在實(shí)現(xiàn)時(shí),如果已經(jīng)徹底釋放資源(state=0),要返回true,否則返回false。
??unparkSuccessor(Node)方法用于喚醒等待隊(duì)列中下一個(gè)線程。這里要注意的是,下一個(gè)線程并不一定是當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn),而是下一個(gè)可以用來(lái)喚醒的線程,如果這個(gè)節(jié)點(diǎn)存在,調(diào)用unpark()方法喚醒。
??總之,release()是獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。

3. acquireShared(int)

??acquireShared(int)方法是共享模式下線程獲取共享資源的頂層入口。它會(huì)獲取指定量的資源,獲取成功則直接返回,獲取失敗則進(jìn)入等待隊(duì)列,直到獲取到資源為止,整個(gè)過(guò)程忽略中斷。下面是acquireShared()的源碼:

/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }


3.1 doAcquireShared(int)

??將當(dāng)前線程加入等待隊(duì)列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回。源碼如下:

 /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

??跟獨(dú)占模式比,還有一點(diǎn)需要注意的是,這里只有線程是head.next時(shí)(“老二”),才會(huì)去嘗試獲取資源,有剩余的話還會(huì)喚醒之后的隊(duì)友。那么問(wèn)題就來(lái)了,假如老大用完后釋放了5個(gè)資源,而老二需要6個(gè),老三需要1個(gè),老四需要2個(gè)。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會(huì)繼續(xù)park()等待其他線程釋放資源,也更不會(huì)去喚醒老三和老四了。獨(dú)占模式,同一時(shí)刻只有一個(gè)線程去執(zhí)行,這樣做未嘗不可;但共享模式下,多個(gè)線程是可以同時(shí)執(zhí)行的,現(xiàn)在因?yàn)槔隙馁Y源需求量大,而把后面量小的老三和老四也都卡住了。當(dāng)然,這并不是問(wèn)題,只是AQS保證嚴(yán)格按照入隊(duì)順序喚醒罷了(保證公平,但降低了并發(fā))。實(shí)現(xiàn)如下:

/**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

??此方法在setHead()的基礎(chǔ)上多了一步,就是自己蘇醒的同時(shí),如果條件符合(比如還有剩余資源),還會(huì)去喚醒后繼結(jié)點(diǎn),畢竟是共享模式!至此,acquireShared()也要告一段落了。讓我們?cè)偈崂硪幌滤牧鞒蹋?/p>

  1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
  2. 失敗則通過(guò)doAcquireShared()進(jìn)入等待隊(duì)列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個(gè)等待過(guò)程也是忽略中斷的。

4. releaseShared(int)

??releaseShared(int)方法是共享模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。下面是releaseShared()的源碼:

/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

??此方法的流程也比較簡(jiǎn)單,一句話:釋放掉資源后,喚醒后繼。跟獨(dú)占模式下的release()相似,但有一點(diǎn)稍微需要注意:獨(dú)占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會(huì)返回true去喚醒其他線程,這主要是基于獨(dú)占下可重入的考量;而共享模式下的releaseShared()則沒(méi)有這種要求,共享模式實(shí)質(zhì)就是控制一定量的線程并發(fā)執(zhí)行,那么擁有資源的線程在釋放掉部分資源時(shí)就可以喚醒后繼等待結(jié)點(diǎn)。

/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

參考:https://www.cnblogs.com/waterystone/p/4920797.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容