AQS源碼學(xué)習(xí)

1、AQS介紹

AQS全稱AbstractQueuedSynchronizer,是一個同步器,用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。內(nèi)部主要使用一個volatile修飾的state變量和一個FIFO雙向隊列來實現(xiàn)的。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;

ReentrantLock、Semaphore、CountDownLatch等都是基于AQS實現(xiàn)的。

同步器的主要使用方式是繼承,子類通過繼承同步器并實現(xiàn)它的抽象方法來管理同步狀 態(tài),在抽象方法的實現(xiàn)過程中免不了要對同步狀態(tài)進行更改,這時就需要使用同步器提供的3 個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進行操 作,因為它們能夠保證狀態(tài)的改變是安全的。

image-20211117224126737.png

子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器自身沒有實現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來 供自定義同步組件使用,同步器既可以支持獨占式地獲取同步狀態(tài),也可以支持共享式地獲 取同步狀態(tài),這樣就可以方便實現(xiàn)不同類型的同步組件(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等)

public class Main {
    static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            return super.tryAcquire(arg);
        }

        @Override
        protected boolean tryRelease(int arg) {
            return super.tryRelease(arg);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            return super.tryAcquireShared(arg);
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            return super.tryReleaseShared(arg);
        }

        @Override
        protected boolean isHeldExclusively() {
            return super.isHeldExclusively();
        }
    }
}

同步器是實現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實現(xiàn)中聚合同步器,利用同步器實現(xiàn)鎖的語義??梢赃@樣理解二者之間的關(guān)系:鎖是面向使用者的,它定義了使用者與鎖交 互的接口(比如可以允許兩個線程并行訪問),隱藏了實現(xiàn)細節(jié);同步器面向的是鎖的實現(xiàn)者,它簡化了鎖的實現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現(xiàn)者所需關(guān)注的領(lǐng)域。

2、同步器的接口與示例

同步器的設(shè)計是基于模板方法模式的,也就是說,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法將會調(diào)用使用者重寫的方法。 這句話聽起來很繞,慢慢往后看就懂了。

我們在繼承AQS并重寫那5個方法的時候,需要調(diào)用AQS提供的幾個方法去訪問或修改stats變量的狀態(tài)。

// 獲取當(dāng)前同步狀態(tài)
protected final int getState() {
    return state;
}

// 設(shè)置當(dāng)前同步狀態(tài)
protected final void setState(int newState) {
    state = newState;
}

// 使用CAS設(shè)置當(dāng)前狀態(tài),CAS能保證操作的原子性
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

我們在繼承同步器時可重寫的方法如下

image-20211117224924467.png

在實現(xiàn)自定義同步組件時,也會調(diào)用同步器提供的模板方法,部分模板方法如下

image-20211118202508275.png

AQS提供的模板方法主要分為三類:

(1)獨占式獲取和釋放同步狀態(tài)。

(2)共享式獲取和釋放同步狀態(tài)。

(3)查詢同步隊列中等待的線程情況。

3、實現(xiàn)一個獨占鎖

獨占鎖,就是同一時刻只能一個線程擁有鎖?;贏QS,我們可以很方便的實現(xiàn)。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 基于AQS實現(xiàn)獨占鎖
 */
public class ExclusiveLock implements Lock {

    /**
     * 實現(xiàn)自定義的同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 獲取鎖
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            // 設(shè)置同步變量state為1
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 是否處于占用狀態(tài)
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 每個condition都包含一個condition隊列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

一般在實現(xiàn)自定義同步器時,都會把它作為靜態(tài)內(nèi)部類去實現(xiàn)。上面實現(xiàn)的ExclusiveLock類中就是定義了一個靜態(tài)內(nèi)部類Sync去繼承AQS實現(xiàn)獨占鎖邏輯的。通過CAS設(shè)置同步變量state值為1表示獲取鎖成功,釋放鎖時把state設(shè)置為0即可。

4、AQS的實現(xiàn)分析

4.1 同步隊列

AQS內(nèi)部依賴同步隊列(雙向FIFO隊列)來進行線程的管理,當(dāng)線程獲取鎖失敗時,會將線程以及等待狀態(tài)信息構(gòu)造為一個節(jié)點Node并將其放入同步隊列隊尾,然后阻塞該線程。當(dāng)鎖釋放的時候,會把隊首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

Node是AQS中的一個靜態(tài)內(nèi)部類,用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點,主要字段如下

static final class Node {
        // 表示節(jié)點是共享模式
        static final Node SHARED = new Node();
        // 表示節(jié)點是獨占模式
        static final Node EXCLUSIVE = null;

        // 由于在同步隊列中等待的線程等待超時或者被中斷,需要從同步隊列中取消等待,節(jié)點進入該狀態(tài)將不會改變。
        static final int CANCELLED =  1;
        // 后繼節(jié)點的線程處于等待狀態(tài),而當(dāng)前節(jié)點的線程如果釋放了同步狀態(tài)或者被取消,將會通知后繼節(jié)點,使后繼節(jié)點的線程得以運行。
        static final int SIGNAL    = -1;
        // 節(jié)點在等待隊列中,節(jié)點線程等待在Condition上,當(dāng)其他線程對Condition調(diào)用了signal()方法后,該節(jié)點將會從等待隊列轉(zhuǎn)移到同步隊列中,加入到對同步狀態(tài)的獲取中。
        static final int CONDITION = -2;
        // 表示下一次共享式同步狀態(tài)獲取將會無條件的傳播下去。
        static final int PROPAGATE = -3;

        /**
         * 等待狀態(tài),取值如下
         * SIGNAL 值為-1,
         *
         * CANCELLED 值為1,
         *
         * CONDITION 值為-2,
         *
         * PROPAGATE 值為-3,
         *
         * INITIAL 值為0,表示初始狀態(tài)。
         */
        volatile int waitStatus;

        // 前驅(qū)節(jié)點,當(dāng)節(jié)點加入同步隊列時被設(shè)置(尾部添加)
        volatile Node prev;

        // 后繼節(jié)點
        volatile Node next;

        // 獲取同步狀態(tài)的線程
        volatile Thread thread;

        // 等待隊列中的后繼節(jié)點。如果當(dāng)前節(jié)點是共享的,那么這個字段將是一個SHARED常量,也就是說節(jié)點類型(獨占和共享)和等待隊列中的后繼節(jié)點共用同一個字段
        Node nextWaiter;
}

沒有成功獲取同步狀態(tài)的線程將會加入同步隊列的尾部,這個加入的過程也必須要保證線程安全,因此AQS提供了compareAndSetTail(Node expect, Node update)方法。同步隊列的結(jié)構(gòu)大致如下

image-20211122215003010.png

同步隊列設(shè)置尾結(jié)點的過程大致如下

image-20211122215318789.png

同步隊列的首節(jié)點是獲取鎖成功的線程,首節(jié)點的線程在釋放鎖后,將會喚醒后繼節(jié)點,后繼節(jié)點如果獲取鎖成功的同時會把自己設(shè)置為頭結(jié)點

image-20211122215510970.png

設(shè)置頭結(jié)點是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個線程能成功獲取到同步狀態(tài),因此設(shè)置頭結(jié)點的方法并不需要CAS來保證,它只需要將首節(jié)點設(shè)置為頭結(jié)點的后繼節(jié)點然后斷開首節(jié)點即可。

4.2 獨占式同步狀態(tài)獲取和釋放

我們在實現(xiàn)自定義的獨占式同步器時,主要重寫了AQS的tryAcquiretryRelease方法,通過操作同步變量state完成同步狀態(tài)的獲取和釋放。

我們可以調(diào)用AQS對外提供的acquire獲取同步狀態(tài),該方法會調(diào)用我們重新的tryAcquire方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果調(diào)用tryAcquire方法返回為false,則通過addWaiter把線程加入同步隊列隊尾,并標(biāo)志位獨占Node.EXCLUSIVE。通過CAS確保節(jié)點能安全的添加到隊列尾。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

加入隊列尾后,通過CAS不斷的嘗試獲取同步狀態(tài)。只有在前驅(qū)節(jié)點是頭結(jié)點的情況下,才有可能獲取到同步狀態(tài)。如果獲取不到則阻塞節(jié)點中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點的出隊或阻塞線程被中斷來實現(xiàn)。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued(final Node node,int arg)方法中,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點是頭節(jié)點才能夠嘗試獲取同步狀態(tài),這是為什么?原因有兩個,如下。

(1)頭節(jié)點是成功獲取到同步狀態(tài)的節(jié)點,而頭節(jié)點的線程釋放了同步狀態(tài)之后,將會喚醒其后繼節(jié)點,后繼節(jié)點的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點是否是頭節(jié)點。

(2)維護同步隊列的FIFO原則。

image-20211122220135617.png

獨占式同步狀態(tài)獲取流程,也就是acquire(int arg)方法調(diào)用流程大致如下

image-20211122222721588.png

上圖中,當(dāng)同步狀態(tài)獲取成功之后,當(dāng)前線程從acquire(int arg)方法返回,如果對于鎖這種并發(fā)組件而言,代表著當(dāng)前線程獲取了鎖。

當(dāng)線程獲取同步狀態(tài)并執(zhí)行完相應(yīng)的邏輯后,就需要釋放同步狀態(tài),通過調(diào)用AQS提供的release方法。該方法在釋放了同步狀態(tài)之后,會喚醒其后繼節(jié)點(進而使后繼節(jié)點重新嘗試獲取同步狀態(tài))。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

該方法執(zhí)行時,會喚醒頭節(jié)點的后繼節(jié)點線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程。

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

分析了獨占式同步狀態(tài)獲取和釋放過程后,適當(dāng)做個總結(jié):在獲取同步狀態(tài)時,同步器維護一個同步隊列,獲取狀態(tài)失敗的線程都會被加入到隊列中并在隊列中進行自旋;移出隊列(或停止自旋)的條件是前驅(qū)節(jié)點為頭節(jié)點且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點的后繼節(jié)點

4.3 共享式同步狀態(tài)獲取和釋放

共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例,如果一個程序在對文件進行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問。

通過調(diào)用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態(tài)

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireShared方法中,會調(diào)用我們重寫的tryAcquireShared方法嘗試獲取同步狀態(tài),該方法返回值為int,返回值大于等于0時,表示獲取成功。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態(tài)并退出自旋的條件就是tryAcquireShared(int arg)方法返回值大于等于0??梢钥吹?,在doAcquireShared(int arg)方法的自旋過程中,如果當(dāng)前節(jié)點的前驅(qū)為頭節(jié)點時,嘗試獲取同步狀態(tài),如果返回值大于等于0,表示該次獲取同步狀態(tài)成功并從自旋過程中退出。

與獨占式一樣,共享式獲取也需要釋放同步狀態(tài),通過調(diào)用releaseShared(int arg)方法可以釋放同步狀態(tài)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

該方法在釋放同步狀態(tài)之后,將會喚醒后續(xù)處于等待狀態(tài)的節(jié)點。對于能夠支持多個線程同時訪問的并發(fā)組件(比如Semaphore),它和獨占式主要區(qū)別在于tryReleaseShared(int arg)方法必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放,一般是通過循環(huán)和CAS來保證的,因為釋放同步狀態(tài)的操作會同時來自多個線程。

4.4 超時獲取同步狀態(tài)

通過調(diào)用同步器的doAcquireNanos(int arg,long nanosTimeout)doAcquireSharedNanos(int arg, long nanosTimeout)方法可以超時獲取同步狀態(tài),即在指定的時間段內(nèi)獲取同步狀態(tài),如果獲取到同步狀態(tài)則返回true,否則,返回false。該方法提供了傳統(tǒng)Java同步操作(比如synchronized關(guān)鍵字)所不具備的特性。

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 從現(xiàn)在起經(jīng)過nanosTimeout后超時
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 調(diào)用tryAcquire方法獨占式獲取同步狀態(tài)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 更新超時時間
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

大致流程如下

image-20211120113200693.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容