并發(fā)編程之AQS初探

Java并發(fā)編程核心在于java.concurrent.util包,而juc當(dāng)中的大多數(shù)同步器實(shí)現(xiàn)都是圍繞著共同的基礎(chǔ)行為,比如等待隊(duì)列、條件隊(duì)列、獨(dú)占獲取、共享獲取等,而這個(gè)行為的抽象就是基于AbstractQueuedSynchronizer,簡稱AQS。

AQS具備特性
?阻塞等待隊(duì)列
?共享/獨(dú)占
?公平/非公平
?可重入
?允許中斷

可以說,AQS貫穿了整個(gè)并發(fā)包設(shè)計(jì),是juc的核心,對于并發(fā)編程實(shí)現(xiàn)的理解至關(guān)重要。

AQS是什么

JDK源碼對AQS有十分具體的解釋,下面這段英文摘自java.util.concurrent.locks包下的AbstractQueuedSynchronizer.java源碼文件。

 *Provides a framework for implementing blocking locks and related
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers that rely on a
 * single atomic {@code int} value to represent state. Subclasses
 * must define the protected methods that change this state, and which
 * define what that state means in terms of this object being acquired
 * or released.  Given these, the other methods in this class carry
 * out all queuing and blocking mechanics. Subclasses can maintain
 * other state fields, but only the atomically updated {@code int}
 * value manipulated using methods {@link #getState}, {@link
 * #setState} and {@link #compareAndSetState} is tracked with respect
 * to synchronization.
 *
 * <p>Subclasses should be defined as non-public internal helper
 * classes that are used to implement the synchronization properties
 * of their enclosing class.  Class
 * {@code AbstractQueuedSynchronizer} does not implement any
 * synchronization interface.  Instead it defines methods such as
 * {@link #acquireInterruptibly} that can be invoked as
 * appropriate by concrete locks and related synchronizers to
 * implement their public methods.
 *
 * <p>This class supports either or both a default <em>exclusive</em>
 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
 * attempted acquires by other threads cannot succeed. Shared mode
 * acquires by multiple threads may (but need not) succeed. This class
 * does not &quot;understand&quot; these differences except in the
 * mechanical sense that when a shared mode acquire succeeds, the next
 * waiting thread (if one exists) must also determine whether it can
 * acquire as well. Threads waiting in the different modes share the
 * same FIFO queue. Usually, implementation subclasses support only
 * one of these modes, but both can come into play for example in a
 * {@link ReadWriteLock}. Subclasses that support only exclusive or
 * only shared modes need not define the methods supporting the unused mode.
 *
 * <p>This class defines a nested {@link ConditionObject} class that
 * can be used as a {@link Condition} implementation by subclasses
 * supporting exclusive mode for which method {@link
 * #isHeldExclusively} reports whether synchronization is exclusively
 * held with respect to the current thread, method {@link #release}
 * invoked with the current {@link #getState} value fully releases
 * this object, and {@link #acquire}, given this saved state value,
 * eventually restores this object to its previous acquired state.  No
 * {@code AbstractQueuedSynchronizer} method otherwise creates such a
 * condition, so if this constraint cannot be met, do not use it.  The
 * behavior of {@link ConditionObject} depends of course on the
 * semantics of its synchronizer implementation.
 *
 * <p>This class provides inspection, instrumentation, and monitoring
 * methods for the internal queue, as well as similar methods for
 * condition objects. These can be exported as desired into classes
 * using an {@code AbstractQueuedSynchronizer} for their
 * synchronization mechanics.

英語好的同學(xué)可以直接閱讀上述源碼作者Doug Lea給出的注解,這里為了讓更多人更好的理解,給出英文的大致含義。

AQS同步器是用來構(gòu)建鎖和其他同步組件的基礎(chǔ)框架,它的實(shí)現(xiàn)主要依賴一個(gè)int成員變量來表示同步狀態(tài)以及通過一個(gè)FIFO隊(duì)列構(gòu)成等待隊(duì)列。它的子類必須重寫AQS的幾個(gè)protected修飾的用來改變同步狀態(tài)的方法,其他方法主要是實(shí)現(xiàn)了排隊(duì)和阻塞機(jī)制。狀態(tài)的更新使用getState,setState以及compareAndSetState這三個(gè)方法。
子類被推薦定義為自定義同步組件的非public靜態(tài)內(nèi)部類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)的獲取和釋放方法來供自定義同步組件的使用。
同步器既支持獨(dú)占式獲取同步狀態(tài),也可以支持共享式獲取同步狀態(tài),這樣就可以方便的實(shí)現(xiàn)不同類型的同步組件。
同步器定義了一個(gè)嵌套的條件對象類COnditionObject,可以被子類使用作為Contidition用于支持獨(dú)占模式。

同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語義??梢赃@樣理解二者的關(guān)系:鎖是面向使用者,它定義了使用者與鎖交互的接口,隱藏了實(shí)現(xiàn)細(xì)節(jié);同步器是面向鎖的實(shí)現(xiàn)者,它簡化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)的管理,線程的排隊(duì),等待和喚醒等底層操作。鎖和同步器很好的隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域。

AQS的模板方法設(shè)計(jì)模式

AQS的設(shè)計(jì)采用了模板方法的設(shè)計(jì)模式,它將一些方法開放給子類進(jìn)行重寫,根據(jù)java多態(tài)性,同步器給同步組件所提供模板方法在調(diào)用這些被重寫的方法時(shí),實(shí)際調(diào)用的是子類的實(shí)現(xiàn)。

AQS繼承關(guān)系.png

以ReentrantLock為例舉個(gè)例子,AQS中需要重寫的方法tryAcquire:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

ReentrantLock中NonfairSync(繼承AQS)會(huì)重寫該方法為:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

而AQS中的模板方法acquire():

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }

會(huì)調(diào)用tryAcquire方法,而此時(shí)當(dāng)繼承AQS的NonfairSync調(diào)用模板方法acquire時(shí)就會(huì)調(diào)用已經(jīng)被NonfairSync重寫的tryAcquire方法。這就是使用AQS的方式,在弄懂這點(diǎn)后會(huì)lock的實(shí)現(xiàn)理解有很大的提升??梢詺w納總結(jié)為這么幾點(diǎn):

  1. 同步組件(這里不僅僅指鎖,還包括CountDownLatch等)的實(shí)現(xiàn)依賴于同步器AQS,在同步組件實(shí)現(xiàn)中,使用AQS的方式被推薦為定義繼承AQS的靜態(tài)內(nèi)部類;
  2. AQS采用模板方法進(jìn)行設(shè)計(jì),AQS的protected修飾的方法需要由繼承AQS的子類進(jìn)行重寫實(shí)現(xiàn),當(dāng)調(diào)用AQS的子類的方法時(shí)就會(huì)調(diào)用被重寫的方法;
  3. AQS負(fù)責(zé)同步狀態(tài)的管理,線程的排隊(duì),等待和喚醒這些底層操作,而Lock等同步組件主要專注于實(shí)現(xiàn)同步語義;
  4. 在重寫AQS的方式時(shí),使用AQS提供的getState(),setState(),compareAndSetState()方法進(jìn)行修改同步狀態(tài)

AQS可重寫的方法如下圖(摘自《java并發(fā)編程的藝術(shù)》一書):

image

在實(shí)現(xiàn)同步組件時(shí)AQS提供的模板方法如下圖:

image

AQS提供的模板方法可以分為3類:

  1. 獨(dú)占式獲取與釋放同步狀態(tài);
  2. 共享式獲取與釋放同步狀態(tài);
  3. 查詢同步隊(duì)列中等待線程情況;

同步組件通過AQS提供的模板方法實(shí)現(xiàn)自己的同步語義。

一個(gè)例子

下面使用一個(gè)例子來進(jìn)一步理解下AQS的使用。這個(gè)例子也是來源于AQS源碼中的example。

class Mutex implements Lock, java.io.Serializable {
    // Our internal helper class
    // 繼承AQS的靜態(tài)內(nèi)存類
    // 重寫方法
    private static class Sync extends AbstractQueuedSynchronizer {
        // Reports whether in locked state
        //判斷同步器是否處于鎖定狀態(tài)
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquires the lock if state is zero
        //只有當(dāng)state值為0,也就是沒有線程獲取著該鎖,才能拿到鎖的使用權(quán)
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();
    //使用同步器的模板方法實(shí)現(xiàn)自己的同步語義
    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

MutexDemo:

public class MutextDemo {
    private static Mutex mutex = new Mutex();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                mutex.lock();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.unlock();
                }
            });
            thread.start();
        }
    }
}

執(zhí)行情況:

image

上面的這個(gè)例子實(shí)現(xiàn)了獨(dú)占鎖的語義,在同一個(gè)時(shí)刻只允許一個(gè)線程占有鎖。MutexDemo新建了10個(gè)線程,分別睡眠3s。從執(zhí)行情況也可以看出來當(dāng)前Thread-6正在執(zhí)行占有鎖而其他Thread-7,Thread-8等線程處于WAIT狀態(tài)。按照推薦的方式,Mutex定義了一個(gè)繼承AQS的靜態(tài)內(nèi)部類Sync,并且重寫了AQS的tryAcquire等等方法,而對state的更新也是利用了setState(),getState(),compareAndSetState()這三個(gè)方法。在實(shí)現(xiàn)lock接口中的方法也只是調(diào)用了AQS提供的模板方法(因?yàn)镾ync繼承AQS)。
從這個(gè)例子就可以很清楚的看出來,在同步組件的實(shí)現(xiàn)上主要是利用了AQS,而AQS“屏蔽”了同步狀態(tài)的修改,線程排隊(duì)等底層實(shí)現(xiàn),通過AQS的模板方法可以很方便的給同步組件的實(shí)現(xiàn)者進(jìn)行調(diào)用。而針對用戶來說,只需要調(diào)用同步組件提供的方法來實(shí)現(xiàn)并發(fā)編程即可。同時(shí)在新建一個(gè)同步組件時(shí)需要把握的兩個(gè)關(guān)鍵點(diǎn)是:

  1. 實(shí)現(xiàn)同步組件時(shí)推薦定義繼承AQS的靜態(tài)內(nèi)存類,并重寫需要的protected修飾的方法;
  2. 同步組件語義的實(shí)現(xiàn)依賴于AQS的模板方法,而AQS模板方法又依賴于被AQS的子類所重寫的方法。

通俗點(diǎn)說,因?yàn)锳QS整體設(shè)計(jì)思路采用模板方法設(shè)計(jì)模式,同步組件以及AQS的功能實(shí)際上別切分成各自的兩部分:

同步組件實(shí)現(xiàn)者的角度:

通過可重寫的方法:獨(dú)占式: tryAcquire()(獨(dú)占式獲取同步狀態(tài)),tryRelease()(獨(dú)占式釋放同步狀態(tài));共享式 :tryAcquireShared()(共享式獲取同步狀態(tài)),tryReleaseShared()(共享式釋放同步狀態(tài));告訴AQS怎樣判斷當(dāng)前同步狀態(tài)是否成功獲取或者是否成功釋放。同步組件專注于對當(dāng)前同步狀態(tài)的邏輯判斷,從而實(shí)現(xiàn)自己的同步語義。這句話比較抽象,舉例來說,上面的Mutex例子中通過tryAcquire方法實(shí)現(xiàn)自己的同步語義,在該方法中如果當(dāng)前同步狀態(tài)為0(即該同步組件沒被任何線程獲?。?,當(dāng)前線程可以獲取同時(shí)將狀態(tài)更改為1返回true,否則,該組件已經(jīng)被線程占用返回false。很顯然,該同步組件只能在同一時(shí)刻被線程占用,Mutex專注于獲取釋放的邏輯來實(shí)現(xiàn)自己想要表達(dá)的同步語義。

AQS的角度

而對AQS來說,只需要同步組件返回的true和false即可,因?yàn)锳QS會(huì)對true和false會(huì)有不同的操作,true會(huì)認(rèn)為當(dāng)前線程獲取同步組件成功直接返回,而false的話就AQS也會(huì)將當(dāng)前線程插入同步隊(duì)列等一系列的方法。

總的來說,同步組件通過重寫AQS的方法實(shí)現(xiàn)自己想要表達(dá)的同步語義,而AQS只需要同步組件表達(dá)的true和false即可,AQS會(huì)針對true和false不同的情況做不同的處理,至于底層實(shí)現(xiàn),可以看這篇文章。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容