java并發(fā)編程(十八)啥是讀寫鎖ReentrantReadWriteLock?

前面我們學(xué)習(xí)了AQS,ReentrantLock等,現(xiàn)在來學(xué)習(xí)一下什么是讀寫鎖ReentrantReadWriteLock。

當(dāng)讀操作遠(yuǎn)遠(yuǎn)高于寫操作時(shí),這時(shí)候可以使用【讀寫鎖】讓【讀-讀】可以并發(fā),提高性能。

本文還是基于源碼的形式,希望同學(xué)們能夠以本文為思路,自己跟蹤源碼一步步的debug進(jìn)去,加深理解。

一、初識(shí)ReentrantReadWriteLock

同樣的,先看下其類圖:

image.png
  • 實(shí)現(xiàn)了讀寫鎖接口ReadWriteLock
  • 有5個(gè)內(nèi)部類,與ReentrantLock相同的是FairSyncNonfairSyncSync,另外不同的是增加兩個(gè)內(nèi)部類,都實(shí)現(xiàn)了Lock接口:
    • WriteLock
    • ReadLock
  • Sync 增加了兩個(gè)內(nèi)部類 :
    • HoldCounter:持有鎖的計(jì)數(shù)器
    • ThreadLocalHoldCounter :維護(hù)HoldCounter的ThreadLocal

二、使用案例

通常會(huì)維護(hù)一個(gè)操作數(shù)據(jù)的容器類,內(nèi)部應(yīng)該封裝好數(shù)據(jù)的read和write方法,如下所示:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @description: 數(shù)據(jù)容器類
 * @author:weirx
 * @date:2022/1/13 15:29
 * @version:3.0
 */
public class DataContainer {

    /**
     * 初始化讀鎖和寫鎖
     */
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    protected void read(){
        readLock.lock();
        try {
            System.out.println("獲取讀鎖");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println("釋放讀鎖");
        }
    }

    protected void write(){
        writeLock.lock();
        try {
            System.out.println("獲取寫鎖");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println("釋放寫鎖");
        }
    }
}

簡單測試一下,分為讀讀、讀寫、寫寫。

  • 讀讀:
    public static void main(String[] args) {
        //初始化數(shù)據(jù)容器
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.read();
        }, "t2").start();
    }

結(jié)果,讀讀不互斥,同時(shí)獲取讀鎖,同時(shí)釋放:

獲取讀鎖
獲取讀鎖
釋放讀鎖
釋放讀鎖
  • 讀寫:
    public static void main(String[] args) {
        //初始化數(shù)據(jù)容器
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();
    }

結(jié)果,讀寫互斥,無論是先執(zhí)行read還是write方法,都會(huì)等到讀鎖或?qū)戞i被釋放之后,才會(huì)獲取下一把鎖:

獲取讀鎖 -- 第一個(gè)執(zhí)行
釋放讀鎖 -- 第二個(gè)執(zhí)行
獲取寫鎖 -- 第三個(gè)執(zhí)行
釋放寫鎖 -- 第四個(gè)執(zhí)行
  • 寫寫:
    public static void main(String[] args) {
        //初始化數(shù)據(jù)容器
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.write();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();
    }

結(jié)果,寫寫互斥,只有第一把寫鎖釋放后,才能獲取下一把寫鎖:

獲取寫鎖
釋放寫鎖
獲取寫鎖
釋放寫鎖

注意:

  • 鎖重入時(shí),持有讀鎖再去獲取寫鎖,會(huì)導(dǎo)致寫鎖一直等待
        protected void read(){
          readLock.lock();
          try {
              System.out.println("獲取讀鎖");
              TimeUnit.SECONDS.sleep(1);
              System.out.println("獲取寫鎖");
              writeLock.lock();
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              readLock.unlock();
              System.out.println("釋放讀鎖");
          }
      }
    
    結(jié)果:不會(huì)釋放
    獲取讀鎖
    獲取寫鎖
    
  • 鎖重入時(shí),持有寫鎖,可以再去獲取讀鎖。
     protected void write(){
          writeLock.lock();
          try {
              System.out.println("獲取寫鎖");
              TimeUnit.SECONDS.sleep(1);
              System.out.println("獲取讀鎖");
              readLock.lock();
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              writeLock.unlock();
              System.out.println("釋放寫鎖");
          }
      }
    
    結(jié)果:
    獲取寫鎖
    獲取讀鎖
    釋放寫鎖
    

三、源碼分析

我們根據(jù)前面的例子,從讀鎖的獲取到釋放,從寫鎖的獲取到釋放,依次查看源碼。

先注意一個(gè)事情,讀寫鎖是以不同的位數(shù)來區(qū)分獨(dú)占鎖和共享鎖的狀態(tài)的:

       /*
         * 讀和寫分為上行下兩個(gè)部分,低16位是獨(dú)占鎖狀態(tài),高16位是共享鎖狀態(tài)
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 返回以count表示的共享持有數(shù) */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** 返回以count表示的互斥保持?jǐn)?shù)  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

3.1 讀鎖分析

3.1.1 讀鎖獲取

從 readLock.lock(); 這里進(jìn)入分析過程:

        /**
        * 獲取讀鎖。
        * 如果寫鎖沒有被另一個(gè)線程持有,則獲取讀鎖并立即返回。
        * 如果寫鎖被另一個(gè)線程持有,那么當(dāng)前線程將被禁用以用于線程調(diào)度目的并處于休眠狀態(tài),直到獲得讀鎖為止
        */
        public void lock() {
            sync.acquireShared(1);
        }

如上的lock方法,是ReentrantReadWriteLock子類ReadLock的方法,而acquireShared方法是在AQS的子類Syn當(dāng)中定義的,這個(gè)方法嘗試以共享的方式獲取讀鎖,失敗則進(jìn)入等待隊(duì)列, 不斷重試,直到獲取讀鎖為止。

    public final void acquireShared(int arg) {
        // 被其他線程持有的話,就走AQS的doAcquireShared
        if (tryAcquireShared(arg) < 0)
            // 獲取共享鎖,失敗加入等待隊(duì)列,不可中斷的獲取,直到獲取為止
            doAcquireShared(arg);
    }

tryAcquireShared是在ReentrantReadWriteLock當(dāng)中實(shí)現(xiàn)的,我們直接看代碼:

        protected final int tryAcquireShared(int unused) {
            // 獲取當(dāng)前線程
            Thread current = Thread.currentThread();
            // 獲取當(dāng)前鎖狀態(tài)
            int c = getState();
            // 獨(dú)占鎖統(tǒng)計(jì)不等于0 且 持有者不是當(dāng)前線程,就返回 -1 ,換句話說,被其他線程持有
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 共享鎖數(shù)量
            int r = sharedCount(c);
            // 返回fase才有資格獲取讀鎖
            if (!readerShouldBlock() &&
                // 持有數(shù)小于默認(rèn)值
                r < MAX_COUNT &&
                // CAS 設(shè)置鎖狀態(tài)
                compareAndSetState(c, c + SHARED_UNIT)) {
                // 持有共享鎖為0
                if (r == 0) {
                    // 第一個(gè)持有者是當(dāng)前線程
                    firstReader = current;
                    // 持有總數(shù)是 1 
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 持有鎖的是當(dāng)前線程本身,就把技術(shù) + 1
                    firstReaderHoldCount++;
                } else {
                    // 獲取緩存計(jì)數(shù)
                    HoldCounter rh = cachedHoldCounter;
                    // 如果是null 或者 持有線程的id不是當(dāng)前線程
                    if (rh == null || rh.tid != getThreadId(current))
                        // 賦值給緩存
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        // rh不是null ,且是當(dāng)前線程,就把讀鎖持有者設(shè)為緩存中的值
                        readHolds.set(rh);
                    // 將其 + 1
                    rh.count++;
                }
                return 1;
            }
            // 想要獲取讀鎖的線程應(yīng)該被阻塞,保底工作,處理 CAS 未命中和在 tryAcquireShared 中未處理的重入讀取
            return fullTryAcquireShared(current);
        }

從上面的源碼我們可以看得出來,寫鎖和讀鎖之間是互斥的。

3.1.2 讀鎖釋放

直接看關(guān)鍵部分

    /**
      * 以共享模式釋放鎖,tryReleaseShared返回true,則釋放
      */
    public final boolean releaseShared(int arg) {
        // 釋放鎖
        if (tryReleaseShared(arg)) {
            // 喚醒隊(duì)列的下一個(gè)線程
            doReleaseShared();
            return true;
        }
        return false;
    }

看看讀寫鎖的tryReleaseShared實(shí)現(xiàn):

        protected final boolean tryReleaseShared(int unused) {
            //。。。省略。。。
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // 讀鎖的計(jì)數(shù)不會(huì)影響其它獲取讀鎖線程, 但會(huì)影響其它獲取寫鎖線程
                    // 計(jì)數(shù)為 0 才是真正釋放
                    return nextc == 0;
            }
        }

如果上述方法釋放成功,則走下面AQS繼承來的方法:

    private void doReleaseShared() {
        // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個(gè)節(jié)點(diǎn) unpark
        // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
              // 如果有其它線程也在釋放讀鎖,那么需要將 waitStatus 先改為 0
              // 防止 unparkSuccessor 被多次執(zhí)行
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 如果已經(jīng)是 0 了,改為 -3,用來解決傳播性
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }

3.2 寫鎖分析

3.2.1 獲取鎖

    public final void acquire(int arg) {
        // 嘗試獲得寫鎖失敗
        if (!tryAcquire(arg) &&
                        // 將當(dāng)前線程關(guān)聯(lián)到一個(gè) Node 對(duì)象上, 模式為獨(dú)占模式
                        // 進(jìn)入 AQS 隊(duì)列阻塞
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) {
            selfInterrupt();
        }
    }

讀寫鎖的上鎖方法:tryAcquire

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
           // 獲得低 16 位, 代表寫鎖的 state 計(jì)數(shù)
            int w = exclusiveCount(c);
            if (c != 0) {
                // 如果寫鎖是0 或者 當(dāng)前線程不等于獨(dú)占線程,獲取失敗
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 寫鎖計(jì)數(shù)超過低 16 位, 報(bào)異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 寫鎖重入, 獲得鎖成功
                setState(c + acquires);
                return true;
            }
            // 寫鎖應(yīng)該阻塞
            if (writerShouldBlock() ||
                //    更改計(jì)數(shù)失敗  
                !compareAndSetState(c, c + acquires))
                // 獲取鎖失敗
                return false;
            // 設(shè)置當(dāng)前線程獨(dú)占鎖
            setExclusiveOwnerThread(current);
            return true;
        }

3.2.2 釋放鎖

release:

    public final boolean release(int arg) {
        // 嘗試釋放寫鎖成功
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease:

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        // 因?yàn)榭芍厝氲脑? 寫鎖計(jì)數(shù)為 0, 才算釋放成功
        boolean free = exclusiveCount(nextc) == 0;
        if (free) {
            setExclusiveOwnerThread(null);
        }
        setState(nextc);
        return free;
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容