深入理解讀寫鎖ReentrantReadWriteLock

特點

  1. 讀讀共享
  2. 讀寫互斥
  3. 寫寫互斥

結(jié)構(gòu)

類圖

鎖的狀態(tài)表示

state

繼承AQS的類都需要使用state變量代表鎖的資源占用情況

image

高16位 表示讀鎖的上鎖次數(shù)(由于讀讀共享,可能由多個線程累加),

低16位 表示寫鎖的重入次數(shù), 由于寫寫互斥,讀寫互斥, 只能有一個線程重入累加而得

讀鎖 : ReadLock

lock() :加鎖

/**
 * Acquires the read lock.
 *
 * <p>Acquires the read lock if the write lock is not held by
 * another thread and returns immediately.
 *
 * <p>If the write lock is held by another thread then
 * the current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the read lock has been acquired.
 */
public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

Sync.tryAcquireShared():

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    // 鎖狀態(tài)
    int c = getState();
    /**
    *   exclusiveCount(c)
    *
    *   寫鎖的重入的次數(shù)  state & 11111 1111 1111 1111  
    *   static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    *   
    **/
    // 如果此時有寫鎖,寫鎖重入次數(shù)應(yīng)該 > 0 ,并且不是當前線程,由于讀寫互斥,寫寫互斥,不管當前方法是讀寫還是寫鎖,這種情況都會加鎖失敗
    // 如果是當前線程持有的寫鎖則 把寫鎖降級為讀鎖 
    // 如果當前線程并沒有寫鎖, 則直接 獲取讀鎖
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 獲取讀鎖上鎖次數(shù)
    // state 右移 16位,把 用于 表示寫鎖的加鎖次數(shù) 的低16位 給 抵消掉,剩下的就是 高16位
    int r = sharedCount(c);
    // 如果隊列中沒有線程在排隊
    // 這里會有公平非公平之分
    // 公平鎖 會判斷 隊列是否有線程在排隊
    // 非公平鎖 只判斷隊頭 是不是 寫鎖,是寫鎖 才會 等待
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        // cas
        // 讀鎖上鎖,直接  state + 1 0000 0000 0000 0000
                // SHARED_UNIT 是 1 左移 16位
                // 這樣就可以對表示讀鎖上鎖次數(shù)的高16位 + 1
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果上鎖次數(shù)為0,當前鎖沒有線程持有其讀鎖
        if (r == 0) {
            // 第一次上讀鎖的線程設(shè)置為當前線程
            firstReader = current;
            firstReaderHoldCount = 1;
            // 第一次上讀鎖的線程重入
        } else if (firstReader == current) {
            // 首次上讀鎖 線程重入次數(shù) + 1
            firstReaderHoldCount++;
        } else {
            // HoldCounter 主要有線程id和這個線程加鎖次數(shù)count, 用于統(tǒng)計某個線程  加鎖的次數(shù)
            // 從緩存拿HoldCounter對象
            HoldCounter rh = cachedHoldCounter;
            // 如果緩存等于空 或者  緩存的線程id和當前線程id不相等
            if (rh == null || rh.tid != getThreadId(current))
                // 從線程的threadLocal里HoldCounter拿對象,如果沒有,會初始化HoldCounter
                // 里面的count = 0,并且賦值給cachedHoldCounter
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            // 對當前線程的HoldCounter的加鎖次數(shù)進行 ++
            rh.count++;
        }
        return 1;
    }
    // 感覺跟上面代碼邏輯差不多 : cas 失敗,再來一遍
    return fullTryAcquireShared(current);
}

doAcquireShared :

自旋 ,自旋失敗 入隊,阻塞

private void doAcquireShared(int arg) {
    // 創(chuàng)建Node節(jié)點,節(jié)點類型為SHARED,持有當前線程
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 1. 如果當前節(jié)點的前驅(qū)節(jié)點時頭結(jié)點
            // 2. 如果阻塞的線程被喚醒,由于是無限循環(huán),會繼續(xù)走到這里,頭結(jié)點永遠是空Node,這時會繼續(xù)去拿鎖。由于是讀鎖,可以接著上鎖
            if (p == head) {
                // 1. 自旋一次
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 1. 自旋成功,設(shè)置新的Node為head,并喚醒后一個線程
                    // 2. 第二次循環(huán)進來會接著設(shè)置新的Node為head,并喚醒下一個等待讀鎖的線程
                    // 直到Node的狀態(tài)不是Shared為止
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 1. 把前驅(qū)節(jié)點設(shè)置為Signal狀態(tài),阻塞自身節(jié)點
            // 2. 如果線程被叫醒,會繼續(xù)循環(huán)
            // 3. 以此類推,一直喚醒等待讀鎖的線程,直到遇到不是SHARED狀態(tài)的節(jié)點。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

讀鎖 Node 結(jié)構(gòu) :

{thread:線程id,nextWaiter = new Node() },沒有waitStatus

setHeadAndPropagate

設(shè)置新的頭節(jié)點,并釋放后面的SHARED狀態(tài)的節(jié)點

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果當前節(jié)點的后面一個節(jié)點是Shared狀態(tài)
        if (s == null || s.isShared())
            // 釋放后 一個節(jié)點
            doReleaseShared();
    }
}

doReleaseShared : 釋放下一個線程(head.next)

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                                    // 叫醒下一個線程
                  unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

unparkSuccessor(h) :喚醒下一個線程

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 找到最近一個非CANCELLED(1)狀態(tài)的線程
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒它
        LockSupport.unpark(s.thread);
}

總結(jié)

  1. 讀鎖加鎖時,如果有寫鎖持有鎖則上鎖失敗,進入隊列排隊, 如果是讀鎖則可以共享這把鎖,不用排隊,只是多了一個計數(shù)的步驟。
  2. 首次加鎖失敗,入隊 后,如果自己是head, 會搶鎖一次,搶鎖成功,會喚醒隊列后面所有SHARED節(jié)點

unlock() : 解鎖

讀鎖狀態(tài) - 1,并判斷讀鎖是否完全釋放

public final boolean releaseShared(int arg) {
    // 讀鎖狀態(tài) - 1。并判斷是否完全釋放
    if (tryReleaseShared(arg)) {
        // 如果讀鎖完全釋放,那么喚醒下一個 等待寫鎖的線程 醒來搶寫鎖
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared(int unused)

state讀鎖狀態(tài)-1,其他讀鎖上鎖次數(shù) -1

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 如果當前線程是第一個上讀鎖的線程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        // 如果第一次上讀鎖的線程只上過一次讀鎖
        if (firstReaderHoldCount == 1)
            // 清空第一次上讀鎖的線程
            firstReader = null;
        else
            // 第一次上讀鎖的線程上鎖次數(shù) --
            firstReaderHoldCount--;
    } else {
        // 否則,從ThreadLocal中取出上讀鎖次數(shù)-1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        // 高16讀鎖狀態(tài)位 - 1   =  (- 1 << 16 : 1 0000 0000 0000 0000)
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            // 返回 是不是  讀鎖完全釋放。 讀鎖狀態(tài)為 = 0
            return nextc == 0;
    }
}

doReleaseShared() : 釋放下一個線程(head.next)

這個方法和讀鎖獲取到鎖之后,喚醒隊列中下一個等待讀鎖的線程是一樣的。如果下一個是SHARED狀態(tài)的話,會調(diào) doReleaseShared()。

而這里,如果讀鎖全部釋放,就會喚醒 隊列中 下一個等待寫鎖的 線程來獲取寫鎖。

總結(jié)

讀鎖被喚醒時, 如果當前被持有鎖是讀鎖,則自己也可以共享這把鎖,并且喚醒隊列中自己后面的一個狀態(tài)SHARED的節(jié)點來搶鎖。 以此類推,隊列中最近的的所有等待讀鎖的線程都會被喚醒,并共享這把鎖。

比如 W 表示寫鎖,R表示 讀鎖 隊列中R1,R2,R3,R4,W1,R5 :

R1被喚醒, 搶到讀鎖時,會喚醒 R2,R3,R4 來共享鎖,但是 R5不會,因為中間隔著一把寫鎖W1

寫鎖 : WriteLock

lock() :加鎖

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    // 嘗試加鎖
    if (!tryAcquire(arg) &&
        // 嘗試加鎖失敗,入隊等待
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg) :嘗試加鎖

這個方法和ReentrantLock實現(xiàn)的方法有一點點不一樣 :

  1. 只要有讀鎖或者寫鎖(不是自己重入)就加鎖失敗

  2. cas更改state失敗表示加鎖失敗

  3. 如果是公平鎖隊列中有線程在排隊,則加鎖失敗

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    // 獲取到state
    int c = getState();
    // 獲取寫鎖上鎖次數(shù)
    int w = exclusiveCount(c);
    // 如果有讀鎖或者寫鎖
    if (c != 0) {
        // w==0表示沒有寫鎖,但是c!=0,所以只有讀鎖
        // 或者 : current != getExclusiveOwnerThread()  有寫鎖,但是別的線程上的,不是自己重入的情況
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 第一個判斷 沒return false的情況下,那么就是  沒有讀鎖,并且 有寫鎖,但是是 當前線程重入
        // 自己重入, 寫狀態(tài) + 1
        setState(c + acquires);
        return true;
    }
    // 能走到這,就說明 c==0,因為 c!=0 在上面這個判斷中 是肯定會返回的
    // writerShouldBlock() 判斷是否應(yīng)該排隊,公平鎖返回的是 隊列中是否有線程,非公平鎖恒返回false
    if (writerShouldBlock() ||
        // cas失敗
        !compareAndSetState(c, c + acquires))
        return false;
    // 否則加鎖成功
    setExclusiveOwnerThread(current);
    return true;
}

加鎖失敗,則入隊睡眠

這個和 ReentrantLock是一樣的

寫鎖 Node 結(jié)構(gòu) :和ReentrantLock的鎖結(jié)構(gòu)一樣

總結(jié)

當(讀鎖 不存在 并且 沒有寫鎖 ) 或者 寫鎖 只有 自己重入才能加鎖成功

unlock():解鎖

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    // 嘗試解鎖,返回是否釋放鎖的結(jié)果
    if (tryRelease(arg)) {
        Node h = head;
        // 如果寫鎖已經(jīng)完全釋放,喚醒隊列中的頭節(jié)點.next
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease(int releases)

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 低16位寫鎖狀態(tài)位 減 1
    int nextc = getState() - releases;
    // 是否完全釋放,低16位 == 0 : 重入鎖需要解鎖多次
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 完全釋放,設(shè)置為無鎖狀態(tài)
        setExclusiveOwnerThread(null);
    // 重新設(shè)置寫鎖狀態(tài)位
    setState(nextc);
    return free;
}

總結(jié)

寫鎖解鎖 減 低16位寫狀態(tài)就行,重入鎖需多次解鎖,直至完全釋放,才會喚醒隊列中等待的線程。

鎖降級

在ReentrantReadWriteLock 是支持鎖降級的。但不支持鎖的升級。

鎖降級

從寫鎖 降級到讀鎖 :

為什么可以 ?

因為是從寫鎖降級到讀鎖,寫鎖只能由一個線程持有,此時降級為讀鎖,其他線程上讀鎖或者寫鎖,因為此時寫鎖的狀態(tài)還沒釋放。所以自始至終只可能存在同一個線程在同時持有讀鎖和者寫鎖。

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    // 是否有寫鎖
    if (exclusiveCount(c) != 0 &&
        // 有寫鎖,但是 寫鎖持有線程就是當前線程,鎖降級到讀鎖
        getExclusiveOwnerThread() != current)
        return -1;
    // 下面就是上讀鎖 邏輯了
    int r = sharedCount(c);
    // ................ 省略部分代碼
    return fullTryAcquireShared(current);
}

為什么不支持鎖升級

可能會造成死鎖。

鎖升級 : 是讀鎖升級到寫鎖

假設(shè)有 A,B 和 C 三個線程,它們都已持有讀鎖。假設(shè)線程 A 嘗試從讀鎖升級到寫鎖。那么它必須等待 B 和 C 釋放掉已經(jīng)獲取到的讀鎖。如果隨著時間推移,B 和 C 逐漸釋放了它們的讀鎖,此時線程 A 確實是可以成功升級并獲取寫鎖。

但是我們考慮一種特殊情況。假設(shè)線程 A 和 B 都想升級到寫鎖,那么對于線程 A 而言,它需要等待其他所有線程,包括線程 B 在內(nèi)釋放讀鎖。而線程 B 也需要等待所有的線程,包括線程 A 釋放讀鎖。這就是一種非常典型的死鎖的情況。誰都愿不愿意率先釋放掉自己手中的鎖。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容