【死磕Java并發(fā)】-----J.U.C之阻塞隊列:SynchronousQueue

原文出處http://cmsblogs.com/chenssy

【注】:SynchronousQueue實現(xiàn)算法看的暈乎乎的,寫了好久才寫完,如果當(dāng)中有什么錯誤之處,忘各位指正

作為BlockingQueue中的一員,SynchronousQueue與其他BlockingQueue有著不同特性:

  1. SynchronousQueue沒有容量。與其他BlockingQueue不同,SynchronousQueue是一個不存儲元素的BlockingQueue。每一個put操作必須要等待一個take操作,否則不能繼續(xù)添加元素,反之亦然。
  2. 因為沒有容量,所以對應(yīng) peek, contains, clear, isEmpty ... 等方法其實是無效的。例如clear是不執(zhí)行任何操作的,contains始終返回false,peek始終返回null。
  3. SynchronousQueue分為公平和非公平,默認(rèn)情況下采用非公平性訪問策略,當(dāng)然也可以通過構(gòu)造函數(shù)來設(shè)置為公平性訪問策略(為true即可)。
  4. 若使用 TransferQueue, 則隊列中永遠(yuǎn)會存在一個 dummy node(這點后面詳細(xì)闡述)。

SynchronousQueue非常適合做交換工作,生產(chǎn)者的線程和消費者的線程同步以傳遞某些信息、事件或者任務(wù)。

SynchronousQueue

與其他BlockingQueue一樣,SynchronousQueue同樣繼承AbstractQueue和實現(xiàn)BlockingQueue接口:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue提供了兩個構(gòu)造函數(shù):

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // 通過 fair 值來決定公平性和非公平性
        // 公平性使用TransferQueue,非公平性采用TransferStack
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

TransferQueue、TransferStack繼承Transferer,Transferer為SynchronousQueue的內(nèi)部類,它提供了一個方法transfer(),該方法定義了轉(zhuǎn)移數(shù)據(jù)的規(guī)范,如下:

    abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }

transfer()方法主要用來完成轉(zhuǎn)移數(shù)據(jù)的,如果e != null,相當(dāng)于將一個數(shù)據(jù)交給消費者,如果e == null,則相當(dāng)于從一個生產(chǎn)者接收一個消費者交出的數(shù)據(jù)。

SynchronousQueue采用隊列TransferQueue來實現(xiàn)公平性策略,采用堆棧TransferStack來實現(xiàn)非公平性策略,他們兩種都是通過鏈表實現(xiàn)的,其節(jié)點分別為QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用,SynchronousQueue的put、take操作都是委托這兩個類來實現(xiàn)的。

TransferQueue

TransferQueue是實現(xiàn)公平性策略的核心類,其節(jié)點為QNode,其定義如下:

    static final class TransferQueue<E> extends Transferer<E> {
        /** 頭節(jié)點 */
        transient volatile QNode head;
        /** 尾節(jié)點 */
        transient volatile QNode tail;
        // 指向一個取消的結(jié)點
        //當(dāng)一個節(jié)點中最后一個插入時,它被取消了但是可能還沒有離開隊列
        transient volatile QNode cleanMe;

        /**
         * 省略很多代碼O(∩_∩)O
         */
    }

在TransferQueue中除了頭、尾節(jié)點外還存在一個cleanMe節(jié)點。該節(jié)點主要用于標(biāo)記,當(dāng)刪除的節(jié)點是尾節(jié)點時則需要使用該節(jié)點。

同時,對于TransferQueue需要注意的是,其隊列永遠(yuǎn)都存在一個dummy node,在構(gòu)造時創(chuàng)建:

        TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

在TransferQueue中定義了QNode類來表示隊列中的節(jié)點,QNode節(jié)點定義如下:

    static final class QNode {
        // next 域
        volatile QNode next;
        // item數(shù)據(jù)項
        volatile Object item;
        //  等待線程,用于park/unpark
        volatile Thread waiter;       // to control park/unpark
        //模式,表示當(dāng)前是數(shù)據(jù)還是請求,只有當(dāng)匹配的模式相匹配時才會交換
        final boolean isData;

        QNode(Object item, boolean isData) {
            this.item = item;
            this.isData = isData;
        }

        /**
         * CAS next域,在TransferQueue中用于向next推進
         */
        boolean casNext(QNode cmp, QNode val) {
            return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        /**
         * CAS itme數(shù)據(jù)項
         */
        boolean casItem(Object cmp, Object val) {
            return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        /**
         *  取消本結(jié)點,將item域設(shè)置為自身
         */
        void tryCancel(Object cmp) {
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
        }

        /**
         * 是否被取消
         * 與tryCancel相照應(yīng)只需要判斷item釋放等于自身即可
         */
        boolean isCancelled() {
            return item == this;
        }
        
        
        boolean isOffList() {
            return next == this;
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = QNode.class;
                itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

上面代碼沒啥好看的,需要注意的一點就是isData,該屬性在進行數(shù)據(jù)交換起到關(guān)鍵性作用,兩個線程進行數(shù)據(jù)交換的時候,必須要兩者的模式保持一致。

TransferStack

TransferStack用于實現(xiàn)非公平性,定義如下:

    static final class TransferStack<E> extends Transferer<E> {

        static final int REQUEST    = 0;

        static final int DATA       = 1;

        static final int FULFILLING = 2;

        volatile SNode head;

        /**
         * 省略一堆代碼  O(∩_∩)O~
         */

    }

TransferStack中定義了三個狀態(tài):REQUEST表示消費數(shù)據(jù)的消費者,DATA表示生產(chǎn)數(shù)據(jù)的生產(chǎn)者,F(xiàn)ULFILLING,表示匹配另一個生產(chǎn)者或消費者。任何線程對TransferStack的操作都屬于上述3種狀態(tài)中的一種(對應(yīng)著SNode節(jié)點的mode)。同時還包含一個head域,表示頭結(jié)點。

內(nèi)部節(jié)點SNode定義如下:

    static final class SNode {
        // next 域
        volatile SNode next;
        // 相匹配的節(jié)點
        volatile SNode match;
        // 等待的線程
        volatile Thread waiter;
        // item 域
        Object item;                // data; or null for REQUESTs

        // 模型
        int mode;

        /**
         * item域和mode域不需要使用volatile修飾,因為它們在volatile/atomic操作之前寫,之后讀
         */
       
        SNode(Object item) {
            this.item = item;
        }

        boolean casNext(SNode cmp, SNode val) {
            return cmp == next &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        /**
         * 將s結(jié)點與本結(jié)點進行匹配,匹配成功,則unpark等待線程
         */
        boolean tryMatch(SNode s) {
            if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                if (w != null) {    // waiters need at most one unpark
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }

        void tryCancel() {
            UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
        }

        boolean isCancelled() {
            return match == this;
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long matchOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = SNode.class;
                matchOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("match"));
                nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

上面簡單介紹了TransferQueue、TransferStack,由于SynchronousQueue的put、take操作都是調(diào)用Transfer的transfer()方法,只不過是傳遞的參數(shù)不同而已,put傳遞的是e參數(shù),所以模式為數(shù)據(jù)(公平isData = true,非公平mode= DATA),而take操作傳遞的是null,所以模式為請求(公平isData = false,非公平mode = REQUEST),如下:

    // put操作
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

    // take操作
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

公平模式

公平性調(diào)用TransferQueue的transfer方法:

    E transfer(E e, boolean timed, long nanos) {
        QNode s = null;
        // 當(dāng)前節(jié)點模式
        boolean isData = (e != null);

        for (;;) {
            QNode t = tail;
            QNode h = head;
            // 頭、尾節(jié)點 為null,沒有初始化
            if (t == null || h == null)
                continue;

            // 頭尾節(jié)點相等(隊列為null) 或者當(dāng)前節(jié)點和隊列節(jié)點模式一樣
            if (h == t || t.isData == isData) {
                // tn = t.next
                QNode tn = t.next;
                // t != tail表示已有其他線程操作了,修改了tail,重新再來
                if (t != tail)
                    continue;
                // tn != null,表示已經(jīng)有其他線程添加了節(jié)點,tn 推進,重新處理
                if (tn != null) {
                    // 當(dāng)前線程幫忙推進尾節(jié)點,就是嘗試將tn設(shè)置為尾節(jié)點
                    advanceTail(t, tn);
                    continue;
                }
                //  調(diào)用的方法的 wait 類型的, 并且 超時了, 直接返回 null
                // timed 在take操作闡述
                if (timed && nanos <= 0)
                    return null;

                // s == null,構(gòu)建一個新節(jié)點Node
                if (s == null)
                    s = new QNode(e, isData);

                // 將新建的節(jié)點加入到隊列中,如果不成功,繼續(xù)處理
                if (!t.casNext(null, s))
                    continue;

                // 替換尾節(jié)點
                advanceTail(t, s);

                // 調(diào)用awaitFulfill, 若節(jié)點是 head.next, 則進行自旋
                // 若不是的話, 直接 block, 直到有其他線程 與之匹配, 或它自己進行線程的中斷
                Object x = awaitFulfill(s, e, timed, nanos);

                // 若返回的x == s表示,當(dāng)前線程已經(jīng)超時或者中斷,不然的話s == null或者是匹配的節(jié)點
                if (x == s) {
                    // 清理節(jié)點S
                    clean(t, s);
                    return null;
                }

                // isOffList:用于判斷節(jié)點是否已經(jīng)從隊列中離開了
                if (!s.isOffList()) {
                    // 嘗試將S節(jié)點設(shè)置為head,移出t
                    advanceHead(t, s);
                    if (x != null)
                        s.item = s;
                    // 釋放線程 ref
                    s.waiter = null;
                }

                // 返回
                return (x != null) ? (E)x : e;

            }

            // 這里是從head.next開始,因為TransferQueue總是會存在一個dummy node節(jié)點
            else {
                // 節(jié)點
                QNode m = h.next;

                // 不一致讀,重新開始
                // 有其他線程更改了線程結(jié)構(gòu)
                if (t != tail || m == null || h != head)
                    continue;

                /**
                 * 生產(chǎn)者producer和消費者consumer匹配操作
                 */
                Object x = m.item;
                // isData == (x != null):判斷isData與x的模式是否相同,相同表示已經(jīng)匹配了
                // x == m :m節(jié)點被取消了
                // !m.casItem(x, e):如果嘗試將數(shù)據(jù)e設(shè)置到m上失敗
                if (isData == (x != null) ||  x  == m || !m.casItem(x, e)) {
                    // 將m設(shè)置為頭結(jié)點,h出列,然后重試
                    advanceHead(h, m);
                    continue;
                }

                // 成功匹配了,m設(shè)置為頭結(jié)點h出列,向前推進
                advanceHead(h, m);
                // 喚醒m上的等待線程
                LockSupport.unpark(m.waiter);
                return (x != null) ? (E)x : e;
            }
        }
    }

整個transfer的算法如下:

  1. 如果隊列為null或者尾節(jié)點模式與當(dāng)前節(jié)點模式一致,則嘗試將節(jié)點加入到等待隊列中(采用自旋的方式),直到被匹配或、超時或者取消。匹配成功的話要么返回null(producer返回的)要么返回真正傳遞的值(consumer返回的),如果返回的是node節(jié)點本身則表示當(dāng)前線程超時或者取消了。
  2. 如果隊列不為null,且隊列的節(jié)點是當(dāng)前節(jié)點匹配的節(jié)點,則進行數(shù)據(jù)的傳遞匹配并返回匹配節(jié)點的數(shù)據(jù)
  3. 在整個過程中都會檢測并幫助其他線程推進

當(dāng)隊列為空時,節(jié)點入列然后通過調(diào)用awaitFulfill()方法自旋,該方法主要用于自旋/阻塞節(jié)點,直到節(jié)點被匹配返回或者取消、中斷。

    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {

        // 超時控制
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        // 自旋次數(shù)
        // 如果節(jié)點Node恰好是head節(jié)點,則自旋一段時間,這里主要是為了效率問題,如果里面阻塞,會存在喚醒、線程上下文切換的問題
        // 如果生產(chǎn)者、消費者者里面到來的話,就避免了這個阻塞的過程
        int spins = ((head.next == s) ?
                (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        // 自旋
        for (;;) {
            // 線程中斷了,剔除當(dāng)前節(jié)點
            if (w.isInterrupted())
                s.tryCancel(e);

            // 如果線程進行了阻塞 -> 喚醒或者中斷了,那么x != e 肯定成立,直接返回當(dāng)前節(jié)點即可
            Object x = s.item;
            if (x != e)
                return x;
            // 超時判斷
            if (timed) {
                nanos = deadline - System.nanoTime();
                // 如果超時了,取消節(jié)點,continue,在if(x != e)肯定會成立,直接返回x
                if (nanos <= 0L) {
                    s.tryCancel(e);
                    continue;
                }
            }

            // 自旋- 1
            if (spins > 0)
                --spins;

            // 等待線程
            else if (s.waiter == null)
                s.waiter = w;

            // 進行沒有超時的 park
            else if (!timed)
                LockSupport.park(this);

            // 自旋次數(shù)過了, 直接 + timeout 方式 park
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

在自旋/阻塞過程中做了一點優(yōu)化,就是判斷當(dāng)前節(jié)點是否為對頭元素,如果是的則先自旋,如果自旋次數(shù)過了,則才阻塞,這樣做的主要目的就在如果生產(chǎn)者、消費者立馬來匹配了則不需要阻塞,因為阻塞、喚醒會消耗資源。在整個自旋的過程中會不斷判斷是否超時或者中斷了,如果中斷或者超時了則調(diào)用tryCancel()取消該節(jié)點。

tryCancel

            void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }

取消過程就是將節(jié)點的item設(shè)置為自身(itemOffset是item的偏移量)。所以在調(diào)用awaitFulfill()方法時,如果當(dāng)前線程被取消、中斷、超時了那么返回的值肯定時S,否則返回的則是匹配的節(jié)點。如果返回值是節(jié)點S,那么if(x == s)必定成立,如下:

                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

如果返回的x == s成立,則調(diào)用clean()方法清理節(jié)點S:

    void clean(QNode pred, QNode s) {
        //
        s.waiter = null;

        while (pred.next == s) {
            QNode h = head;
            QNode hn = h.next;
            // hn節(jié)點被取消了,向前推進
            if (hn != null && hn.isCancelled()) {
                advanceHead(h, hn);
                continue;
            }

            // 隊列為空,直接return null
            QNode t = tail;
            if (t == h)
                return;

            QNode tn = t.next;
            // 不一致,說明有其他線程改變了tail節(jié)點,重新開始
            if (t != tail)
                continue;

            // tn != null 推進tail節(jié)點,重新開始
            if (tn != null) {
                advanceTail(t, tn);
                continue;
            }

            // s 不是尾節(jié)點 移出
            if (s != t) {
                QNode sn = s.next;
                // 如果s已經(jīng)被移除退出循環(huán),否則嘗試斷開s
                if (sn == s || pred.casNext(s, sn))
                    return;
            }

            // s是尾節(jié)點,則有可能會有其他線程在添加新節(jié)點,則cleanMe出場
            QNode dp = cleanMe;
            // 如果dp不為null,說明是前一個被取消節(jié)點,將其移除
            if (dp != null) {
                QNode d = dp.next;
                QNode dn;
                if (d == null ||               // 節(jié)點d已經(jīng)刪除
                        d == dp ||                 // 原來的節(jié)點 cleanMe 已經(jīng)通過 advanceHead 進行刪除
                        !d.isCancelled() ||        // 原來的節(jié)點 s已經(jīng)刪除
                        (d != t &&                 // d 不是tail節(jié)點
                                (dn = d.next) != null &&  //
                                dn != d &&                //   that is on list
                                dp.casNext(d, dn)))       // d unspliced
                    // 清除 cleanMe 節(jié)點, 這里的 dp == pred 若成立, 說明清除節(jié)點s,成功, 直接return, 不然的話要再次循環(huán)
                    casCleanMe(dp, null);
                if (dp == pred)
                    return;
            } else if (casCleanMe(null, pred))  // 原來的 cleanMe 是 null, 則將 pred 標(biāo)記為 cleamMe 為下次 清除 s 節(jié)點做標(biāo)識
                return;
        }
    }

這個clean()方法感覺有點兒難度,我也看得不是很懂。這里是引用http://www.itdecent.cn/p/95cb570c8187

  1. 刪除的節(jié)點不是queue尾節(jié)點, 這時 直接 pred.casNext(s, s.next) 方式來進行刪除(和ConcurrentLikedQueue中差不多)

  2. 刪除的節(jié)點是隊尾節(jié)點

  • 此時 cleanMe == null, 則 前繼節(jié)點pred標(biāo)記為 cleanMe, 為下次刪除做準(zhǔn)備
  • 此時 cleanMe != null, 先刪除上次需要刪除的節(jié)點, 然后將 cleanMe至null, 讓后再將 pred 賦值給 cleanMe

非公平模式

非公平模式transfer方法如下:

    E transfer(E e, boolean timed, long nanos) {
        SNode s = null; // constructed/reused as needed
        int mode = (e == null) ? REQUEST : DATA;

        for (;;) {
            SNode h = head;
            // 棧為空或者當(dāng)前節(jié)點模式與頭節(jié)點模式一樣,將節(jié)點壓入棧內(nèi),等待匹配
            if (h == null || h.mode == mode) {
                // 超時
                if (timed && nanos <= 0) {
                    // 節(jié)點被取消了,向前推進
                    if (h != null && h.isCancelled())
                        //  重新設(shè)置頭結(jié)點(彈出之前的頭結(jié)點)
                        casHead(h, h.next);
                    else
                        return null;
                }
                // 不超時
                // 生成一個SNode節(jié)點,并嘗試替換掉頭節(jié)點head (head -> s)
                else if (casHead(h, s = snode(s, e, h, mode))) {
                    // 自旋,等待線程匹配
                    SNode m = awaitFulfill(s, timed, nanos);
                    // 返回的m == s 表示該節(jié)點被取消了或者超時、中斷了
                    if (m == s) {
                        // 清理節(jié)點S,return null
                        clean(s);
                        return null;
                    }

                    // 因為通過前面一步將S替換成了head,如果h.next == s ,則表示有其他節(jié)點插入到S前面了,變成了head
                    // 且該節(jié)點就是與節(jié)點S匹配的節(jié)點
                    if ((h = head) != null && h.next == s)
                        // 將s.next節(jié)點設(shè)置為head,相當(dāng)于取消節(jié)點h、s
                        casHead(h, s.next);

                    // 如果是請求則返回匹配的域,否則返回節(jié)點S的域
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
            }

            // 如果棧不為null,且兩者模式不匹配(h != null && h.mode != mode)
            // 說明他們是一隊對等匹配的節(jié)點,嘗試用當(dāng)前節(jié)點s來滿足h節(jié)點
            else if (!isFulfilling(h.mode)) {
                // head 節(jié)點已經(jīng)取消了,向前推進
                if (h.isCancelled())
                    casHead(h, h.next);

                // 嘗試將當(dāng)前節(jié)點打上"正在匹配"的標(biāo)記,并設(shè)置為head
                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    // 循環(huán)loop
                    for (;;) {
                        // s為當(dāng)前節(jié)點,m是s的next節(jié)點,
                        // m節(jié)點是s節(jié)點的匹配節(jié)點
                        SNode m = s.next;
                        // m == null,其他節(jié)點把m節(jié)點匹配走了
                        if (m == null) {
                            // 將s彈出
                            casHead(s, null);
                            // 將s置空,下輪循環(huán)的時候還會新建
                            s = null;
                            // 退出該循環(huán),繼續(xù)主循環(huán)
                            break;
                        }
                        // 獲取m的next節(jié)點
                        SNode mn = m.next;
                        // 嘗試匹配
                        if (m.tryMatch(s)) {
                            // 匹配成功,將s 、 m彈出
                            casHead(s, mn);     // pop both s and m
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        } else
                            // 如果沒有匹配成功,說明有其他線程已經(jīng)匹配了,把m移出
                            s.casNext(m, mn);
                    }
                }
            }
            // 到這最后一步說明節(jié)點正在匹配階段
            else {
                // head 的next的節(jié)點,是正在匹配的節(jié)點,m 和 h配對
                SNode m = h.next;

                // m == null 其他線程把m節(jié)點搶走了,彈出h節(jié)點
                if (m == null)
                    casHead(h, null);
                else {
                    SNode mn = m.next;
                    if (m.tryMatch(h))
                        casHead(h, mn);
                    else 
                        h.casNext(m, mn);
                }
            }
        }
    }

整個處理過程分為三種情況,具體如下:

  1. 如果當(dāng)前棧為空獲取節(jié)點模式與棧頂模式一樣,則嘗試將節(jié)點加入棧內(nèi),同時通過自旋方式等待節(jié)點匹配,最后返回匹配的節(jié)點或者null(被取消)
  2. 如果棧不為空且節(jié)點的模式與首節(jié)點模式匹配,則嘗試將該節(jié)點打上FULFILLING標(biāo)記,然后加入棧中,與相應(yīng)的節(jié)點匹配,成功后將這兩個節(jié)點彈出棧并返回匹配節(jié)點的數(shù)據(jù)
  3. 如果有節(jié)點在匹配,那么幫助這個節(jié)點完成匹配和出棧操作,然后在主循環(huán)中繼續(xù)執(zhí)行

當(dāng)節(jié)點加入棧內(nèi)后,通過調(diào)用awaitFulfill()方法自旋等待節(jié)點匹配:

    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        // 超時
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 當(dāng)前線程
        Thread w = Thread.currentThread();

        // 自旋次數(shù)
        // shouldSpin 用于檢測當(dāng)前節(jié)點是否需要自旋
        // 如果棧為空、該節(jié)點是首節(jié)點或者該節(jié)點是匹配節(jié)點,則先采用自旋,否則阻塞
        int spins = (shouldSpin(s) ?
                (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            // 線程中斷了,取消該節(jié)點
            if (w.isInterrupted())
                s.tryCancel();

            // 匹配節(jié)點
            SNode m = s.match;

            // 如果匹配節(jié)點m不為空,則表示匹配成功,直接返回
            if (m != null)
                return m;
            // 超時
            if (timed) {
                nanos = deadline - System.nanoTime();
                // 節(jié)點超時,取消
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }

            // 自旋;每次自旋的時候都需要檢查自身是否滿足自旋條件,滿足就 - 1,否則為0
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;

            // 第一次阻塞時,會將當(dāng)前線程設(shè)置到s上
            else if (s.waiter == null)
                s.waiter = w;

            // 阻塞 當(dāng)前線程
            else if (!timed)
                LockSupport.park(this);
            // 超時
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

awaitFulfill()方法會一直自旋/阻塞直到匹配節(jié)點。在S節(jié)點阻塞之前會先調(diào)用shouldSpin()方法判斷是否采用自旋方式,為的就是如果有生產(chǎn)者或者消費者馬上到來,就不需要阻塞了,在多核條件下這種優(yōu)化是有必要的。同時在調(diào)用park()阻塞之前會將當(dāng)前線程設(shè)置到S節(jié)點的waiter上。匹配成功,返回匹配節(jié)點m。

shouldSpin()方法如下:

        boolean shouldSpin(SNode s) {
            SNode h = head;
            return (h == s || h == null || isFulfilling(h.mode));
        }

同時在阻塞過程中會一直檢測當(dāng)前線程是否中斷了,如果中斷了,則調(diào)用tryCancel()方法取消該節(jié)點,取消過程就是將當(dāng)前節(jié)點的math設(shè)置為當(dāng)前節(jié)點。所以如果線程中斷了,那么在返回m時一定是S節(jié)點自身。

            void tryCancel() {
                UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
            }

awaitFullfill()方法如果返回的m == s,則表示當(dāng)前節(jié)點已經(jīng)中斷取消了,則需要調(diào)用clean()方法,清理節(jié)點S:

    void clean(SNode s) {

        // 清理item域
        s.item = null;
        // 清理waiter域
        s.waiter = null;

        // past節(jié)點
        SNode past = s.next;
        if (past != null && past.isCancelled())
            past = past.next;

        // 從棧頂head節(jié)點,取消從棧頂head到past節(jié)點之間所有已經(jīng)取消的節(jié)點
        // 注意:這里如果遇到一個節(jié)點沒有取消,則會退出while
        SNode p;
        while ((p = head) != null && p != past && p.isCancelled())
            casHead(p, p.next);     // 如果p節(jié)點已經(jīng)取消了,則剔除該節(jié)點

        // 如果經(jīng)歷上面while p節(jié)點還沒有取消,則再次循環(huán)取消掉所有p 到past之間的取消節(jié)點
        while (p != null && p != past) {
            SNode n = p.next;
            if (n != null && n.isCancelled())
                p.casNext(n, n.next);
            else
                p = n;
        }
    }

clean()方法就是將head節(jié)點到S節(jié)點之間所有已經(jīng)取消的節(jié)點全部移出?!静磺宄楹我脙蓚€while,一個不行么】

至此,SynchronousQueue的源碼分析完成了,說下我個人感覺吧:個人感覺SynchronousQueue實現(xiàn)好復(fù)雜(可能是自己智商不夠吧~~~(>_<)~~~),源碼看了好久,這篇博客寫了將近一個星期,如果有什么錯誤之處,煩請各位指正?。?/strong>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容