JUC系列 - AQS CLH同步隊(duì)列

上一篇文章:JUC系列 - AQS簡介 介紹了AQS,本篇將結(jié)合AbstractQueuedSynchronizer源碼來分析。

CLH同步隊(duì)列

CLH同步隊(duì)列是一個FIFO雙向隊(duì)列,AQS依賴它來完成同步狀態(tài)的管理,當(dāng)前線程如果獲取同步狀態(tài)失敗時,AQS則會將當(dāng)前線程已經(jīng)等待狀態(tài)等信息構(gòu)造成一個節(jié)點(diǎn)(Node)并將其加入到CLH同步隊(duì)列,同時會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時,會把首節(jié)點(diǎn)喚醒(公平鎖),使其再次嘗試獲取同步狀態(tài)。

在CLH同步隊(duì)列中,一個節(jié)點(diǎn)表示一個線程,它保存著線程的引用(thread)、狀態(tài)(waitStatus)、前驅(qū)節(jié)點(diǎn)(prev)、后繼節(jié)點(diǎn)(next),其定義如下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();
    /** 獨(dú)占 */
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    
    static final int SIGNAL    = -1;
    
    static final int CONDITION = -2;
    
    static final int PROPAGATE = -3;

    
    volatile int waitStatus;

    /** 前驅(qū)節(jié)點(diǎn) */
    volatile Node prev;

    /** 后繼節(jié)點(diǎn) */
    volatile Node next;

    /** 獲取同步狀態(tài)的線程 */
    volatile Thread thread;

    
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
CLH.png

入隊(duì)(enqueue)

熟悉數(shù)據(jù)結(jié)構(gòu)的同學(xué),CLH隊(duì)列入隊(duì)是再簡單不過了,無非就是tail指向新節(jié)點(diǎn)、新節(jié)點(diǎn)的prev指向當(dāng)前最后的節(jié)點(diǎn),當(dāng)前最后一個節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)。


    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    /**
     * 入隊(duì)(enqueue)
     */
    private Node enq(final Node node) {
        for (;;) {  // 自旋過程
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter(Node node)先通過快速嘗試設(shè)置尾節(jié)點(diǎn),如果失敗,則調(diào)用enq(Node node)方法設(shè)置尾節(jié)點(diǎn)。

enqueue.png

出隊(duì)(dequeue)

CLH同步隊(duì)列遵循FIFO,首節(jié)點(diǎn)的線程釋放同步狀態(tài)后,將會喚醒它的后繼節(jié)點(diǎn)(next),而后繼節(jié)點(diǎn)將會在獲取同步狀態(tài)成功時將自己設(shè)置為首節(jié)點(diǎn),這個過程非常簡單,head執(zhí)行該節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的next和當(dāng)前節(jié)點(diǎn)的prev即可,注意在這個過程是不需要使用CAS來保證的,因?yàn)橹挥幸粋€線程能夠成功獲取到同步狀態(tài)。

    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

獨(dú)占式同步狀態(tài)獲取

acquire(int arg)方法為AQS提供的模板方法,該方法為獨(dú)占式獲取同步狀態(tài),但是該方法對中斷不敏感,也就是說由于線程獲取同步狀態(tài)失敗加入到CLH同步隊(duì)列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步隊(duì)列中移除。代碼如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容