ReentrantReadWriteLock

ReentrantLock是獨(dú)占鎖,只允許一個(gè)線程執(zhí)行;CountDownLatch,Semaphore等是共享鎖;它們分別利用了AQS的獨(dú)占與共享功能;那么如果在讀操作遠(yuǎn)多于寫操作的情況下該如何選擇?讀寫鎖,之前的文章中介紹了如何自己實(shí)現(xiàn)一個(gè)讀寫鎖,還實(shí)現(xiàn)了重入功能,讀讀,寫寫,讀寫,寫讀四種重入?,F(xiàn)在來看看JUC包下的ReentrantReadWriteLock的實(shí)現(xiàn)。
先來大致了解下ReentrantReadWriteLock:

  • 讀鎖是個(gè)共享鎖,寫鎖是個(gè)獨(dú)占鎖。讀鎖同時(shí)被多個(gè)線程獲取,寫鎖只能被一個(gè)線程獲取。讀鎖與寫鎖不能同時(shí)存在。
  • 一個(gè)線程可以多次重復(fù)獲取讀鎖和寫鎖
  • 鎖降級(jí):獲取寫鎖的線程又獲取了讀鎖,之后釋放寫鎖,就完成了一次鎖降級(jí)。
  • 鎖升級(jí):不支持升級(jí)。獲取讀鎖的線程去獲取寫鎖的化會(huì)造成死鎖。
  • 重入數(shù):讀鎖和寫鎖的最大重入數(shù)為65535
  • 公平與非公平兩種模式

AQS維護(hù)了一個(gè)int值,表示同步狀態(tài);對于ReentrantLock,state會(huì)在0與1之間變化,1表示已被占有后續(xù)線程入隊(duì)列等待,0表示free。對于CountDownLatch,會(huì)先將state賦予個(gè)大于0的值,在該值變?yōu)?后喚醒等待隊(duì)列中的線程。那么如何用它來即表示讀鎖又表示寫鎖呢?讀鎖我們是允許多個(gè)線程同步運(yùn)行的,我們還允許重入,那么拿什么來記錄每個(gè)線程讀鎖的重入數(shù)?

針對上面兩個(gè)問題,對于同步狀態(tài)status,高16位表示所有線程持有的讀鎖總數(shù),低16位為一個(gè)線程的寫鎖總數(shù),包括重入。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
       
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

采用ThreadLocal來記錄每個(gè)線程鎖持有的讀鎖數(shù)目。

        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        private transient ThreadLocalHoldCounter readHolds;
        private transient HoldCounter cachedHoldCounter;
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
  • HoldCounter 靜態(tài)內(nèi)部類用來記錄一個(gè)線程的讀鎖重入數(shù),以及id;
  • ThreadLocalHoldCounter繼承了ThreadLocal,實(shí)現(xiàn)了initialValue方法,作用是在沒有set前調(diào)用get的話initialValue會(huì)被調(diào)用,HoldCounter對象會(huì)被存儲(chǔ)到Entry里,并返回它。變量名為readHolds,它用來存儲(chǔ)/獲取線程的讀鎖數(shù)量。因?yàn)樽x鎖是共享的,我們利用同步狀態(tài)的高16位來記錄總數(shù),用threadlocal來記錄每個(gè)線程所持有的讀鎖數(shù)目。對于寫鎖來說它是獨(dú)占鎖,低16位代表的就是當(dāng)前線程持有的寫鎖數(shù)目。
  • cachedHoldCounter:它是一種優(yōu)化的手段,為了避免頻繁的調(diào)用ThreadLocalHoldCounter的讀取,更改甚至刪除操作,于是緩存最新一個(gè)成功獲取鎖的線程的HoldCounter,意思是當(dāng)一個(gè)線程需要記錄值的時(shí)候會(huì)先檢查自己是否是cachedHoldCounter中緩存的那個(gè)線程,是的話就不用再從readHolds中獲取了,減少對ThreadLocal的操作。
  • firstReader 與firstReaderHoldCount:代表首個(gè)獲取讀鎖的線程與其所持有的讀鎖數(shù),該讀鎖數(shù)不會(huì)存儲(chǔ)進(jìn)readHolds,這是種優(yōu)化,針對只有一個(gè)線程的情況,避免頻繁操作readHolds。

ReentrantReadWriteLock繼承了ReadWriteLock,這兩個(gè)方法分別返回讀鎖與寫鎖。ReentrantReadWriteLock內(nèi)部實(shí)現(xiàn)了兩個(gè)類:ReadLock&WriteLock分別實(shí)現(xiàn)讀鎖與寫鎖。

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

同ReentrantLock一樣內(nèi)部實(shí)現(xiàn)了非公平與公平兩種同步器:NonfairSync &FairSync ,繼承自同一同步器Sync。

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

只定義了writerShouldBlock & readerShouldBlock兩種方法,它們作用在獲取鎖的過程中,決定當(dāng)前線程是否該阻塞。

一 讀鎖

1.獲取讀鎖

        public void lock() {
            sync.acquireShared(1);
        }

定位到AQS的acquireShared,該方法之前介紹過。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

來看看Sync重寫的tryAcquireShared方法

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

寫鎖不為零且持有寫鎖的并非本線程,則返回-1,之后在acquireShared中將線程的節(jié)點(diǎn)放入到等待隊(duì)列中。寫鎖不為零但是正是本線程持有的,則代表寫讀重入。之后在readerShouldBlock返回false與CAS操作成功后,更新HoldCounter 的值,這里會(huì)對之前提到的firstReader ,firstReaderHoldCount 或cachedHoldCounter進(jìn)行相應(yīng)的操作。

如果CAS失敗或者readerShouldBlock返回true,則會(huì)調(diào)用fullTryAcquireShared,該方法會(huì)繼續(xù)嘗試獲取讀鎖,可以看成是tryAcquireShared的升級(jí)版。

先來看看readerShouldBlock()方法:
公平模式下,根據(jù)隊(duì)列中當(dāng)前線程之前有沒有等待的線程來決定。

        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }

非公平模式

        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }

調(diào)用apparentlyFirstQueuedIsExclusive()

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
  • 返回true代表等待隊(duì)列head.next節(jié)點(diǎn)是等待寫鎖的線程,該方法的目的是不讓寫鎖一直等待下去;比如在上一篇自己實(shí)現(xiàn)的讀寫鎖中,通過增加一個(gè)寫請求變量來防止寫?zhàn)囸I,讓寫鎖的優(yōu)先級(jí)高于讀鎖。這里有相似的目的。
  • 這個(gè)方法是不可靠的,因?yàn)樵跈z測過程中隊(duì)列結(jié)構(gòu)是在變化的,;但是我們并不依賴于它的準(zhǔn)確表達(dá),它更多是一種探測,一種優(yōu)化,我們希望它來防止寫鎖的饑餓;而且并不是該方法返回了true,線程就會(huì)被放入阻塞隊(duì)列退出競爭,來看fullTryAcquireShared的邏輯
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

之前說fullTryAcquireShared是tryAcquireShared的升級(jí)版,它處理了CAS失敗和readerShouldBlock返回true的情況;

  1. 先檢查寫鎖是否被其他線程占用;
  2. 調(diào)用readerShouldBlock檢查當(dāng)前線程是否應(yīng)該進(jìn)入等待隊(duì)列,返回true也不代表該線程要進(jìn)入等待隊(duì)列,我們看它的處理邏輯:如果firstReader == current代表當(dāng)前只有你一個(gè)讀線程,那么不用等待可以獲取讀鎖;只有在rh.count == 0(意味著該線程沒有持有讀鎖)的情況下返回-1代表線程要進(jìn)入等待隊(duì)列。為什么?持有讀鎖的線程不能進(jìn)入同步隊(duì)列?
  3. 之后便是CAS,成功便更改HoldCounter值返回1,代表獲取讀鎖成功,否則循環(huán)再次檢查,總之不能輕易的將線程放入等待隊(duì)列,容易造成死鎖。

上面問題的解答:一個(gè)線程是不能隨便放入隊(duì)列中等待的,容易造成死鎖,看下面兩種情況:

  1. 如果一個(gè)線程持有讀鎖,重入失敗被放入等待隊(duì)列,若等待隊(duì)列中排在它前面的線程里有等待寫鎖的線程,那么就會(huì)造成死鎖,因?yàn)樽x鎖與寫鎖是互斥的。
  2. 假設(shè)一個(gè)線程持有寫鎖進(jìn)行“鎖降級(jí)申請”,被放入同步隊(duì)列,那么不僅之后的讀寫線程都會(huì)被放入隊(duì)列,隊(duì)列中之前有等待線程,無論等待的是讀或?qū)戞i都將造成死鎖。

回到fullTryAcquireShared

  1. 它先判斷是否有寫鎖,如果有且就是本線程就不會(huì)進(jìn)行readerShouldBlock判斷,直接CAS,這樣便解決了情況2的問題;
  2. 針對情況1線程持有讀鎖的情況,即使readerShouldBlock返回true,rh.count == 0不符合不會(huì)返回-1,也就不會(huì)將線程放入隊(duì)列。

總結(jié)就是fullTryAcquireShared,采用for循環(huán)方式讓線程不斷判斷與嘗試,且只有在一種情況下才會(huì)將線程放入隊(duì)列:readerShouldBlock返回true(原因可能是公平模式或者第一個(gè)等待線程(head.next)在等待寫鎖),當(dāng)前線程不是第一個(gè)讀線程且沒有持有讀鎖。

2.釋放讀鎖

        public void unlock() {
            sync.releaseShared(1);
        }

定位到AQS的releaseShared,之前介紹過。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

來看看同步器Syn實(shí)現(xiàn)的tryReleaseShared

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

1.如果是firstReader ,就對它及firstReader進(jìn)行修改;2.如果不是,就對readHolds進(jìn)行修改;3. 自旋CAS修改status
返回true代表status == 0,表示既沒有讀鎖也沒有寫鎖。

3. tryLock

        public boolean tryLock() {
            return sync.tryReadLock();
        }

調(diào)用了同步器Syn的tryReadLock

        final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

感覺與tryAcquireShared很像,不同在于tryReadLock只嘗試獲取讀鎖一次,成功就返回true,否則false;這是由于方法用途不同,所以設(shè)計(jì)自然不同。

二, 寫鎖

1,獲取寫鎖

        public void lock() {
            sync.acquire(1);
        }

定位到AQS的acquire中,AQS文章里介紹過。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

來看看同步器Syn重寫的tryAcquire方法

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

一 :c != 0下分兩種 1. w == 0 代表有讀鎖;2. w != 0 && current != getExclusiveOwnerThread() 代表有其他寫鎖;以上兩種返回false,線程要進(jìn)入等待隊(duì)列。否則就設(shè)置status值,返回true,獲取寫鎖成功。
二 :c == 0情況下要看writerShouldBlock的情況,返回false就會(huì)去CAS更改同步狀態(tài),成功就將AOS里的exclusiveOwnerThread設(shè)置位當(dāng)前線程,最后返回true;

注:情況一用setState更改同步狀態(tài),情況二用compareAndSetState?情況一執(zhí)行到setState這步說明當(dāng)前線程已持有寫鎖,是在重入,其他線程都會(huì)被排斥不同擔(dān)心線程安全問題,所以setState就可以,同步狀態(tài)status是volatile的。情況二里當(dāng)前即無讀鎖也無寫鎖,當(dāng)前線程始于其他線程在競爭,所以要利用CAS來保證原子性。
持有讀鎖線程不能申請寫鎖,即不能升級(jí),從tryAcquire可以看出持有讀鎖線程一定會(huì)返回false,也就是會(huì)被放入隊(duì)列中等待寫鎖,但是它持有的讀鎖將不會(huì)被釋放,那么寫鎖就不肯能獲取到,雖然不影響讀鎖的獲取,但所有寫鎖都將不能被獲取到。

來看看writerShouldBlock方法
非公平模式:直接返回false,也就是寫請求可以插隊(duì),即寫優(yōu)先級(jí)高;

        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }

公平模式:考慮的是當(dāng)前隊(duì)列是否有等待的線程

        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }

2,釋放寫鎖

        public void unlock() {
            sync.release(1);
        }

定位到AQS中的release

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

來看看同步器Syn重寫的tryRelease

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

注意返回值:返回true代表寫鎖個(gè)數(shù)為0,也就是寫鎖可用;返回false表示寫鎖仍被當(dāng)前線程占著,可能是因?yàn)楫?dāng)前線程重入了寫鎖。

3,tryLock

        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }

定位到同步器Syn

        final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

tryWriteLock方法看上去跟tryAcquire方法真的很像。唯一的區(qū)別在于,tryWriteLock忽略的writerShouldBlock方法;該方法的調(diào)用就是去搶寫鎖,搶不到返回false就行了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容