AQS原理及源碼淺析

什么是AQS?

AQS 即 AbstractQueuedSynchronizer 抽象隊列同步器,它是Java并發(fā)用來構(gòu)建鎖和其他同步工具的基礎框架,是一個抽象類。AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現(xiàn)都依賴于它,如常用的ReentrantLock、Semaphore、CountDownLatch。

ReentrantLock是如何加鎖的?AQS是如何被使用的?

ReentrantLock 通過調(diào)用對象的lock()和unlock()方法來加鎖解鎖,下面分析一下源碼中加鎖的過程,在加鎖過程中使用了AQS類。

lock方法的內(nèi)部調(diào)用關系如圖:

Lock調(diào)用關系圖.png

首先看一下默認使用非公平鎖的時候的調(diào)用過程:

  1. ReentrantLock調(diào)用lock()方法的時候,它會調(diào)用Sync的acquire(1)方法;
  2. 因為 Sync 是 ReentrantLock 內(nèi)抽象的靜態(tài)內(nèi)部類,它繼承了AQS抽象類,Sync 內(nèi)又沒重寫 acquire(),所以直接調(diào)用的是AQS的 acquire(1);
  3. AQS的 acquire(1),會調(diào)AQS內(nèi)的 tryAcquire(1);
  4. AQS內(nèi)的 tryAcquire(1) 方法內(nèi)只是拋出了個異常,實際上調(diào)的是 ReentrantLock 中 NonfairSync 類重寫的 tryAcquire(1);如果采用的是公平鎖的話,這里會調(diào)用 FairSync類(繼承Sync) 重寫的 tryAcquire 方法
  5. NonfairSync 繼承了 Sync,tryAcquire(1) 調(diào)用的是父類 Sync 的nonfairTryAcquire(1)。(FairSync 直接實現(xiàn)了 tryAcquire 方法,無其它調(diào)用)

至此就是整個調(diào)用過程,下是源碼中的Sync和它的子類:


ReentrantLock部分源碼.png

AQS原理及源碼解讀

上面讀到nonfairTryAcquire(acquires),就必須要了解實現(xiàn)了。下面先分析一下AQS的原理。

AQS隊列結(jié)構(gòu)淺析

AQS內(nèi)部維護著一個 volatile int state 和一個FIFO(雙向隊列)線程等待隊列(多線程爭用資源被阻塞時會進入此隊列),即CLH隊列。這兩個是AQS的核心。

注:CLH 是論文《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》 三位作者的縮寫,AQS就是根據(jù) CLH 鎖的變種實現(xiàn)的

AQS的同步機制就是依靠CLH隊列實現(xiàn)的。state所代表的意義由子類來定,它的值是根據(jù)子類不同的實現(xiàn)取不同的意義。拿ReentrantLock來說,state初始為0,當線程得到了這把鎖后state會變?yōu)?,重入一次會+1,釋放一次-1,減到0完全釋放,所以在ReentrantLock中state為0和1就代表了加鎖和解鎖。

雙向隊列由鏈表結(jié)構(gòu)實現(xiàn),AQS內(nèi)部定義了

     /**
     * Wait queue node class.
     */
    static final class Node {
       
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        // Node中保存的是一個個線程
        volatile Thread thread;

        Node nextWaiter;

        // 只截取了部分源碼,完整源碼請參照jdk11
    }

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

CLH隊列是FIFO的雙端雙向隊列。線程通過AQS獲取鎖失敗,就會將線程封裝成一個Node節(jié)點,插入隊列尾。當有線程釋放鎖時,會嘗試把
head.next節(jié)點的線程喚醒(LockSupport的unpark方法)。被喚醒的線程獲得鎖后node節(jié)點隨之出隊。


AQS同步隊列.png

當線程acquire(1)以后,如果state的值是0,那就直接拿到鎖,默認是非公平的上來就搶,搶不著就進隊列里acquireQueued()。

來看一下源碼:

    // AQS類方法
    public final void acquire(int arg) {
        // 先tryAcquire嘗試獲取鎖,若返回true表示獲取成功,
        // 由于&&符號邏輯短路,符號后部分不會執(zhí)行
        if (!tryAcquire(arg) &&
            // 若獲取失敗,線程排隊等待
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire失敗會調(diào)用 acquireQueued() 方法:

final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            // 節(jié)點進入等待隊列,執(zhí)行死循環(huán)自旋
            for (;;) {
                // 拿到前一個節(jié)點
                final Node p = node.predecessor();
                // 如果前一個節(jié)點是head,并且當前線程成功獲得了鎖
                if (p == head && tryAcquire(arg)) {
                    // 把當前節(jié)點設置為頭節(jié)點(head指針指向當前節(jié)點)
                    // 并把thread和prev置空(因為現(xiàn)在已經(jīng)獲取到鎖,不再需要記錄這個節(jié)點所對應的線程了)
                    setHead(node);
                    // 把舊的head的next置空,使舊的head出隊
                    p.next = null; // help GC
                    return interrupted;
                }
                // 判定前一個節(jié)點的狀態(tài)是否為 Node.SIGNAL
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }

這是一個死循環(huán),除非進入if(p == head && tryAcquire(arg))判定條件,獲取node節(jié)點的前一個節(jié)點p。當p為頭節(jié)點并且當前線程tryAcquire成功時,用setHead(node)把當前節(jié)點設置為頭節(jié)點,同時把節(jié)點屬性thread、prev置空。然后再把p的next節(jié)點置空使使舊的head出隊。
總結(jié)起來就是把舊的頭部斷開,head指針指向一個新的頭部,這個新head內(nèi)thread屬性也是空的,因為現(xiàn)在已經(jīng)獲取到鎖,不再需要記錄這個節(jié)點所對應的線程了。

圖解如下:
acquireQueued示意圖.png
  • 如果上面判定條件失敗
    會首先判定:shouldParkAfterFailedAcquire(p , node),這個方法內(nèi)部會判定前一個節(jié)點的狀態(tài)是否為:Node.SIGNAL,若是則返回true,不是返回false,不過會再做一些操作:判定節(jié)點的狀態(tài)是否大于0,若大于0則認為被 CANCELLED 掉了(大于0的只可能被CANCELLED的狀態(tài)),因此會從前一個節(jié)點開始逐步循環(huán)找到一個沒有被 CANCELLED 節(jié)點,然后與這個節(jié)點的next、prev的引用相互指向;如果前一個節(jié)點的狀態(tài)不是大于0的,則通過CAS嘗試將狀態(tài)修改為“Node.SIGNAL”,自然的如果下一輪循環(huán)的時候會返回值應該會返回true。
    如果這個方法返回了true,則會執(zhí)行:parkAndCheckInterrupt()方法,它是通過LockSupport.park(this)將當前線程掛起到WATING狀態(tài),它需要等待一個中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實現(xiàn)了Lock的操作。

AQS隊列初始化

程序第一個線程直接搶到鎖后并不會入隊,那隊列是什么時候初始化的呢?


AQS隊列初始化.png
    public final void acquire(int arg) {
        // 先tryAcquire嘗試獲取鎖,若返回true表示獲取成功,
        // 由于&&符號邏輯短路,符號后部分不會執(zhí)行
        if (!tryAcquire(arg) &&
            // 若獲取失敗,線程排隊等待
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

可以看到在第二個線程獲取鎖失敗入隊時會調(diào)用 addWaiter(Node.EXCLUSIVE),默認EXCLUSIVE排它鎖,下面再來看一下這個方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(mode);
        // 死循環(huán),直到執(zhí)行完畢
        for (;;) {
            Node oldTail = tail;
            // 第一次入隊未初始化時tail為空
            if (oldTail != null) {
                // 把新節(jié)點的前置節(jié)點設置在等待隊列的末端(prev指針指向oldTail)
                node.setPrevRelaxed(oldTail);
                // cas操作,把新節(jié)點設置為tail末端
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                // tail為空則初始化隊列
                initializeSyncQueue();
            }
        }
    }

可以看到在addWaiter方法中進行了初始化。

如何使用AQS獲取鎖?tryAcquire()

ReentrantLock 可以實現(xiàn)公平鎖和非公平鎖,在NonfairSync、FairSync靜態(tài)內(nèi)部類中分別重寫了tryAcquire()

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
  • 看一下非公平時如何獲取鎖:
    NonfairSync 中 tryAcquire() 調(diào)用了父類Sync 的 nonfairTryAcquire(acquires)
    // jdk11 ReentrantLock 部分源碼
    final boolean nonfairTryAcquire(int acquires) {
        // 獲取當前線程
        final Thread current = Thread.currentThread();
        // 獲取state狀態(tài)
        int c = getState();
        // 如果狀態(tài)值為0
        if (c == 0) {
            // 嘗試用cas操作將0變成1,獲取鎖
            if (compareAndSetState(0, acquires)) {
                // cas操作成功說明搶到鎖,把這個線程設置為獨占這把鎖
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果這個線程是 獨占同步鎖的線程
        else if (current == getExclusiveOwnerThread()) {
            // 改變狀態(tài)值,指鎖重入機制
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

先得到當前線程,然后獲取state的值,如果state的值等于0,用compareAndSetState(0,acquire)方法嘗試把state的值改為1,假如改成了setExclusiveOwnerThread()把當前線程設置為獨占statie這個把鎖的狀態(tài),說明我已經(jīng)得到這把鎖,而且這把鎖是互斥的,我得到以后,別人是得不到的,因為別人再來的時候這個state的值已經(jīng)變成1了,如果說當前線程已經(jīng)是獨占state這把鎖了,就往后加個1就表示可重入了。

  • 公平時如何獲取鎖:
    FairSync自己直接重寫了tryAcquire。源碼跟nonfairTryAcquire差不多,
    區(qū)別是它會先查詢是否已經(jīng)有線程在隊列中等待
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 先判斷隊列中是否有等待線程
        if (!hasQueuedPredecessors() &&
            // 沒有等待線程的話進行cas操作嘗試加鎖,改變state
            compareAndSetState(0, acquires)) {
            // 設置當前線程獨占這把鎖
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 下面代碼跟nonfairTryAcquire的一樣
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

如何釋放鎖,并喚醒隊列中其它線程?

ReentrantLock 的 unlock() 被調(diào)用時會釋放鎖

public void unlock() {
    sync.release(1);
}

它實際調(diào)用的AQS的release:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 喚醒head的next節(jié)點
            // 調(diào)用的LockSupport.unpark()
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    // 改變狀態(tài)
    int c = getState() - releases;
    // 當前線程不是獨占鎖的線程時,拋異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 定義是否被釋放的標記
    boolean free = false;
    // 判斷state狀態(tài)是否為0
    if (c == 0) {
        // 狀態(tài)為0表示鎖被釋放,標記設為true
        free = true;
        // 把AQS中獨占鎖的線程設置為空,表示當前沒有任何線程占用鎖
        setExclusiveOwnerThread(null);
    }
    // 更新狀態(tài)信息
    setState(c);
    return free;
}

AQS中的細節(jié):VarHandle存在的意義

上面講AQS隊列初始化時看的 addWaiter 源碼中調(diào)用了 node.setPrevRelaxed(oldTail) ,點進去看一下:

final void setPrevRelaxed(Node p) {
    PREV.set(this, p);
}

有這個調(diào)用:PREV.set(this, p),這個PREV是這樣定義的:

// AQS的Node類 部分源碼 jdk11
private static final VarHandle PREV;
static {
    try {
        // Lookup 對象是一個創(chuàng)建handles的工廠
        MethodHandles.Lookup l = MethodHandles.lookup();
        // 返回某個類的Var Handle對象
        PREV = l.findVarHandle(Node.class, "prev", Node.class);
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }
}

它是VarHandle類型,VarHandle是在JDK1.9開始有的??此拿?,Var 變量,Handle 句柄,意思是方便操縱變量的工具。

舉個例子,Object o = new Object(); 引用o指向內(nèi)存中Object對象,用VarHandle的話它也是指的這個引用。

既然兩個引用都指向同一個對象,為什么還需要VarHandle?

下面寫個小程序來說明:

public class T01_VarHandle {

    int var = 10;
    private static VarHandle varHandle;

    static {
        // 初始化一個handle工廠
        MethodHandles.Lookup l = MethodHandles.lookup();
        try {
            // 初始化var的handle
            varHandle = l.findVarHandle(T01_VarHandle.class, "var", int.class);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        T01_VarHandle instance = new T01_VarHandle();

        // 通過handle讀取變量(讀)
        System.out.println(varHandle.get(instance));

        // 通過handle改變變量(寫)
        varHandle.set(instance, 20);
        System.out.println(instance.var);

        //進行線程安全的累加操作
        varHandle.getAndAdd(instance, 10);
        System.out.println(instance.var);

        // 通過handle進行CAS操作
        varHandle.compareAndSet(instance, 30, 40);
        System.out.println(instance.var);
    }

}

輸出結(jié)果為10 20 30 40。

也就是說可以用VarHandle對變量進行讀寫,這點跟普通引用沒太大區(qū)別。重要的是VarHandle可以做一些compareAndSet操作(原子操作),還可以handle.getAndAdd()操作這也是原子操作,這些是線程安全的。

所以VarHandle除了可以完成普通屬性的原子操作,還可以完成原子性的線程安全的操作,這也是VarHandle的含義。

AQS效率高的原因

AQS中線程進入等待隊列,節(jié)點插入隊尾時需要保證線程安全,如果對整個隊列加鎖效率就太低了,所以AQS用VarHandle的特性,采用CAS的方法插入節(jié)點,這樣效率就會很高。

所以也可以說volatile和CAS是AQS的核心。

AQS隊列為什么是雙向鏈表?

要添加一個線程節(jié)點的時候,需要看一下前面這個節(jié)點的狀態(tài),如果前面的節(jié)點是持有線程的過程中,這個時候就得在后面等著,如果說前面這個節(jié)點已經(jīng)取消掉了,那就應該越過這個節(jié)點,不去考慮它的狀態(tài),所以有需要看前面節(jié)點狀態(tài)的時候,必須是雙向的。

比如說前面看過的acquireQueued()方法的for死循環(huán)中,等待隊列中的節(jié)點需要關注前面節(jié)點的狀態(tài),如果前面成head了,就嘗試獲取鎖。這個過程就要求必須是雙向鏈表。

后記

關于AQS其實還有很多可分析的地方,比如讀寫鎖中的排他鎖和共享鎖是如何實現(xiàn)的,ReentrantLock的newCondition()等。本文暫時就寫這些了,其它源碼有空再讀......

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容