淺析AQS

有沒有想過ReentrantLock,Semaphores等是怎么實現(xiàn)同步的?這一切歸功于幕后功臣AQS,全名AbstractQueuedSynchronizer--抽象的隊列同步器

jdk文檔上的說明:

提供一個依賴于先進先出(FIFO)等待隊列,實現(xiàn)阻塞鎖和相關(guān)同步器(信號量、事件等)的框架

也就是說,AbstractQueuedSynchronizer是一個用隊列來實現(xiàn)的同步框架

首先瞄一瞄它的子類


再勾畫出它的繼承圖


可以看到,ReentrantLock,Semaphores,ThreadPoolExecutor,CountDownLatch,ReentrantReadWriteLock的同步機制的實現(xiàn),都依賴于AbstractQueuedSynchronizer

那么它們是怎么通過AQS實現(xiàn)同步的呢?

//ReentrantLock類的公平鎖實現(xiàn)
final void lock() {
    acquire(1);
}
//ReentrantLock類的非公平鎖實現(xiàn)
final void lock() {
    if (compareAndSetState(0, 1)) 
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
//ReentrantReadWriteLock類lock方法實現(xiàn)
public void lock() {
    sync.acquire(1);
}

可以看到這幾種鎖的實現(xiàn)都依賴AQS類的一個很重要的方法--acquire,我們一起來揭開acquire神秘的內(nèi)衣,不,面紗

/**
 * 獨占模式獲取
 * AQS類的子類需實現(xiàn)本方法里的tryAcquire方法
 * 否則,線程將排隊,可能會重復(fù)阻塞和解除阻塞
 * 直到調(diào)用tryAcquire成功為止.
 * 該方法可用于實現(xiàn)鎖方法
 *
 * @param arg
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

此方法是使當前線程獲取獨占模式,說人話就是獲取線程的運行權(quán),使線程進入RUNNABLE狀態(tài).
在acquire里,首先調(diào)用tryAcquire方法,如果tryAcquire返回true,線程直接變?yōu)檫\行狀態(tài),若不成功則調(diào)用acquireQueued和addWaiter方法,使線程進入CLH隊列,這個后面會說到.
但我們看到tryAcquire直接拋異常了,什么鬼!我來翻譯一下jdk文檔的注釋

/**
 * 試圖以獨占模式獲取.
 * 此方法應(yīng)該查詢狀態(tài)是否允許存在獨占模式,允許才進行獲取
 *
 * @param arg 此參數(shù)一般傳1,不過如你有其他需要傳多少都行
 * @return {@code true}
 * @throws IllegalMonitorStateException  如果AQS的狀態(tài)是非法狀態(tài)就會拋出次異常
 * @throws UnsupportedOperationException 如果實現(xiàn)類不支持獨占模式就會拋出此異常
 */
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

原來,AQS類的子類需覆蓋tryAcquire方法,直接拋異常是考慮到,子類若不支持獨占鎖模式,就無須覆蓋此方法,調(diào)用的時候自然就拋出UnsupportedOperationException.還有arg參數(shù)一般傳1,這是因為可重入鎖每次重入,狀態(tài)state+1.

至于各種Lock是怎么覆蓋tryAcquire方法,以及重入狀態(tài)為什么是+1,請參考線程八鎖

我們繼續(xù)來扒一扒acquireQueued方法,在說這個方法前,先說說AQS的底層數(shù)據(jù)結(jié)構(gòu)--CLH隊列--的變體--不知道什么鬼隊列.反正它叫隊列同步器,顧名思義,需要一個隊列來控制同步,而隊列的節(jié)點依賴于內(nèi)部類Node

/**
 * 等待隊列節(jié)點類
 * 此等待隊列是CLH鎖隊列的一種變體
 * 每個節(jié)點都有一個"status"字段來表示該節(jié)點中的線程是否應(yīng)該被阻塞
 */
static final class Node {
    /**
     * 共享節(jié)點
     */
    static final Node SHARED = new Node();
    /**
     * 獨占節(jié)點
     */
    static final Node EXCLUSIVE = null;

    /**
     * 表示線程已被取消
     */
    static final int CANCELLED = 1;
    /**
     * 表示當前節(jié)點的后繼線程需要阻塞
     */
    static final int SIGNAL = -1;
    /**
     * 表示線程在等待條件
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次acquireShared應(yīng)該無條件傳播
     */
    static final int PROPAGATE = -3;

    /**
     * SIGNAL: 此節(jié)點的后繼線程是阻塞的或即將阻塞,所以此節(jié)點的線程釋放或被取消時,必須喚醒它的后
     * 繼.為了避免產(chǎn)生競爭,獲取方法必須表明它們需要一個信號,然后重新原子獲取,若失敗則阻塞
     * CANCELLED:  由于超時或中斷,此節(jié)點被取消.節(jié)點永遠不會離開此狀態(tài).特別是,
     * 節(jié)點的線程處于取消狀態(tài)再也不會阻塞
     * CONDITION:  此節(jié)點當前位于條件隊列中.它不會用作同步隊列節(jié)點,直至狀態(tài)改變?yōu)橹?當時的狀態(tài)
     * 將被設(shè)置為0
     * PROPAGATE:  已發(fā)布的節(jié)點應(yīng)該傳播到其他節(jié)點。這在doReleaseShared中設(shè)置(僅針對head節(jié)
     * 點),以確保傳播繼續(xù)進行,即使其他操作已經(jīng)介入。
     * 0:          以上都不是
     */
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    //將線程構(gòu)造成一個Node,添加到等待隊列
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //這個方法會在Condition隊列使用
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

從源碼可以看出,一個Node對象里面包含前繼節(jié)點,后繼節(jié)點和一個線程對象,所以Node對象組成的隊列是基于雙向鏈表的FIFO(先進先出)隊列.大概長這樣(懶的畫,偷了張圖回來,既然上面有水印我就不注明出處了)

先看看addWaiter源碼

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {//若尾節(jié)點非空
        node.prev = pred;//將尾節(jié)點作為新節(jié)點的前繼
        if (compareAndSetTail(pred, node)) {//CAS把新節(jié)點設(shè)置為尾節(jié)點
            pred.next = node;//把原尾節(jié)點的后繼設(shè)置為新節(jié)點
            return node;
        }
    }
    enq(node);
    return node;
}
/**
 * 通過自旋操作把當前節(jié)點加入到隊列尾部
 */
private Node enq(final Node node) {
    for (; ; ) {
        Node t = tail; //如果是第一次添加到隊列,那么tail=null
        if (t == null) { // Must initialize
            //CAS的方式創(chuàng)建一個空的Node作為頭結(jié)點
            if (compareAndSetHead(new Node()))
                //此時隊列中只一個頭結(jié)點,所以tail也指向它
                tail = head;
        } else {
            //進行第二次循環(huán)時,tail不為null,進入else區(qū)域。
            //將當前線程的Node結(jié)點的prev指向tail,然后使用CAS將tail指向Node
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                //t此時指向tail,所以可以CAS成功,將tail重新指向Node。
                //此時t為更新前的tail的值,即指向空的頭結(jié)點,t.next=node,就將頭結(jié)點的后續(xù)結(jié)點指向Node,返回頭結(jié)點
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter方法大致流程是:

  1. 創(chuàng)建當前線程節(jié)點
  2. 若尾節(jié)點不為空,則把新節(jié)點添加到尾部,成為新的尾節(jié)點
  3. 若尾節(jié)點為空,則調(diào)用enq方法,首先初始化隊列,第二次循環(huán)時,再把新節(jié)點添加到尾部,成為新的尾節(jié)點

再看看acquireQueued的源碼

/**
* 獲取已處于獨占不可中斷模式的線程隊列
* @return 是否在等待時被打斷
*/
final boolean acquireQueued(/**此node為addWaiter所返回,即已成為尾節(jié)點的當前線程節(jié)點**/final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//自旋
            final Node p = node.predecessor();//獲取前繼節(jié)點
            //若前繼是head節(jié)點才有資格獲取鎖
            if (p == head && 
                //在這里還是會調(diào)用tryAcquire
                tryAcquire(arg)) {
                //若獲取鎖成功,設(shè)置當前節(jié)點為頭節(jié)點
                setHead(node);
                //凡是head節(jié)點,head.thread與head.prev永遠為null, 但是head.next不為null
                p.next = null; // help GC--有助于GC回收
                failed = false;
                return interrupted;
            }
            //如果獲取鎖失敗或前繼節(jié)點不是頭節(jié)點,則根據(jù)節(jié)點的waitStatus決定是否需要掛起線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())// 若前面為true,則執(zhí)行掛起,待下次喚醒的時候檢測中斷的標志
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);// 如果拋出異常則取消鎖的獲取,進行出隊(sync queue)操作
    }
}

小結(jié)acquireQueued的大致流程:

  1. 判斷當前節(jié)點的前驅(qū)節(jié)點是否頭節(jié)點
  2. 若是,則嘗試獲取鎖,如果獲取鎖成功,則設(shè)置當前為頭節(jié)點
  3. 若當前節(jié)點的前驅(qū)節(jié)點不是頭節(jié)點或嘗試獲取鎖失敗,則根據(jù)節(jié)點的waitStatus決定是否需要掛起線程
  4. 若上述過程出現(xiàn)異常(因為上述操作在自旋中,不出現(xiàn)異常不會跳到finally代碼塊中),則判斷頭節(jié)點是否設(shè)置失敗,若失敗,則取消正在進行的獲取鎖的嘗試

釋放鎖移除節(jié)點
head節(jié)點表示獲取鎖成功的節(jié)點,當頭結(jié)點在釋放同步狀態(tài)時,會喚醒后繼節(jié)點,如果后繼節(jié)點獲得鎖成功,會把自己設(shè)置為頭結(jié)點,節(jié)點的變化過程如下


  • 修改head節(jié)點指向下一個獲得鎖的節(jié)點
  • 新的獲得鎖的節(jié)點,將prev的指針指向null

這里有一個小的變化,就是設(shè)置head節(jié)點不需要用CAS,原因是設(shè)置head節(jié)點是由獲得鎖的線程來完成的,而同步鎖只能由一個線程獲得,所以不需要CAS保證,只需要把head節(jié)點設(shè)置為原首節(jié)點的后繼節(jié)點,并且斷開原h(huán)ead節(jié)點的next引用即可

對應(yīng)源碼

/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *            {@link #tryRelease} but is otherwise uninterpreted and
 *            can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(long arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

參考文獻
深入分析AQS實現(xiàn)原理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容