ReentrantLock原理分析

前言

上一篇文章《基于CAS操作的Java非阻塞同步機制》 分析了非同步阻塞機制的實現(xiàn)原理,本篇將分析一種以非同步阻塞機制為基礎(chǔ)實現(xiàn)的重入鎖ReentrantLock。

ReentrantLock能夠代替synchronized關(guān)鍵字完成獨占鎖的功能,并且允許占有鎖線程的重入,顯示地調(diào)用lock、unlock方法使得代碼更靈活,收縮性更好。

因為ReentrantLock的代碼量很多并且邏輯復(fù)雜,所以要將每一部分的細節(jié)講全很難做到。本篇內(nèi)容將圍繞Lock接口的lock、lockInterruptibly、tryLock、unlock、newCondition等方法展開。

初識ReentrantLock

public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

public class ReentrantLock implements Lock, java.io.Serializable {
    ...
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }

    static final class NonfairSync extends Sync {
        ...
    }

    static final class FairSync extends Sync {
        ...
    }
}

ReentrantLock實現(xiàn)了Lock接口,并且有三個內(nèi)部類。第一個Sync繼承了AbstractQueuedSynchronizer。另兩個NonfairSync和FairSync繼承自Sync分別實現(xiàn)非公平和公平兩種模式的方法。

下面看下ReentrantLock的主要方法:

  • lock 占有鎖,如果當前線程無法占有鎖則掛起
  • lockInterruptibly 占有鎖,除非當前線程已中斷
  • tryLock 嘗試在當前鎖空閑時占有鎖,如果占有失敗并不會掛起,而是返回false
  • unlock 釋放鎖
  • newCondition 由當前Lock創(chuàng)建一個Condition對象用于調(diào)用await、signal、signalAll等同步方法。

AbstractQueuedSynchronizer

由于ReentrantLock的實現(xiàn)依賴于其內(nèi)部類Sync,而Sync繼承自AbstractQueuedSynchronizer,因此先分析這個類。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;

        static final int SIGNAL    = -1;

        static final int CONDITION = -2;

        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
}

AbstractQueuedSynchronizer內(nèi)部有一個以Node對象為節(jié)點的雙向鏈表隊列,AbstractQueuedSynchronizer中的head變量為隊列頭,tail變量為隊列尾,并且它們都是transient關(guān)鍵字修飾的,這里解釋下它的語義:

當對象被序列化時(寫入字節(jié)序列到目標文件)時,transient阻止實例中那些用此關(guān)鍵字聲明的變量持久化;當對象被反序列化時(從源文件讀取字節(jié)序列進行重構(gòu),這樣的實例變量值不會被持久化和恢復(fù)。

AbstractQueuedSynchronizer中的state屬性是一個以CAS非阻塞同步操作維護的volatile修飾的變量。

Node類中的waitStatus表示當前的等待狀態(tài):

  • CANCELLED 1 表示該Node放棄競爭鎖
  • SIGNAL -1 表示Node內(nèi)部的線程將掛起(通過LockSupport的park方法)
  • CONDITION -2 表示Node在Condition中的隊列等待
  • PROPAGATE -3 本節(jié)點的狀態(tài)將傳遞給下一個節(jié)點?(不太清楚)

Node中還維護了prev、next等雙向鏈表所必要的引用,并且每個Node中維護一個線程對象,該線程即參與通過CAS操作競爭修改AbstractQueuedSynchronizer中state失敗的線程。

AbstractQueuedSynchronizer acquire方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

多線程競爭占有鎖通過acquire方法實現(xiàn),其中tryAcquire方法分別由子類NonfairSync和FairSync實現(xiàn)具體的。

非公平模式NonfairSync 的tryAcquire方法實現(xiàn)如下:

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

公平模式FairSync的tryAcquire方法實現(xiàn)如下:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  • NonfairSync 在AQS的state為0的情況下,利用CAS將state改為acquires,如果成功則調(diào)用setExclusiveOwnerThread方法將exclusiveOwnerThread這個變量設(shè)置為當前線程,表明當前線程占有鎖,然后返回true。在競爭激烈的情況下,CAS可能返回失敗,或者state不為0,表示鎖已被其他線程獨占。因此第二個判斷比較當前線程與exclusiveOwnerThread變量是否相等,如果相等說明是同一線程的操作,將state加上acquires并更新回去返回true。上述條件都不滿足,當前線程競爭鎖失敗返回false。

  • FairSync 相比NonfairSync 在CAS這一步前先執(zhí)行hasQueuedPredecessors方法,該方法確認是否有鎖等待隊列,若沒有等待的線程則返回false,如果鎖等待隊列有等待的線程,并且第一個等待的線程與當前調(diào)用方法的線程不相等則返回true,否則返回false。從公平模式的語義判斷該模式要保證競爭鎖的有序性,因此不像NonFairSync允許后面的線程插隊獲取鎖,而是保證線程獲取鎖的順序與線程隊列的順序一致。

可見公平模式會保證線程占有鎖的順序與AQS內(nèi)部的Node隊列順序相同,非公平模式允許最后將要插入Node隊列尾部的Node線程插隊競爭占有鎖。

對于嘗試獲取鎖失敗的線程下一步執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先是addWaiter方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter先將當前線程封裝為一個Node對象A,如果隊列未初始化就先通過enq方法利用CAS將head和tail節(jié)點設(shè)置為同一個新Node對象,然后把A Node插入到隊列的尾部并返回。如果隊列已經(jīng)有tail節(jié)點,就把A Node插入到tail后,并將tail設(shè)置為A返回。注意這里用CAS完全是為了并發(fā)競爭。

接著執(zhí)行acquireQueued方法:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這里的node參數(shù)即剛才插入隊列的Node對象,暫且還叫它A,首先判斷A的前一個節(jié)點是不是head節(jié)點,如果是表示A是隊列的第一個等待線程。如果在它插入的過程中占有鎖的線程可能執(zhí)行完畢釋放了鎖,所以接著執(zhí)行tryAcquire嘗試獲取鎖,如果成功就將head設(shè)置為A,并且將原h(huán)ead的next設(shè)置為null,這樣head節(jié)點成了A。

如果A不是head節(jié)點后的節(jié)點,或者嘗試獲取鎖失敗,執(zhí)行shouldParkAfterFailedAcquire方法:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

這里有三種情況:

  1. 如果前一個節(jié)點的狀態(tài)為-1(SIGNAL),表示當前線程需要阻塞,返回true。
  2. 如果前一個節(jié)點的狀態(tài)大于0,這里只有CANCELLED狀態(tài)大于0,表示前一個線程已被取消競爭,將前一個節(jié)點移除,直到前一個節(jié)點的狀態(tài)不大于0,并返回false。
  3. 如果前一個節(jié)點的狀態(tài)為為0,將前一節(jié)點的狀態(tài)通過CAS設(shè)置為-1(SIGNAL),返回false,這樣下一次循環(huán)本線程就會阻塞。

對shouldParkAfterFailedAcquire方法返回false的Node進入下一次循環(huán) ,直到前節(jié)點被CAS設(shè)置為-1(SIGNAL)。

若shouldParkAfterFailedAcquire返回true,表示將要對當前線程掛起,詳見parkAndCheckInterrupt方法:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

通過LockSupport的park方法將線程掛起,它的底層實現(xiàn)是UnSafe類的park方法。當調(diào)用LockSupport的unpark方法喚醒線程,返回線程是否中斷并清除中斷狀態(tài)。

占有鎖小結(jié)

占有鎖的實現(xiàn)分為公平和非公平模式,公平模式下線程會按照請求的順序依次獲取鎖,非公平模式下允許最后嘗試獲取鎖的線程插隊與隊列第一個等待線程競爭。

當線程未獲取鎖,先將自身同步插入到鎖等待隊列,接著進入循環(huán),如果當前線程是隊列的第一個,嘗試獲取鎖。如果當前線程未獲取鎖(無論是不是隊列里第一個線程),都嘗試將前一個節(jié)點的狀態(tài)設(shè)置為SIGNAL,然后掛起。

AQS鎖等待隊列.png

所以并發(fā)執(zhí)行acquire方法嘗試獲取鎖的多個線程,最后的結(jié)果就是其中一個線程占有鎖,其他線程都插入到鎖等待隊列里掛起,并且除了尾節(jié)點狀態(tài)為0,其它Node的狀態(tài)是SIGNAL或CANCELLED,引起CANCELLED狀態(tài)的分析在后面的tryLock(long timeout, TimeUnit unit)方法中。

AbstractQueuedSynchronizer release方法

釋放鎖與占有鎖的過程是相反的,相比synchronized,ReentrantLock需要在代碼中顯示調(diào)用AQS的release方法:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先是執(zhí)行tryRelease方法,該方法由子類Sync實現(xiàn):

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

首先將AQS的state剪去參數(shù)releases,然后判斷當前線程是不是持有鎖的線程,如果是就繼續(xù),不是則拋出IllegalMonitorStateException異常。因此在代碼層不允許未占有鎖的線程執(zhí)行release方法。

接下來判斷state是否為0,如果是就將占有鎖的線程設(shè)置為NULL,利用CAS將state為0同步更新,返回true。如果state不為0表示當前線程還將繼續(xù)占有鎖,返回false。

如果tryRelaese方法返回false,則release方法也返回false。如果tryRelease返回true,表示狀態(tài)已重置為0,當前線程不再占有鎖。接下來判斷鎖等待隊列的head是不是空,如果不是空并且waitStatus不為0則執(zhí)行unparkSuccessor方法。為什么waitStatus不為0才執(zhí)行喚醒方法呢,上文中提到當前節(jié)點的線程在掛起前一定要將前一節(jié)點的waitStatus更新為-1,所以如果head節(jié)點的waitStatus如果還是0,表示head以后的節(jié)點線程并未被掛起,該線程會進入下一次循環(huán)嘗試獲取鎖。

unparkSuccessor方法顧名思義就是將head后阻塞中的線程恢復(fù):

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

先將head的狀態(tài)通過CAS同步更新為0,如果head節(jié)點的下一個節(jié)點為空,則什么都不做。

如果head節(jié)點的下一個節(jié)點不為空并且waitStatus小于等于0,將head下一節(jié)點的線程喚醒。被喚醒的線程會繼續(xù)執(zhí)行acquireQueued方法內(nèi)的死循環(huán)競爭獲取鎖。如果當前為非公平模式,此時如果又有一個新的線程嘗試獲取鎖,這個剛被喚醒的線程可能會競爭失敗繼續(xù)掛起。如果當前為公平模式,喚醒的線程會通過CAS成功獲取鎖,因為新線程只會插入到鎖等待隊列的尾部掛起。

如果head節(jié)點的下一個節(jié)點不為空,但是waitStatus大于0,表示下一個線程被取消競爭,此時會從隊列尾部向頭部開始遍歷,找到第一個waitStatus為-1或0并且非head節(jié)點的Node,最后將該Node中的線程喚醒。

釋放鎖小結(jié)

在占有鎖的小結(jié)中提到,多線程競爭鎖以后除了一個線程獲取鎖以外,其他線程都將插入到Node隊列并掛起。而釋放鎖方法的執(zhí)行只對應(yīng)于已占有鎖的線程,該方法會將AQS的state通過CAS同步更新為0,然后喚醒線程隊列中除了head節(jié)點的首個處于SIGNAL或0狀態(tài)的線程。公平模式下該線程會占有鎖并執(zhí)行任務(wù),非公平模式下該線程可能會與同時嘗試獲取鎖的新線程競爭鎖,并且可能競爭失敗重新掛起。

AQS鎖的釋放2.png

ReentrantLock lock()與unlock()

ReentrantLock 的lock方法的實際實現(xiàn)是委托給內(nèi)部的NonfairSync和FairSync的。

    public void lock() {
        sync.lock();
    }

   static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
    }

    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }

公平模式下ReentrantLock的lock方法等同于調(diào)用AQS的acquire(1);
非公平模式下只是先嘗試性通過CAS將state從0更新為1,如果失敗等同于調(diào)用AQS的acquire(1)方法。

    public void unlock() {
        sync.release(1);
    }

unlock方法等同于調(diào)用AQS的release(1)。

由于AQS的acquire和release方法前面都講過,這里就不復(fù)述了。

ReentrantLock tryLock()與tryLock(long timeout, TimeUnit unit)

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

tryLock的方法實現(xiàn)同非公平模式下的tryAcquire方法,嘗試占有可重入鎖,如果失敗返回false,并不像lock方法將線程掛起。

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

tryLock(long timeout, TimeUnit unit)方法就稍微復(fù)雜一點,該方法的實現(xiàn)還是在AQS的tryAcquireNanos方法:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

該方法先調(diào)用兩種模式的tryAcquire方法嘗試占有鎖,如果失敗則執(zhí)行doAcquireNanos方法:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

該方法的實現(xiàn)與acquireQueued非常類似,它在內(nèi)部調(diào)用addWaiter先將線程添加到線程隊列中,然后計算出終止阻塞的時間,接著進入死循環(huán),先嘗試占有鎖,如果成功就將節(jié)點從隊列移除返回true;未占有鎖就通過shouldParkAfterFailedAcquire的CAS操作將前一節(jié)點設(shè)置為SIGNAL狀態(tài)后就阻塞nanosTimeout 長度的時間,該時間是終止時間減去當前時間。當該線程再次被喚醒時會再次嘗試獲取鎖,若還是獲取不到就返回false。并且在失敗的情況下執(zhí)行cancelAcquire方法:

    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }

該方法將Node節(jié)點前waitStatus大于0即CANCELLED狀態(tài)的節(jié)點移除隊列,然后將當前超時的Node節(jié)點設(shè)置為CANCELLED狀態(tài)。

如果當前節(jié)點為隊列的尾部,利用CAS將其移除隊列。

如果當前節(jié)點不是尾部節(jié)點,在上個節(jié)點不是head節(jié)點的情況下,如果上個節(jié)點處于SIGNAL狀態(tài)或嘗試將其設(shè)置為SIGNAL狀態(tài)成功,并且上個節(jié)點的線程不為null,如果下一個節(jié)點為SIGNAL狀態(tài),將當前節(jié)點移除。

不滿足上述兩個條件將繼續(xù)執(zhí)行unparkSuccessor方法,即喚醒繼任者線程,在上文中分析過。該方法將找到當前節(jié)點后處于0或-1狀態(tài)的NODE,并將其喚醒。

tryLock方法小結(jié)

不帶參數(shù)的tryLock方法嘗試非公平獲取鎖,如果獲取失敗并不會掛起,而是返回結(jié)果false。
帶參數(shù)的tryLock(long timeout, TimeUnit unit)通過公平與非公平另種模式的tryAcquire方法嘗試獲取鎖,如果獲取失敗當前線程會掛起timeout長度的時間,如果在指定時間還未占有鎖就返回false,并且將當前Node置為CANCELLED狀態(tài),這就是CANCELLED狀態(tài)的由來。

ReentrantLock lockInterruptibly()

該方面的注釋解釋為占有鎖除非線程被中斷了。

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

它的實現(xiàn)依然是AQS定義的acquireInterruptibly方法。注意到如果線程是中斷的則拋出InterruptedExeception。然后先調(diào)用tryAcquire嘗試占有鎖,該方法的實現(xiàn)依然是公平和非公平兩種模式,在獲取鎖失敗的情況下執(zhí)行doAcquireInterruptibly方法:

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

對比doAcquireNanos方法,發(fā)現(xiàn)兩者的實現(xiàn)十分類似。先將線程Node插入到隊列尾部,然后將上個節(jié)點狀態(tài)更新為SIGNAL,接著調(diào)用parkAndCheckInterrupt方法掛起當前線程:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

這里的不設(shè)置阻塞時間上限,如果線程喚醒后已經(jīng)中斷,則拋出InterruptedException異常。

lockInterruptibly方法小結(jié)

lockInterruptibly方法對比lock方法的區(qū)別是,該方法在調(diào)用時就檢查當前線程是否中斷,如果當前線程中斷就不再嘗試獲取鎖而是直接拋出InterruptedException異常。如果線程被掛起,喚醒后同樣也會檢查中斷狀態(tài),一旦發(fā)現(xiàn)線程被中斷就會拋出InterruptedException異常。

而lock方法在調(diào)用時不檢查線程的中斷狀態(tài),調(diào)用lock方法掛起的線程喚醒后雖然也檢查線程是否中斷,但是不會拋出異常,lock方法把中斷延時到了同步區(qū)域去處理異常。

ReentrantLock newCondition()

在使用synchronized同步代碼塊內(nèi)使用Object的wait()、notify()、notifyAll()方法能夠?qū)崿F(xiàn)線程間的生產(chǎn)者消費者模型,而在ReentrantLock中也有它的實現(xiàn)方法。通過newCondition方法可以調(diào)用AQS的newCondition方法:

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

ConditionObject 是AQS內(nèi)的一個成員類,也就是AQS的狀態(tài)和方法也可以在ConditionObject類訪問,它的內(nèi)部也有一個隊列,firstWaiter為隊列頭,lastWaiter為隊列尾。

public class ConditionObject implements Condition, java.io.Serializable {
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

Condition接口的主要方法:

  • await() 讓當前線程等待直到被通知或中斷
  • awaitUninterruptibly() 讓當前線程等待直到被通知,無法被中斷
  • awaitNanos(long nanosTimeout) 讓當前線程等待直到被通知、中斷或超時
  • awaitUntil(Date deadline) 讓當前線程等待直到被通知、中斷或越過指定時間
  • signal() 喚醒等待中的一個線程
  • signalAll() 喚醒所有等待中的線程

通過newCondition方法可以產(chǎn)生多個ConditionObject 對象,即一個ReentrantLock可以對應(yīng)多個ConditionObject對象 ,而每個ConditionObject對象的await()、signal()、signalAll()方法是相互獨立的。而在Object的wait()、notify()、notifyAll()方法中,這些同步方法只是針對這一個對象。

ConditionObject await()

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

當一個線程調(diào)用了await方法,調(diào)用addConditionWaiter方法,首先調(diào)用unlinkCancelledWaiters方法清除ConditionObject隊列中非Condition狀態(tài)的節(jié)點,接著將自身線程封裝為一個waitStatus為CONDITION的Node, 并插入到ConditionObject內(nèi)部的隊列,這個隊列同AQS的Node隊列也是先進先出。

接著調(diào)用fullyRelease方法獲取當前AQS的state,并調(diào)用release方法釋放鎖,喚醒AQS Node隊列中第一個掛起的線程。到這一步當前線程已經(jīng)交出了鎖的控制權(quán)。

接著進入循環(huán),退出條件是isOnSyncQueue方法返回true,SyncQueue即上文中AQS內(nèi)的Node隊列,該隊列上的線程都在等待鎖,下面分析下這個方法:

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
  • 情況一, 如果插入的節(jié)點為CONDITION 狀態(tài)或它的前一節(jié)點為空,表示該節(jié)點處于等待狀態(tài),并未加入鎖的等待隊列返回false。
  • 情況二,如果插入的節(jié)點有下一個節(jié)點,返回true。
  • 情況三,上述都不滿足調(diào)用findNodeFromTail,該方法從AQS的tail節(jié)點開始找當前節(jié)點是否在AQS的Node隊列中。

循環(huán)內(nèi),當前線程會通過LockSupport的park方法掛起,當線程被喚醒,調(diào)用checkInterruptWhileWaiting方法判斷線程是不是中斷了:

        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

如果線程中斷,就嘗試通過CAS將Node狀態(tài)更新為0,如果成功就插入AQS的Node隊列,返回THROW_IE 標記。如果Node當前狀態(tài)不為CONDITION,則返回REINTERRUPT標記。然后跳出循環(huán)。

跳出循環(huán)后,執(zhí)行acquireQueued方法,該方法在上述AQS的acquire方法的過程中分析過,用于嘗試獲取鎖。取得鎖后先清除ConditionObject中條件等待隊列非Condition狀態(tài)的Node,然后根據(jù)interruptMode標記決定拋出異常(THROW_IE ),還是交給同步代碼塊處理(REINTERRUPT)。

await方法小結(jié)

從await代碼的分析中得知,await方法將當前線程封裝為Node對象插入到Condition的條件等待隊列,然后將AQS鎖完全釋放,喚醒AQS鎖等待隊列中的下一個SIGNAL線程。

接著將當前線程掛起,直到線程被中斷或喚醒,嘗試調(diào)用acquireQueued方法獲取AQS的鎖,該方法在AQS的aquire方法中介紹過,它會判斷node是不是鎖等待隊列HEAD后的節(jié)點,如果是就嘗試占有鎖,否則該線程會清除之前的CACELLED狀態(tài)的節(jié)點后再次判斷,如果還不是就掛起。占有AQS同步鎖后根據(jù)中斷標記決定是直接拋出中斷異常還是由同步代碼塊處理中斷。

await方法執(zhí)行過程.png

這里有個疑問執(zhí)行await方法的線程的Node這是添加到ConditionObject的條件等待隊列,為何線程醒來后要把自己當成AQS的鎖等待隊列的節(jié)點?見signal方法的分析。

ConditionObject signal()

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

首先判斷調(diào)用 signal方法的線程是不是鎖的持有者線程,然后獲取ConditionObject條件等待隊列的頭結(jié)點,對其調(diào)用doSignal方法:

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先將ConditionObjcect內(nèi)的Node隊列的頭結(jié)點指向下一個節(jié)點,即將頭節(jié)點移除,然后執(zhí)行transferForSignal方法,該方法先嘗試同步更新頭節(jié)點為0,如果失敗說明當前Node是無效的,繼續(xù)循環(huán)將頭節(jié)點后移一個。

如果節(jié)點成功更新為狀態(tài)0,將該Node插入到AQS的線程隊列中。僅當該Node為取消狀態(tài)或更新為SIGNAL狀態(tài)失敗才喚醒該線程。

通常情況下,ConditionObject中的Node插入到AQS的鎖等待隊列中后,由unlock方法釋放鎖后由AQS的release方法去喚醒線程,也就是調(diào)用LockSupport的unpark方法,在上文分析過。這里也是Node從條件等待隊列轉(zhuǎn)換到AQS的鎖等待隊列的實現(xiàn),并且將Node從CONDITION狀態(tài)更新為0.

signal方法小結(jié)

await方法執(zhí)行后,線程A會封裝為Node插入到ConditionObject的條件等待隊列中,并且會掛起交出AQS鎖的控制權(quán)。當另一個線程B調(diào)用了signal方法,ConditionObject條件等待隊列中的頭個處于CONDITION狀態(tài)的Node(線程A)會被插入到AQS的鎖等待隊列中并同步更新狀態(tài)為0,當線程B釋放了鎖,會喚醒線程A,線程A獲取鎖后可以繼續(xù)執(zhí)行await方法后的同步代碼。

signal.png

ConditionObject signalAll()

相比signal方法,signalAll其實就是喚醒ConditionObject中條件等待隊列里所有狀態(tài)為CONDITION的線程去競爭鎖。

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

signal方法只會將條件等待隊列的頭個節(jié)點插入到AQS的鎖等待隊列,而signalAll方法嘗試將條件等待隊列的所有狀態(tài)為CONDITION的節(jié)點插入到AQS的鎖等待隊列。

總結(jié)

本以為ReentrantLock的內(nèi)部實現(xiàn)像AtomicInteger一樣簡單的調(diào)用UnSafe類的CAS算法就實現(xiàn)了,實際的分析過程中發(fā)現(xiàn)還是挺復(fù)雜的,主要是這個類牽扯到大量的CAS同步操作競爭鎖,所以看源碼就不能僅僅靠單線程思維,還要發(fā)散成多線程思維,想想這里的同步操作是不是為了避免某個并發(fā)問題。

AbstractQueuedSynchronizer的鎖等待隊列和ConditionObject的條件等待隊列是ReentrantLock實現(xiàn)的關(guān)鍵,這兩個隊列共用了Node類,所不同的是條件等待隊列的Node狀態(tài)一般是CONDITION,而鎖等待隊列的狀態(tài)一般是SIGNAL或者0,兩種隊列的NODE都有CANCELLED狀態(tài)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容