java并發(fā)之ReentrantReadWriteLock

java并發(fā)之ReentrantReadWriteLock

知識(shí)導(dǎo)讀

  • 讀寫鎖內(nèi)部維護(hù)了兩個(gè)分離的鎖,讀鎖和寫鎖,兩個(gè)鎖共用一個(gè)AQS實(shí)現(xiàn)。state的高16位記錄讀鎖資源占用,低16位記錄寫鎖資源占用。讀鎖基于AQS的共享模式實(shí)現(xiàn),寫鎖基于AQS的獨(dú)占模式實(shí)現(xiàn)
  • 讀鎖和寫鎖都是可重入,提供公平模式和非公平模式的。非公平模式的讀鎖要優(yōu)先等待隊(duì)列中頭部是寫鎖的線程去獲取寫鎖。
  • 當(dāng)一個(gè)線程持有寫鎖的時(shí)候,允許當(dāng)前線程再添加讀鎖,鎖降級(jí);當(dāng)一個(gè)線程持有讀鎖的時(shí)候不允許再添加寫鎖,即不允許鎖升級(jí)。鎖釋放的時(shí)候需要按照加鎖順序釋放

原理

  1. ReentrantReadWriteLock持有兩把鎖,readerLock和writerLock
    • readerLock是基于AQS共享模式的可重入、可共享的鎖。提供了公平和非公平兩種加鎖模式
    • writerLock是基于AQS獨(dú)占模式的可重入鎖。提供了公平和非公平兩種加鎖模式
  2. ReentrantReadWriteLock的內(nèi)部類Sync繼承了AQS,覆寫實(shí)現(xiàn)了獨(dú)占模式和共享模式獲取同步狀態(tài)和釋放資源的邏輯,readerLock和writerLock是基于同一個(gè)Sync實(shí)例實(shí)現(xiàn)的。Sync中state變量的高16位記錄讀鎖的資源獲取情況,state變量的低16位記錄寫鎖的資源獲取情況。
  3. 當(dāng)一個(gè)線程持有寫鎖的時(shí)候,允許當(dāng)前線程再添加讀鎖,鎖降級(jí);當(dāng)一個(gè)線程持有讀鎖的時(shí)候不允許再添加寫鎖,即不允許鎖升級(jí)。鎖釋放的時(shí)候需要按照加鎖順序釋放
  4. 添加讀鎖后不允許添加寫鎖,個(gè)人感覺(jué)原因是讀鎖與讀鎖是可以共存的,當(dāng)加了讀鎖后再加寫鎖需要其他線程持有的讀鎖全部釋放,單純的只看添加讀鎖的是否是本線程不行。不過(guò)也能做啊,為啥不支持,還是想不通,判斷一下讀鎖的state數(shù)量和firstReaderHoldCount數(shù)一致就能判斷出當(dāng)前讀鎖是否只被當(dāng)前線程持有
  5. 獲取讀、寫鎖失敗被阻塞的線程都會(huì)添加到同一個(gè)隊(duì)列中,當(dāng)讀鎖很多的時(shí)候,容易反生寫鎖饑餓的情況

注意:state變量是一個(gè)int值,高16位和低16位分別表示讀鎖和寫鎖的資源數(shù)量,所以讀、寫鎖的資源都有個(gè)最大值,2的16次方-1

鎖降級(jí)實(shí)現(xiàn)

class CashData {
    Object obj;
    volatile boolean cacheValid;
    ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public Object load() {
        rwl.readLock().lock();
        if (!cacheValid) {
            // 釋放讀鎖
            rwl.readLock().unlock();
            // 要進(jìn)行賦值,添加寫鎖
            rwl.writeLock().lock();
            try {
                if (!cacheValid) {
                    cacheValid = true;
                    obj = 1;
                }
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); // Unlock write, still hold read
            }
        }
        rwl.readLock().unlock();
        return obj;
    }
}

源碼分析

基本構(gòu)造

ReentrantReadWriteLock內(nèi)部持有兩個(gè)鎖實(shí)現(xiàn)。api都是調(diào)用這兩個(gè)鎖的api,ReentrantReadWriteLock只是一個(gè)外層包裝。在創(chuàng)建ReentrantReadWriteLock實(shí)例的時(shí)候會(huì)初始化一個(gè)Sync實(shí)例,讀鎖和寫鎖都是基于Sync實(shí)現(xiàn)。

private final ReentrantReadWriteLock.ReadLock readerLock;//讀鎖
private final ReentrantReadWriteLock.WriteLock writerLock;//寫鎖
//AQS實(shí)現(xiàn)類,基于該類實(shí)現(xiàn)
final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    //創(chuàng)建讀、寫鎖的時(shí)候會(huì)使用 同一個(gè)Sync來(lái)創(chuàng)建
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

ReadLock實(shí)現(xiàn)了Lock接口,基于ReentrantReadWriteLock的內(nèi)部類Sync的共享模式模式實(shí)現(xiàn),每次加鎖需要修改state的高16位值+1

public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
     //使用AQS的共享模式模式
    public void lock() {
        sync.acquireShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryReadLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

WriteLock實(shí)現(xiàn)了Lock接口,基于ReentrantReadWriteLock的內(nèi)部類Sync的獨(dú)占模式實(shí)現(xiàn),每次加鎖需要修改state的低16位值+1

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
    //使用AQS的獨(dú)占模式
    public void lock() {
        sync.acquire(1);
    }
        //使用AQS的獨(dú)占模式
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
        //使用AQS的獨(dú)占模式
    public boolean tryLock( ) {
        return sync.tryWriteLock();
    }
    //使用AQS的獨(dú)占模式
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    //使用AQS的獨(dú)占模式
    public void unlock() {
        sync.release(1);
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

Sync實(shí)現(xiàn)

ReentrantReadWriteLock中的讀、寫鎖各個(gè)方法的實(shí)現(xiàn)都依賴內(nèi)部類Sync。Sync繼承了AQS,覆寫了AQS的獨(dú)占模式方法和共享模式方法

Sync內(nèi)部構(gòu)造

abstract static class Sync extends AbstractQueuedSynchronizer {
  static final int SHARED_SHIFT   = 16;
  static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
  static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  //獲取當(dāng)前讀鎖數(shù)量(共享+重入),高16位的數(shù)值
  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
  //獲取當(dāng)前寫鎖數(shù)量(重入),低16位的數(shù)值
  static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
  Sync() {
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
  }
}

獨(dú)占模式(寫鎖實(shí)現(xiàn))

獨(dú)占模式加鎖

Sync覆寫AQS的tryAcquire方法,只允許一個(gè)線程獲取寫鎖的同步狀態(tài)成功

  1. 如果當(dāng)前存在讀鎖,不允許添加寫鎖,無(wú)論是否持有讀鎖的線程是否為本線程,返回獲取同步狀態(tài)失敗,當(dāng)前線程入隊(duì)自旋阻塞
  2. 如果當(dāng)前有其他線程持有寫鎖,返回獲取同步狀態(tài)失敗,當(dāng)前線程入隊(duì)自旋阻塞
  3. 如果持有寫鎖的是當(dāng)前線程,重入獨(dú)占鎖,直接設(shè)置state值
  4. 如果寫鎖的數(shù)量超過(guò)允許的最大值,則返回獲取同步狀態(tài)失敗
  5. CAS設(shè)置state值
    1. CAS設(shè)置state失敗,則返回獲取同步狀態(tài)失敗
    2. CAS設(shè)置state成功,則獲取同步狀態(tài)成功,設(shè)置持有寫鎖的線程為當(dāng)前線程,返回true,當(dāng)前線程繼續(xù)執(zhí)行
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);//獲取當(dāng)前寫鎖數(shù)量
    if (c != 0) {
        //有其他線程持有讀鎖,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //重入
        setState(c + acquires);
        return true;
    }
    //公平鎖和非公平鎖邏輯 由writerShouldBlock方法來(lái)判斷
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    //獲取同步狀態(tài)成功,設(shè)置持有鎖的線程為當(dāng)前線程
    setExclusiveOwnerThread(current);
    return true;
}

非公平鎖NonfairSync的writerShouldBlock方法直接返回false,可以直接CAS設(shè)置state值來(lái)獲取同步狀態(tài)
公平鎖FairSync的writerShouldBlock調(diào)用AQS的hasQueuedPredecessors方法來(lái)判斷,跟ReenTrantLock一樣,判斷AQS同步隊(duì)列中是否存在排在該線程之前的的節(jié)點(diǎn),保證先入隊(duì)的先執(zhí)行

獨(dú)占模式釋放鎖

Sync覆寫AQS的tryRelease方法,定義釋放資源的邏輯。由于是獨(dú)占模式,只會(huì)有一個(gè)線程同時(shí)執(zhí)行該方法,不會(huì)存在并發(fā)問(wèn)題

  1. 首先判斷如果不是持有寫鎖的線程,調(diào)用該方法直接拋出異常
  2. state的低16位存儲(chǔ)寫鎖數(shù),沒(méi)releae一次,低16位減去1,當(dāng)?shù)?6位數(shù)值為0的時(shí)候,釋放寫鎖成功,修改持有寫鎖的線程為null
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //判斷寫鎖數(shù)量是否為0,0的時(shí)候修改持有寫鎖的線程為null
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

共享模式(讀鎖實(shí)現(xiàn))

共享模式加鎖

Sync覆寫AQS的tryAcquireShared方法,定義獲取同步狀態(tài)的邏輯

  1. 存在寫鎖,如果持有寫鎖的是當(dāng)前線程則允許添加讀鎖,如果持有寫鎖的是其他線程則不允許添加讀鎖
  2. 調(diào)用readerShouldBlock來(lái)判斷是否公平與非公平
  3. 判斷讀鎖數(shù)量不要超過(guò)最大值,不能超過(guò)16位數(shù)值的最大值
  4. 存在并發(fā)則CAS設(shè)置state的值,如果成功則獲取讀鎖的同步狀態(tài)成功,返回1
  5. 如果獲取讀鎖的同步狀態(tài)失敗,調(diào)用fullTryAcquireShared方法,循環(huán)再次嘗試一遍
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //其他線程持有寫鎖,直接返回-1,獲取同步狀態(tài)失敗
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //獲取state的高16位數(shù)值(讀鎖數(shù)量)
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //讀鎖數(shù)量為0,第一次加讀鎖,設(shè)置firstReader為當(dāng)前線程
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { //讀鎖重入,如果當(dāng)前線程是第一個(gè)持有該讀鎖的線程,計(jì)數(shù)器+1
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
       //成功獲取同步狀態(tài),返回一個(gè)正整數(shù)或者0
        return 1;
    }
    //獲取同步狀態(tài)失敗 
    return fullTryAcquireShared(current);
}

fullTryAcquireShared方法實(shí)現(xiàn)基本與tryAcquireShared方法中的邏輯一樣,多了一層CAS設(shè)置失敗后再for循環(huán)重試

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            if (firstReader == current) {
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

公平鎖FairSync的readerShouldBlock調(diào)用AQS的hasQueuedPredecessors方法來(lái)判斷,跟ReenTrantLock一樣,判斷AQS同步隊(duì)列中是否存在排在該線程之前的的節(jié)點(diǎn),保證先入隊(duì)的先執(zhí)行

非公平鎖NonfairSync的readerShouldBlock實(shí)現(xiàn),如果AQS同步隊(duì)列中第一個(gè)是等待寫鎖的線程則返回true,當(dāng)前獲取讀鎖的線程先阻塞,優(yōu)先讓等待隊(duì)列的線程去獲取寫鎖

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}
共享模式釋放鎖

Sync覆寫AQS的tryReleaseShared方法,定義釋放資源的邏輯。當(dāng)前是共享模式,會(huì)有多個(gè)線程同時(shí)獲取執(zhí)行權(quán),所以該方法會(huì)存在多線程并發(fā)執(zhí)行的情況

  1. 修改firstReader和firstReaderHoldCount變量值,感覺(jué)是為了查詢方便
  2. for循環(huán)加CAS修改state的高16位值,當(dāng)state的高16位為0的時(shí)候,釋放成功返回true
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容