并發(fā)編程之鎖(四)--ReentrantReadWriteLock

前言

上一篇中已經(jīng)分析了ReentrantLock,下面我們來看一下讀寫鎖ReentrantReadWriteLock。
在這之前,先看一下其結(jié)構(gòu)圖:


ReadLock/WriteLock

  • ReadLock
//使用ReentrantReadWriteLock的Sync對象
protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}

public void lock() {
    sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
    return sync.tryReadLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void unlock() {
    sync.releaseShared(1);
}
//不支持條件變量
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

public String toString() {
    int r = sync.getReadLockCount();
    return super.toString() +
        "[Read locks = " + r + "]";
}

  • WriteLock
protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
}

public void lock() {
    sync.acquire(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public void unlock() {
    sync.release(1);
}

public Condition newCondition() {
    return sync.newCondition();
}

public String toString() {
    Thread o = sync.getOwner();
    return super.toString() + ((o == null) ?
                               "[Unlocked]" :
                               "[Locked by thread " + o.getName() + "]");
}

public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
}

public int getHoldCount() {
    return sync.getWriteHoldCount();
}

WriteLock的代碼,類似ReadLock的代碼,差別在于獨(dú)占式獲取同步狀態(tài)。

Sync抽象類

Sync是ReentrantReadWriteLock的靜態(tài)內(nèi)部類,繼承自AbstractQueuedSynchronizer的抽象類。它使用AQS的state字來表示當(dāng)前鎖的持有數(shù)量,其中state的高16位表示讀狀態(tài),即獲取該讀鎖的次數(shù),低16位表示獲取到寫鎖的線程的可重入次數(shù)。

static final int SHARED_SHIFT   = 16; // 位數(shù)
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; // 每個(gè)鎖的最大重入次數(shù),65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//寫鎖的標(biāo)記,用來計(jì)算寫鎖的重入次數(shù)

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//第一個(gè)獲得讀鎖的線程
private transient Thread firstReader = null;
//firstReader的重入次數(shù)
private transient int firstReaderHoldCount;
//當(dāng)前線程持有的可重入數(shù)量
private transient ThreadLocalHoldCounter readHolds;
//成功獲取讀鎖的的最后一個(gè)線程的計(jì)數(shù)器
private transient HoldCounter cachedHoldCounter;
/**
  * 計(jì)數(shù)器,主要存儲(chǔ)線程id和重入次數(shù)
  **/
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    // 避免 HoldCounter 和 ThreadLocal 互相綁定而導(dǎo)致 GC 難以釋放它們
    final long tid = getThreadId(Thread.currentThread());
}
static final long getThreadId(Thread thread) {
    return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
Sync() {
    //ThreadLocal子類,表示當(dāng)前線程持有的可重入讀鎖的數(shù)量
    readHolds = new ThreadLocalHoldCounter();
    setState(getState()); // ensures visibility of readHolds
}

abstract boolean readerShouldBlock();

abstract boolean writerShouldBlock();
//寫鎖釋放
protected final boolean tryRelease(int releases) {
    //1. 如果釋放的線程不為鎖的持有者,直接拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 寫鎖的重入次數(shù),如果為0,釋放持有的線程
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
//寫鎖獲取。針對寫鎖,因?yàn)橐_保寫鎖的操作對讀鎖是可見的。
//如果在存在讀鎖的情況下允許獲取寫鎖,那么那些已經(jīng)獲取讀鎖的其他線程可能就無法感知當(dāng)前寫線程的操作。
//因此只有等讀鎖完全釋放后,寫鎖才能夠被當(dāng)前線程所獲取,一旦寫鎖獲取了,所有其他讀、寫線程均會(huì)被阻塞。
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    //1. 計(jì)算寫鎖的重入次數(shù)
    int c = getState();
    int w = exclusiveCount(c);
    //2. 如果存在鎖
    if (c != 0) {
        //w == 0 表示僅存在讀鎖不存在寫鎖
        //2.1  如果只存在讀鎖或者存在寫鎖但是當(dāng)前線程不是已經(jīng)獲取寫鎖的線程,直接返回失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //2.2. 如果寫鎖可重入次數(shù)超出最大范圍,拋出異常
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //2.3. 設(shè)置狀態(tài),獲取成功
        setState(c + acquires);
        return true;
    }
    //3. 使用CAS設(shè)置狀態(tài),如果失敗返回失敗(#writerShouldBlock()在非公平鎖下永遠(yuǎn)是false,
    //在公平鎖下需要判斷是否有前驅(qū)節(jié)點(diǎn),上一篇中提到過 )
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
//讀鎖釋放
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //1. 如果想要釋放鎖的線程為第一個(gè)獲取讀鎖的線程
    if (firstReader == current) {
        // 將計(jì)數(shù)器至空或者減1
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        //2. 判斷最后一次獲取到讀鎖的線程是否是當(dāng)前線程,如果不是從readholds中獲取
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //3. 自旋,設(shè)置狀態(tài)
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

private IllegalMonitorStateException unmatchedUnlockException() {
    return new IllegalMonitorStateException(
        "attempt to unlock read lock, not locked by current thread");
}
//讀鎖獲取
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果存在寫鎖,且鎖的持有者不是當(dāng)前線程,直接返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //2. 獲取讀鎖重入次數(shù)
    int r = sharedCount(c);
    /*
     * readerShouldBlock():讀鎖是否需要等待(公平鎖原則)
     * r < MAX_COUNT:持有線程小于最大數(shù)(65535)
     * compareAndSetState(c, c + SHARED_UNIT):設(shè)置讀取鎖狀態(tài)
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //3. 如果重入次數(shù)為0,firstReader = current;如果firstReader == current, firstReaderHoldCount++;
        //否則從緩存中獲取
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
//獲取讀鎖的完整版本,處理CAS未命中和tryAcquireShared中未處理的重入讀取
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            //有寫鎖的前提下,線程持有者不是當(dāng)前線程
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        // 讀鎖需要阻塞,判斷是否當(dāng)前線程已經(jīng)獲取到讀鎖
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        //讀鎖超出最大范圍
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS設(shè)置讀鎖成功
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
//嘗試獲取寫鎖
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        //寫鎖的數(shù)量
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
//嘗試獲取讀鎖
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
    return new ConditionObject();
}

final Thread getOwner() {
    return ((exclusiveCount(getState()) == 0) ?
            null :
            getExclusiveOwnerThread());
}

final int getReadLockCount() {
    return sharedCount(getState());
}

final boolean isWriteLocked() {
    return exclusiveCount(getState()) != 0;
}

final int getWriteHoldCount() {
    return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

final int getReadHoldCount() {
    if (getReadLockCount() == 0)
        return 0;

    Thread current = Thread.currentThread();
    if (firstReader == current)
        return firstReaderHoldCount;

    HoldCounter rh = cachedHoldCounter;
    if (rh != null && rh.tid == getThreadId(current))
        return rh.count;

    int count = readHolds.get().count;
    if (count == 0) readHolds.remove();
    return count;
}
//自定義序列化邏輯
private void readObject(java.io.ObjectInputStream s)
    throws java.io.IOException, ClassNotFoundException {
    s.defaultReadObject();
    readHolds = new ThreadLocalHoldCounter();
    setState(0); // reset to unlocked state
}

final int getCount() { return getState(); }

Sync的實(shí)現(xiàn)類

  • NofairSync實(shí)現(xiàn)類
final boolean writerShouldBlock() {
    return false; // writers can always barge
}
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    //頭節(jié)點(diǎn)不為空且下一個(gè)節(jié)點(diǎn)也不為空且是獨(dú)占鎖且下一個(gè)節(jié)點(diǎn)的線程也不為空
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

由于寫鎖是獨(dú)占排它鎖,所以在非公平鎖的情況下,需要調(diào)用 AQS 的 #apparentlyFirstQueuedIsExclusive()方法,判斷是否當(dāng)前寫鎖已經(jīng)被獲取。

  • FairSync實(shí)現(xiàn)類
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

ReentrantReadWriteLock

  • 構(gòu)造方法
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;

public ReentrantReadWriteLock() {
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

ReentrantReadWriteLock的默認(rèn)構(gòu)造方法初始化了非公平鎖的實(shí)現(xiàn),同時(shí)也初始化了readerLock,writerLock。

總結(jié)

1. Java中tid指的是什么?
線程的tid,這里不直接使用#Thread.getId()是因?yàn)?getId()方法不是final,容易被子類覆蓋。
2. 什么是鎖降級(jí)?
鎖降級(jí)就意味著寫鎖是可以降級(jí)為讀鎖的,但是需要遵循先獲取寫鎖、獲取讀鎖再釋放寫鎖的次序。注意如果當(dāng)前線程先獲取寫鎖,然后釋放寫鎖,再獲取讀鎖這個(gè)過程不能稱之為鎖降級(jí),鎖降級(jí)一定要遵循那個(gè)次序。
鎖降級(jí)中讀鎖的獲取釋放為必要?
肯定是必要的。假如當(dāng)前線程 A 不獲取讀鎖而是直接釋放了寫鎖,這個(gè)時(shí)候另外一個(gè)線程 B 獲取了寫鎖,那么這個(gè)線程 B 對數(shù)據(jù)的修改是不會(huì)對當(dāng)前線程 A 可見的。如果獲取了讀鎖,則線程B在獲取寫鎖過程中判斷如果有讀鎖還沒有釋放則會(huì)被阻塞,只有當(dāng)前線程 A 釋放讀鎖后,線程 B 才會(huì)獲取寫鎖成功。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容