帶你徹底弄懂ReentrantReadWriteLock

前言

ReentrantReadWriteLock可以說(shuō)是最復(fù)雜的鎖實(shí)現(xiàn)類(lèi),這篇文章帶你弄懂ReentrantReadWriteLock實(shí)現(xiàn)讀寫(xiě)鎖的所有細(xì)節(jié),在閱讀本文之前,讀者需要了解一些前置的知識(shí)點(diǎn),比如AQS,公平鎖以及非公平鎖,CAS等等,了解了這些知識(shí)點(diǎn)后,可以更加輕松的理解ReentrantReadWriteLock,因?yàn)镽eentrantReadWriteLock是在這些的基礎(chǔ)上構(gòu)建的。

ReentrantReadWriteLock總體邏輯

AQS中維護(hù)這一個(gè)阻塞線程隊(duì)列,在該阻塞線程隊(duì)列為空時(shí),讀線程可以同時(shí)獲取讀鎖,在所有讀線程釋放讀鎖之前,如果有寫(xiě)線程想要獲取寫(xiě)鎖,則阻塞該線程,并加入阻塞隊(duì)列中。后續(xù)的讀寫(xiě)線程在第一個(gè)寫(xiě)線程之前的所有讀線程釋放鎖之前都會(huì)通過(guò)尾插的方式加入阻塞隊(duì)列。當(dāng)?shù)谝粋€(gè)寫(xiě)線程之前的所有讀線程釋放鎖后,才會(huì)陸續(xù)喚醒隊(duì)列中的線程嘗試加鎖。

ReentrantReadWriteLock的基本內(nèi)部單元

Sync繼承AQS,讀寫(xiě)鎖的核心業(yè)務(wù)邏輯都在AQS中,不管是公平鎖還是非公平鎖,讀鎖還是寫(xiě)鎖,他們都只是調(diào)用Sync中的方法實(shí)現(xiàn)公平競(jìng)爭(zhēng)和非公平競(jìng)爭(zhēng),加鎖和解鎖的業(yè)務(wù)邏輯。寫(xiě)鎖調(diào)用Sync#tryAcquire()方法加鎖,調(diào)用Sync#tryRelease()方法解鎖。讀鎖調(diào)用Sync#tryAcquireShared()加鎖,調(diào)用Sync#tryReleaseShared()加鎖。而這些方法的本質(zhì)都是通過(guò)改變AQS#state的值來(lái)改變鎖狀態(tài)。

// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫(xiě)鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 抽象靜態(tài)內(nèi)部類(lèi),繼承AQS,讀寫(xiě)鎖的核心的業(yè)務(wù)邏輯都在Sync類(lèi)中
final Sync sync;
// 非公平鎖,繼承Sync
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false;
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}
// 公平鎖,繼承Sync
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
// 讀鎖
public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;
    // 構(gòu)造函數(shù)
    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    // 讀鎖的加鎖方法
    public void lock() {
        sync.acquireShared(1);
    }
    // 讀鎖的解鎖方法
    public void unlock() {
        sync.releaseShared(1);
    }
}

public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;
    // 構(gòu)造函數(shù)
    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    // 寫(xiě)鎖加鎖方法
    public void lock() {
        sync.acquire(1);
    }
    // 寫(xiě)鎖解鎖方法
    public void unlock() {
        sync.release(1);
    }
    
}

下面來(lái)看下抽象內(nèi)部類(lèi)Sync的基本組成,Java中不管哪個(gè)并發(fā)類(lèi)都是通過(guò)定義AQS中int類(lèi)型的狀態(tài)碼state實(shí)現(xiàn)的。ReentrantReadWriteLock將狀態(tài)碼state分為高16位和低16位。狀態(tài)碼state的高16位用于存儲(chǔ)并發(fā)讀取的線程數(shù)(讀線程重入的次數(shù)也加入計(jì)數(shù)),即狀態(tài)碼state的高16位=(線程1的重入次數(shù)+線程2的重入次數(shù)+...+線程n的重入次數(shù))。狀態(tài)碼state的低16位用于存儲(chǔ)單個(gè)寫(xiě)線程的重入次數(shù)(寫(xiě)鎖為互斥鎖,只有一個(gè)線程能獲取寫(xiě)鎖)。

abstract static class Sync extends AbstractQueuedSynchronizer {
    static final int SHARED_SHIFT   = 16;
    // 用于高16加一操作(高16位用于存儲(chǔ)并發(fā)讀取的線程數(shù))
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 最大并發(fā)讀取的線程數(shù)(讀線程的重入也加入計(jì)數(shù)) = 最大的單個(gè)寫(xiě)線程的重入次數(shù) = 65535
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 16位的二進(jìn)制1,用于計(jì)算低16位的寫(xiě)單個(gè)寫(xiě)進(jìn)程沖入次數(shù)
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    // 返回高16位并發(fā)讀的線程數(shù)
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 返回低16位單個(gè)寫(xiě)進(jìn)程重入次數(shù)
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    // 用于保存線程id和對(duì)應(yīng)的重入次數(shù)
    static final class HoldCounter {
        int count = 0;
        final long tid = getThreadId(Thread.currentThread());
    }
    // 繼承自ThreadLocal,用于保存除了第一個(gè)讀線程外,其他讀取線程各自的重入次數(shù)
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    // 用于保存除了第一個(gè)讀線程外,其他讀取線程的重入次數(shù)
    private transient ThreadLocalHoldCounter readHolds;
    // 用于上一次讀讀線程的id和重入次數(shù)
    private transient HoldCounter cachedHoldCounter;
    
    // 用于報(bào)錯(cuò)第一個(gè)讀線程對(duì)象
    private transient Thread firstReader = null;
    // 用于保存第一個(gè)讀線程的重入次數(shù)
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }

    abstract boolean readerShouldBlock();

    abstract boolean writerShouldBlock();
    // 寫(xiě)線程解鎖方法
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

    // 寫(xiě)線程加鎖方法
    protected final boolean tryAcquire(int acquires) {

        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    // 讀線程解鎖方法
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    // 讀線程加鎖方法
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }

    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
                if (firstReader == current) {
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh;
                }
                return 1;
            }
        }
    }

    }

ReentrantReadWriteLock初始化過(guò)程

下面先來(lái)看下ReentrantReadWriteLock的初始化過(guò)程的源碼

public class LearnReadWriteLock {
    public static void main(String[] args){
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    }
}
/*ReentrantReadWriteLock#ReentrantReadWriteLock()*/
public ReentrantReadWriteLock() {
        this(false);
    }
/*ReentrantReadWriteLock#ReentrantReadWriteLock(boolean fair)*/
public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
/*ReentrantReadWriteLock$ReadLock#ReadLock(ReentrantReadWriteLock lock)*/
protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
/*ReentrantReadWriteLock$WriteLock#WriteLock(ReentrantReadWriteLock lock)*/
protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

我們可以看到,默認(rèn)初始化為非公平鎖。我們看讀寫(xiě)鎖的初始化邏輯,他們都傳入了當(dāng)前ReentrantReadWriteLock對(duì)lock,并將sync參數(shù)設(shè)置為lock.sync。讀寫(xiě)鎖的業(yè)務(wù)邏輯都是通過(guò)調(diào)用sync中的方法實(shí)現(xiàn),這里就可以看出sync才是實(shí)現(xiàn)讀寫(xiě)鎖的關(guān)鍵類(lèi)。

ReentrantReadWriteLock讀鎖加鎖過(guò)程

public class LearnReadWriteLock {
    public static void main(String[] args){
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        Lock readLock = readWriteLock.readLock();
        readLock.lock();
    }
}
/*ReentrantReadWriteLock$ReadLock#lock()*/
public void lock() {
    sync.acquireShared(1);
}
/*AbstractQueuedSynchronizer#acquireShared()*/
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
/*ReentrantReadWriteLock$Sync#tryAcquireShared()*/
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 獲取AQS中的狀態(tài)碼state
    int c = getState();
    // 如果低16位的獨(dú)占寫(xiě)線程的重入次數(shù)不為0,并且獨(dú)占線程不等于當(dāng)前線程則返回-1。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 獲取高16位所有讀線程的重入次數(shù)之和
    int r = sharedCount(c);
    // 如果阻塞隊(duì)列為空或者阻塞隊(duì)列第一個(gè)線程獲取讀鎖&&高16位所有讀線程的重入次數(shù)之和小于65535則當(dāng)前線程允許獲取讀鎖&&使用CAS將高16位所有讀線程的重入次數(shù)之和加一成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果是第一個(gè)獲取讀鎖的線程,則使用firstReader記錄第一個(gè)獲取讀鎖的線程對(duì)象,并設(shè)置第一個(gè)獲取讀鎖的線程的重入次數(shù)firstReaderHoldCount為1。
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
            // 如果當(dāng)前線程是第一次獲取鎖的線程
        } else if (firstReader == current) {
            // 將第一次獲取鎖的線程的重入次數(shù)加1
            firstReaderHoldCount++;
        } else {
            // 如果不是第一個(gè)獲取讀鎖的線程
            // 獲取上一次獲取讀鎖的線程的相關(guān)記錄(上次獲取讀鎖的線程的id和重入次數(shù))
            HoldCounter rh = cachedHoldCounter;
            // 如果沒(méi)有上一個(gè)獲取獲取讀鎖的線程的信息(第一個(gè)獲取讀鎖的線程不算)||上一個(gè)獲取讀鎖的線程id不等于當(dāng)前的線程的id
            if (rh == null || rh.tid != getThreadId(current))
                // 從TreadLocal中獲取當(dāng)前線程的信息(重入信息),將上次獲取讀鎖的線程信息設(shè)置為當(dāng)前獲取讀鎖的線程信息。
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            // 當(dāng)前線程的重入次數(shù)加1
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

非公平獲取讀鎖:在線程獲取讀鎖的過(guò)程中,有一步很關(guān)鍵,就是通過(guò)NonfairSync#readerShouldBlock()判斷阻塞隊(duì)列的對(duì)頭的等待線程是請(qǐng)求讀鎖還是寫(xiě)鎖。如果對(duì)頭的等待線程請(qǐng)求的是讀鎖,則當(dāng)前線程不需要加入阻塞隊(duì)列,可以非公平的競(jìng)爭(zhēng)讀鎖。如果對(duì)頭的等待線程獲取的寫(xiě)鎖,則當(dāng)前線程加入阻塞隊(duì)列并掛起。

/*ReentrantReadWriteLock$NonfairSync#readerShouldBlock()*/
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
/*AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive()*/
final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
/*AbstractQueuedSynchronizer#isShared()*/
final boolean isShared() {
    return nextWaiter == SHARED;
}

ReentrantReadWriteLock寫(xiě)鎖加鎖過(guò)程

public class LearnReadWriteLock {
    public static void main(String[] args){
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        Lock writeLock = readWriteLock.writeLock();
        writeLock.lock();
    }
}
/*ReentrantReadWriteLock$WriteLock#lock()*/
public void lock() {
    sync.acquire(1);
}
/*AbstractQueuedSynchronizer#acquire()*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/*ReentrantReadWriteLock$Sync#tryAcquire()*/
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 獲取AQS中的狀態(tài)碼state
    int c = getState();
    // 獲取低16位獨(dú)占線程的重入次數(shù)
    int w = exclusiveCount(c);
    if (c != 0) {
        // 如果獨(dú)占線程的重入次數(shù)w為0(狀態(tài)碼c不為0,獨(dú)占線程的重入次數(shù)w為0說(shuō)明當(dāng)前有讀鎖還沒(méi)釋放,不能加寫(xiě)鎖)|| 獨(dú)占線程的重入次數(shù)w為不為零但是當(dāng)前線程不是獲得寫(xiě)鎖得獨(dú)占線程。
        if (w == 0 || current != getExclusiveOwnerThread())
            // 加鎖失敗
            return false;
        // 獨(dú)占線程的重入次數(shù)w加1后如果大于最大得重入次數(shù),則拋出異常
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 獨(dú)占線程的重入次數(shù)w加1,不需要CAS進(jìn)行加一,因?yàn)槭仟?dú)占線程地重入(不存在并發(fā))
        setState(c + acquires);
        // 加鎖成功
        return true;
    }
    // 在狀態(tài)碼c=0時(shí),說(shuō)明當(dāng)前沒(méi)有任何線程獲取鎖。寫(xiě)線程可以直接獲取寫(xiě)鎖,不需要阻塞,所以writerShouldBlock()直接返回false, 并使用CAS對(duì)寫(xiě)?yīng)氄季€程的重入次數(shù)加1
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 設(shè)獨(dú)占線程為當(dāng)前線程
    setExclusiveOwnerThread(current);
    return true;
}
/*ReentrantReadWriteLock$NonfairSync#writerShouldBlock*/
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

寫(xiě)鎖的加鎖過(guò)程比較簡(jiǎn)單,直接注釋就能看懂,這里不在贅述。

ReentrantReadWriteLock讀鎖阻塞過(guò)程

線程在獲取讀鎖時(shí),有兩種情況會(huì)導(dǎo)致線程阻塞:

  • 當(dāng)前已經(jīng)有線程獲取寫(xiě)鎖,并且還沒(méi)有釋放寫(xiě)鎖。
  • 已經(jīng)有線程獲取了讀鎖同時(shí)沒(méi)有釋放讀鎖并且阻塞隊(duì)列的第一個(gè)阻塞線程請(qǐng)求寫(xiě)鎖。

第一種情況很好理解,當(dāng)已經(jīng)有線程獲取了寫(xiě)鎖時(shí),任何其他線程嘗試加鎖都會(huì)失敗并并加入阻塞隊(duì)列。我們下面來(lái)看下第二種情況。

public class LearnReadWriteLock {
    public static void main(String[] args){
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        Lock readLock = readWriteLock.readLock();
        Lock writeLock = readWriteLock.writeLock();
        // 線程1獲取讀鎖
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                readLock.lock();
            }
        }, "Thread1-tryRead");
        thread1.start();
        // 線程2獲取寫(xiě)鎖
        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                writeLock.lock();
            }
        }, "Thread2-tryWrite");
        thread2.start();
        // 線程3獲取讀鎖
        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                readLock.lock();
            }
        }, "Thread3-tryRead");
        thread3.start();
    }
}

加入阻塞隊(duì)列。head指向阻塞隊(duì)列的頭節(jié)點(diǎn)(頭節(jié)點(diǎn)不存具體阻塞線程的信息,只是用來(lái)指向阻塞隊(duì)列的第一個(gè)阻塞線程對(duì)應(yīng)的Node),head指向隊(duì)列的節(jié)點(diǎn),tail指向隊(duì)列的尾節(jié)點(diǎn)。

image

我們先開(kāi)看下thread請(qǐng)求寫(xiě)鎖的失敗后的操作,即調(diào)用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的源碼,下面來(lái)看下阻塞隊(duì)列的插入過(guò)程addWaiter(Node.EXCLUSIVE)。Node.EXCLUSIVE為空,表示當(dāng)前阻塞線程請(qǐng)求的是寫(xiě)鎖。

/*ReentrantReadWriteLock$WriteLock#lock()*/
public void lock() {
    sync.acquire(1);
}
/*AbstractQueuedSynchronizer#acquire()*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/*AbstractQueuedSynchronizer#acquire()*/
private Node addWaiter(Node mode) {// Node.EXCLUSIVE(Node EXCLUSIVE = null);
    // 為當(dāng)前阻塞線程創(chuàng)建Node節(jié)點(diǎn)用于存儲(chǔ)阻塞線程的基本信息
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果尾節(jié)點(diǎn)為空,則創(chuàng)建首尾節(jié)點(diǎn),并使用尾插法在尾節(jié)點(diǎn)tail后面插入當(dāng)前node
    enq(node);
    return node;
}
/*AbstractQueuedSynchronizer$Node#Node(Thread thread, Node mode)*/
Node(Thread thread, Node mode) {
    // nextWaiter = null代表當(dāng)前阻塞線程請(qǐng)求獲取寫(xiě)鎖
    this.nextWaiter = mode;
    // 用于thread用于存儲(chǔ)阻塞線程對(duì)象
    this.thread = thread;
}
/*AbstractQueuedSynchronizer$Node#enq(final Node node)*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 使用CAS使head指向新建的頭節(jié)點(diǎn)
            if (compareAndSetHead(new Node()))
                // 由于當(dāng)前阻塞隊(duì)列只有一個(gè)頭節(jié)點(diǎn),因此頭節(jié)點(diǎn)就是尾節(jié)點(diǎn)
                tail = head;
        } else {
            // 新的阻塞節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)指尾節(jié)點(diǎn)tail
            node.prev = t;
            // 使用CAS將尾節(jié)點(diǎn)指針tail指向當(dāng)前節(jié)點(diǎn)
            if (compareAndSetTail(t, node)) {
                // 前任尾節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)指向新的尾節(jié)點(diǎn)
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter(Node.EXCLUSIVE)執(zhí)行完后,節(jié)點(diǎn)插入到阻塞隊(duì)列,如下圖所示,head指向阻塞隊(duì)列的頭節(jié)點(diǎn)(頭節(jié)點(diǎn)不存具體阻塞線程的信息,只是用來(lái)指向阻塞隊(duì)列的第一個(gè)阻塞線程對(duì)應(yīng)的Node),head指向隊(duì)列的節(jié)點(diǎn),tail指向隊(duì)列的尾節(jié)點(diǎn)。在線程2對(duì)應(yīng)的阻塞節(jié)點(diǎn)插入阻塞隊(duì)列后,阻塞隊(duì)列如下圖所示:

image

線程2對(duì)應(yīng)的阻塞節(jié)點(diǎn)插入到阻塞隊(duì)列后,線程2還未掛起。還要繼續(xù)執(zhí)行acquireQueued(node, arg)方法,下面來(lái)看下acquireQueued(final Node node, int arg)源碼。

/*AbstractQueuedSynchronizer#acquireQueued(final Node node, int arg)*/
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 前一個(gè)節(jié)點(diǎn)如果是頭節(jié)點(diǎn)則再次嘗試獲取寫(xiě)鎖
                if (p == head && tryAcquire(arg)) {
                    // 如果寫(xiě)鎖獲取成功,將當(dāng)前線程對(duì)應(yīng)的阻塞節(jié)點(diǎn)從阻塞隊(duì)列中移除
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // shouldParkAfterFailedAcquire(p,node)會(huì)將當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus設(shè)為-1,并再次進(jìn)去for循環(huán),如果還是不能獲取寫(xiě)鎖則掛起當(dāng)前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued(final Node node, int arg)方法會(huì)進(jìn)行兩次for循環(huán)讓線程2嘗試獲取寫(xiě)鎖,當(dāng)然嘗試獲取寫(xiě)鎖的前提的線程2對(duì)應(yīng)的阻塞節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn)。顯然這里滿足這個(gè)條件,于是線程2會(huì)兩次嘗試獲取寫(xiě)鎖,但是還是以失敗告終,線程2最終調(diào)用parkAndCheckInterrupt()掛起。在線程2掛起后阻塞隊(duì)列圖下圖所示。

image

最后我們來(lái)看看線程3獲取讀鎖的過(guò)程,判斷線程3是否需要阻塞的條件是隊(duì)列的第一個(gè)阻塞線程請(qǐng)求的是否是讀鎖,如果請(qǐng)求的是讀鎖,線程3嘗試獲取讀鎖(非公平)。如果請(qǐng)求的是寫(xiě)鎖,則線程3需要阻塞。

/*ReentrantReadWriteLock$ReadLock#lock()*/
public void lock() {
    sync.acquireShared(1);
}
/*AbstractQueuedSynchronizer#acquireShared()*/
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
/*ReentrantReadWriteLock$Sync#tryAcquireShared()*/
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 獲取AQS中的狀態(tài)碼state
    int c = getState();
    // 如果低16位的獨(dú)占寫(xiě)線程的重入次數(shù)不為0,并且獨(dú)占線程不等于當(dāng)前線程則返回-1。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 獲取高16位所有讀線程的重入次數(shù)之和
    int r = sharedCount(c);
    // 如果阻塞隊(duì)列為空或者阻塞隊(duì)列第一個(gè)線程獲取讀鎖&&高16位所有讀線程的重入次數(shù)之和小于65535則當(dāng)前線程允許獲取讀鎖&&使用CAS將高16位所有讀線程的重入次數(shù)之和加一成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
/*ReentrantReadWriteLock$NonfairSync#readerShouldBlock()*/
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
/*AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive()*/
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}
/*AbstractQueuedSynchronizer#isShared()*/
final boolean isShared() {
    return nextWaiter == SHARED;
}

顯然,當(dāng)前阻塞隊(duì)列的第一個(gè)阻塞線程2嘗試獲取的是寫(xiě)鎖,接下來(lái)運(yùn)行阻塞方法doAcquireShared(arg); doAcquireShared(arg)會(huì)將線程3對(duì)應(yīng)的阻塞節(jié)點(diǎn)插入到阻塞隊(duì)列中,并兩次嘗試獲取讀鎖,如果還是獲取不到,則掛起當(dāng)前線程。顯然這里線程3最后還是會(huì)獲取讀鎖失敗。

/*AbstractQueuedSynchronizer#doAcquireShared()*/
private void doAcquireShared(int arg) {
    // 插入當(dāng)前線程對(duì)應(yīng)的阻塞節(jié)點(diǎn)到阻塞隊(duì)列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果當(dāng)前線程對(duì)應(yīng)的阻塞節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn),直接嘗試獲取讀鎖
            if (p == head) {
                // 獲取讀鎖成功
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire(p,node)會(huì)將當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus設(shè)為-1,并再次進(jìn)去for循環(huán),如果還是不能獲取讀鎖則掛起當(dāng)前線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
/*AbstractQueuedSynchronizer#acquire()*/
private Node addWaiter(Node mode) {// Node.SHARED(Node SHARED = new Node();)
    // 為當(dāng)前阻塞線程創(chuàng)建Node節(jié)點(diǎn)用于存儲(chǔ)阻塞線程的基本信息
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 在尾部插入線程3的阻塞節(jié)點(diǎn)
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

在線程3掛起后阻塞隊(duì)列如下圖所示。


image

總結(jié)

以上就是ReentrantReadWriteLock的獲取讀寫(xiě)鎖的全部?jī)?nèi)容,關(guān)于解鎖部分的原理,這里不在贅述。需要注意一下幾點(diǎn):

  • 如果已經(jīng)有線程獲取了寫(xiě)鎖,則其他任何線程如果嘗試獲取讀寫(xiě)鎖都會(huì)失敗。
  • 如果已經(jīng)有線程獲取了讀鎖,其他線程嘗試獲取讀鎖時(shí),需要判斷阻塞隊(duì)列的第一個(gè)阻塞線程嘗試獲取的鎖的類(lèi)型,如果是讀鎖,則可以非公平的競(jìng)爭(zhēng)讀鎖。如果是寫(xiě)鎖,則加入阻塞隊(duì)列,在兩次嘗試獲取讀鎖失敗后被掛起。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容