【死磕Java并發(fā)】-----J.U.C之AQS:同步狀態(tài)的獲取與釋放

此篇博客所有源碼均來自JDK 1.8

在前面提到過,AQS是構(gòu)建Java同步組件的基礎(chǔ),我們期待它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。AQS的設(shè)計模式采用的模板方法模式,子類通過繼承的方式,實現(xiàn)它的抽象方法來管理同步狀態(tài),對于子類而言它并沒有太多的活要做,AQS提供了大量的模板方法來實現(xiàn)同步,主要是分為三類:獨占式獲取和釋放同步狀態(tài)、共享式獲取和釋放同步狀態(tài)、查詢同步隊列中的等待線程情況。自定義子類使用AQS提供的模板方法就可以實現(xiàn)自己的同步語義。

獨占式

獨占式,同一時刻僅有一個線程持有同步狀態(tài)。

獨占式同步狀態(tài)獲取

acquire(int arg)方法為AQS提供的模板方法,該方法為獨占式獲取同步狀態(tài),但是該方法對中斷不敏感,也就是說由于線程獲取同步狀態(tài)失敗加入到CLH同步隊列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步隊列中移除。代碼如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

各個方法定義如下:

  1. tryAcquire:去嘗試獲取鎖,獲取成功則設(shè)置鎖狀態(tài)并返回true,否則返回false。該方法自定義同步組件自己實現(xiàn),該方法必須要保證線程安全的獲取同步狀態(tài)。
  2. addWaiter:如果tryAcquire返回FALSE(獲取同步狀態(tài)失敗),則調(diào)用該方法將當(dāng)前線程加入到CLH同步隊列尾部。
  3. acquireQueued:當(dāng)前線程會根據(jù)公平性原則來進(jìn)行阻塞等待(自旋),直到獲取鎖為止;并且返回當(dāng)前線程在等待過程中有沒有中斷過。
  4. selfInterrupt:產(chǎn)生一個中斷。

acquireQueued方法為一個自旋的過程,也就是說當(dāng)前線程(Node)進(jìn)入同步隊列后,就會進(jìn)入一個自旋的過程,每個節(jié)點都會自省地觀察,當(dāng)條件滿足,獲取到同步狀態(tài)后,就可以從這個自旋過程中退出,否則會一直執(zhí)行下去。如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中斷標(biāo)志
            boolean interrupted = false;
            /*
             * 自旋過程,其實就是一個死循環(huán)而已
             */
            for (;;) {
                //當(dāng)前線程的前驅(qū)節(jié)點
                final Node p = node.predecessor();
                //當(dāng)前線程的前驅(qū)節(jié)點是頭結(jié)點,且同步狀態(tài)成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //獲取失敗,線程等待--具體后面介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

從上面代碼中可以看到,當(dāng)前線程會一直嘗試獲取同步狀態(tài),當(dāng)然前提是只有其前驅(qū)節(jié)點為頭結(jié)點才能夠嘗試獲取同步狀態(tài),理由:

  1. 保持FIFO同步隊列原則。
  2. 頭節(jié)點釋放同步狀態(tài)后,將會喚醒其后繼節(jié)點,后繼節(jié)點被喚醒后需要檢查自己是否為頭節(jié)點。

acquire(int arg)方法流程圖如下:

獲取同步狀態(tài).png

獨占式獲取響應(yīng)中斷

AQS提供了acquire(int arg)方法以供獨占式獲取同步狀態(tài),但是該方法對中斷不響應(yīng),對線程進(jìn)行中斷操作后,該線程會依然位于CLH同步隊列中等待著獲取同步狀態(tài)。為了響應(yīng)中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態(tài)時,如果當(dāng)前線程被中斷了,會立刻響應(yīng)中斷拋出異常InterruptedException。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

首先校驗該線程是否已經(jīng)中斷了,如果是則拋出InterruptedException,否則執(zhí)行tryAcquire(int arg)方法獲取同步狀態(tài),如果獲取成功,則直接返回,否則執(zhí)行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。1.方法聲明拋出InterruptedException異常,2.在中斷方法處不再是使用interrupted標(biāo)志,而是直接拋出InterruptedException異常。

獨占式超時獲取

AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進(jìn)一步增強,它除了響應(yīng)中斷外,還有超時控制。即如果當(dāng)前線程沒有在指定時間內(nèi)獲取同步狀態(tài),則會返回false,否則返回true。如下:

   public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現(xiàn)的,如下:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //nanosTimeout <= 0
        if (nanosTimeout <= 0L)
            return false;
        //超時時間
        final long deadline = System.nanoTime() + nanosTimeout;
        //新增Node節(jié)點
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            //自旋
            for (;;) {
                final Node p = node.predecessor();
                //獲取同步狀態(tài)成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                /*
                 * 獲取失敗,做超時、中斷判斷
                 */
                //重新計算需要休眠的時間
                nanosTimeout = deadline - System.nanoTime();
                //已經(jīng)超時,返回false
                if (nanosTimeout <= 0L)
                    return false;
                //如果沒有超時,則等待nanosTimeout納秒
                //注:該線程會直接從LockSupport.parkNanos中返回,
                //LockSupport為JUC提供的一個阻塞和喚醒的工具類,后面做詳細(xì)介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //線程是否已經(jīng)中斷了
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

針對超時控制,程序首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。如果獲取同步狀態(tài)失敗,則需要計算出需要休眠的時間間隔nanosTimeout(= deadline - System.nanoTime()),如果nanosTimeout <= 0 表示已經(jīng)超時了,返回false,如果大于spinForTimeoutThreshold(1000L)則需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接進(jìn)入快速自旋的過程。原因在于 spinForTimeoutThreshold 已經(jīng)非常小了,非常短的時間等待無法做到十分精確,如果這時再次進(jìn)行超時等待,相反會讓nanosTimeout 的超時從整體上面表現(xiàn)得不是那么精確,所以在超時非常短的場景中,AQS會進(jìn)行無條件的快速自旋。

整個流程如下:

超時獲取同步狀態(tài).png

獨占式同步狀態(tài)釋放

當(dāng)線程獲取同步狀態(tài)后,執(zhí)行完相應(yīng)邏輯后就需要釋放同步狀態(tài)。AQS提供了release(int arg)方法釋放同步狀態(tài):

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

該方法同樣是先調(diào)用自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態(tài),釋放成功后,會調(diào)用unparkSuccessor(Node node)方法喚醒后繼節(jié)點(如何喚醒LZ后面介紹)。

這里稍微總結(jié)下:

在AQS中維護(hù)著一個FIFO的同步隊列,當(dāng)線程獲取同步狀態(tài)失敗后,則會加入到這個CLH同步隊列的對尾并一直保持著自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅(qū)節(jié)點是否為首節(jié)點,如果為首節(jié)點則不斷嘗試獲取同步狀態(tài),獲取成功則退出CLH同步隊列。當(dāng)線程執(zhí)行完邏輯后,會釋放同步狀態(tài),釋放后會喚醒其后繼節(jié)點。

共享式

共享式與獨占式的最主要區(qū)別在于同一時刻獨占式只能有一個線程獲取同步狀態(tài),而共享式在同一時刻可以有多個線程獲取同步狀態(tài)。例如讀操作可以有多個線程同時進(jìn)行,而寫操作同一時刻只能有一個線程進(jìn)行寫操作,其他操作都會被阻塞。

共享式同步狀態(tài)獲取

AQS提供acquireShared(int arg)方法共享式獲取同步狀態(tài):

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //獲取失敗,自旋獲取同步狀態(tài)
            doAcquireShared(arg);
    }

從上面程序可以看出,方法首先是調(diào)用tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),如果獲取失敗則調(diào)用doAcquireShared(int arg)自旋方式獲取同步狀態(tài),共享式獲取同步狀態(tài)的標(biāo)志是返回 >= 0 的值表示獲取成功。自選式獲取同步狀態(tài)如下:

    private void doAcquireShared(int arg) {
        /共享式節(jié)點
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驅(qū)節(jié)點
                final Node p = node.predecessor();
                //如果其前驅(qū)節(jié)點,獲取同步狀態(tài)
                if (p == head) {
                    //嘗試獲取同步
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),返回值為int,當(dāng)其 >= 0 時,表示能夠獲取到同步狀態(tài),這個時候就可以從自旋過程中退出。

acquireShared(int arg)方法不響應(yīng)中斷,與獨占式相似,AQS也提供了響應(yīng)中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這里就不做解釋了。

共享式同步狀態(tài)釋放

獲取同步狀態(tài)后,需要調(diào)用release(int arg)方法釋放同步狀態(tài),方法如下:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

因為可能會存在多個線程同時進(jìn)行釋放同步狀態(tài)資源,所以需要確保同步狀態(tài)安全地成功釋放,一般都是通過CAS和循環(huán)來完成的。

參考資料

Doug Lea:《Java并發(fā)編程實戰(zhàn)》
方騰飛:《Java并發(fā)編程的藝術(shù)》


個人微信公眾號
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容