六、并發(fā)包中其他鎖

上一篇文章中介紹了AQS同步隊列,這篇文章從源碼來解析并發(fā)包中基于AQS實現(xiàn)的鎖。


獨占鎖ReentrantLock

ReentrantLock是一個可重入的獨占鎖,獨占鎖說明同一個時間,只有一個線程占用,用的場景非常多,也是最常見的一個中鎖,其state在0時表示當前鎖沒有別線程占有,其他值則表示重入次數(shù)。首先看一下ReentrantLock的類圖:

ReentrantLock類圖

從圖中可以看出,其內(nèi)部用的同步鎖Sync也是繼承自AQS,所以我們大膽推測,其原理與AQS一本相同。
我們注意到,ReentrantLock有一個構(gòu)造函數(shù),也就是入?yún)⑹欠窆芥i,從Sync子類也可看出,ReentrantLock提供了公平和非公平的實現(xiàn)。下面我們跟著函數(shù)源碼去了解其原理

獲取鎖lock

當一個線程調(diào)用lock方法,其嘗試獲取鎖

public void lock() {
    sync.lock();// 調(diào)用同步器的lock方法
}

其實現(xiàn)是由子類來實現(xiàn)的:

final void lock() {  // 非公平鎖
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
final void lock() {  // 公平鎖
    acquire(1);  // 這個方法來自AQS,忘記的可以往前翻翻
}

如上,公平鎖和非公平鎖不同之處,在于非公平鎖會首先直接取用CAS去搶占,搶占不到再調(diào)用acquire(1).這里怎么理解呢?原因是,非公平的概念是不管先來后到,能拿到鎖的就是好漢,所以,非公平鎖會直接去拿。后面的tryAcquire也是同樣的道理。

  • tryAcquire
    AQS中的acquire會首先調(diào)用子類的tryAcquire來判斷是否拿到鎖,沒拿到再放入等待循環(huán)請求或等待。
    重點關(guān)注子類的實現(xiàn)。

    這里有一點要說明:非公平鎖的tryAcquire放在了Sync類里實現(xiàn),而公平鎖則由其子類FairSync來實現(xiàn)。原因是無論公平鎖還是非公平鎖,調(diào)用tryLock時,調(diào)用的是同一個方法也即是父類Sync的nonfairTryAcquire方法,也就是說無論公平還是非公平鎖,使用tryLock函數(shù)時都是采用的非公平策略,線程空閑則直接獲取,否則失敗。

    我們首先關(guān)注下nonfairTryAcquire方法,它是一個非阻塞的方法,如果獲取到則返回true,獲取不到則返回false。

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {      // 這里直接cas和非公平鎖lock方法邏輯一樣
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果不是0,則判斷是否是同一個線程進行重入
        else if (current == getExclusiveOwnerThread()) {
            // zhon
            int nextc = c + acquires; // 可重入此時增加
            if (nextc < 0) // overflow  說明次數(shù)溢出,超出限制
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    再來看下公平策略下的搶占方式:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 其他代碼都是一樣的,只有這里稍有不同,也就是判斷前驅(qū)節(jié)點是否存在,不存在才能搶占,也就是先來后到不能插隊。
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    回到acquire,如果搶占失敗則執(zhí)行AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))系列方法放入同步隊列并等待。這里不再詳述。

獲取鎖unlock

釋放鎖unlock() 調(diào)用的是sync.release(1);,這個也是AQS的方法,其也是先調(diào)用子類的tryRelease(1),如果成功,則判斷是否需要喚醒后繼節(jié)點,如果是則喚醒。我們重點講下子類的tryRelease方法,它的實現(xiàn)實在Sync里的,所以公平和非公平策略都一樣。

  1. tryRelease
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 狀態(tài)值減1,也即是可重入次數(shù)。
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {       // 如果狀態(tài)值為0,代表該線程已經(jīng)完全釋放鎖了。
            free = true;
            setExclusiveOwnerThread(null); // 清空當前鎖的線程
        }
        setState(c);
        return free;// 返回釋放結(jié)果
    }
    
    可以看到,這里釋放主要是修改狀態(tài)值,如果狀態(tài)值達到0了,則需要進行下一步也就是喚醒

案例介紹

CopyOnWriteList

讀寫鎖ReentrantReadWriteLock

ReentrantLock鎖可以解決大部分并發(fā)問題,但是它是一個獨占鎖,也就是同一時刻只有一個線程持有,這在實際場景中可能性能不佳。其中讀多寫少的場景就無法滿足,因此ReentrantReadWriteLock就產(chǎn)生了,這是一個單線程寫,多線程多的實現(xiàn)。
其類圖如下:

讀寫鎖.PNG

其類圖比較復雜,但是其實核心還是Sync,只是,讀寫鎖包含兩把鎖readerLock和writerLock來控制讀寫
,其sync作用只是用來作為參數(shù)構(gòu)造這兩把鎖。同獨占鎖一樣,讀寫鎖也支持公平和非公平策略,所以其也有帶參數(shù)的構(gòu)造函數(shù)。
接下來我們跟著代碼看

結(jié)構(gòu)

CopyOnWriteList 結(jié)構(gòu)其實很簡單
1、屬性主要是readerLock和writerLock,而它們的核心是Sync。
2、方法基本都是getXXX方法
3、ReadLock、WriteLock也很簡單

  • 構(gòu)造函數(shù)

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    

    從函數(shù)里面看出來,readerLock和writerLock以同一個sync(從這可以看出來這個類非常重要)作為參數(shù)進行構(gòu)建,F(xiàn)airSync和NonfairSync方法大都來自父類Sync,只有writerShouldBlock和readerShouldBlock稍有不同。

  • ReadLock

    // 忽略相似的響應(yīng)超時和中斷的lock方法
    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
        public void lock() {
            sync.acquireShared(1);  // 共享鎖的方式
        }
        public boolean tryLock() {
            return sync.tryReadLock();  // 讀鎖
        }
        public void unlock() {
            sync.releaseShared(1);
        }
    
        public Condition newCondition() {
            throw new UnsupportedOperationException(); // 共享鎖不支持條件隊列
        }
    }
    
  • WriteLock

    // 忽略相似的響應(yīng)超時和中斷的lock方法
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
        public void lock() {
            sync.acquire(1);  // 獨占方式
        }
        public boolean tryLock() {
            return sync.tryWriteLock(); // 寫鎖
        }
        public void unlock() {
             sync.release(1);
        }
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
    }
    

看了上面的代碼發(fā)現(xiàn),兩個鎖都很簡單,一個就是獨占方式用來寫數(shù)據(jù),此操作不支持同時,一個就是共享方式來實現(xiàn)多線程讀,實現(xiàn)的邏輯都是其引用變量sync來實現(xiàn)的,很容易理解。關(guān)鍵的是它們的核心公共參數(shù)Sync。

核心類Sync

但其實Sync說難也不難,一共就400行代碼,接下來我們一起逐行來分析源碼

屬性

  • static final int SHARED_SHIFT = 16;
  • static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1左移16位=1*2^16=65536
  • static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 65535
  • static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 16個1
  • static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 共享鎖數(shù)量
  • static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 獨占鎖可重入次數(shù)

注意 c>>>SHARED_SHIFT c是一個4字節(jié)32位整型 >>>表示無符號右移,右移16位,其實就是獲取其高16位的值
再看 c & EXCLUSIVE_MASK,這是一個短連接,也就是獲取其后面16位。我們知道AQS將由此可見,讀寫鎖采用將int的前16位作為讀的狀態(tài),后16位作為寫的狀態(tài)(可結(jié)合后文中代碼理解)。

這些變量的作用是什么呢?

獨占鎖方法

寫鎖中的lock調(diào)用的是Sync中的acquire,這個方法來自于AQS,所以再重新回憶一下aquire(1)對應(yīng)的AQS方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&    
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

可以看出它跟其他所不同之處還是tryAcquire方法,釋放unlock同樣使用了AQS的release(1),其子類核心是tryReleaseShared。

  • tryAcquire
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);  //獲取狀態(tài)值的獨占狀態(tài)值,也就是可重入個數(shù)
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 走到這里說明是該線程占有鎖,且可重入次數(shù)沒有達到限制,重入次數(shù)+1
            setState(c + acquires);
            return true;
        }
        // 走到這里說明鎖還沒有被占有,則判斷以下兩個條件:
        // 1. 寫的時候是否等待,公平鎖則需要等待前驅(qū)節(jié)點,非公平鎖不需要直接return false
        // 2. 嘗試CAS,如果失敗,說明其他線程拿到鎖了 
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        //走到這,說明當前線程競爭鎖成功。
        setExclusiveOwnerThread(current);
        return true;
    }
    
    再次結(jié)合AQS回一下整個過程,首先去嘗試獲取鎖,如果成功則返回,如果失敗則加入同步隊列等待喚醒(park),或自旋獲取鎖(頭結(jié)點的下個節(jié)點)。這里的寫鎖與前面講到的獨占鎖幾乎一樣。
  • tryRelease
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);   
        return free;
    }
    
    這一段也不再贅述,邏輯與獨占鎖一模一樣
  • tryWriteLock()
    還是與獨占鎖對比(這兩個幾乎一樣),上面說了,獨占鎖中的trylock對于公平和非公平?jīng)]有區(qū)別都是用非公平的策略去實現(xiàn)的。這里同樣如此
    final boolean tryWriteLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c != 0) {
            int w = exclusiveCount(c);
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
        }
        if (!compareAndSetState(c, c + 1))
            return false;
        setExclusiveOwnerThread(current); // 不再有前驅(qū)節(jié)點的判斷
        return true;
    }
    
    
    有一點要注意ReentrantLock中trylock和lock共用了nofairTryAquire方法,但公平和非公平的lock方法不一致。
    而這里是ReentrantReadWriteLock中的寫鎖正好相反,也就是公平和非公平的lock方法共用邏輯,但是tryWriteLock和lock的方法不一致。
    但其實原理是一樣的。

共享鎖方法

因為我們文中前面部分已經(jīng)講過獨占鎖,所以寫鎖再理解起來就非常簡單了,但是讀鎖的具體實現(xiàn)我們還沒講過,不過原理大體是差不多的。
記得我們在AQS源碼解析文章中講過,共享鎖和獨占所得區(qū)別:

  1. 狀態(tài)操作不一樣,共享鎖是判斷state>0 ,則表示還可以被持有
  2. 獲取到鎖后以及釋放時的操作不一樣,共享鎖還會嘗試去擴散通知其他節(jié)點
  3. 共享鎖沒有條件隊列

我們接下來跟診源碼看一下具體實現(xiàn)

  • tryAcquireShared
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();// 獲取當前狀態(tài)和當前線程
        // 判斷寫鎖是否被占用,記住寫的時候是不能讀的。
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);  
        // 三個條件: 1.讀的時候是否阻塞 2.判斷是否達到共享數(shù)量最大值 3.CAS成功
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) { 
            if (r == 0) {// 如果為0則說明該線程是第一個讀線程,則記錄
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                // 到這說明不是第一個讀線程
                HoldCounter rh = cachedHoldCounter; // 不是當前線程緩存值或者沒有緩存則設(shè)置緩存
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    // // 7. 為什么要 count == 0 時進行 ThreadLocal.set? 因為上面tryReleaseShared方法 中當 count == 0 時, 進行了ThreadLocal.remove
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        // 到這說明沒有獲取到鎖
        return fullTryAcquireShared(current); // 自旋獲取鎖
    }
    
  • fullTryAcquireShared
    這個方法其實是 tryAcquireShared 的冗余(redundant)方法, 主要補足 readerShouldBlock 導致的獲取等待 和 CAS 修改 AQS 中 state 值失敗進行的修補工作
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    // 到這里說明有線程正在寫,直接返回加入AQS隊列等待
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                // 公平鎖表示如果有前驅(qū)節(jié)點,非公平鎖表示頭節(jié)點是否是寫鎖
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    // 到這說明不是第一個線程
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    // 到這里說明aqs 同步隊列里面有 獲取 readLock 的node 或 head.next 是獲取 writeLock 的節(jié)點
                    if (rh.count == 0) // 為真說明不是重入,且cas失敗
                        return -1;
                }
            }
            // 到這里說明不需要等待,只要能cas成功就行
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    
    這段代碼我們重點看下什么情況進入重復循環(huán),這才是這個方法的意義,首先下列條件會返回-1:
    1. 如果有寫鎖或者頭節(jié)點的下一個節(jié)點想獲取寫鎖
    2. readerShouldBlock(非公平模式下,表示頭結(jié)點的下一個節(jié)點是獨占模式,這種設(shè)計是為了給寫鎖給予非常高的優(yōu)先級) 且不是該鎖重入
    3. readerShouldBlock(公平模式下,還有前驅(qū)節(jié)點無論讀寫) 且不是該鎖重入

上面三種情況都不會循環(huán)還是進入AQS同步隊列等待,只有在非block且當前沒有寫線程的時候,cas失敗才會去進入loop,這也即是這個函數(shù)的目的。

  • tryReleaseShared
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1) 
                firstReader = null;  // 釋放
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        // 自旋cas直到釋放
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    
    這個邏輯也比較簡單,和共享鎖相比,多了些操作重入次數(shù)的邏輯。

鎖升級/鎖降級

同一個線程中,在沒有釋放讀鎖的情況下,就去申請寫鎖,這屬于鎖升級,同一個線程中,在沒有釋放寫鎖的情況下,就去申請讀鎖,這屬于鎖降級,ReentrantReadWriteLock只支持降級,升級會導致死鎖。
案例:

public class ReentrantReadWriteLockTest {
    public static void main(String[] args) throws Exception {

        final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        log.info("readWriteLock.readLock().lock() begin");
        readWriteLock.readLock().lock();
        log.info("readWriteLock.readLock().lock() over");

        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                log.info("readLock().lock() begin:" );
                readWriteLock.readLock().lock(); // 獲取過一次就能再次獲取, 但是若其他沒有獲取的線程因為 syn queue里面 head.next 是獲取write的線程, 則到
                                                 // syn queue 里面進行等待
                log.info("readLock().lock() over:" );
            }
        }.start();
        log.info("readWriteLock.writeLock().lock() begin");
        readWriteLock.writeLock().lock();
        log.info("readWriteLock.writeLock().lock() over");
    }
}

如上輸出為:

readWriteLock.readLock().lock() begin
readWriteLock.readLock().lock() over
readWriteLock.writeLock().lock() begin
readLock().lock() begin:

可以看出讀鎖和寫鎖都阻塞了,發(fā)生了死鎖,原因是:

  1. main線程中首先占用了讀鎖,然后再去獲取寫鎖,這個時候會在tryAcquire中根據(jù)條件,c != 0&&w==0,表示當前有線程在讀,也就是自己main線程在讀,因此進入AQS隊列中等待。這就是升級會導致死鎖
  2. thread線程中嘗試去獲取讀線程,但發(fā)現(xiàn)AQS隊列中頭結(jié)點的下一個節(jié)點是寫鎖,因此也加入AQS中等待。

StampedLock

StampedLock這個類是在1.8版本加入的

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容