2. AbstractQueuedSynchronizer

隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構(gòu)建鎖和其他同步組件的基礎(chǔ)框架。它使用了一個int成員變量來表示同步狀態(tài),通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作,并發(fā)包的作者(Doug Lea)期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。

同步器的主要使用方式是繼承,子類通過繼承同步器并實現(xiàn)它的抽象方法管理同步狀態(tài),在抽象方法的實現(xiàn)過程中免不了要對同步狀態(tài)進行更改,這時就需要使用同步器提供的3個方法(getState(),setState(int),compareAndSetState(int,int))來進行操作,因為它們能夠保證狀態(tài)的改變是安全的。子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器本身沒有實現(xiàn)任何同步接口,它僅僅定義了若干同步狀態(tài)獲取和釋放的方法供自定義同步組件使用,同步器既支持獨占式獲取同步狀態(tài),也支持共享式獲取同步狀態(tài)。當(dāng)以獨占式模式獲取時,其他線程嘗試獲取會失敗,而以共享式模式獲取時,多個線程獲取則可能會成功。

同步器是實現(xiàn)鎖(也可以是任何同步組件)的關(guān)鍵,在鎖的實現(xiàn)中聚合同步器,利用同步器實現(xiàn)鎖的語義。可以這樣理解二者之間的關(guān)系:鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現(xiàn)細節(jié);同步器是面向鎖的實現(xiàn)者的,它簡化了鎖的實現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊、等待與喚醒等底層操作。

使用

為了使用此類作為同步組件的基礎(chǔ),需要重新定義以下方法,通過使用getState(),setState(int),compareAndSetState(int,int)方法來檢查和修改同步狀態(tài):

方法名稱 描述
tryAcquire(int) 獨占式獲取同步狀態(tài),實現(xiàn)該方法需要查詢當(dāng)前狀態(tài)把那個且判斷同步狀態(tài)是否符合預(yù)期,然后再進行CAS設(shè)置同步狀態(tài)
tryRelease(int) 獨占式釋放同步狀態(tài),等待獲取同步狀態(tài)的線程將有機會獲取同步狀態(tài)
tryAcquireShared(int) 共享式獲取同步狀態(tài),返回大于等于0的值,表示獲取成功,反之,獲取失敗
tryReleaseShared(int) 共享式釋放同步狀態(tài)
isHeldExclusively() 同步器是否被在獨占模式下被線程占用

每個方法的默認實現(xiàn)是拋出UnsupportedOperationException異常。這個方法的實現(xiàn)必須是線程安全的,簡短的并且非阻塞的。定義這些方法是此類唯一支持的操作,其他方法被聲明為final因為它們不能被獨立改變。

    > line: 1115
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

你可能也發(fā)現(xiàn)了從AbstractOwnableSynchronizer類中繼承的方法,這些方法對于追蹤線程獲取獨占式同步器很有用。你應(yīng)該去使用它們,這確保了監(jiān)視器和錯誤處理組件能幫助用戶確定哪個線程持有了鎖。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    /**
     * 獨占模式同步器的持有線程
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 設(shè)置同步器的擁有者。如果傳入的參數(shù)為null,表示當(dāng)前沒有線程占有同步器
     * 這個方法沒有強加任何同步措施,例如synchronized或者volatile訪問
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 返回最后一次使用setExclusiveOwnerThread設(shè)置的線程,如果沒有設(shè)置過,
     * 則返回null。這個方法沒有強加任何同步措施,例如synchronized或者volatile訪問
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

這個類基于內(nèi)部的FIFO隊列,獨占式同步器的核心如下:

Acquire:
    while (!tryAcquire(arg)) {
        如果還未入隊,那么將此線程入隊;
        可能會將當(dāng)前線程阻塞;
    }

Release:
    if (tryRelease(arg))
        喚醒第一個等待線程;

共享模式與此相似。

示例

這里是一個不可重入的獨占鎖類,使用0代表解鎖狀態(tài),1代表加鎖狀態(tài)。雖然一個不可重入鎖不強制要求記錄當(dāng)前持有同步器的線程,但是這個類這樣做的原因是使得使用者更容易監(jiān)視。

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Reports whether in locked state
        public boolean isLocked() {
            return getState() != 0;
        }

        public boolean isHeldExclusively() {
            // a data race, but safe due to out-of-thin-air guarantees
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // Provides a Condition
        public Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isLocked();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

這里是一個類似java.util.concurrent.CountDownLatch的柵欄類,除了它只需要釋放一個信號。因為柵欄是非獨占式的,它將使用共享式獲取和釋放方法。

class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() {
            return getState() != 0;
        }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public boolean isSignalled() {
        return sync.isSignalled();
    }

    public void signal() {
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}

從上面的實現(xiàn)類中可以看出使用AQS框架能夠大大降低一個可靠的自定義同步器的實現(xiàn)門檻。

AQS實現(xiàn)分析

接下來從實現(xiàn)角度分析同步器是如何完成線程同步狀態(tài)的管理,主要包括:同步隊列、獨占式同步狀態(tài)獲取與釋放、共享式同步狀態(tài)獲取與釋放以及超時獲取同步狀態(tài)等同步器的核心數(shù)據(jù)結(jié)構(gòu)和模板方法。

同步隊列

同步器依賴內(nèi)部的同步隊列來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程以及等待信息等狀態(tài)狗造成一個節(jié)點并將其增加到同步隊列,同時會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時,會把首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

同步隊列中的節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)后繼結(jié)點,節(jié)點的屬性如下所示:

    static final class Node {
        /** 標(biāo)記節(jié)點在共享式模式下等待 */
        static final Node SHARED = new Node();
        /** 標(biāo)記節(jié)點在獨占式模式下等待 */
        static final Node EXCLUSIVE = null;

        /** waitStatus value 表示線程已經(jīng)被取消獲取共享狀態(tài) */
        static final int CANCELLED =  1;
        /** waitStatus value 表示后繼節(jié)點需要被喚醒 */
        static final int SIGNAL    = -1;
        /** waitStatus value 表示節(jié)點在condition上等待 */
        static final int CONDITION = -2;
        /**
         * waitStatus value 表示下一個共享式節(jié)點需要被無條件傳遞
         */
        static final int PROPAGATE = -3;

        /**
         * 狀態(tài)域,只能有以下幾種值:
         *   SIGNAL:     這個節(jié)點的后續(xù)節(jié)點(或者即將成為后繼節(jié)點)被阻塞,
         *               所以當(dāng)前節(jié)點必須在它釋放共享狀態(tài)或取消獲取共享狀態(tài)時喚醒它的后繼節(jié)點。
         *               為了避免競爭,acquire方法必須首先表示它們需要一個信號,然后重新進行原子獲取,
         *               如果失敗,則被阻塞

         *   CANCELLED:  這個節(jié)點由于超時或者中斷而被取消。節(jié)點不會離開這個狀態(tài)。
         *               特別的,有cancelled節(jié)點的線程將不會再次被阻塞。
         *   CONDITION:  這個節(jié)點現(xiàn)在處于condition等待隊列中。
         *               它不會作為同步隊列節(jié)點使用直到它的狀態(tài)被設(shè)置為0并且被轉(zhuǎn)移到同步隊列中
         *              (使用0這個值與這個域的其他使用沒有任何關(guān)系,簡化了機制)。
         *   PROPAGATE:  一次共享式同步狀態(tài)釋放應(yīng)當(dāng)被傳遞給其他節(jié)點。
         *               這只會在doReleaseShared()方法中被設(shè)置來確保傳遞進行。
         *   0:          初始值
         *
         * 這個值被安排為數(shù)字主要是簡化使用。肺腑值意味著節(jié)點不需要被通知。
         * 因此,大多數(shù)代碼不需要只為了通知而去檢查特定的值。
         *
         * 此成員變量對于普通同步節(jié)點初始化為0,對于condition節(jié)點初始化為CONDITION。
         * 它使用CAS操作進行修改,或者如果可能的話,使用volatile寫。
         */
        volatile int waitStatus;

        /**
         * 前驅(qū)結(jié)點。當(dāng)節(jié)點入隊時被設(shè)置,出隊時被設(shè)為null以幫助 gc。
         * 當(dāng)然,當(dāng)前驅(qū)結(jié)點被取消,我們快速循環(huán)搜索整個隊列來尋找
         * 一個未被取消的節(jié)點,它總是會存在因為頭節(jié)點不會被取消:
         * 一個節(jié)點成為頭節(jié)點只會因為成功獲取到了同步狀態(tài)。一個被取消
         * 的線程將不會再繼續(xù)及進行獲取同步狀態(tài),并且它只會取消自己的節(jié)點
         */
        volatile Node prev;

        /**
         * 后繼節(jié)點。當(dāng)節(jié)點入隊時被設(shè)置,刪除被取消的節(jié)點時被修改,出隊
         * 時被設(shè)置為null以幫助 gc。入隊操作不會設(shè)置前驅(qū)結(jié)點的next域直到
         * 入隊成功,所以發(fā)現(xiàn)next域的值為null不一定代表此節(jié)點為最后一個
         * 節(jié)點。被取消的節(jié)點的next域被設(shè)置為指向它自己。
         */
        volatile Node next;

        /**
         * 插入此節(jié)點的線程。在構(gòu)造函數(shù)中初始化,使用完后設(shè)置為null。
         */
        volatile Thread thread;

        /**
         * 等待隊列中的后繼節(jié)點。如果當(dāng)前節(jié)點是共享的,那么這個字段
         * 將是一個SHARED常量。因為等待隊列只有當(dāng)處于獨占模式時才能
         * 訪問,所以我們只需要一個簡單的鏈接隊列來保存節(jié)點,當(dāng)它們在
         * condition上等待時。它們再次嘗試獲取同步狀態(tài)會被轉(zhuǎn)移到同步隊列。
         * 因為condition只可以是獨占式的,所以我們使用一個特殊值來表示
         * 共享模式。
         */
        Node nextWaiter;

        /**
         * 如果這個節(jié)點在共享模式等待,則返回true。
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前驅(qū)結(jié)點,如果為null則拋出 NullPointerException 異常。
         * 當(dāng)前驅(qū)結(jié)點不為null時使用。null檢查可以去掉,但是存在可以幫助VM。 
         * @return the predecessor of this node
         */
        final Node predecessor() {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        /** Establishes initial head or SHARED marker. */
        Node() {}

        /** Constructor used by addWaiter. */
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }

        /** Constructor used by addConditionWaiter. */
        Node(int waitStatus) {
            WAITSTATUS.set(this, waitStatus);
            THREAD.set(this, Thread.currentThread());
        }

        /** CASes waitStatus field. */
        final boolean compareAndSetWaitStatus(int expect, int update) {
            return WAITSTATUS.compareAndSet(this, expect, update);
        }

        /** CASes next field. */
        final boolean compareAndSetNext(Node expect, Node update) {
            return NEXT.compareAndSet(this, expect, update);
        }

        final void setPrevRelaxed(Node p) {
            PREV.set(this, p);
        }

        // VarHandle 機制。將上面字段使用VarHandle封裝,只是為了提高代碼性能。      
        // 這是jdk提供的一種底層操作機制,java8前為Unsfe類。
        private static final VarHandle NEXT;
        private static final VarHandle PREV;
        private static final VarHandle THREAD;
        private static final VarHandle WAITSTATUS;
        static {
            try {
                MethodHandles.Lookup l = MethodHandles.lookup();
                NEXT = l.findVarHandle(Node.class, "next", Node.class);
                PREV = l.findVarHandle(Node.class, "prev", Node.class);
                THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
                WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
            } catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
    }

節(jié)點是構(gòu)成同步隊列的基礎(chǔ),同步器擁有首節(jié)點head和尾節(jié)點tail,沒有成功獲取同步狀態(tài)的線程將會成為節(jié)點加入該隊列的尾部,同步隊列的基本結(jié)構(gòu)如下:

    > line: 563      處于源代碼中的位置
    /**
     * 同步隊列的頭節(jié)點,延遲初始化。除了初始化以外,只能通過setHead方法
     * 來修改。注意:如果頭節(jié)點存在,它的waitStatus被保證不會是CANCELLED 
     */
    private transient volatile Node head;

    /**
     * 同步隊列的尾節(jié)點,延遲初始化。只會通過enq方法增加新節(jié)點才會被改變。 
     */
    private transient volatile Node tail;

    /**
     * 同步狀態(tài)
     */
    private volatile int state;
    
    > line: 2293
    // VarHandle 機制。將上面三個字段使用VarHandle封裝,只是為了提高代碼性能。      
    private static final VarHandle STATE;
    private static final VarHandle HEAD;
    private static final VarHandle TAIL;

    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
            HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
            TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }

        // Reduce the risk of rare disastrous classloading in first call to
        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
        Class<?> ensureLoaded = LockSupport.class;
    }

試想一下,當(dāng)一個線程成功獲取了同步狀態(tài),其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點并加入到同步隊列中,而這個加入隊列的過程必須要保證是線程安全的,因為同步器提供了一個基于CAS的設(shè)置尾節(jié)點的方法:compareAndSetTail(Node expect, Node update),它需要傳入一個當(dāng)前線程“認為”的尾節(jié)點和當(dāng)前節(jié)點,然后進行CAS更新。

    > line: 2325
    /**
     * CASes tail field.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return TAIL.compareAndSet(this, expect, update);
    }

同步器將節(jié)點加入到同步隊列的過程如下:

同步隊列遵循FIFO,首節(jié)點是獲取同步狀態(tài)的節(jié)點,首節(jié)點的線程在釋放同步狀態(tài)時,會喚醒后繼節(jié)點,而后繼節(jié)點會在成功獲取同步狀態(tài)時將自己設(shè)為頭節(jié)點:

設(shè)置頭節(jié)點是通過獲取同步狀態(tài)成功的節(jié)點來完成的,由于只有一個線程能夠獲取到同步狀態(tài),因此設(shè)置頭節(jié)點的方法并不需要使用CAS來保證。

    > line: 674
    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

獨占式同步狀態(tài)的獲取與釋放

通過調(diào)用同步器的acquire(int)方法可以獲取同步狀態(tài),該方法對中斷不敏感,也就是由于線程獲取同步狀態(tài)失敗后進入同步隊列中,后續(xù)如果對線程進行中斷操作,線程不會從同步隊列中移除。

> line: 1236
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果獲取同步狀態(tài)的過程中被中斷了,那么acquireQueued返回后就將自己中斷
        selfInterrupt();
}

> line: 873
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

上述代碼主要完成了同步狀態(tài)獲取、節(jié)點構(gòu)造、加入同步隊列以及在同步隊列中自旋等待的相關(guān)工作,其主要邏輯是:首先調(diào)用tryAcquire(int)嘗試獲得鎖,如果獲取鎖失敗,構(gòu)造一個獨占型結(jié)點并增加到隊尾。
然后循環(huán)CAS獲取鎖,如果前驅(qū)結(jié)點為head并且此時tryAcquire(int)成功,獲取到鎖,設(shè)置頭節(jié)點為此節(jié)點。如果失敗,調(diào)用shouldParkAfterFailedAcquire()方法判斷,如果返回true則阻塞當(dāng)前線程,否則循環(huán)再次獲取。
note: 前驅(qū)結(jié)點waitStatus為SIGNAL返回true,為其他則CAS設(shè)置為SIGNAL并返回false,即第二次調(diào)用一般返回true,即每個線程被構(gòu)造結(jié)點后有兩次機會嘗試獲取鎖,失敗則被阻塞。如果發(fā)生了異常,則取消此節(jié)點。

  1. 首先調(diào)用tryAcquire(int)嘗試獲得鎖,如果獲取鎖失敗,構(gòu)造一個獨占型結(jié)點并增加到隊尾
> line: 650
private Node addWaiter(Node mode) {
    //以獨占模式構(gòu)造一個節(jié)點
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        //獲取尾節(jié)點,如果為null則初始化同步隊列(對應(yīng)前面注釋中的延遲初始化)
        if (oldTail != null) {
            //設(shè)置prev結(jié)點
            node.setPrevRelaxed(oldTail);
            //CAS設(shè)置尾節(jié)點
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

> line: 2316
private final void initializeSyncQueue() {
    Node h;
    //使用CAS設(shè)置頭節(jié)點,此時head和tail指向同一節(jié)點
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

> line: 561   Node#setPrevRelaxed(Node p)
final void setPrevRelaxed(Node p) {
        PREV.set(this, p);
}

上面使用compareAndSetTail(oldTail, node)方法來去報節(jié)點能夠被線程安全添加。試想一下:如果使用一個普通的LinkedList來維護節(jié)點之間的關(guān)系,那么當(dāng)一個線程獲取了同步狀態(tài),而其他多個線程調(diào)用tryAcquire(int)獲取同步狀態(tài)失敗而被并發(fā)添加到LinkedList中時,LinkedList將難以保證Node的正確添加,最終的結(jié)果可能時節(jié)點數(shù)量有偏差,而且順序也是混亂的。

  1. 然后循環(huán)CAS獲取鎖,如果前驅(qū)結(jié)點為head并且此時tryAcquire(int)成功,獲取到鎖,設(shè)置頭節(jié)點為此節(jié)點。如果失敗,調(diào)用shouldParkAfterFailedAcquire()方法判斷,如果返回true則阻塞當(dāng)前線程,否則循環(huán)再次獲取。
> line: 904
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            // 獲取前驅(qū)結(jié)點
            final Node p = node.predecessor();
            // 如果前驅(qū)結(jié)點為頭節(jié)點并且tryAcquire(int)成功
            if (p == head && tryAcquire(arg)) {
                // 將自己設(shè)置為頭節(jié)點
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        //如果在獲取同步狀態(tài)時出現(xiàn)異常,則取消此節(jié)點
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

> line: 842
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前驅(qū)結(jié)點已經(jīng)被設(shè)置為SIGNAL,要求線程釋放同步狀態(tài)時通知此節(jié)點,
         * 所以它可以被安全的阻塞了。
         * 如果線程第一次獲取同步狀態(tài)失敗,它前驅(qū)節(jié)點的waitStatus就會被設(shè)置為SIGNAL,
         * 如果第二次再次失敗,它將會被阻塞。
         */
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 新構(gòu)造的結(jié)點的waitStatus默認為0,將前驅(qū)節(jié)點的waitStatus CAS設(shè)置為SIGNAL。
         */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

> line: 882
private final boolean parkAndCheckInterrupt() {
    // 阻塞當(dāng)前線程
    LockSupport.park(this);
    // 返回線程的中斷標(biāo)志位
    return Thread.interrupted();
}

> line: 789  
private void cancelAcquire(Node node) {
    // 如果此節(jié)點不存在則忽略
    if (node == null)
        return;
    node.thread = null;

    // 跳過前面被取消的結(jié)點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext是前面第一個不需要刪除的結(jié)點。下面的CAS如果失敗了,
    // 也就是我們和其他的cancel或signal操作競爭時失敗了,那么更后面
    // 的操作就沒必要執(zhí)行了。
    Node predNext = pred.next;

    // 將waitStatus設(shè)置為CANCELLED
    // 可以使用volatile寫代替CAS。此步完成后,其他結(jié)點能夠跳過此節(jié)點。
    // 在此之前不受其他線程的干擾。
    node.waitStatus = Node.CANCELLED;

    // 如果是尾節(jié)點,則移除自己
    if (node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        // 如果它的后繼節(jié)點需要喚醒,嘗試將其設(shè)置為前驅(qū)節(jié)點的next,
        // 如此便可以喚醒它。否則,我們就直接喚醒它。
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

> 看完cancelAcquire的源碼后,我們回頭再看shouldParkAfterFailedAcquire剩余部分
> line: 842
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;

    // 如果前驅(qū)結(jié)點的waitStatus為CANCELLED,那么跳過前面連續(xù)的CANCELLED結(jié)點,
    // 尋找到一個waitStatus<=0的結(jié)點,并將其next指向此節(jié)點。
    // 因為cancelAcquire正常執(zhí)行完CANCELLED結(jié)點的next會指向自己,當(dāng)pred
    // 的next不再指向它時,將不再會有對象引用它,下一此gc時這些CACCELLED節(jié)點就會被回收
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

acquireQueued(final Node node, int arg)方法中,當(dāng)前線程在循環(huán)中嘗試獲取同步狀態(tài),而只有前驅(qū)結(jié)點時頭節(jié)點時才能嘗試獲取同步狀態(tài),這是為什么?

  1. 頭節(jié)點是成功獲取到同步狀態(tài)的節(jié)點,而頭節(jié)點的線程釋放了同步狀態(tài)后,將會喚醒其后繼節(jié)點,后繼節(jié)點的線程被喚醒后需要檢查自己的前驅(qū)結(jié)點是否為頭節(jié)點。
  2. 維護同步隊列的FIFO原則。該方法中,節(jié)點自旋獲取同步狀態(tài)的行為如下:

由于非頭節(jié)點線程的前驅(qū)節(jié)點出隊或者被中斷線程會從等待狀態(tài)返回,隨后檢查自己的前驅(qū)節(jié)點是否為頭節(jié)點,如果是則嘗試獲取同步狀態(tài)??梢钥吹焦?jié)點與節(jié)點之間在循環(huán)檢查的過程中基本不相互通信,而是簡單的判斷自己的前驅(qū)是否為頭節(jié)點,這就使得節(jié)點的釋放規(guī)則符合FIFO,并且也便于對過早通知的處理(過早通知是指前驅(qū)節(jié)點不是頭節(jié)點的線程被意外喚醒或者因為被中斷而喚醒)。

acquire(int)方法調(diào)用流程如下所示:

可中斷獲取

> line: 1256
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 如果在嘗試獲取同步狀態(tài)前便被中斷了,那么立刻拋出InterruptedException
    // note:其他線程對其調(diào)用Thread#interrupt()方法,此線程還能繼續(xù)運行,
    //       只是中斷標(biāo)志位被設(shè)置為true。
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

> line: 929
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //構(gòu)造節(jié)點并增加到同步隊列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            // 如果在獲取同步狀態(tài)的過程中被中斷了,那么拋出InterruptedException
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

可以看出此方法和acquire(int)方法幾乎一樣,除了線程如果被中斷會立刻拋出InterruptedException異常,而不是只被記錄下來。

超時獲取

> line: 1281
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

> line: 957
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果超時時間<=0,立刻返回
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 如果超時時間結(jié)束,那么就取消此節(jié)點獲取同步狀態(tài)并返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 如果剩余時間大于1000ns,則線程睡眠
            // 否則,線程進入快速的自旋中,嘗試獲取同步狀態(tài),
            // 提高獲取同步狀態(tài)的可能。同時,1000ns以下的睡眠
            // 無法保證精準(zhǔn)的執(zhí)行。
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;

public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        U.park(false, nanos);
        setBlocker(t, null);
    }
}

可以看出此版本與acquire(int)也沒有太大區(qū)別,只是增加了中斷響應(yīng)和超時控制。

釋放同步狀態(tài)

當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)邏輯后,就需要釋放同步狀態(tài),使得后續(xù)節(jié)點能夠繼續(xù)獲取同步狀態(tài)。通過調(diào)用relaase(int)方法可以釋放同步狀態(tài),該方法釋放了同步狀態(tài)后,會喚醒其后繼節(jié)點。

>line: 1299
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 喚醒后繼節(jié)點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

> line: 685
private void unparkSuccessor(Node node) {
    /*
     * 如果waitStatus為負值(可能需要喚醒后繼節(jié)點),嘗試清除它。
     * 如果這個CAS失敗了或者waitStatus被等待線程改變也可以接受。
     */
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * 需要被喚醒的線程存儲在后繼節(jié)點中,一般都是下一個節(jié)點。
     * 但是如果下一個節(jié)點被取消了或者是null,那么從隊列尾部開始
     * 遍歷尋找沒有被取消的后繼節(jié)點。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        // 作用是解除對下一個被取消節(jié)點的引用,使其能夠被gc
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

分析完獨占式同步狀態(tài)獲取和釋放過程后,做一個總結(jié):在獲取同步狀態(tài)時,同步器維護一個同步隊列,獲取狀態(tài)失敗的線程都會被增加到同步隊列中并在隊列中自旋(只有兩次嘗試的機會,都失敗就會被阻塞,防止浪費CPU資源);移除隊列的條件是前驅(qū)結(jié)點為頭節(jié)點并且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用tryRelease(int)方法釋放同步狀態(tài),然后喚醒頭節(jié)點的后繼節(jié)點。

共享式同步狀態(tài)獲取與釋放

共享式與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài),例如文件讀寫,可以有多個線程同時讀文件,但是只能有一個線程寫文件。

通過同步器的acquireShared(int)方法可以共享式獲取同步狀態(tài):

> line: 1320
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

> line: 992
private void doAcquireShared(int arg) {
    // 增加一個SHARED節(jié)點
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 前驅(qū)結(jié)點為頭節(jié)點時嘗試獲取同步狀態(tài)
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 設(shè)置頭節(jié)點并嘗試喚醒后繼節(jié)點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

> line: 755
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 如果滿足以下條件之一,嘗試喚醒下一個節(jié)點:
     * 1. propagate>0,表示還有同步狀態(tài)可供獲取
     * 2. waitStatus<0(在setHead()方法調(diào)用前后被其他操作修改),
     *    表示喚醒后繼節(jié)點。注意:PROPAGATE在shouldParkAfterFailedAcquire
     *     中可能會被更改為SIGNAL
     * 并且下一個節(jié)點在共享模式下等待,或者為null 。
     * 
     * 這些檢查可能會導(dǎo)致不必要的線程喚醒,當(dāng)時只有在多個線程正在
     * 競爭acquires/releases時會發(fā)生,所以大部分情況都需要立刻喚醒
     * 后面的共享節(jié)點。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

> line: 717
private void doReleaseShared() {
    /*
     * 確保釋放同步狀態(tài)傳遞,即使有其他正在進行中的acquires/releases。
     * 如果waitStatus為SIGNAL使用unparkSuccessor(head)喚醒后繼節(jié)點。
     * 但是如果不是,那么設(shè)置waitStatus為PROPAGATE來確保釋放傳遞
     * 繼續(xù)下去。另外,當(dāng)我們這樣做時必須循環(huán)進行防止一個新節(jié)點被增加。
     * 同時,不像unparkSuccessor在其他地方的使用,我們需要直到CAS是否
     * 失敗,如果是那么進入下一個循環(huán)重新進行。
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 當(dāng)前只有一個線程獲取了同步狀態(tài),沒有其他線程嘗試獲取,
            // 所以waitStatus為0,CAS修改為PROPAGATE確保傳遞繼續(xù),因為
            // 后續(xù)節(jié)點獲取同步狀態(tài)失敗時會調(diào)用shouldParkAfterFailedAcquire
            // 方法將其設(shè)置為SIGNAL。
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

acquireShared(int)方法中,同步器調(diào)用tryAcquireShared(arg)方法嘗試獲取同步狀態(tài),tryAcquireShared(arg)方法返回值為int,當(dāng)返回值大于等于0時,表示能夠獲取到同步狀態(tài)。因此,在共享式獲取的自旋過程中,成功獲取同步狀態(tài)并退出自旋的條件就是tryAcquireShared(arg)方法返回值大于等于0。

可中斷獲取

> line: 1338
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

> line: 1022
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

與獨占式獲取一樣,可中斷獲取版本能夠響應(yīng)中斷,當(dāng)線程被中斷后,立刻拋出InterruptedException異常。

超時獲取

> line: 1362
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

> line: 1053
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

超時獲取和獨占式超時獲取也無差別,不再詳細贅述。

超時獲取同步狀態(tài)的流程如下:

共享同步狀態(tài)釋放

> line: 1379
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared(int)方法前面已經(jīng)分析過。該方法釋放同步狀態(tài)后,將會喚醒處于等待狀態(tài)的節(jié)點。

剩余的一些監(jiān)視方法以及輔助方法不屬于核心機制部分,不對其進行分析,讀者有興趣可自行查看源碼,相信看懂上述部分后理解這些方法也不困難。同時,關(guān)于Condition的部分將在后面部分講解,下面先編寫一個自定義同步組件鞏固上面的分析。

自定義組件 —— TwinsLock

在前面幾節(jié)中,已經(jīng)對同步器AQS進行了主要功能實現(xiàn)的分析,本節(jié)通過編寫一個自定義同步組件來加深對同步器的理解。

設(shè)計一個同步工具:該工具在同一時刻,只允許最多兩個線程同時訪問,超時兩個線程的訪問將被阻塞。

首先,確定訪問模式。TwinsLock能夠同一時刻支持多個線程訪問,顯然是一個共享式訪問,因此需要AQS提供的acquiredShared(int)方法等和shared相關(guān)的方法,這就要求TwinsLock必須重新tryAcquiredShared(int)tryReleaseShared(int)方法,這樣才能保證同步器的共享式同步狀態(tài)的獲取和釋放方法得以執(zhí)行。

其次,定義資源數(shù)。TwinsLock同一時刻允許最多兩個線程訪問,表明同步資源數(shù)為2,這樣可以設(shè)置初始狀態(tài)為2,當(dāng)一個線程進行獲取時,status減1,線程釋放,status加1,狀態(tài)的合法范圍是0、1、2。其中0表示當(dāng)前已經(jīng)有兩個線程獲取了同步資源,此時如果還有其他線程嘗試獲取同步狀態(tài),只能被阻塞。在同步狀態(tài)改變時,需要使用compareAndSet(int, int)方法做原子性保障。

最后,組合自定義同步器。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        private volatile int count;

        Sync(int count) {
            if(count <= 0) {
                throw new IllegalArgumentException("count must large than 0");
            }
            this.count = count;
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for(;;) {
                int current = getState();
                int newCount = current - arg;
                if(newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for(;;) {
                int current = getState();
                int newCount = current + arg;
                if(newCount > count) {
                    throw new IllegalMonitorStateException("Unexpected unlock");
                }
                if(compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

在上述示例中,TwinsLock實現(xiàn)了Lock接口,提供了面向使用者的接口,使用者調(diào)用lock()方法獲取鎖,隨后調(diào)用unlock()方法釋放鎖。TwinsLock同時包含了一個自定義同步器Sync,該同步器面向線程訪問和同步狀態(tài)控制。

同步器作為一個橋梁,連接線程訪問以及同步狀態(tài)控制等底層技術(shù)與不同并發(fā)組件(如Lock、CountDownLatch等)的接口語義。

下面編寫一個測試驗證TwinsLock能否正常工作。

public class TwinsLockTest {
    @Test
    public void test() {
        final Lock lock = new TwinsLock();
        class Worker implements Runnable {
            @Override
            public void run() {
                lock.lock();
                try {
                    SleepUtils.sleep(1);
                    System.out.println(Thread.currentThread().getName());
                    SleepUtils.sleep(1);
                } finally {
                    lock.unlock();
                }
            }
        }
        for (int i = 0; i < 10; ++i) {
            Thread worker = new Thread(new Worker(), "Worker-" + i);
            worker.setDaemon(true);
            worker.start();
        }
        for (int i = 0; i < 10; ++i) {
            SleepUtils.sleep(1);
            System.out.println();
        }
    }
}

輸出如下:

Worker-2
Worker-0

Worker-1
Worker-5

Worker-3
Worker-4

Worker-8
Worker-9

Worker-6
Worker-7

可以看出線程名稱成對輸出,也就是同一時刻最多只有兩個線程能夠獲取到鎖,TwinsLock可以按預(yù)期正常工作。

Condition 接口

任意一個Java對象,都擁有一組監(jiān)視器方法(定義在java.lang.Object)上,主要包括wait(),wait(long),notify(),notifyAll()方法,這些方法與synchronized關(guān)鍵字配合,能夠?qū)崿F(xiàn)等待/通知機制。Condition接口也提供了類似Object的監(jiān)視器方法,與Lock配合也可以實現(xiàn)等待/通知機制,但是這兩者在使用方式以及功能特性上有所區(qū)別。

通過對比Object的監(jiān)視器方法和Condition接口,可以更詳細地了解Condition的特性。

對比項 Object Monitor Methods Conditon
前置條件 獲取對象的鎖 調(diào)用Lock.lock()獲取鎖
調(diào)用Lock.newCondtion()獲取Condition對象
調(diào)用方式 直接調(diào)用,如object.wait() 直接調(diào)用,如condition.await()
等待隊列個數(shù) 一個 多個
當(dāng)前線程釋放鎖并進入等待狀態(tài) 支持 支持
當(dāng)前線程釋放鎖并進入等待狀態(tài)
后響應(yīng)中斷
不支持 支持
當(dāng)前線程釋放鎖并進入超時等待狀態(tài) 支持 支持
當(dāng)前線程釋放鎖并進入等待狀態(tài)到將來某個時間 不支持 支持
喚醒等待隊列的一個線程 支持 支持
喚醒等待隊列的全部線程 支持 支持
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容