Java AbstractQueuedSynchronizer(AQS)淺析之二

上一篇文章Java AbstractQueuedSynchronizer(AQS)淺析之一
我們分析了AQS在ReentrantLock中的運(yùn)用。注意:ReentrantLock是以獨(dú)享模式獲取鎖和釋放鎖的,今天這篇文章,我們看一看AQS在ReentrantReadWriteLock中的應(yīng)用。

ReentrantReadWriteLock類結(jié)構(gòu)

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    
    /** 讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 實(shí)現(xiàn)所有的同步機(jī)制 */
    final Sync sync;
    
    public ReentrantReadWriteLock() {
        //默認(rèn)是非公平鎖
        this(false);
    }
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    
    /**
     * ReentrantReadWriteLock的同步實(shí)現(xiàn),子類有公平和非公平版本。
     */
    abstract static class Sync extends AbstractQueuedSynchronizer{
        
        static final class HoldCounter {
            
        }
        
        static final class ThreadLocalHoldCounter
                    extends ThreadLocal<HoldCounter> {
            
        }
            
    }
    
    /**
     * 非公平版本
     */
    static final class NonfairSync extends Sync{
        
    }
    
    /**
     * 公平版本
     */
    static final class FairSync extends Sync {
        
    }
    
    
    /**
     * 讀鎖
     */
    public static class ReadLock implements Lock, java.io.Serializable {
        
    }
    
    /**
     * 寫鎖
     */
    public static class WriteLock implements Lock, java.io.Serializable {
        
    }
    
}
ReentrantReadWriteLock.png

上圖中:綠色虛線表示實(shí)現(xiàn)接口,藍(lán)色實(shí)線表示繼承類,紅色帶加號的實(shí)線表示內(nèi)部類。

ReentrantReadWriteLock.ReadLock獲取的是讀鎖(共享鎖)
ReentrantReadWriteLock.WriteLock獲取的是寫鎖(獨(dú)享鎖)

本篇以非公平的讀寫鎖進(jìn)行分析

ReentrantReadWriteLock.ReadLock

ReadLock的lock()方法

public void lock() {
     //調(diào)用AQS的acquireShared(int arg)方法
     sync.acquireShared(1);
}

AQS的acquireShared(int arg)方法

public final void acquireShared(int arg) {
   //先嘗試獲取鎖
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

AQS沒有實(shí)現(xiàn)tryAcquireShared(int arg)方法,我們看其子類ReentrantReadWriteLock.Sync的實(shí)現(xiàn)。

protected final int tryAcquireShared(int unused) {
    /*
     * 流程:
     * 1. 如果寫鎖被別的線程持有,失敗。
     * 2. 否則,當(dāng)前線程有資格獲取讀鎖,所以詢問是否由于排隊(duì)策略需要阻塞。 如果不需要阻塞的話,
     * 嘗試通過CAS修改state來獲取鎖并更新鎖計(jì)數(shù)。請注意該步驟不進(jìn)行重入獲取鎖的檢查,
     * 這個(gè)檢查推遲到獲取鎖的完整版本方法(fullTryAcquireShared(Thread current))中來避免
     * 在這個(gè)更典型的非重入情況下必須檢查鎖計(jì)數(shù)。
     * 3. 如果步驟2由于當(dāng)前線程沒有資格獲取鎖或者CAS修改state失敗或者讀鎖的數(shù)量已經(jīng)飽和而失敗
     * 就使用具有完整重試循環(huán)的版本的方法來獲取讀鎖,即調(diào)用`fullTryAcquireShared(Thread current)`方法。
     */
    Thread current = Thread.currentThread();
   //獲取同步狀態(tài)值
    int c = getState();
    //注釋1處,如果存在寫鎖,并且鎖的持有者不是當(dāng)前線程,返回-1
    if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
        return -1;
    //獲取共享鎖的數(shù)量
    int r = sharedCount(c);
    //注釋2處,走到注釋2處說明沒有寫鎖,或者寫鎖的持有者是當(dāng)前線程。此時(shí)我們是可以獲取讀鎖的。
    if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {//注釋3處
            //r==0,說明當(dāng)前線程是第一個(gè)獲取讀鎖的線程
            firstReader = current;
            //當(dāng)前線程,持有鎖的數(shù)量從0到1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            //如果第一個(gè)獲取讀鎖的線程就是當(dāng)前線程,增加當(dāng)前線程持有的讀鎖的數(shù)量
            firstReaderHoldCount++;
        } else {
            //注釋4處
            HoldCounter rh = cachedHoldCounter;
            //如果rh為null或者rh的線程id不是當(dāng)前線程的線程id
            if (rh == null || rh.tid != getThreadId(current))
                //獲取當(dāng)前線程的HoldCounter對象,賦值給cachedHoldCounter
                cachedHoldCounter = rh = readHolds.get();
            //如果此條件滿足,說明上一個(gè)成功獲取讀鎖的線程就是當(dāng)前線程,但是現(xiàn)在把讀鎖釋放了,所以讀鎖計(jì)數(shù)為0
            else if (rh.count == 0)
                //設(shè)置當(dāng)前線程的HoldCounter對象
                readHolds.set(rh);
           //當(dāng)前線程的HoldCounter對象的讀鎖數(shù)量加1
            rh.count++;
        }
        //成功獲得讀鎖返回1
        return 1;
    }
    return fullTryAcquireShared(current);
}

注釋1處,如果存在寫鎖,并且鎖的持有者不是當(dāng)前線程,返回-1,表示獲取讀鎖失敗。

注釋2處,走到注釋2處說明沒有寫鎖,或者寫鎖的持有者是當(dāng)前線程。此時(shí)我們是可以獲取讀鎖的。

注釋2處,如果3個(gè)條件都滿足

  1. !readerShouldBlock():當(dāng)前線程不應(yīng)該阻塞。
  2. 讀鎖的數(shù)量小于MAX_COUNT(65535)。
  3. 以CAS的方式更新state成功。

條件1,當(dāng)前線程不應(yīng)該阻塞。那么什么時(shí)候當(dāng)前獲取讀鎖的線程應(yīng)該阻塞呢?看源碼

ReentrantReadWriteLock.NonfairSync的readerShouldBlock()方法

final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

AQS的apparentlyFirstQueuedIsExclusive()方法

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

從上面的代碼可以看出,只有當(dāng)?shù)却?duì)列不為空,并且head的后繼節(jié)點(diǎn)有效且在等待以獨(dú)享模式獲取鎖的時(shí)候,當(dāng)前線程才會(huì)阻塞。

條件2,共享鎖的數(shù)量小于MAX_COUNT(65535)。

這里我們要提一下AQS的state字段(int類型,32位),該字段用來描述鎖被獲取的次數(shù)。

/**
 * The synchronization state.
 */
private volatile int state;

在獨(dú)享鎖中這個(gè)值通常是0或者1(如果是重入鎖的話state值就是獲取鎖的次數(shù)),在共享鎖中state就是共享鎖的數(shù)量。但是在ReentrantReadWriteLock中有讀、寫兩把鎖,所以需要在一個(gè)整型變量state上分別表示讀鎖(共享鎖)和寫鎖(獨(dú)享鎖)的數(shù)量(或者也可以叫狀態(tài))。于是將state變量“按位切割”切分成了兩個(gè)部分,高16位表示讀鎖狀態(tài)(讀鎖數(shù)量),低16位表示寫鎖狀態(tài)(寫鎖數(shù)量)。如下圖所示:


read_write_bit.png

我們再看一下ReentrantReadWriteLock.Sync類中獲取獨(dú)享鎖和共享鎖的方法就明白了

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

    /*
     * 讀鎖和寫鎖的提取常量和函數(shù)
     * 鎖的狀態(tài)邏輯上分為兩個(gè)無符號的short類型(16位)
     * 低16位代表獨(dú)享鎖的數(shù)量,高16位代表共享鎖的數(shù)量。
     * 
     */
    static final int SHARED_SHIFT   = 16;//16位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);//65536
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//65535
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//65535

    /** 無符號右移16位,獲取共享鎖的數(shù)量 */
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    /** 獲取獨(dú)享鎖的數(shù)量 */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

條件3,以CAS的方式更新state成功。

compareAndSetState(c, c + SHARED_UNIT)

注意我們每獲取一次讀鎖要加65536(2的16次方)。

如果注釋2中的3個(gè)條件都為true,說明我們成功獲取了讀鎖。那么進(jìn)入if代碼塊中。

注釋3處,這里出現(xiàn)了兩個(gè)變量

//第一個(gè)獲取讀鎖的線程,將讀鎖數(shù)量從0改變到1,并且還沒有釋放讀鎖。
private transient Thread firstReader = null;
//第一個(gè)獲取讀鎖的線程持有的讀鎖的數(shù)量
private transient int firstReaderHoldCount;

注釋4處,有多了幾個(gè)陌生的變量和類

ReentrantReadWriteLock.Sync.HoldCounter

/**
 * 每個(gè)線程的讀鎖計(jì)數(shù)器,保存在線程本地變量中
 */
static final class HoldCounter {
  //線程持有讀鎖的數(shù)量
   int count = 0;
   // 線程id
   final long tid = getThreadId(Thread.currentThread());
}

這個(gè)類為每個(gè)線程保存持有讀鎖的數(shù)量。該類的實(shí)例保存在ThreadLocal中。

ReentrantReadWriteLock.Sync的成員變量

//保存上一個(gè)成功獲取讀鎖的線程的HoldCounter對象
private transient HoldCounter cachedHoldCounter;

ReentrantReadWriteLock.Sync.ThreadLocalHoldCounter

static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

這個(gè)類就是用來保存HoldCounter的。注意該類的initialValue方法返回一個(gè)新的HoldCounter對象。

在注釋2處,如果3個(gè)條件都滿足說明成功獲取了讀鎖,返回1 。如果獲取失敗就調(diào)用fullTryAcquireShared(Thread current)方法來獲取讀鎖。

ReentrantReadWriteLock.Sync的fullTryAcquireShared(Thread current)方法

/**
 * 獲取讀鎖的完整版本,會(huì)解決tryAcquireShared方法中未處理的CAS失敗和可重入獲取讀鎖的問題。
 */
final int fullTryAcquireShared(Thread current) {
  
    HoldCounter rh = null;
    for (;;) {//無限循環(huán)
        //獲取state
        int c = getState();
        //注釋1處,如果寫鎖的數(shù)量不為0
        if (exclusiveCount(c) != 0) {
            //如果不是當(dāng)前線程持有互斥鎖返回-1
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else 表示我們持有寫鎖;阻塞在這里會(huì)導(dǎo)致死鎖。 
        } else if (readerShouldBlock()) {//注釋2處,如果當(dāng)前線程應(yīng)該阻塞
            //注釋3處, 當(dāng)前線程持有讀鎖
            if (firstReader == current) {
                //斷言 firstReaderHoldCount > 0;
            } else {
                //注釋4處
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        //獲取當(dāng)前線程的HoldCounter
                        rh = readHolds.get();
                        if (rh.count == 0) 
                            //從線程本地變量readHolds中移除當(dāng)前線程
                            readHolds.remove();
                    }
                }
                //當(dāng)前線程的鎖計(jì)數(shù)為0,沒有持有讀鎖,返回-1。
                if (rh.count == 0)
                    return -1;
            }
        }
        //如果讀鎖的數(shù)量等于MAX_COUNT,拋出異常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //注釋5處,如果以CAS的方式獲取更新state成功,說明當(dāng)前線程成功獲取讀鎖,最后返回1。
        if (compareAndSetState(c, c + SHARED_UNIT)) {  
            if (sharedCount(c) == 0) {
                //注釋6處,當(dāng)前線程是第一個(gè)獲取讀鎖的線程,為firstReader和firstReaderHoldCount賦值
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                //注釋7處,當(dāng)前線程再次獲取鎖,更新firstReaderHoldCount值
                firstReaderHoldCount++;
            } else { //注釋8處
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    //獲取當(dāng)前線程的HoldCounter
                    rh = readHolds.get();
                else if (rh.count == 0)
                    //當(dāng)前線程第一次獲取讀鎖
                    readHolds.set(rh);
                //重入獲取鎖,增加當(dāng)前線程持有的鎖數(shù)量
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            //成功獲取鎖,返回1
            return 1;
        }
    }
}

注釋1處,如果寫鎖的數(shù)量不為0,如果不是當(dāng)前線程持有寫鎖,則獲取失敗,返回-1。

注釋2處,沒有寫鎖并且當(dāng)前線程需要阻塞

注釋3處,firstReader == current說明當(dāng)前線程持有讀鎖并且還沒有釋放鎖。

如果注釋3條件不滿足,進(jìn)入注釋4處,獲取當(dāng)前線程的鎖計(jì)數(shù)器,如果當(dāng)前線程的鎖計(jì)數(shù)為0,說明當(dāng)前線程沒有持有讀鎖,就從線程本地變量中移除當(dāng)前線程的鎖計(jì)數(shù)器。

注釋5處,如果以CAS的方式獲取更新state成功,說明當(dāng)前線程成功獲取讀鎖。最后返回1。

注釋6處,如果當(dāng)前線程是第一個(gè)獲取讀鎖的線程,為firstReader和firstReaderHoldCount賦值。

注釋7處,如果當(dāng)前線程是第一個(gè)獲取讀鎖的線程再次獲取鎖,更新firstReaderHoldCount值。

注釋8處,創(chuàng)建當(dāng)前線程的鎖計(jì)數(shù)器保存在線程本地變量中。

如果獲取鎖失敗,就會(huì)調(diào)用AQS的doAcquireShared(int arg)方法

public final void acquireShared(int arg) {
    //tryAcquireShared(arg)方法返回值小于0說明獲取鎖失敗
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

AQS的doAcquireShared(int arg)方法

/**
 * 以共享不可中斷的模式獲取鎖
 */
private void doAcquireShared(int arg) {
    //以共享模式將節(jié)點(diǎn)添加到等待隊(duì)列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//無限循環(huán)
            final Node p = node.predecessor();
                if (p == head) {
                   //再次嘗試獲取鎖
                   int r = tryAcquireShared(arg);
                   if (r >= 0) {//獲取鎖成功
                       //注釋1處,這個(gè)方法有點(diǎn)意思
                       setHeadAndPropagate(node, r);
                       p.next = null; // help GC
                       if (interrupted)
                           selfInterrupt();
                       failed = false;
                       return;
                   }
               }
               //阻塞線程,等待被喚醒
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這個(gè)過程可以參考上一篇文章Java AbstractQueuedSynchronizer(AQS)淺析之一中的acquireQueued(final Node node, int arg)方法分析過程,這里再簡單說一下。

這是一個(gè)無限循環(huán),

  1. 以共享模式添加當(dāng)前線程到等待隊(duì)列。
  2. 添加成功以后再次嘗試獲取鎖,如果獲取成功,就返回。
  3. 獲取失敗,阻塞當(dāng)前線程,等待被喚醒。
  4. 線程被喚醒后,繼續(xù)循環(huán)步驟1,2,3。

我們看下注釋1處。當(dāng)前節(jié)點(diǎn)在獲取讀鎖以后,如果還有剩余資源(tryAcquireShared(arg)方法返回值大于0),或者當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)以共享模式等待獲取鎖(就是獲取讀鎖),就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)。

AQS的setHeadAndPropagate(Node node, int propagate)

/**
 * 設(shè)置等待隊(duì)列的頭并且檢查當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)是否以共享模式等待獲取鎖,如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
 * 是以共享模式等待獲取鎖,則當(dāng)propagate > 0或設(shè)置了PROPAGATE狀態(tài)的時(shí)候就喚醒后繼節(jié)點(diǎn)。
 *
 */
 private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    //將自己設(shè)置為head
    setHead(node);
    //嘗試喚醒下一個(gè)隊(duì)列中的節(jié)點(diǎn)
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //后繼節(jié)點(diǎn)為null或者以共享模式等待獲取鎖
        if (s == null || s.isShared())
            //這個(gè)方法后面再說
            doReleaseShared();
    }
}

現(xiàn)在ReadLock獲取鎖的過程分析完了,接下來看看ReadLock釋放鎖的過程。

ReadLock的unlock()方法

public void unlock() {
   sync.releaseShared(1);
}

AQS的releaseShared(int arg)方法

public final boolean releaseShared(int arg) {
   //注釋1處,
   if (tryReleaseShared(arg)) {
       doReleaseShared();
       return true;
   }
   return false;
}

注釋1處,嘗試釋放讀鎖,AQS沒有實(shí)現(xiàn)tryReleaseShared(int arg)方法,我們直接看ReentrantReadWriteLock.Sync的實(shí)現(xiàn)。

ReentrantReadWriteLock.Sync的的tryReleaseShared(int arg)方法

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        //如果第一個(gè)讀線程是當(dāng)前線程,那么firstReaderHoldCount肯定大于0
        
        //注釋1處,
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {//注釋2處,
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            //獲取當(dāng)前線程的鎖計(jì)數(shù)
            rh = readHolds.get();
        int count = rh.count;
        //鎖計(jì)數(shù)等于1,說明當(dāng)前線程只獲取了一次讀鎖, 本次釋放以后,就不再持有鎖
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)//count <= 0表明當(dāng)前線程沒有持有讀鎖,所以不能釋放。
                throw unmatchedUnlockException();
        }
        //當(dāng)前線程count減去1
        --rh.count;
    }
    //注釋3處,
    for (;;) {//CAS可能失敗,所以需要循環(huán)
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 釋放讀鎖對讀線程沒有作用,但是會(huì)影響寫線程,
            // 但是如果讀鎖和寫鎖都被釋放的情況下,寫線程可以獲取寫鎖。
            return nextc == 0;
    }
}

注釋1處,如果firstReaderHoldCount==1,說明當(dāng)前線程只獲取了一次讀鎖,所以當(dāng)釋放鎖以后,當(dāng)前線程就不再持有讀鎖了,應(yīng)該將firstReader置為null。否則就將firstReaderHoldCount減去1。

注釋2處,獲取當(dāng)前線程的鎖計(jì)數(shù),如果鎖計(jì)數(shù)等于1,說明當(dāng)前線程只獲取了一次讀鎖, 本次釋放以后,就不再持有鎖。 如果鎖計(jì)數(shù)<= 0表明當(dāng)前線程沒有持有讀鎖,所以不能釋放。最后將當(dāng)前線程的鎖計(jì)數(shù)減去1。

注釋3處,使用CAS的方式修改讀鎖狀態(tài)。因?yàn)镃AS可能會(huì)失敗,所以在for循環(huán)里進(jìn)行修改。修改成功后返回讀鎖的數(shù)量,如果為0,說明讀鎖完全釋放了,這個(gè)時(shí)候是可以獲取寫鎖的。

AQS的doReleaseShared方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;// loop to recheck cases
                  //喚醒后繼節(jié)點(diǎn)
                  unparkSuccessor(h);
            }
            else if (ws == 0 &&
                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

該方法就是為了喚醒后繼節(jié)點(diǎn)。

ReentrantReadWriteLock.WriteLock

WriteLock的lock()方法

public void lock() {
    sync.acquire(1);
}

AQS的acquire(int arg)方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Java AbstractQueuedSynchronizer(AQS)淺析之一這篇文章中已經(jīng)分析過了這個(gè)方法,現(xiàn)在我們只看tryAcquire(arg)方法的具體實(shí)現(xiàn)。

ReentrantReadWriteLock.Sync的tryAcquire(arg)方法

protected final boolean tryAcquire(int acquires) {
    /*
     * 流程:
     * 1. 如果存在讀鎖或?qū)戞i,并且讀鎖和寫鎖的持有者不是當(dāng)前線程,則失敗。
     * 2. 如果寫鎖數(shù)量超過最大數(shù)量,失敗。
     * 3. 否則,當(dāng)前線程有資格獲取鎖。
     */
    Thread current = Thread.currentThread();
    //獲取同步狀態(tài)值
    int c = getState();
    //獲取寫鎖數(shù)量
    int w = exclusiveCount(c);
    //存在鎖
    if (c != 0) {
        // 注釋1處,
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //如果要獲取寫鎖的數(shù)量加上現(xiàn)有寫鎖的數(shù)量大于65535,拋出異常。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 注釋2處,
        setState(c + acquires);
        return true;
    }
   //注釋3處,非公平策略的寫鎖,writerShouldBlock總是返回false
    if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
    //注釋4處
    setExclusiveOwnerThread(current);
    return true;
}

注釋1處,存在鎖,如果當(dāng)前不存在寫鎖或者當(dāng)前線程沒持有寫鎖,獲取失敗。

注釋2處,如果走到注釋2處,說明當(dāng)前線程已經(jīng)持有寫鎖,是再次獲取寫鎖,所以我們可以直接修改同步狀態(tài)值,不需要CAS,也是美滋滋。

注釋3處,如果不存鎖,如果當(dāng)前線程應(yīng)該阻塞或者CAS更新同步狀態(tài)值失敗,就返回false。

注釋4處,將當(dāng)前線程標(biāo)記為寫鎖的持有者,返回true。

對與AQS的acquire(int arg)方法中的其他步驟這里就不再贅述了,可以參考Java AbstractQueuedSynchronizer(AQS)淺析之一。

WriteLock的unLock()方法

public void unlock() {
    sync.release(1);
}

AQS的release(int arg)方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //喚醒后繼節(jié)點(diǎn)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

只看tryRelease(int releases)方法

ReentrantReadWriteLock.Sync的tryRelease(int releases) 方法

protected final boolean tryRelease(int releases) {
   //注釋1處,如果不是寫鎖的持有者,拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //注釋2處,
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    //注釋3處
    setState(nextc);
    return free;
}

注釋1處,如果不是寫鎖的持有者,拋出異常。

注釋2處,如果剩余寫鎖數(shù)量為0,說明已經(jīng)完全釋放完畢,將寫鎖的持有者線程置為null。
注釋3處,重新為同步狀態(tài)state賦值,返回。

鎖降級

如果當(dāng)前線程持有寫鎖,然后獲取了讀鎖,然后釋放了寫鎖,就從寫鎖降級到了讀鎖。但是從一個(gè)讀鎖升級到寫鎖是不可能的。一個(gè)鎖降級的例子

class CachedData {

    String data;
    volatile boolean cacheValid;

    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            // 在獲取寫鎖之前必須釋放讀鎖
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // 重新檢查狀態(tài),因?yàn)榱硪粋€(gè)線程可能在執(zhí)行此操作之前已獲得寫鎖定并更改了狀態(tài)。
                if (!cacheValid) {
                    //修改數(shù)據(jù)
                    System.out.println("修改數(shù)據(jù)");
                    data = "Hello world";
                    cacheValid = true;
                }
                // 通過在釋放寫鎖之前獲取讀鎖來實(shí)現(xiàn)降級
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock(); // 釋放寫鎖但仍持有讀鎖
            }
        }

        try {
            //讀數(shù)據(jù)
            System.out.println("讀取數(shù)據(jù)");
            System.out.println(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

參考鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容