AQS原理及源碼分析

AQS是一個(gè)用來(lái)構(gòu)建鎖和同步器的框架,使用AQS能簡(jiǎn)單且高效地構(gòu)造出應(yīng)用廣泛的大量的同步器,比如ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的。

1. AQS核心思想

AQS核心思想是,如果被請(qǐng)求的共享資源空閑,則將當(dāng)前請(qǐng)求資源的線程設(shè)置為有效的工作線程,并且將共享資源設(shè)置為鎖定狀態(tài)。如果被請(qǐng)求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時(shí)鎖分配的機(jī)制,這個(gè)機(jī)制AQS是用CLH隊(duì)列鎖實(shí)現(xiàn)的,即將暫時(shí)獲取不到鎖的線程加入到隊(duì)列中。

CLH(Craig,Landin,and Hagersten)隊(duì)列是一個(gè)虛擬的雙向隊(duì)列(虛擬的雙向隊(duì)列即不存在隊(duì)列實(shí)例,僅存在結(jié)點(diǎn)之間的關(guān)聯(lián)關(guān)系)。AQS是將每條請(qǐng)求共享資源的線程封裝成一個(gè)CLH鎖隊(duì)列的一個(gè)結(jié)點(diǎn)(Node)來(lái)實(shí)現(xiàn)鎖的分配。其中Sync queue,即同步隊(duì)列,是雙向鏈表,包括head結(jié)點(diǎn)和tail結(jié)點(diǎn),head結(jié)點(diǎn)主要用作后續(xù)的調(diào)度。而Condition queue不是必須的,其是一個(gè)單向鏈表,只有當(dāng)使用Condition時(shí),才會(huì)存在此單向鏈表。并且可能會(huì)有多個(gè)Condition queue。


AQS使用一個(gè)int成員變量來(lái)表示同步狀態(tài),通過(guò)內(nèi)置的FIFO隊(duì)列來(lái)完成獲取資源線程的排隊(duì)工作。AQS使用CAS對(duì)該同步狀態(tài)進(jìn)行原子操作實(shí)現(xiàn)對(duì)其值的修改。

private volatile int state; // 共享變量,使用volatile修飾保證線程可見(jiàn)性

狀態(tài)信息通過(guò)procted類型的getState,setState,compareAndSetState進(jìn)行操作

//  返回同步狀態(tài)的當(dāng)前值
protected final int getState() {  
        return state;
}
 // 設(shè)置同步狀態(tài)的值
protected final void setState(int newState) { 
        state = newState;
}
//原子地(CAS操作)將同步狀態(tài)值設(shè)置為給定值update如果當(dāng)前同步狀態(tài)的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2. AQS對(duì)資源共享的方式

AQS定義兩種資源共享方式

  1. Exclusive(獨(dú)占):只有一個(gè)線程能執(zhí)行,如ReentrantLock。又可分為公平鎖和非公平鎖:
    • 公平鎖:按照線程在隊(duì)列中的排隊(duì)順序,先到者先拿到鎖
    • 非公平鎖:當(dāng)線程要獲取鎖時(shí),無(wú)視隊(duì)列順序直接去搶鎖,誰(shuí)搶到就是誰(shuí)的
  2. Share(共享):多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 。

ReentrantReadWriteLock 可以看成是組合式,因?yàn)镽eentrantReadWriteLock也就是讀寫(xiě)鎖允許多個(gè)線程同時(shí)對(duì)某一資源進(jìn)行讀。

不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源 state 的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在上層已經(jīng)實(shí)現(xiàn)好了。

3. AQS使用的設(shè)計(jì)模式

同步器的設(shè)計(jì)是基于模板方法模式的,使用者繼承AbstractQueuedSynchronizer并重寫(xiě)指定的方法。(這些重寫(xiě)方法很簡(jiǎn)單,無(wú)非是對(duì)于共享資源state的獲取和釋放) 將AQS組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用其模板方法,而這些模板方法會(huì)調(diào)用使用者重寫(xiě)的方法。以下是AQS提供的可重寫(xiě)的方法:

protected boolean isHeldExclusively();//該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
protected boolean tryAcquire(int);//獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int);//獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
protected int tryAcquireShared(int);//共享方式。嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
protected boolean tryReleaseShared(int);//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。

4. Node類

該類為AbstractQueuedSynchronizer的一個(gè)靜態(tài)內(nèi)部類,每個(gè)被阻塞的線程都會(huì)被封裝成一個(gè)Node結(jié)點(diǎn),放入隊(duì)列。

static final class Node {
    // 模式,分為共享與獨(dú)占
    // 共享模式
    static final Node SHARED = new Node();
    // 獨(dú)占模式
    static final Node EXCLUSIVE = null;        
    // 結(jié)點(diǎn)狀態(tài)常量
    static final int CANCELLED =  1; // 表示當(dāng)前的線程被取消,線程被中斷的情況
    static final int SIGNAL    = -1; //表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是需要去unpark后面的節(jié)點(diǎn)
    static final int CONDITION = -2; //表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中
    static final int PROPAGATE = -3; //表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行

    // 結(jié)點(diǎn)狀態(tài)
    volatile int waitStatus;   //  值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖,后面已經(jīng)沒(méi)有其他節(jié)點(diǎn)了,就不用去unPark()
    // 前驅(qū)結(jié)點(diǎn)
    volatile Node prev;    
    // 后繼結(jié)點(diǎn)
    volatile Node next;        
    // 結(jié)點(diǎn)所對(duì)應(yīng)的線程
    volatile Thread thread;        
    // 下一個(gè)等待者
    Node nextWaiter;
    
    // 結(jié)點(diǎn)是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 獲取前驅(qū)結(jié)點(diǎn),若前驅(qū)結(jié)點(diǎn)為空,拋出異常
    final Node predecessor() throws NullPointerException {
        // 保存前驅(qū)結(jié)點(diǎn)
        Node p = prev; 
        if (p == null) // 前驅(qū)結(jié)點(diǎn)為空,拋出異常
            throw new NullPointerException();
        else // 前驅(qū)結(jié)點(diǎn)不為空,返回
            return p;
    }
    
    // 無(wú)參構(gòu)造方法
    Node() {    // 由addWaiter使用用于建立初始標(biāo)頭或SHARED標(biāo)記
    }
    
    // 構(gòu)造方法
        Node(Thread thread, Node mode) {    // 由addWaiter使用
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 構(gòu)造方法
    Node(Thread thread, int waitStatus) { // 由Condition使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

5. AbstractQueuedSynchronizer類屬性

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    
    // 版本號(hào)
    private static final long serialVersionUID = 7373984972572414691L;    
    // 頭節(jié)點(diǎn)
    private transient volatile Node head;    
    // 尾結(jié)點(diǎn)
    private transient volatile Node tail;    
    // 狀態(tài)
    private volatile int state;    
    // 自旋時(shí)間
    static final long spinForTimeoutThreshold = 1000L;
    
    // Unsafe類實(shí)例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // state內(nèi)存偏移地址
    private static final long stateOffset;
    // head內(nèi)存偏移地址
    private static final long headOffset;
    // state內(nèi)存偏移地址
    private static final long tailOffset;
    // tail內(nèi)存偏移地址
    private static final long waitStatusOffset;
    // next內(nèi)存偏移地址
    private static final long nextOffset;
    // 靜態(tài)初始化塊
    static {
        try {
            //unsafe.objectFieldOffset();用于獲取某個(gè)字段相對(duì)Java對(duì)象的“起始地址”的偏移量
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
}

6. 核心方法

6.1 acquire方法(加鎖)

該方法以獨(dú)占模式獲取(資源),忽略中斷,即線程在acquire過(guò)程中,中斷此線程是無(wú)效的。源碼如下:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

調(diào)用流程如下:


  1. 首先調(diào)用tryAcquire方法,調(diào)用此方法的線程會(huì)試圖在獨(dú)占模式下加鎖,如果加鎖成功什么都不做,如果加鎖失敗,那么!tryAcquire(arg)=true,說(shuō)明此時(shí)的鎖被其他線程所占有。在AbstractQueuedSynchronizer源碼中默認(rèn)會(huì)拋出一個(gè)異常,即需要子類去重寫(xiě)此方法完成自己的邏輯。
  2. 若tryAcquire失敗,則調(diào)用addWaiter方法,addWaiter方法完成的功能是將調(diào)用此方法的線程封裝成為一個(gè)結(jié)點(diǎn)并放入同步隊(duì)列。
  3. 調(diào)用acquireQueued方法,此方法完成的功能是同步隊(duì)列中的結(jié)點(diǎn)不斷嘗試獲取鎖,若成功,則返回true,否則,返回false。
  4. 如果if條件滿足就會(huì)被中斷。
addWaiter方法
// 添加等待者
private Node addWaiter(Node mode) {
    // 新生成一個(gè)結(jié)點(diǎn),默認(rèn)為獨(dú)占模式
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取尾節(jié)點(diǎn)
    Node pred = tail;
    if (pred != null) { // 尾結(jié)點(diǎn)不為空,即已經(jīng)被初始化,隊(duì)列中是有元素的
        // 將新結(jié)點(diǎn)的prev連接到尾結(jié)點(diǎn),也就是將新的節(jié)點(diǎn)加入到隊(duì)列的后面
        node.prev = pred; 
        if (compareAndSetTail(pred, node)) { //為了解決有多個(gè)線程同時(shí)進(jìn)入這個(gè)判斷邏輯生成了多個(gè)節(jié)點(diǎn),保證只能有一個(gè)節(jié)點(diǎn)成為隊(duì)列的尾部。
            // 
            pred.next = node;
            return node; // 返回新生成的結(jié)點(diǎn)
        }
    }
    enq(node); 
// 如果尾結(jié)點(diǎn)為空(隊(duì)列中沒(méi)有元素),或者是compareAndSetTail操作失敗,則入隊(duì)列
// 對(duì)于公平鎖而言,沒(méi)有競(jìng)爭(zhēng)到隊(duì)列的尾部,那么就一直去競(jìng)爭(zhēng)直到競(jìng)爭(zhēng)到。
    return node;
}
enq(node)

假設(shè)此時(shí)有兩個(gè)線程:線程1、線程2進(jìn)來(lái)。兩個(gè)線程都在上面代碼中判斷if (pred != null)都為空則進(jìn)入到enq方法。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 兩個(gè)線程進(jìn)來(lái)發(fā)現(xiàn)尾節(jié)點(diǎn)為空,那么就生成一個(gè)空的node節(jié)點(diǎn),再次循環(huán)線程1和線程2進(jìn)來(lái)判斷不為空走else邏輯
                if (compareAndSetHead(new Node()))
                    tail = head; //頭節(jié)點(diǎn)與尾結(jié)點(diǎn)都指向同一個(gè)新生結(jié)點(diǎn)(空節(jié)點(diǎn))
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { //兩個(gè)線程再進(jìn)來(lái)競(jìng)爭(zhēng)尾節(jié)點(diǎn),將屬性指向空的node節(jié)點(diǎn)。
                    t.next = node;
                    return t; //假設(shè)線程2競(jìng)爭(zhēng)到尾節(jié)點(diǎn)
                }
            }
        }
    }
acquireQueued方法

此時(shí)線程2進(jìn)到這個(gè)方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//獲取當(dāng)前節(jié)點(diǎn)的prev前一個(gè)節(jié)點(diǎn)
                if (p == head && tryAcquire(arg)) {//如果前一個(gè)節(jié)點(diǎn)等于head節(jié)點(diǎn),那么說(shuō)明它就是第一個(gè)排隊(duì)的線程
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
              //parkAndCheckInterrupt :阻塞
              // shouldParkAfterFailedAcquire: 修改上一個(gè)node節(jié)點(diǎn)的waitStatus為-1
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
tryAcquire方法

以ReentrantLock公平鎖為例子,下面是ReentrantLock內(nèi)部lock()方法邏輯:

 final void lock() {
            acquire(1);
        }
//嘗試獲取鎖
 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();//可重入條件
            if (c == 0) { //表示第一次獲取到鎖
              //compareAndSetState(0, acquires)
              //利用 cas 將state狀態(tài)改為 1
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current); // 設(shè)置鎖被當(dāng)前線程獨(dú)占,也就是真正的加鎖操作
                    return true;
                }
            }
            //重入邏輯
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
\color{red}{判斷是否有正在排隊(duì)的線程(公平鎖)}
 public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容