AQS(AbstractQueuedSynchronizer)詳解一

什么是AQS?

  • AQS(AbstractQueuedSynchronizer): 是并發(fā)容器J.U.C(java.util.concurrent)下locks包內(nèi)的一個類. 它實現(xiàn)了一個FIFO(FirstIn、FisrtOut先進先出)的隊列. 底層實現(xiàn)的數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表.

  • AQS的核心思想是, 如果被請求的共享資源是空閑的, 則將當前請求資源的線程設(shè)置為有效線程, 并且將共享的資源設(shè)置為鎖定的狀態(tài). 如果被請求的共享資源被占用, 那么就需要一套線程阻塞/等待以及喚醒進行鎖分配的機制, 這個機制AQS是用CLH(參考:備注1)隊列鎖實現(xiàn)的, 就是將暫時獲取不到鎖的線程加入到隊列中進行等待.

  • AQS定義兩種資源共享方式: Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)

  • AQS同步器是用一個int變量state來表示狀態(tài). 同步功能使用的方法都是子類繼承AbstractQueuedSynchronizer類實現(xiàn)的. 子類通過繼承同步器實現(xiàn)自身需要的方法來管理state狀態(tài), 管理的方式就是通過accquire()/accquireShared()/release()/releaseShared()等方法來操作狀態(tài). 在多線程環(huán)境下狀態(tài)的操作必須保證其原子性, 所以子類在狀態(tài)的管理中需要使用AQS同步器提供的三個方法操作state: getState()/setState(int)/compareAndSetState(int, int).

  • 子類推薦被定義為自定義同步裝置的內(nèi)部類(大佬都是這么實現(xiàn)的, 跟著沒毛病).

備注1:

CLH鎖即Craig, Landin, and Hagersten (CLH) locks. CLH鎖是一個自旋鎖。能確保無饑餓性. 提供先來先服務(wù)的公平性.
CLH鎖也是一種基于鏈表的可擴展、高性能、公平的自旋鎖, 申請線程僅僅在本地變量上自旋, 它不斷輪詢前驅(qū)的狀態(tài), 假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋

AQS中的數(shù)據(jù)結(jié)構(gòu)

節(jié)點和同步隊列

Node節(jié)點屬性

  • 節(jié)點的狀態(tài)waitStatus.
    CANCELLED(1): 表示當前節(jié)點被取消, 進入改狀態(tài)的節(jié)點將不會在發(fā)生變化.
    SIGNAL(-1): 表示后繼結(jié)點在等待當前結(jié)點喚醒. 后繼結(jié)點入隊時, 會將前繼結(jié)點的狀態(tài)更新為SIGNAL.
    CONDITION:(-2) : 表示當前節(jié)點在condition隊列中進行等待. 當其他線程調(diào)用了Condition的signal()方法后, CONDITION狀態(tài)的結(jié)點將從等待隊列轉(zhuǎn)移到同步隊列中, 等待獲取同步鎖.
    PROPAGATE(-3): 共享模式下, 前繼結(jié)點不僅會喚醒其后繼結(jié)點, 同時也可能會喚醒后繼的后繼結(jié)點.
    值為0, 表示當前節(jié)點在sync隊列中,等待著獲取同步鎖.
  • Node prev: 前驅(qū)節(jié)點.
  • Node next: 后繼節(jié)點.
  • Node nextWaiter: 存儲condition隊列中的后繼節(jié)點.
  • Thread thread: 當前線程.

同步隊列數(shù)據(jù)結(jié)構(gòu)

核心方法分析

public final void acquire(int arg)

該方法是獨占模式下線程獲取共享資源的入口, 如果獲取到資源后, 線程直接返回.否則將進入等待隊列, 直到獲取到資源為止(整個過程忽略中斷的影響). 這就是Lock.lock()的語義, 你也可以自定義Lock頂層接口, 參考 Doug Lea對Lock的定義.

  public final void acquire(int var1) {
        if (!tryAcquire(var1) && acquireQueued(addWaiter(Node.EXCLUSIVE), var1)) {
            selfInterrupt();
        }
    }

函數(shù)流程如下:

  1. tryAcquire(): 嘗試直接獲取資源, 如果成功直接返回(調(diào)用tryAcquire更改狀態(tài),需要保證原子性. 這里體現(xiàn)了非公平鎖, 每個線程獲取鎖時會嘗試直接搶占加塞一次, 而CLH隊列中可能還有別的線程在等待).
  2. addWaiter(): 如果獲取不到, 將當前線程構(gòu)造成節(jié)點Node并加入sync隊列的尾部, 并且標記為獨占模式.
  3. acquireQueued(): 使線程阻塞在等待隊列中獲取資源, 一直獲取到資源后才返回. 如果在整個等待過程中被中斷過, 則返回true, 否則返回false.
  4. 如果線程在等待過程中被中斷過, 它是不響應(yīng)的. 只是獲取資源后才再進行自我中斷selfInterrupt(), 將中斷補上(響應(yīng)前面說的, 整個等待過程忽略中斷的影響).
1. tryAcquire()方法

此方法嘗試去獲取獨占資源. 如果獲取成功, 則直接返回true, 否則直接返回false. 這也正是tryLock()的語義, 還是那句話. 當然不僅僅只限于tryLock().
如下是tryAcquire()的源碼

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

這里throw異常是留給我們進行實現(xiàn)的. AQS只是一個框架, 具體資源的獲取和釋放邏輯由我們自定義同步器去實現(xiàn)(就像ReentrantLock類). 需要自定義實現(xiàn)的方法都沒有定義成abstract, 由我們根據(jù)同步器獨占/共享自有選擇.

2. addWaiter(Node)方法
    private Node addWaiter(Node mode) {
        // 以給定模式構(gòu)造結(jié)點. mode有兩種: EXCLUSIVE(獨占)和SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        //嘗試直接將節(jié)點放到sync隊列尾部,
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果放入尾部失敗, 調(diào)用enq()入隊
        enq(node);
        return node;
    }
3. enq(Node)方法
    private Node enq(final Node node) {
        //CAS"自旋", 直到成功加入隊尾
        for (;;) {
            Node t = tail;
            if (t == null) { //  隊列為空, 創(chuàng)建一個空的結(jié)點作為head結(jié)點, 并將tail也指向它, 這是一個初始化的動作
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//正常流程, 放入隊尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

CAS自旋volatile變量, 保證了可見性, 操作上又是原子方法. 這是一種很經(jīng)典的用法

4. acquireQueued(Node, int)方法

當節(jié)點進入同步隊列后, 接下來就是要等待獲取鎖(訪問控制), 同一時刻只有一個線程在運行, 其他都要進入等待狀態(tài). 每個線程節(jié)點都是獨立的, 他們進行自旋判斷, 當發(fā)現(xiàn)前驅(qū)節(jié)點是頭結(jié)點并且獲取了狀態(tài)(tryAcquire()自己實現(xiàn)原子性操作), 那這個線程就可以運行了.

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//標記是否可以成功拿到狀態(tài)
        try {
            boolean interrupted = false;//處理過程中是否被中斷過
            for (;;) {//自旋
                final Node p = node.predecessor();//獲取當前節(jié)點的前驅(qū)節(jié)點
              //如果前驅(qū)節(jié)點是head, 當前節(jié)點就是排第二. 這個時候可以嘗試去獲取資源了(頭結(jié)點可能釋放完喚醒自己了)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);//設(shè)置頭節(jié)點為當前節(jié)點
                    p.next = null; // help GC setHead()中node.prev已置為null, 此處再將head.next置為null. 方便gc回收head節(jié)點.
                    failed = false;//標記成功獲取資源
                    return interrupted;
                }
                //不滿足喚醒條件, 調(diào)用park()進入waiting狀態(tài), 等待unpark(). 如果等待的過程被中斷, 線程會從park()中醒過來, 發(fā)現(xiàn)拿不到資源后繼續(xù)進入park()中等待.
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;//如果線程被終端, 標記interrupted為true, 等待線程獲取到資源后在中斷
            }
        } finally {
            if (failed)//如果等待過程中沒有成功獲取資源(不可控異常), 取消線程在隊列的等待
                cancelAcquire(node);
        }
    }
shouldParkAfterFailedAcquire()方法如果發(fā)現(xiàn)前驅(qū)節(jié)點狀態(tài)不是SIGNAL, 會標記前驅(qū)節(jié)點狀態(tài)為SIGNAL(-1). 如果發(fā)現(xiàn)前驅(qū)節(jié)點放棄等待了就一直往前找節(jié)點, 直到找到正常等待的節(jié)點排隊到它后面.
parkAndCheckInterrupt()使線程進入waiting狀態(tài), 如果發(fā)現(xiàn)被喚醒, 檢查是不是被中斷了并且清除狀態(tài).

acquire()方法總結(jié)

  1. 嘗試直接插隊獲取資源, 如果不成功進入同步隊列排隊.
  2. 調(diào)用park()進入waiting狀態(tài), 等待前驅(qū)節(jié)點調(diào)用unpark()或者interrupt()喚醒自己. interrupt()喚醒拿不到資源繼續(xù)進入waiting狀態(tài).
  3. 被喚醒后嘗試獲取資源, 如果獲取不到資源進入2流程, 獲取到資源就執(zhí)行后續(xù)代碼(如果等待過程被中斷過此時會調(diào)用selfInterrupt()將中斷補上).

public final boolean release(int arg)

該方法是獨占模式下線程釋放共享資源的入口.

    public final boolean release(int arg) {
        if (tryRelease(arg)) {//釋放資源, 自定義函數(shù)實現(xiàn)
            Node h = head;
            if (h != null && h.waitStatus != 0)//拿到頭結(jié)點
                unparkSuccessor(h);//喚醒等待隊列中的下一個線程
            return true;
        }
        return false;
    }
1. tryRelease(arg)方法

需要我們實現(xiàn)的獨占資源釋放函數(shù).

protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
}
2. unparkSuccessor(node) 方法

喚醒等待隊列中的下一個線程

    private void unparkSuccessor(Node node) {
        //當前線程節(jié)點的狀態(tài)
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//設(shè)置當前線程的節(jié)點狀態(tài)為0, 因為已經(jīng)釋放資源
        Node s = node.next; //找到下一個需要喚醒的節(jié)點
        if (s == null || s.waitStatus > 0) {//下一個節(jié)點為空或者已經(jīng)放棄等待就取消喚醒操作
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//從后往前找有效的節(jié)點
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒有效節(jié)點
    }
下一個有效的線程被喚醒后處在acquireQueued()的自旋流程中, 然后進入資源判斷獲取(if (p == head && tryAcquire(arg))).

public final void acquireShared(int arg)

此方法是共享模式下線程獲取共享資源的頂層入口. 它會獲取指定量的資源(state), 獲取成功后直接返回, 獲取失敗進入等待隊列, 直到獲取到資源(整個過程忽略中斷的影響).參考ReentrantReadWriteLock設(shè)計.

 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//改方法需要自定義同步器實現(xiàn). 返回語義負數(shù)表示失敗, 0或者大于零表示獲取成功.
            doAcquireShared(arg);//小于零進入等待隊列, 獲取資源后返回
    }
1. doAcquireShared(arg)方法
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//加入隊列的尾部, 模式為共享. addWaiter()方法參考上面介紹
        boolean failed = true;//成功失敗標識
        try {
            boolean interrupted = false;//是否中斷標識
            for (;;) {//CAS自旋
                final Node p = node.predecessor();//獲取前驅(qū)節(jié)點
                if (p == head) {//前驅(qū)節(jié)點為頭結(jié)點, 嘗試獲取資源(此處有可能是前驅(qū)節(jié)點喚醒了自己)
                    int r = tryAcquireShared(arg);//獲取資源
                    if (r >= 0) {//成功
                        setHeadAndPropagate(node, r);//將head指向自己, 此時r>0, 還有剩余資源喚醒后續(xù)排隊線程
                        p.next = null; // help GC
                        if (interrupted)// 中斷標識
                            selfInterrupt();//補上中斷
                        failed = false;
                        return;
                    }
                }
                 //不滿足喚醒條件, 調(diào)用park()進入waiting狀態(tài), 等待unpark(). 如果等待的過程被中斷, 線程會從park()中醒過來, 發(fā)現(xiàn)拿不到資源后繼續(xù)進入park()中等待.
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
2. setHeadAndPropagate(Node, int)方法
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);/head指向自己
        //如果還有剩余量, 繼續(xù)喚醒下一個排隊的線程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

acquireShared()方法總結(jié)

  1. tryAcquireShared()方法嘗試獲取資源, 成功直接返回, 如果不成功進入同步隊列排隊.
  2. 調(diào)用park()進入waiting狀態(tài), 等待前驅(qū)節(jié)點調(diào)用unpark()或者interrupt()喚醒自己.
  3. 被喚醒后嘗試獲取資源, 如果獲取不到資源進入2流程, 獲取到資源就執(zhí)行后續(xù)代碼.
    其實同acquir()方法一樣, 只不過該方法在自己拿到資源后回去喚醒后繼線程

public final boolean releaseShared(int arg)

該方法是共享模式下線程釋放共享資源的入口. 跟獨占模式下的資源釋放方法release()很相似, 不同的是獨占模式一般是完全釋放資源(state=0)后才允許去喚醒其他線程, 而共享模式往往不會這么控制, 具體實現(xiàn)要看自定義同步器的邏輯.

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//嘗試釋放資源, 該方法需要自定義共享同步器實現(xiàn).
            doReleaseShared();//喚醒后繼節(jié)點
            return true;
        }
        return false;
    }
1. tryReleaseShared()方法

需要我們自己實現(xiàn)的共享資源釋放方法.

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
2. doReleaseShared()方法

該方法是用來喚醒后繼節(jié)點的.

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;           
                    unparkSuccessor(h);//喚醒后繼節(jié)點
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)  //head節(jié)點如果發(fā)生變化即退出自旋
                break;
        }
    }

releaseShared()方法總結(jié)

  1. tryReleaseShared()方法進行共享資源的釋放.
  2. doReleaseShared()方法用來喚醒后繼節(jié)點.
以上是幾個AQS常用的資源獲取和釋放的基本方法, 其實還有一些方法和上面分析的方法略有不同, 如下:
  • 獨占式獲取資源
    1. acquireInterruptibly(int arg): 類似于acquire()方法, 不同的地方是該方法響應(yīng)外界對線程的中斷信號, 并且拋出InterruptedException()異常.
    2. tryAcquireNanos(int arg, long nanosTimeout) : 類似于acquire()方法, 同樣響應(yīng)中斷拋出InterruptedException()異常, 并且該方法有獲取超時時間.
  • 共享式獲取資源
    1. acquireSharedInterruptibly(int arg): 類似acquireInterruptibly()方法的共享實現(xiàn), 同樣響應(yīng)中斷拋出InterruptedException()異常.
    2. tryAcquireSharedNanos(int arg, long nanosTimeout)(): 類似tryAcquireNanos()方法的共享實現(xiàn), 同樣響應(yīng)中斷拋出InterruptedException()異常, 并且該方法有獲取超時時間.

測試案例

1. ExclusiveLock(自定義獨占鎖)

ExclusiveLock是互斥的不可重入鎖實現(xiàn), 對鎖資源State的操作只有0和1兩個狀態(tài), 0代表未鎖定,1代表鎖定. 按照上面的分析, 我們需要實現(xiàn)AQS的tryAcquire()和tryRelease()方法.

public class ExclusiveLock implements Lock {
    //自定義內(nèi)部類同步器
    private static class ExclusiveSync extends AbstractQueuedSynchronizer {
        //判斷是否是鎖定狀態(tài)
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        //嘗試獲取資源, 如果成功直接返回. 獲取成功返回true, 否則返回false.
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)){//狀態(tài)變更必須為CAS原子操作, 保證原子性
                setExclusiveOwnerThread(Thread.currentThread());//同樣也是原子操作
                return true;
            }
            return false;
        }
        //嘗試釋放資源
        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0){
                throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        
    }
    //創(chuàng)建自定義同步器的實現(xiàn)
    private final ExclusiveSync sync = new ExclusiveSync();
    //獲取資源, 同acquire()語義一樣, 獲取不到進入同步隊列等待成功返回
    @Override
    public void lock() {
        sync.acquire(1);
    }
   //判斷鎖是否被占有
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    //獲取資源, 立刻返回結(jié)果
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    //
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    //釋放資源
    @Override
    public void unlock() {
        sync.release(1);
    }
    
}

1. ShareLock(自定義共享鎖)

ShareLock為一個共享同步器的實現(xiàn), 設(shè)計同一時刻可以有兩個線程獲取到資源, 超過兩個進行同步隊列阻塞. 按照上面的分析, 我們實現(xiàn)AQS的tryAcquireShared()和tryReleaseShared()方法.

public class ShareLock implements Lock {
    
    
    public static class ShareSync extends AbstractQueuedSynchronizer{
        
        //定義同步器的初始狀態(tài)為2
        ShareSync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }

        }

        @Override
        protected boolean tryReleaseShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current + reduceCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }
    
    
    private final ShareSync sync = new ShareSync(2);
    
    
    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容