Java并發(fā)之ReentrantReadWriteLock

ReentrantReadWriteLock可重入讀寫鎖,先從功能以及具體實現(xiàn)有一個簡單的了解

一、兩把鎖

writerLock,readerLock分為讀鎖跟寫鎖,他們之間的共存與互斥進(jìn)行一個簡單羅列

寫寫互斥、讀寫互斥、寫讀可降級,讀寫不可升級、讀讀可并行

這是我的總結(jié),另外競爭狀態(tài)同樣是使用state變量進(jìn)行的,通過高16進(jìn)行共享鎖也就是讀讀線程之間的共享模式調(diào)度,低16位用于寫鎖的阻塞模式。
也就是說寫鎖重入次數(shù)是int最大取值范圍的差不多一半,不過這也夠用了。

二、同步器代碼

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

這幾個成員屬性主要是做state值的切分與轉(zhuǎn)換的,

SHARED_SHIFT =16 等于是將int的state 4字節(jié) 32位進(jìn)行切分,16位
SHARED_UNIT 左移位16位也就是實際share模式(讀鎖時候state的)存儲時候是以65536為底,
MAX_COUNT 讀鎖或者寫鎖最大的資源數(shù),65535
EXCLUSIVE_MASK 字面意思獨占模式的掩碼,其實就是為了方便進(jìn)行獨占模式進(jìn)行位&運算
sharedCount(int c) 通過無符號右移操作得出目前share資源的剩余數(shù)量
exclusiveCount(int c) 通過掩碼的與運算,"抹掉"高位的讀鎖占用的變量值,不得不說你大爺還是你大爺,如果是我也就是加加減減那么用。

用于存儲讀鎖的計數(shù)器在重入時候使用,代碼如下

      static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

讀鎖的重入計數(shù)是通過線程本地變量ThreadLocal里面存儲了HoldCounter實現(xiàn)的

    static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

接下來對幾個變量做一個簡單說明就開始分析幾個重要的方法

        當(dāng)前線程對自身重入計數(shù)器的一個引用
        private transient ThreadLocalHoldCounter readHolds;
        sync對象內(nèi)部緩存的最后一個獲得讀鎖的線程計數(shù)器對象引用目的提升查詢比較時候的性能
        private transient HoldCounter cachedHoldCounter;
        sync對象內(nèi)部緩存的第一個獲取讀鎖的線程
        private transient Thread firstReader = null;
        sync對象內(nèi)部緩存的第一個獲取讀鎖的線程重入次數(shù)
        private transient int firstReaderHoldCount;
       
        

兩個抽象方法定義要求公平鎖與非公平鎖去實現(xiàn)在嘗試獲得鎖時候是否進(jìn)行當(dāng)前線程的阻塞

        判斷讀鎖是否要阻塞
        abstract boolean readerShouldBlock();
        判斷寫鎖是否需要阻塞
        abstract boolean readerShouldBlock()
        公平鎖實現(xiàn)
        static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            判斷當(dāng)前阻塞隊列是否有等待節(jié)點
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            判斷當(dāng)前阻塞隊列是否有等待節(jié)點
            return hasQueuedPredecessors();
        }
    }
        非公平鎖實現(xiàn)
        static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            不阻塞嘗試進(jìn)行加鎖
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            判斷當(dāng)前是否有寫鎖,如果沒有寫鎖就不阻塞
            return apparentlyFirstQueuedIsExclusive();
        }
    }

下面仔細(xì)分解一下讀鎖跟寫鎖的加鎖與釋放鎖的相關(guān)代碼
從命名就可以看出來哪些是用于寫鎖的

 protected final boolean tryRelease(int releases) {
            //因為寫鎖是獨占模式,如果當(dāng)前線程不是鎖持有者則屬于異常狀態(tài)
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //因為支持重入所以不只是0跟1的關(guān)系
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            //當(dāng)state減到0的時候進(jìn)行競爭狀態(tài)的釋放,并修改當(dāng)前寫鎖線程持有者為空
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
        這部分是寫鎖的獲取,寫鎖的獲取稍微有些復(fù)雜
        protected final boolean tryAcquire(int acquires) {
          
            Thread current = Thread.currentThread();
            取出當(dāng)前state的數(shù)值
            int c = getState();
            通過與運算算出當(dāng)前獨占鎖下數(shù)值
            int w = exclusiveCount(c);
            如果是state不為0有可能有線程獲得了讀鎖,也有可能是有現(xiàn)成獲得寫鎖
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    這表示兩種情況下都失敗
                    1.有讀鎖存在
                    2.有寫鎖,但是當(dāng)前寫鎖持有線程不是當(dāng)前線程
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                state不為0并且當(dāng)前線程又為鎖持有線程
                鎖重入增加state的值
                setState(c + acquires);
                return true;
            }
            如果當(dāng)前state為0走入這個分支,判斷是否需要等待,
            或者嘗試修改競爭狀態(tài)失敗返回加鎖失敗否則設(shè)置當(dāng)前
            線程為鎖持有線程
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

下面是讀鎖的獲取與釋放邏輯相關(guān)的代碼
個人認(rèn)為讀鎖的獲取相對值得自信分解,因為里面會涉及一個鎖降級過程,并且還有一個令我想了挺長沒get到李大爺?shù)狞c的地方,就是一個死循環(huán)去獲取讀鎖。前提條件是目前無寫鎖未被持有

 protected final int tryAcquireShared(int unused) {
           
            Thread current = Thread.currentThread();
            int c = getState();
            當(dāng)前如果有寫鎖,并且寫鎖線程不是自己就進(jìn)入讀鎖獲取邏輯
            這里面是有含義的包含了鎖降級的支持,
            假設(shè)一個線程獲得了寫鎖,那么第一個條件是成立的,他會判斷持有線程是不是當(dāng)前線程,如果是當(dāng)前線程那么還是會執(zhí)行下面讀鎖獲取代碼。
            
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

這是那個困擾我地方,為什么跟上面方法一樣還要再來這么個方法,截止到目前我的理解是這樣的,之所以可以這么任性的死循環(huán)其實是因為讀鎖數(shù)量限制造成的,以往比如信號量,他雖然會通過共享模式提高并發(fā)線程數(shù),但是并發(fā)量會被嚴(yán)格限制,讀鎖比較特殊在源碼一開始就有一段話

This lock supports a maximum of 65535 recursive write locks
and 65535 read locks. Attempts to exceed these limits result in
*{@link Error} throws from locking methods.

正是因為這個近乎不限制并發(fā)讀數(shù)量的做法才允許這么任性的死循環(huán),因為這個并發(fā)獲得讀鎖其實是很快的,所以不需要進(jìn)入阻塞隊列進(jìn)行排隊,但是這里里面有一個前提是當(dāng)前沒有被寫鎖阻塞,因為如果寫鎖阻塞大量的循環(huán)是無效的依然會浪費性能,還是要去排隊的。代碼如下,就是為了在一次獲得


final int fullTryAcquireShared(Thread current) {
           /*
            * This code is in part redundant with that in
            * tryAcquireShared but is simpler overall by not
            * complicating tryAcquireShared with interactions between
            * retries and lazily reading hold counts.
            */
           HoldCounter rh = null;
           for (;;) {
               int c = getState();
               if (exclusiveCount(c) != 0) {
                   if (getExclusiveOwnerThread() != current)
                       return -1;
                   // else we hold the exclusive lock; blocking here
                   // would cause deadlock.
               } else if (readerShouldBlock()) {
                   // Make sure we're not acquiring read lock reentrantly
                   if (firstReader == current) {
                       // assert firstReaderHoldCount > 0;
                   } else {
                       if (rh == null) {
                           rh = cachedHoldCounter;
                           if (rh == null || rh.tid != getThreadId(current)) {
                               rh = readHolds.get();
                               if (rh.count == 0)
                                   readHolds.remove();
                           }
                       }
                       if (rh.count == 0)
                           return -1;
                   }
               }
               if (sharedCount(c) == MAX_COUNT)
                   throw new Error("Maximum lock count exceeded");
               if (compareAndSetState(c, c + SHARED_UNIT)) {
                   if (sharedCount(c) == 0) {
                       firstReader = current;
                       firstReaderHoldCount = 1;
                   } else if (firstReader == current) {
                       firstReaderHoldCount++;
                   } else {
                       if (rh == null)
                           rh = cachedHoldCounter;
                       if (rh == null || rh.tid != getThreadId(current))
                           rh = readHolds.get();
                       else if (rh.count == 0)
                           readHolds.set(rh);
                       rh.count++;
                       cachedHoldCounter = rh; // cache for release
                   }
                   return 1;
               }
           }
       }

另外還有兩個嘗試獲得鎖的方法一個是嘗試獲得寫鎖,一個是嘗試獲得讀鎖,基礎(chǔ)邏輯為不進(jìn)行是否應(yīng)該阻塞當(dāng)前線程判斷,而是直接進(jìn)行嘗試,加鎖成功則返回true否則返回false。這里面還是要特別注意讀鎖的獲取依然采用的循環(huán)方式,還是因為不限制讀鎖數(shù)量的原因

 final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
 final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                當(dāng)當(dāng)前沒有寫鎖時候就一直進(jìn)行循環(huán)添加當(dāng)前線程讀鎖進(jìn)行嘗試
                int c = getState();
                鎖降級
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

最后一個注意點就是condition的支持,寫鎖支持condition,讀鎖不支持
可以通過代碼就看的出來
讀鎖中獲得conditon方法

        
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }

寫鎖中獲得condition方法

      public Condition newCondition() {
            return sync.newCondition();
        }

其他方法就不進(jìn)行說明了,都是一些統(tǒng)計或者判斷的方法,避免沒有重點。
最后做一個總結(jié)吧

1、可重入讀寫鎖利用state變量標(biāo)記競爭狀態(tài)具體是高16位用于讀鎖分配計數(shù)低16位用于寫鎖重入計數(shù)
2、讀鎖的最大并發(fā)量是65535個讀并發(fā),寫鎖支持65535次重入。
3、寫鎖的獲取與重入跟ReetrantLock區(qū)別不算大,都是通過state進(jìn)行重入計數(shù)的
4、鎖關(guān)系總結(jié)如下,讀鎖被持有則寫鎖獲取失敗、寫鎖被持有則其他線程獲取讀鎖或者寫鎖失敗,但是本線程可以支持獲取讀鎖(鎖降級)。讀鎖與讀鎖是可以并發(fā)獲取的。
5、讀鎖的重入計數(shù)是通過ThreadLocal里面的HoldCounter進(jìn)行存儲的。為了提升對比效率做了一些對象的緩存firsterReader,firstReaderHoldCount、cachedHoldCounter

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容