并發(fā)九:ReentrantReadWriteLock實(shí)現(xiàn)分析

ReentrantReadWriteLock

ReentrantReadWriteLock具有ReentrantLock的特性,支持重入和公平性設(shè)置,但是對讀寫進(jìn)行了分離。
讀操作采用共享鎖,寫操作采用獨(dú)占鎖,即一個(gè)資源可以有多個(gè)線程同時(shí)進(jìn)行讀操作,但是只能有一個(gè)線程進(jìn)行寫操作。
在讀多寫少的情況下ReentrantReadWriteLock可以極大的提高吞吐量。

ReentrantReadWriteLock:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    /** 讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 同步器實(shí)例 */
    final Sync sync;
    /** 父類同步器 */
    abstract static class Sync extends AbstractQueuedSynchronizer {... ...}
    /** 非公平鎖同步器 */
    static final class NonfairSync extends Sync {... ...}
    /** 公平鎖同步器 */
    static final class FairSync extends Sync {
    /** 構(gòu)造 */
    public ReentrantReadWriteLock() {
        this(false);
    }
    /** 構(gòu)造 */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    /** 工廠方法獲取 寫鎖 */
    public ReentrantReadWriteLock.WriteLock writeLock() { 
        return writerLock; 
    }
    /** 工廠方法獲取 讀鎖 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { 
        return readerLock; 
    }
    ... ...
}

ReentrantReadWriteLock內(nèi)部實(shí)現(xiàn)了三個(gè)同步器,與ReentrantLock不同的是,NonfairSync和FairSync的加鎖方法都是調(diào)用父類Sync的tryAcquire方法,子類中只實(shí)現(xiàn)了獲取公平策略的方法writerShouldBlock和readerShouldBlock。
Sync:

abstract static class Sync extends AbstractQueuedSynchronizer {
    static final int SHARED_SHIFT = 16;
    static final int SHARED_UNIT = (1 << SHARED_SHIFT);
    static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    /** 共享數(shù)量 讀鎖 高16位*/
    static int sharedCount(int c) {
        return c >>> SHARED_SHIFT;
    }
    /** 獨(dú)占數(shù)量 寫鎖 低16位*/
    static int exclusiveCount(int c) {
        return c & EXCLUSIVE_MASK;
    }
    /** 重入計(jì)數(shù)器 */
    static final class HoldCounter {
        int count = 0;
        final long tid = Thread.currentThread().getId();
    }
    /** 重入計(jì)數(shù)器ThreadLocal */
    static final class ThreadLocalHoldCounter 
                    extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    /** 重入計(jì)數(shù)器ThreadLocal實(shí)例 */
    private transient ThreadLocalHoldCounter readHolds;
    /** 最近一個(gè)成功獲取讀鎖的線程的計(jì)數(shù)。
    這省卻了ThreadLocal查找 緩存*/
    private transient HoldCounter cachedHoldCounter;
    /** 針對只有一個(gè)讀鎖的優(yōu)化處理 線程 */
    private transient Thread firstReader = null;
    /** 針對只有一個(gè)讀鎖的優(yōu)化處理 重入計(jì)數(shù)器 */
    private transient int firstReaderHoldCount;
    Sync() {
        // 重入計(jì)數(shù)器容器
        readHolds = new ThreadLocalHoldCounter();
        // 保證readHolds的可見性,因?yàn)閟tate 是volatile修飾的
        setState(getState()); 
    }
    // 讀公平策略 交由子類實(shí)現(xiàn)
    abstract boolean readerShouldBlock();
    // 寫公平策略 交由子類實(shí)現(xiàn)
    abstract boolean writerShouldBlock();
    ... ...
}
/**非公平鎖同步器*/
static final class NonfairSync extends Sync {
    //寫公平策略 ,非公平鎖中直接返回false
    final boolean writerShouldBlock() {
        return false; 
    }
   //讀公平策略 ,判斷同步隊(duì)列中隊(duì)頭等待的節(jié)點(diǎn)是否是獨(dú)占節(jié)點(diǎn),
   //也就判斷前面是否有寫鎖在等待被喚醒。
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}
/**公平鎖同步器*/
static final class FairSync extends Sync {
    //寫公平策略 ,判斷隊(duì)列中是否有等待的節(jié)點(diǎn)
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
   //讀公平策略 ,判斷隊(duì)列中是否有等待的節(jié)點(diǎn)
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

ReadLock和WriteLock使用同一個(gè)同步器,這樣就需要解決state不夠用的問題,因?yàn)閟tate既要標(biāo)示讀鎖的數(shù)量,又要標(biāo)示寫鎖的數(shù)量,所以將state變量一分為二,高位16位表示讀鎖的數(shù)量,低位16位表示寫鎖的數(shù)量。
讀鎖存在多個(gè),state的高16位記錄鎖的數(shù)量,重入次數(shù)存在每個(gè)持有線程的ThreadLocal中,即ThreadLocalHoldCounter。

寫鎖加鎖流程:

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState(); // s1
            int w = exclusiveCount(c);// 寫鎖數(shù)量
            if (c != 0) { // s2
                // 有讀鎖存在或者 獨(dú)占線程非當(dāng)前線程 
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)// 檢查重入次數(shù)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            } // s3
              // writerShouldBlock 子類實(shí)現(xiàn) 主要實(shí)現(xiàn)公平策略
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);//持有線程
            return true;
        }

s1:c!=0轉(zhuǎn)入s2,c==轉(zhuǎn)入s3

s2:c!=0說明存在鎖
w==0說明寫鎖的數(shù)量為0,那么一定存在讀鎖,有讀鎖存在不允許獲取寫鎖因?yàn)槿绻粋€(gè)線程在讀另外一個(gè)線程寫入,會(huì)出現(xiàn)數(shù)據(jù)不一致引起臟讀,返回false。
w不等于0說明存在寫鎖,只有線程重入才能獲取鎖,否則返回false,因?yàn)閷戞i是獨(dú)占鎖,不允許兩個(gè)線程同時(shí)寫。
如果重入次數(shù)大于MAX_COUNT,拋出Error,直接退出程序了。
進(jìn)入重入邏輯,將state設(shè)為c+1,返回true,獲取鎖成功。

s3:c==0說明不存在鎖,首次獲取寫鎖
調(diào)用公平性策略方法,如果此時(shí)鎖為公平鎖并且同步隊(duì)列中有等待的節(jié)點(diǎn),就直接返回false。
如果此時(shí)鎖為非公平鎖,獲取成功,返回true。

寫鎖解鎖邏輯與ReentrantLock一樣,這里不再累述。

讀鎖加鎖流程:

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            // 存在寫鎖(獨(dú)占鎖) 并且請求線程非寫鎖的持有線程
            // s1
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);// 讀鎖計(jì)數(shù)
            // s2
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                // s3
                if (r == 0) {// 只有一個(gè)讀鎖,不動(dòng)用readHolds
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {//重入
                    firstReaderHoldCount++;
                } else {//多個(gè)讀鎖,啟用ThreadLocal記錄重入次數(shù) s4
                    HoldCounter rh = cachedHoldCounter;// 訪問緩存
                    if (rh == null || rh.tid != current.getId())
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 自旋重試 s5
            return fullTryAcquireShared(current);
        }

s1: exclusiveCount(c)!=0 說明存在寫鎖并且持有該鎖的線程不是當(dāng)前線程,直接返回-1,獲取鎖失敗。因?yàn)槿绻粋€(gè)線程在寫,這時(shí)候另外一個(gè)線程讀取很有可能讀取不一致的臟數(shù)據(jù)。

s2: 調(diào)用子類的公平性策略&&r < MAX_COUNT&&CAS設(shè)置State,如果三項(xiàng)操作都返回true轉(zhuǎn)入s3,否則轉(zhuǎn)入s5。

s3: r == 0說明沒有讀鎖,是首次獲取讀鎖,直接將firstReader設(shè)置為當(dāng)前線程,重入次數(shù)firstReaderHoldCount設(shè)為1,獲取鎖成,返回1。
r!=0且firstReader等于當(dāng)前線程,說明是唯一的一個(gè)線程在重入,直接將firstReaderHoldCount累加,獲取鎖成,返回1。
r!=0且firstReader不等于當(dāng)前線程,轉(zhuǎn)入s4。

s4: cachedHoldCounter總是記錄最后一次獲取鎖的線程信息,這樣減少了查詢ThreadLocal的次數(shù),也提高了后續(xù)解鎖的效率。
如果cachedHoldCounter為當(dāng)前線程留下的并且重入次數(shù)count為0,先將cachedHoldCounter其放入ThreadLocal中。
如果cachedHoldCounter不是當(dāng)前線程留下的,從線程的ThreadLocal中獲取HoldCounter并將其賦給cachedHoldCounter。
累加cachedHoldCounter中的重入計(jì)數(shù)器。
獲取鎖成,返回1。

s5: fullTryAcquireShared邏輯與tryAcquireShared大致相同,使用for(;;)保證compareAndSetState(c, c + SHARED_UNIT)操作成功,因?yàn)榭赡苡衅渌€程在爭用,這里自旋等待其他線程操作完畢。

讀鎖解鎖方法:

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) { // s1
    if (firstReaderHoldCount == 1)
        firstReader = null;
    else
        firstReaderHoldCount--;
    } else { // s2
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != current.getId())
        rh = readHolds.get();
    int count = rh.count;
    if (count <= 1) {// 完全釋放讀鎖
        readHolds.remove();
        if (count <= 0)
        throw unmatchedUnlockException();
    }
    --rh.count;// 重入退出
    }
    // s3
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

s1: 如果firstReader(加鎖時(shí)的第一個(gè)線程)不是當(dāng)前線程轉(zhuǎn)入s3,當(dāng)前線程沒有重入直接firstReader=null,重入則將重入計(jì)數(shù)器firstReaderHoldCount減1,轉(zhuǎn)入s3。

s2、取出緩存計(jì)數(shù)器,如果cachedHoldCounter是當(dāng)前線留下的就從ThreadLock中取出,count <= 1說明這次釋放將是完全釋放,因此將重入計(jì)數(shù)器從ThreadLock中刪除。重入次數(shù)減1,轉(zhuǎn)入s3

s3、state的高位16位操作得到nextc,for(;;)循環(huán)中保障state設(shè)置成功,如果nextc為0說明讀鎖全部釋放。

鎖升級與降級

鎖升級是指持有讀鎖的線程,在讀鎖未釋放的時(shí)候再申請寫鎖
鎖降級是指持有寫鎖的線程,在寫鎖未釋放的時(shí)候再申請讀鎖
一個(gè)小栗子:

/** 鎖升級 */
public void upgrade() {
    try {
        readLock.lock();// 獲取讀鎖
        try {// 持有讀鎖
            writeLock.lock();// 再獲取寫鎖
        } finally {
            writeLock.unlock();// 釋放寫鎖
        }
    } finally {
        readLock.unlock();// 最后釋放讀鎖
    }
}
/** 鎖降級 */
public void downgrading() {
    try {
        writeLock.lock();// 獲取寫鎖
        try {// 持有寫鎖
            readLock.lock();// 再獲取讀鎖
        } finally {
            readLock.unlock();// 釋放讀鎖
        }
    } finally {
        writeLock.unlock();// 最后釋放寫鎖
    }
}

通過上面源碼的分析:
在"讀鎖加鎖流程s1處"可以看到持有寫鎖的線程可以再去獲取讀鎖。
在"寫鎖加鎖流程s2處"可以看到有讀鎖存在是不允許去獲取寫鎖的。

小結(jié)

  1. 讀鎖為共享鎖,可以運(yùn)行多個(gè)線程持有同一把讀鎖;寫鎖為獨(dú)占鎖,只允許一個(gè)線程持有,在讀多寫少的情況下ReentrantReadWriteLock可以極大的提高吞吐量
  2. 獲取讀鎖的條件:a、沒有寫鎖存在;b、有寫鎖存在,但是進(jìn)入讀鎖的線程是持有寫鎖的線程。
  3. 獲取寫鎖的條件,a、沒有寫鎖存在(重入除外);b、沒有讀鎖存在。
  4. ReentrantReadWriteLock支持鎖降級,不支持鎖升級。

碼字不易,轉(zhuǎn)載請保留原文連接http://www.itdecent.cn/p/2c5c19114463

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容