魔鬼在細節(jié),理解Java并發(fā)底層之AQS實現(xiàn)

jdk的JUC包(java.util.concurrent)提供大量Java并發(fā)工具提供使用,基本由Doug Lea編寫,很多地方值得學(xué)習(xí)和借鑒,是進階升級必經(jīng)之路

本文從JUC包中常用的對象鎖、并發(fā)工具的使用和功能特性入手,帶著問題,由淺到深,一步步剖析并發(fā)底層AQS抽象類具體實現(xiàn)

名詞解釋

1 AQS

AQS是一個抽象類,類全路徑j(luò)ava.util.concurrent.locks.AbstractQueuedSynchronizer,抽象隊列同步器,是基于模板模式開發(fā)的并發(fā)工具抽象類,有如下并發(fā)類基于AQS實現(xiàn):

2 CAS

CAS是Conmpare And Swap(比較和交換)的縮寫,是一個原子操作指令

CAS機制當(dāng)中使用了3個基本操作數(shù):內(nèi)存地址addr,預(yù)期舊的值oldVal,要修改的新值newVal
更新一個變量的時候,只有當(dāng)變量的預(yù)期值oldVal和內(nèi)存地址addr當(dāng)中的實際值相同時,才會將內(nèi)存地址addr對應(yīng)的值修改為newVal

基于樂觀鎖的思路,通過CAS再不斷嘗試和比較,可以對變量值線程安全地更新

3 線程中斷

線程中斷是一種線程協(xié)作機制,用于協(xié)作其他線程中斷任務(wù)的執(zhí)行

當(dāng)線程處于阻塞等待狀態(tài),例如調(diào)用了wait()、join()、sleep()方法之后,調(diào)用線程的interrupt()方法之后,線程會馬上退出阻塞并收到InterruptedException;

當(dāng)線程處于運行狀態(tài),調(diào)用線程的interrupt()方法之后,線程并不會馬上中斷執(zhí)行,需要在線程的具體任務(wù)執(zhí)行邏輯中通過調(diào)用isInterrupted() 方法檢測線程中斷標志位,然后主動響應(yīng)中斷,通常是拋出InterruptedException

對象鎖特性

下面先介紹對象鎖、并發(fā)工具有哪些基本特性,后面再逐步展開這些特性如何實現(xiàn)

1 顯式獲取

以ReentrantLock鎖為例,主要支持以下4種方式顯式獲取鎖

  • (1) 阻塞等待獲取
ReentrantLock lock = new ReentrantLock();
// 一直阻塞等待,直到獲取成功
lock.lock();
  • (2) 無阻塞嘗試獲取
ReentrantLock lock = new ReentrantLock();
// 嘗試獲取鎖,如果鎖已被其他線程占用,則不阻塞等待直接返回false
// 返回true - 鎖是空閑的且被本線程獲取,或者已經(jīng)被本線程持有
// 返回false - 獲取鎖失敗
boolean isGetLock = lock.tryLock();
  • (3) 指定時間內(nèi)阻塞等待獲取
ReentrantLock lock = new ReentrantLock();
try {
    // 嘗試在指定時間內(nèi)獲取鎖
    // 返回true - 鎖是空閑的且被本線程獲取,或者已經(jīng)被本線程持有
    // 返回false - 指定時間內(nèi)未獲取到鎖
    lock.tryLock(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    // 內(nèi)部調(diào)用isInterrupted() 方法檢測線程中斷標志位,主動響應(yīng)中斷
    e.printStackTrace();
}
  • (4) 響應(yīng)中斷獲取
ReentrantLock lock = new ReentrantLock();
try {
    // 響應(yīng)中斷獲取鎖
    // 如果調(diào)用線程的thread.interrupt()方法設(shè)置線程中斷,線程退出阻塞等待并拋出中斷異常
    lock.lockInterruptibly();
} catch (InterruptedException e) {
    e.printStackTrace();
}

2 顯式釋放

ReentrantLock lock = new ReentrantLock();
lock.lock();
// ... 各種業(yè)務(wù)操作
// 顯式釋放鎖
lock.unlock();

3 可重入

已經(jīng)獲取到鎖的線程,再次請求該鎖可以直接獲得

4 可共享

指同一個資源允許多個線程共享,例如讀寫鎖的讀鎖允許多個線程共享,共享鎖可以讓多個線程并發(fā)安全地訪問數(shù)據(jù),提高程序執(zhí)效率

5 公平、非公平

公平鎖:多個線程采用先到先得的公平方式競爭鎖。每次加鎖前都會檢查等待隊列里面有沒有線程排隊,沒有才會嘗試獲取鎖。
非公平鎖:當(dāng)一個線程采用非公平的方式獲取鎖時,該線程會首先去嘗試獲取鎖而不是等待。如果沒有獲取成功,才會進入等待隊列

因為非公平鎖方式可以使后來的線程有一定幾率直接獲取鎖,減少了線程掛起等待的幾率,性能優(yōu)于公平鎖

AQS實現(xiàn)原理

1 基本概念

(1) Condition接口

類似Object的wait()、wait(long timeout)、notify()以及notifyAll()的方法結(jié)合synchronized內(nèi)置鎖可以實現(xiàn)可以實現(xiàn)等待/通知模式,實現(xiàn)Lock接口的ReentrantLock、ReentrantReadWriteLock等對象鎖也有類似功能:

Condition接口定義了await()、awaitNanos(long)、signal()、signalAll()等方法,配合對象鎖實例實現(xiàn)等待/通知功能,原理是基于AQS內(nèi)部類ConditionObject實現(xiàn)Condition接口,線程await后阻塞并進入CLH隊列(下面提到),等待其他線程調(diào)用signal方法后被喚醒

(2) CLH隊列

CLH隊列,CLH是算法提出者Craig, Landin, Hagersten的名字簡稱

AQS內(nèi)部維護著一個雙向FIFO的CLH隊列,AQS依賴它來管理等待中的線程,如果線程獲取同步競爭資源失敗時,會將線程阻塞,并加入到CLH同步隊列;當(dāng)競爭資源空閑時,基于CLH隊列阻塞線程并分配資源

CLH的head節(jié)點保存當(dāng)前占用資源的線程,或者是沒有線程信息,其他節(jié)點保存排隊線程信息

CLH

CLH中每一個節(jié)點的狀態(tài)(waitStatus)取值如下:

  • CANCELLED(1):表示當(dāng)前節(jié)點已取消調(diào)度。當(dāng)timeout或被中斷(響應(yīng)中斷的情況下),會觸發(fā)變更為此狀態(tài),進入該狀態(tài)后的節(jié)點將不會再變化
  • SIGNAL(-1):表示后繼節(jié)點在等待當(dāng)前節(jié)點喚醒。后繼節(jié)點入隊后進入休眠狀態(tài)之前,會將前驅(qū)節(jié)點的狀態(tài)更新為SIGNAL
  • CONDITION(-2):表示節(jié)點等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的節(jié)點將從等待隊列轉(zhuǎn)移到同步隊列中,等待獲取同步鎖
  • PROPAGATE(-3):共享模式下,前驅(qū)節(jié)點不僅會喚醒其后繼節(jié)點,同時也可能會喚醒后繼的后繼節(jié)點
  • 0:新節(jié)點入隊時的默認狀態(tài)

(3) 資源共享方式

AQS定義兩種資源共享方式:
Exclusive 獨占,只有一個線程能執(zhí)行,如ReentrantLock
Share 共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch

(4) 阻塞/喚醒線程的方式

AQS 基于sun.misc.Unsafe類提供的park方法阻塞線程,unpark方法喚醒線程,被park方法阻塞的線程能響應(yīng)interrupt()中斷請求退出阻塞

2 基本設(shè)計

核心設(shè)計思路:AQS提供一個框架,用于實現(xiàn)依賴于CLH隊列的阻塞鎖和相關(guān)的并發(fā)同步器。子類通過實現(xiàn)判定是否能獲取/釋放資源的protect方法,AQS基于這些protect方法實現(xiàn)對線程的排隊、喚醒的線程調(diào)度策略

AQS還提供一個支持線程安全原子更新的int類型變量作為同步狀態(tài)值(state),子類可以根據(jù)實際需求,靈活定義該變量代表的意義進行更新

通過子類重新定義的系列protect方法如下:

  • boolean tryAcquire(int) 獨占方式嘗試獲取資源,成功則返回true,失敗則返回false
  • boolean tryRelease(int) 獨占方式嘗試釋放資源,成功則返回true,失敗則返回false
  • int tryAcquireShared(int) 共享方式嘗試獲取資源。負數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源
  • boolean tryReleaseShared(int) 共享方式嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待節(jié)點返回true,否則返回false

這些方法始終由需要需要調(diào)度協(xié)作的線程來調(diào)用,子類須以非阻塞的方式重新定義這些方法

AQS基于上述tryXXX方法,對外提供下列方法來獲取/釋放資源:

  • void acquire(int) 獨占方式獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響
  • boolean release(int) 獨占方式下線程釋放資源,先釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源
  • void acquireShared(int) 共享方式獲取資源
  • boolean releaseShared(int) 共享方式釋放資源

以獨占模式為例:獲取/釋放資源的核心的實現(xiàn)如下:

 Acquire:
     while (!tryAcquire(arg)) {
        如果線程尚未排隊,則將其加入隊列;
     }

 Release:
     if (tryRelease(arg))
        喚醒CLH中第一個排隊線程

到這里,有點繞,下面一張圖把上面介紹到的設(shè)計思路再重新捋一捋:

AQS基本設(shè)計

特性實現(xiàn)

下面介紹基于AQS的對象鎖、并發(fā)工具的一系列功能特性的實現(xiàn)原理

1 顯式獲取

該特性還是以ReentrantLock鎖為例,ReentrantLock是可重入對象鎖,線程每次請求獲取成功一次鎖,同步狀態(tài)值state加1,釋放鎖state減1,state為0代表沒有任何線程持有鎖

ReentrantLock鎖支持公平/非公平特性,下面的顯式獲取特性以公平鎖為例

(1) 阻塞等待獲取

基本實現(xiàn)如下:

  • 1、ReentrantLock實現(xiàn)AQS的tryAcquire(int)方法,先判斷:如果沒有任何線程持有鎖,或者當(dāng)前線程已經(jīng)持有鎖,則返回true,否則返回false
  • 2、AQS的acquire(int)方法判斷當(dāng)前節(jié)點是否為head且基于tryAcquire(int)能否獲得資源,如果不能獲得,則加入CLH隊列排隊阻塞等待
  • 3、ReentrantLock的lock()方法基于AQS的acquire(int)方法阻塞等待獲取鎖

ReentrantLock中的tryAcquire(int)方法實現(xiàn):

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 沒有任何線程持有鎖
    if (c == 0) {
        // 通過CLH隊列的head判斷沒有別的線程在比當(dāng)前更早acquires
        // 且基于CAS設(shè)置state成功(期望的state舊值為0)
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 設(shè)置持有鎖的線程為當(dāng)前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 持有鎖的線程為當(dāng)前線程
    else if (current == getExclusiveOwnerThread()) {
        // 僅僅在當(dāng)前線程,單線程,不用基于CAS更新
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 其他線程已經(jīng)持有鎖
    return false;
}

AQS的acquire(int)方法實現(xiàn)

public final void acquire(int arg) {
        // tryAcquire檢查釋放能獲取成功
        // addWaiter 構(gòu)建CLH的節(jié)點對象并入隊
        // acquireQueued線程阻塞等待
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // acquireQueued返回true,代表線程在獲取資源的過程中被中斷
        // 則調(diào)用該方法將線程中斷標志位設(shè)置為true
        selfInterrupt();
}


final boolean acquireQueued(final Node node, int arg) {
    // 標記是否成功拿到資源
    boolean failed = true;
    try {
        // 標記等待過程中是否被中斷過
        boolean interrupted = false;
        // 循環(huán)直到資源釋放
        for (;;) {
            // 拿到前驅(qū)節(jié)點
            final Node p = node.predecessor();
            
            // 如果前驅(qū)是head,即本節(jié)點是第二個節(jié)點,才有資格去嘗試獲取資源
            // 可能是head釋放完資源喚醒本節(jié)點,也可能被interrupt()
            if (p == head && tryAcquire(arg)) {
                // 成功獲取資源
                setHead(node);
                // help GC
                p.next = null; 
                failed = false;
                return interrupted;
            }
            
            // 需要排隊阻塞等待
            // 如果在過程中線程中斷,不響應(yīng)中斷
            // 且繼續(xù)排隊獲取資源,設(shè)置interrupted變量為true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

(2) 無阻塞嘗試獲取

ReentrantLock中的tryLock()的實現(xiàn)僅僅是非公平鎖實現(xiàn),實現(xiàn)邏輯基本與tryAcquire一致,不同的是沒有通過hasQueuedPredecessors()檢查CLH隊列的head是否有其他線程在等待,這樣當(dāng)資源釋放時,有線程請求資源能插隊優(yōu)先獲取

ReentrantLock中tryLock()具體實現(xiàn)如下:

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 沒有任何線程持有鎖
    if (c == 0) {
        // 基于CAS設(shè)置state成功(期望的state舊值為0)
        // 沒有檢查CLH隊列中是否有線程在等待
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 持有鎖的線程為當(dāng)前線程
    else if (current == getExclusiveOwnerThread()) {
        // 僅僅在當(dāng)前線程,單線程,不用基于CAS更新
        int nextc = c + acquires;
        if (nextc < 0) // overflow,整數(shù)溢出
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 其他線程已經(jīng)持有鎖
    return false;
}

(3) 指定時間內(nèi)阻塞等待獲取

基本實現(xiàn)如下:

  • 1、ReentrantLock的tryLock(long, TimeUnit)調(diào)用AQS的tryAcquireNanos(int, long)方法
  • 2、AQS的tryAcquireNanos先調(diào)用tryAcquire(int)嘗試獲取,獲取不到再調(diào)用doAcquireNanos(int, long)方法
  • 3、AQS的doAcquireNanos判斷當(dāng)前節(jié)點是否為head且基于tryAcquire(int)能否獲得資源,如果不能獲得且超時時間大于1微秒,則休眠一段時間后再嘗試獲取

ReentrantLock中的實現(xiàn)如下:

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果線程已經(jīng)被interrupt()方法設(shè)置中斷 
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先tryAcquire嘗試獲取鎖 
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

AQS中的實現(xiàn)如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 獲取到資源的截止時間   
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    // 標記是否成功拿到資源
    boolean failed = true;
    try {
        for (;;) {
            // 拿到前驅(qū)節(jié)點
            final Node p = node.predecessor();
            // 如果前驅(qū)是head,即本節(jié)點是第二個節(jié)點,才有資格去嘗試獲取資源
            // 可能是head釋放完資源喚醒本節(jié)點,也可能被interrupt()
            if (p == head && tryAcquire(arg)) {
                // 成功獲取資源
                setHead(node);
                // help GC
                p.next = null; 
                failed = false;
                return true;
            }
            // 更新剩余超時時間
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 排隊是否需要排隊阻塞等待 
            // 且超時時間大于1微秒,則線程休眠到超時時間到了再嘗試獲取
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);

            // 如果線程已經(jīng)被interrupt()方法設(shè)置中斷
            // 則不再排隊,直接退出   
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

(4) 響應(yīng)中斷獲取

ReentrantLock響應(yīng)中斷獲取鎖的方式是:當(dāng)線程在park方法休眠中響應(yīng)thead.interrupt()方法中斷喚醒時,檢查到線程中斷標志位為true,主動拋出異常,核心實現(xiàn)在AQS的doAcquireInterruptibly(int)方法中

基本實現(xiàn)與阻塞等待獲取類似,只是調(diào)用從AQS的acquire(int)方法,改為調(diào)用AQS的doAcquireInterruptibly(int)方法

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    // 標記是否成功拿到資源
    boolean failed = true;
    try {
        for (;;) {
            // 拿到前驅(qū)節(jié)點
            final Node p = node.predecessor();
            
            // 如果前驅(qū)是head,即本節(jié)點是第二個節(jié)點,才有資格去嘗試獲取資源
            // 可能是head釋放完資源喚醒本節(jié)點,也可能被interrupt()
            if (p == head && tryAcquire(arg)) {
                // 成功獲取資源
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            
            // 需要排隊阻塞等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 從排隊阻塞中喚醒,如果檢查到中斷標志位為true
                parkAndCheckInterrupt())
                // 主動響應(yīng)中斷
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2 顯式釋放

AQS資源共享方式分為獨占式和共享式,這里先以ReentrantLock為例介紹獨占式資源的顯式釋放,共享式后面會介紹到

與顯式獲取有類似之處,ReentrantLock顯式釋放基本實現(xiàn)如下:

  • 1、ReentrantLock實現(xiàn)AQS的tryRelease(int)方法,方法將state變量減1,如果state變成0代表沒有任何線程持有鎖,返回true,否則返回false
  • 2、AQS的release(int)方法基于tryRelease(int)排隊是否有任何線程持有資源,如果沒有,則喚醒CLH隊列中頭節(jié)點的線程
  • 3、被喚醒后的線程繼續(xù)執(zhí)行acquireQueued(Node,int)或者doAcquireNanos(int, long)或者doAcquireInterruptibly(int)中for(;;)中的邏輯,繼續(xù)嘗試獲取資源

ReentrantLock中tryRelease(int)方法實現(xiàn)如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 只有持有鎖的線程才有資格釋放鎖
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
        
    // 標識是否沒有任何線程持有鎖    
    boolean free = false;
    
    // 沒有任何線程持有鎖
    // 可重入鎖每lock一次都需要對應(yīng)一次unlock
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

AQS中的release(int)方法實現(xiàn)如下:

public final boolean release(int arg) {
    // 嘗試釋放資源
    if (tryRelease(arg)) {
        Node h = head;
        // 頭節(jié)點不為空
        // 后繼節(jié)點入隊后進入休眠狀態(tài)之前,會將前驅(qū)節(jié)點的狀態(tài)更新為SIGNAL(-1)
        // 頭節(jié)點狀態(tài)為0,代表沒有后繼的等待節(jié)點
        if (h != null && h.waitStatus != 0)
            // 喚醒第二個節(jié)點
            // 頭節(jié)點是占用資源的線程,第二個節(jié)點才是首個等待資源的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

3 可重入

可重入的實現(xiàn)比較簡單,以ReentrantLock為例,主要是在tryAcquire(int)方法中實現(xiàn),持有鎖的線程是不是當(dāng)前線程,如果是,更新同步狀態(tài)值state,并返回true,代表能獲取鎖

4 可共享

可共享資源以ReentrantReadWriteLock為例,跟獨占鎖ReentrantLock的區(qū)別主要在于,獲取的時候,多個線程允許共享讀鎖,當(dāng)寫鎖釋放時,多個阻塞等待讀鎖的線程能同時獲取到

ReentrantReadWriteLock類中將AQS的state同步狀態(tài)值定義為,高16位為讀鎖持有數(shù),低16位為寫鎖持有鎖

ReentrantReadWriteLock中tryAcquireShared(int)、tryReleaseShared(int)實現(xiàn)的邏輯較長,主要涉及讀寫互斥、可重入判斷、讀鎖對寫鎖的讓步,篇幅所限,這里就不展開了

獲取讀鎖(ReadLock.lock())主要實現(xiàn)如下

  • 1、ReentrantReadWriteLock實現(xiàn)AQS的tryAcquireShared(int)方法,判斷當(dāng)前線程能否獲得讀鎖
  • 2、AQS的acquireShared(int)先基于tryAcquireShared(int)嘗試獲取資源,如果獲取失敗,則加入CLH隊列排隊阻塞等待
  • 3、ReentrantReadWriteLock的ReadLock.lock()方法基于AQS的acquireShared(int)方法阻塞等待獲取鎖

AQS共享模式獲取資源的具體實現(xiàn)如下:

public final void acquireShared(int arg) {
    // tryAcquireShared返回負數(shù)代表獲取共享資源失敗
    // 則通過進入等待隊列,直到獲取到資源為止才返回
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 與前面介紹到的acquireQueued邏輯基本一致
// 不同的是將tryAcquire改為tryAcquireShared
// 還有資源獲取成功后將傳播給CLH隊列上等待該資源的節(jié)點
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
     // 標記是否成功拿到資源
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 資源獲取成功
                if (r >= 0) {
                    // 傳播給CLH隊列上等待該資源的節(jié)點                             
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 需要排隊阻塞等待
            // 如果在過程中線程中斷,不響應(yīng)中斷
            // 且繼續(xù)排隊獲取資源,設(shè)置interrupted變量為true
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

//  資源傳播給CLH隊列上等待該資源的節(jié)點 
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 釋放共享資源
            doReleaseShared();
    }
}

釋放讀鎖(ReadLock.unlock())主要實現(xiàn)如下
ReentrantReadWriteLock共享資源的釋放主要實現(xiàn)如下:

  • 1、ReentrantReadWriteLock實現(xiàn)AQS的tryReleaseShared(int)方法,判斷讀鎖釋放后是否還有線程持有讀鎖
  • 2、AQS的releaseShared(int)基于tryReleaseShared(int)判斷是否需要CLH隊列中的休眠線程,如果需要就執(zhí)行doReleaseShared()
  • 3、ReentrantReadWriteLock的ReadLock.unlock()方法基于AQS的releaseShared(int)方法釋放鎖

AQS共享模式釋放資源具體實現(xiàn)如下:

public final boolean releaseShared(int arg) {
    // 允許喚醒CLH中的休眠線程
    if (tryReleaseShared(arg)) {
        // 執(zhí)行資源釋放
        doReleaseShared();
        return true;
    }
    return false;
}
    
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 當(dāng)前節(jié)點正在等待資源
            if (ws == Node.SIGNAL) {
                // 當(dāng)前節(jié)點被其他線程喚醒了
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                unparkSuccessor(h);
            }
            // 進入else的條件是,當(dāng)前節(jié)點剛剛成為頭節(jié)點
            // 尾節(jié)點剛剛加入CLH隊列,還沒在休眠前將前驅(qū)節(jié)點狀態(tài)改為SIGNAL
            // CAS失敗是尾節(jié)點已經(jīng)在休眠前將前驅(qū)節(jié)點狀態(tài)改為SIGNAL
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        // 每次喚醒后驅(qū)節(jié)點后,線程進入doAcquireShared方法,然后更新head
        // 如果h變量在本輪循環(huán)中沒有被改變,說明head == tail,隊列中節(jié)點全部被喚醒
        if (h == head)                 
            break;
    }
}

5 公平、非公平

這個特性實現(xiàn)比較簡單,以ReentrantLock鎖為例,公平鎖直接基于AQS的acquire(int)獲取資源,而非公平鎖先嘗試插隊:基于CAS,期望state同步變量值為0(沒有任何線程持有鎖),更新為1,如果全部CAS更新失敗再進行排隊

// 公平鎖實現(xiàn)
final void lock() {
    acquire(1);
}

// 非公平鎖實現(xiàn)
final void lock() {
    // 第1次CAS
    // state值為0代表沒有任何線程持有鎖,直接插隊獲得鎖
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// 在nonfairTryAcquire方法中再次CAS嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 第2次CAS嘗試獲取鎖
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 已經(jīng)獲得鎖
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

總結(jié)

AQS的state變量值的含義不一定代表資源,不同的AQS的繼承類可以對state變量值有不同的定義

例如在countDownLatch類中,state變量值代表還需釋放的latch計數(shù)(可以理解為需要打開的門閂數(shù)),需要每個門閂都打開,門才能打開,所有等待線程才會開始執(zhí)行,每次countDown()就會對state變量減1,如果state變量減為0,則喚醒CLH隊列中的休眠線程

學(xué)習(xí)類似底層源碼建議先定幾個問題,帶著問題學(xué)習(xí);通俗學(xué)習(xí)前建議先理解透徹整體設(shè)計,整體原理(可以先閱讀相關(guān)文檔資料),再研究和源碼細節(jié),避免一開始就扎進去源碼,容易無功而返

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容