五、AQS源碼解讀

AQS的全稱是AbstractQueuedSynchronizer抽象同步隊列,它是并發(fā)組件里的基礎(chǔ)組件。
其內(nèi)部包含兩個內(nèi)部類ConditionObject、Node,這篇文章先從這兩個類開始講起,分析AQS如何工作


目錄

  1. AbstractQueuedSynchronizer
  2. ConditionObject
  3. 結(jié)語

AbstractQueuedSynchronizer介紹

剛開始,我學(xué)習(xí)AQS的時候一團麻,因為里面的函數(shù)、內(nèi)部類ConditionObject比較復(fù)雜,一直沒有濾清關(guān)系。
等后面看到就發(fā)現(xiàn)不能這么一股腦的看,那應(yīng)該怎么看呢?

  1. 首先你就把它想象成替代synchronized的替代品,synchronized的功能是什么呢,我們回憶一下,就是請求獲取內(nèi)部監(jiān)視器鎖,然后鎖住資源,其他線程在請求的時候就會阻塞,直到其執(zhí)行完并或釋放鎖。
  2. 了解AQS的結(jié)構(gòu)也就是它的屬性,知道它是一個FIFO隊列,并用state來維持同步狀態(tài)
  3. 結(jié)合以上,再去理解AQS是如何實現(xiàn)獲取鎖和釋放鎖的步驟的。
  4. 理解了以上,我們再去理解ConditionObject,這個內(nèi)部類同樣可以和原生并發(fā)中的一一對應(yīng),也就是Object的notify和wait。AQS通過ConditionObject實現(xiàn)signal、await

AQS主要屬性

  • volatile Node head 隊列頭結(jié)點
  • volatile Node tail 隊列尾節(jié)點
  • state 同步狀態(tài)

AQS實現(xiàn)原理最重要的就是以上三個屬性,通過他們來實現(xiàn)加鎖、釋放、同步、FIFO。

單獨說一下state這個屬性,因為它比較難理解,但又是核心屬性??梢詫⑵淅斫鉃橘Y源,也可以理解為鑰匙、還可以理解為鎖狀態(tài)。舉個例子比如獨占鎖:當state為0時,表示還沒人拿到這把鎖,當state為1時,表示鎖已經(jīng)被其他線程持有了,我們就必須等待再請求,我們占據(jù)、釋放鎖,也就是對state進行了CAS原子操作。

Node

Node是同步隊列中的元素,就像鏈表中的元素,結(jié)構(gòu)比較簡單,通過next和pre實現(xiàn)一個雙向的數(shù)據(jù)結(jié)構(gòu)。

Node屬性

  • thread:用來記錄進入AQS隊列里面的線程
  • prev: 前驅(qū)節(jié)點
  • next: 后繼節(jié)點
  • nextWaiter: 下個等待節(jié)點(也就是狀態(tài)為CONDITION)
  • waitStatus: 記錄當前線程等待狀態(tài)
    • 1 CANCELLED 表示被取消了
    • -1 SIGNAL 表示當前節(jié)點的后續(xù)節(jié)點需要被喚醒,也就是釋放此資源,要通知其他人
    • -2 CONDITION 在條件隊列里面等待
    • -3 PROPAGATE 釋放共享資源時需要通知其他節(jié)點,這個是專屬共享鎖
    • 0 其他狀態(tài),在同步隊列中,等待acquire
  • static final Node SHARED:用來標記該線程是在獲取共享資源時被掛起的
  • static final Node EXCLUSIVE: 用來標記該線程是在獲取獨占資源時被掛起的
    思考:
  1. next和nextWaiter可能讓人有點迷惑
  2. SHARED和EXCLUSIVE為什么類型是Node而不是用int或枚舉這種?

Node方法

Node一共就5個方法,其中有3個是構(gòu)造方法,另外一個是判斷是否共享、一個是獲取前驅(qū)節(jié)點

final boolean isShared() {
    return nextWaiter == SHARED;
}
Node() {    // Used to establish initial head or SHARED marker
}
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
Node(Thread thread, Node mode) {     // Used by addWaiter 同步隊列
    this.nextWaiter = mode;
    this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition 條件隊列
    this.waitStatus = waitStatus;
    this.thread = thread;
}

如果對上面函數(shù)不太理解,可以先放一放,繼續(xù)往下看

獲取鎖

如上面所說,獲取鎖其實就是獲取state,請求這個資源。下面我們看具體源代碼

  • acquire() 獲取入口

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&  // 嘗試獲取鎖,如果失敗則
            // 1. 調(diào)用addWaiter 插入同步隊列的,隊尾插入
            // 2. 調(diào)用acquireQueued,在同步隊列中自旋或者掛起。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
  • addWaiter 以當前線程作為參數(shù)新建節(jié)點加入隊列

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node; // 此處有可能并發(fā),所以并不保證后一個節(jié)點。
                return node;
            }
        }
        // 如果失敗如其他線程加入,則進行自旋不斷嘗試加入隊尾
        enq(node);
        return node;
    }
    

    注意失敗有兩種情況:

    • 一種是隊列沒有初始化tail為空
    • 一種是cas失敗,也就是說其他線程插入到尾節(jié)點了
  • enq: 自旋加入隊尾

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))// 重點
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    這段代碼重點是初始化隊列的方法,使用一個空節(jié)點作為head來初始化隊列,這就是為什么很多地方處理都是從頭結(jié)點的下一個節(jié)點開始
    頭結(jié)點是空節(jié)點
    頭結(jié)點是空節(jié)點

  • acquireQueued 同樣也是一個循環(huán)自旋,去獲取資源,獲取到則進行出隊操作

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false; // 中斷標識
            for (;;) {
                final Node p = node.predecessor();
                // 如果前驅(qū)節(jié)點是頭節(jié)點則重新設(shè)置頭節(jié)點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果不是,則判斷該節(jié)點是否應(yīng)該park掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private final boolean parkAndCheckInterrupt() {
        //使用LockSupport,掛起當前線程
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    重點分析下頭節(jié)點的交換過程, p == head && tryAcquire(arg),這句話的表示當前線程的前驅(qū)節(jié)點是頭節(jié)點,且當前線程已經(jīng)獲取到鎖了則執(zhí)行如下操作:

    AQS出隊

    • head出列,釋放head
    • 將當前線程節(jié)點作為head,并使之成為一個空節(jié)點
    • 這個過程不會產(chǎn)生并發(fā),因為同一時間只有一個線程能獲取鎖。
  • shouldParkAfterFailedAcquire
    根據(jù)節(jié)點阻塞狀態(tài)來判斷是否park,進行到這一步,說明上一步失敗,有兩種情況導(dǎo)致失敗,一是當前的前驅(qū)節(jié)點不是頭結(jié)點,二是在嘗試獲取線程失敗了,也就是有其他線程搶占到了資源。那么我們就去判斷是否需要park(這里和自旋操作不一樣了,不再是無限自旋浪費cpu,而是阻塞等待)
    下面的代碼比較簡單,就是根據(jù)node的waitStatus來判斷:

    1. 如果前驅(qū)節(jié)點是無效節(jié)點,則放棄前驅(qū)節(jié)點,向上找一個有效節(jié)點作為前驅(qū)。
    2. 如果前驅(qū)節(jié)點是SIGNAL,則安全的park
    3. 其他狀態(tài)則將前驅(qū)節(jié)點設(shè)為SIGNAL
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 當前節(jié)點的前驅(qū)節(jié)點為SIGNAL,說明要喚醒當前線程,則當前線程可以安全的park
            return true;
        if (ws > 0) {
            // 如果節(jié)點被取消,則放棄前面的節(jié)點直到有效節(jié)點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //其他(這里只有0或者-3),需要設(shè)置前驅(qū)節(jié)點為SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

至此,完成一個請求鎖(資源)的過程,總結(jié)獲取鎖的三個重要步驟及其原理

  1. 嘗試獲取鎖,CAS操作
  2. 如果獲取不到嘗試加隊列并自旋請求獲取鎖
  3. 使用LockSupport,減少不停的自旋帶來的開銷。

獲取鎖時獨占和共享區(qū)別

共享模式下調(diào)用acquireShared(),步驟大體相似,我們重點看一下不同的地方。

  1. acquireShared
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) // 也是想try,只不過判斷方式不一樣,一般共享下,state表示剩余占有次數(shù)。
            doAcquireShared(arg); // 如果失敗則加入同步隊列并等待
    }
    
  2. doAcquireShared
    這個方法和獨占鎖相對應(yīng)其實是 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))結(jié)合體,它將加入隊列和阻塞等待、獲取到鎖后出列操作放到一起。之所以獨占鎖分成兩個函數(shù),是因為acquireQueued()函數(shù)在條件隊列里也有調(diào)用
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); // 加入同步隊列
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {          // 自旋嘗試獲取鎖
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 出列并喚醒其他等待節(jié)點,這里叫傳播,因為可能喚醒不止一個節(jié)點
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 同樣沒有獲取到則阻塞
                if (shouldParkAfterFailedAcquire(p, node) && 
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    
  3. setHeadAndPropagate(node, r); 這個函數(shù)是重點不一樣的地方,前面說過獨占模式調(diào)用的是setHeader,進行一個頭結(jié)點出列的處理。但是共享模式不一樣,它是一個傳播的方式。
    private void setHeadAndPropagate(Node node, int propagate) { // node 前驅(qū)節(jié)點
        Node h = head; // Record old head for check below
        setHead(node);      // 和獨占鎖一樣,頭結(jié)點出列
        // 但共享鎖還有進一步操作,依序獲取后繼節(jié)點,如果是空或者是共享的,則依次釋放
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

釋放鎖

釋放鎖過程相對來說更簡單一點:

  • release
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 如果釋放資源成功,則喚醒有效頭結(jié)點
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    這段代碼重點關(guān)注if (h != null && h.waitStatus != 0),waitStatus在什么時候不為0?-3或者-1,而這兩種狀態(tài)都需要喚醒下一個線程,反過來講,如果waitStatus為0,代表當前節(jié)點后面的節(jié)點沒有掛起等待,那就不需要unpark了。
  • unparkSuccessor
    private void unparkSuccessor(Node node) {
        // 將頭節(jié)點的狀態(tài)改為初始狀態(tài)
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        //如果頭節(jié)點的后繼節(jié)點為無效狀態(tài)則從隊尾開始找并喚醒
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

釋放鎖時獨占和共享區(qū)別

  1. releaseShared
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();  // 同樣只有稍有不同
                return true;
            }
            return false;
        }
    
  2. doReleaseShared
    private void doReleaseShared() {
        for (;;) {  // 這里區(qū)別比較大,是一個循環(huán),也就是達到傳播的目的
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {// 如果是Signal,則重置當前節(jié)點并釋放下一個節(jié)點
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    
  3. 總結(jié)
    共享和獨占模式下,獲取鎖和釋放鎖都需要以傳播的方式進行,也就是說只要拿到鎖,并且可以共享獲取,那么就擴散到其他節(jié)點,讓其他節(jié)點也可以獲取。

借三個問題復(fù)習(xí)一下:
① 為什么從隊尾往后找?
② 當一個head節(jié)點的waitStatus為0說明什么呢?
③ 為什么喚醒的是頭結(jié)點的后繼節(jié)點而不是頭結(jié)點?

ConditionObject

正如上面所講,notify和wait是配合synchronized內(nèi)置鎖實現(xiàn)線程間同步的基礎(chǔ)一樣,條件變量的signal和await方法也是配合aqs鎖來實現(xiàn)線程間同步的。而且AQS的獨占模式才支持ConditionObject

屬性

private transient Node firstWaiter;
private transient Node lastWaiter;

只有兩個屬性,可以看出設(shè)計者的思想就是在隊首和隊尾操作,一個先進先出的隊列。
而且注意都是以waiter結(jié)尾,其實跟Node中的屬性nextWaiter對應(yīng),在條件隊列中,只有nextWaiter有用,其prev和next都是null

await方法

  1. await

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();// 插入一個節(jié)點(當前線程)
        int savedState = fullyRelease(node); // 釋放鎖
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {// 如果不在同步隊列中則一直掛起
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //通過acquireQueued來獲取鎖狀態(tài)直至成功,該方法返回值指示該過程是否被中斷
        // 到這一步說明已經(jīng)在同步隊列了,接下來就是去競爭獲取鎖了。
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //signal方法會將節(jié)點的nextWaiter字段賦值為null,但有可能是因為中斷而取消的
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        //根據(jù)中斷模式,進行相應(yīng)的處理
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    由上面可以看出await做了以下事情:

    1. 插入一個ConditionWaiter節(jié)點
    2. 釋放鎖
    3. 循環(huán)判斷是否在同步隊列中(因為有可能喚醒自己或者被其他喚醒),如果不在則park
    4. 如果在同步隊列中則調(diào)用aqs的acquireQueued方法嘗試獲取資源(上節(jié)已有介紹)
    5. 后續(xù)處理

    接下來逐個解析源碼

  2. addConditionWaiter
    插入一個節(jié)點,將當前線程插入條件隊列,狀態(tài)默認是CONDITION

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters(); // 如果隊尾的節(jié)點狀態(tài)不為CONDITION,觸發(fā)清理
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

    從上面看到,都沒有加鎖或者cas操作,那是因為在獨占模式下,進入到此處的已經(jīng)獲取鎖了,不存在并發(fā)情況。

  3. fullyRelease
    釋放資源

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {// 釋放鎖,參考aqs解析
                failed = false;
                return savedState;
            } else {
                // 順便提一下,release的返回值由tryRelease來決定,也就是子類來定義
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    
  4. isOnSyncQueue
    判斷是否在同步隊列中

     final boolean isOnSyncQueue(Node node) {
        // 只有一種情況同步隊列prev==null,也就是頭結(jié)點,但之前也說過,此時的頭結(jié)點已經(jīng)拿到鎖了,所以不可能進入此(進入這個函數(shù)說明已經(jīng)釋放了鎖)
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * 這里為什么不用node.prev != null,因為插入同步隊列的操作是先node.prev =  
         * pred; 再compareAndSetTail(pred, node),也就是說cas之前prev是有可能
         * 有值的,這個時候使用node.pre搜出來并不代表它在隊列中,
         * 但只有cas成功才算加入同步隊列。  
         * 同樣下面搜索過程因為如果我們正好在cas放置為tail之后,設(shè)置pred.next=node之
         * 前,也就是說,如隊尾的時候先cas成功,再設(shè)置它的前驅(qū)節(jié)點的next為此節(jié)點,那么當
         * 我們搜索的時候正好介于這兩個步驟之間如果從前往后搜,則無法搜到  
         * 所以需要從后往前搜
         */
        return findNodeFromTail(node);
    }   
    
  5. 響應(yīng)中斷
    transferAfterCancelledWait方法在發(fā)生中斷時,將節(jié)點從condition queue中轉(zhuǎn)移到sync queue中去

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    } 
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
    
  6. signal
    將一個節(jié)點(線程)從條件變量的阻塞隊列(condition queue)中移動到同步隊列中(sync queue),讓該節(jié)點重新嘗試獲取資源

    public final void signal() {
        if (!isHeldExclusively())//當前線程是否在獨占資源
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);  // 總是喚醒隊首
    }
    // 首先修改隊首,然后調(diào)用transferForSignal()(AbstractQueuedSynchronizer方法)到同步隊列
    // 如果失敗,則說明該節(jié)點被取消,則繼續(xù)循環(huán)直到找到一個等待節(jié)點
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
    }
    
  7. doSignal

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
                    // 更新頭結(jié)點,直到成功,或者隊列為空
    }
    final boolean transferForSignal(Node node) {
        /**
        * 可能存在競爭:
        * 另一個線程在執(zhí)行await時,被中斷,然后執(zhí)行transferAfterCancelledWait方法;
        * 當前線程執(zhí)行signal,然后執(zhí)行次方法。這兩個方法都會通過CAS操作將節(jié)的狀態(tài)從CONDITION改為0
        * 或者節(jié)點的狀態(tài)已經(jīng)是CANCELLED
        **/
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 將頭結(jié)點加入同步隊列,并unpark喚醒(記得上面await中的park嗎)
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

結(jié)語

其他方式的喚醒和阻塞等待原理類似。
再回顧一下重點:

  1. ConditionObject是只有獨占鎖才有的,因此學(xué)習(xí)AQS可以先將它放一邊;
  2. prev和next是同步隊列的,nextWaiter是條件隊列的;
  3. 同步隊列中,頭結(jié)點永遠是對標拿到鎖的當前線程,但是個空節(jié)點;
  4. 同步隊列,從后往前是可靠的,從前往后是不可靠的;
  5. AQS的實現(xiàn)主要在于鎖如何操作,也就是state
作用 wait() park()
如何喚醒 notify,有順序要求 unpark(),無順序要求,主要獲取許可,就可喚醒
休眠時狀態(tài) waiting waiting
是否釋放鎖
是否響應(yīng)中斷 報錯 響應(yīng)中斷

下一章節(jié):并發(fā)包中其他鎖

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容