上一篇文章中介紹了AQS同步隊列,這篇文章從源碼來解析并發(fā)包中基于AQS實現(xiàn)的鎖。
獨占鎖ReentrantLock
ReentrantLock是一個可重入的獨占鎖,獨占鎖說明同一個時間,只有一個線程占用,用的場景非常多,也是最常見的一個中鎖,其state在0時表示當前鎖沒有別線程占有,其他值則表示重入次數(shù)。首先看一下ReentrantLock的類圖:
從圖中可以看出,其內(nèi)部用的同步鎖Sync也是繼承自AQS,所以我們大膽推測,其原理與AQS一本相同。
我們注意到,ReentrantLock有一個構(gòu)造函數(shù),也就是入?yún)⑹欠窆芥i,從Sync子類也可看出,ReentrantLock提供了公平和非公平的實現(xiàn)。下面我們跟著函數(shù)源碼去了解其原理
獲取鎖lock
當一個線程調(diào)用lock方法,其嘗試獲取鎖
public void lock() {
sync.lock();// 調(diào)用同步器的lock方法
}
其實現(xiàn)是由子類來實現(xiàn)的:
final void lock() { // 非公平鎖
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final void lock() { // 公平鎖
acquire(1); // 這個方法來自AQS,忘記的可以往前翻翻
}
如上,公平鎖和非公平鎖不同之處,在于非公平鎖會首先直接取用CAS去搶占,搶占不到再調(diào)用acquire(1).這里怎么理解呢?原因是,非公平的概念是不管先來后到,能拿到鎖的就是好漢,所以,非公平鎖會直接去拿。后面的tryAcquire也是同樣的道理。
-
tryAcquire
AQS中的acquire會首先調(diào)用子類的tryAcquire來判斷是否拿到鎖,沒拿到再放入等待循環(huán)請求或等待。
重點關(guān)注子類的實現(xiàn)。這里有一點要說明:非公平鎖的tryAcquire放在了Sync類里實現(xiàn),而公平鎖則由其子類FairSync來實現(xiàn)。原因是無論公平鎖還是非公平鎖,調(diào)用tryLock時,調(diào)用的是同一個方法也即是父類Sync的nonfairTryAcquire方法,也就是說無論公平還是非公平鎖,使用tryLock函數(shù)時都是采用的非公平策略,線程空閑則直接獲取,否則失敗。
我們首先關(guān)注下nonfairTryAcquire方法,它是一個非阻塞的方法,如果獲取到則返回true,獲取不到則返回false。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 這里直接cas和非公平鎖lock方法邏輯一樣 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果不是0,則判斷是否是同一個線程進行重入 else if (current == getExclusiveOwnerThread()) { // zhon int nextc = c + acquires; // 可重入此時增加 if (nextc < 0) // overflow 說明次數(shù)溢出,超出限制 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }再來看下公平策略下的搶占方式:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 其他代碼都是一樣的,只有這里稍有不同,也就是判斷前驅(qū)節(jié)點是否存在,不存在才能搶占,也就是先來后到不能插隊。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }回到acquire,如果搶占失敗則執(zhí)行AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))系列方法放入同步隊列并等待。這里不再詳述。
獲取鎖unlock
釋放鎖unlock() 調(diào)用的是sync.release(1);,這個也是AQS的方法,其也是先調(diào)用子類的tryRelease(1),如果成功,則判斷是否需要喚醒后繼節(jié)點,如果是則喚醒。我們重點講下子類的tryRelease方法,它的實現(xiàn)實在Sync里的,所以公平和非公平策略都一樣。
- tryRelease
可以看到,這里釋放主要是修改狀態(tài)值,如果狀態(tài)值達到0了,則需要進行下一步也就是喚醒protected final boolean tryRelease(int releases) { int c = getState() - releases; // 狀態(tài)值減1,也即是可重入次數(shù)。 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 如果狀態(tài)值為0,代表該線程已經(jīng)完全釋放鎖了。 free = true; setExclusiveOwnerThread(null); // 清空當前鎖的線程 } setState(c); return free;// 返回釋放結(jié)果 }
案例介紹
CopyOnWriteList
讀寫鎖ReentrantReadWriteLock
ReentrantLock鎖可以解決大部分并發(fā)問題,但是它是一個獨占鎖,也就是同一時刻只有一個線程持有,這在實際場景中可能性能不佳。其中讀多寫少的場景就無法滿足,因此ReentrantReadWriteLock就產(chǎn)生了,這是一個單線程寫,多線程多的實現(xiàn)。
其類圖如下:
其類圖比較復雜,但是其實核心還是Sync,只是,讀寫鎖包含兩把鎖readerLock和writerLock來控制讀寫
,其sync作用只是用來作為參數(shù)構(gòu)造這兩把鎖。同獨占鎖一樣,讀寫鎖也支持公平和非公平策略,所以其也有帶參數(shù)的構(gòu)造函數(shù)。
接下來我們跟著代碼看
結(jié)構(gòu)
CopyOnWriteList 結(jié)構(gòu)其實很簡單
1、屬性主要是readerLock和writerLock,而它們的核心是Sync。
2、方法基本都是getXXX方法
3、ReadLock、WriteLock也很簡單
-
構(gòu)造函數(shù)
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }從函數(shù)里面看出來,readerLock和writerLock以同一個sync(從這可以看出來這個類非常重要)作為參數(shù)進行構(gòu)建,F(xiàn)airSync和NonfairSync方法大都來自父類Sync,只有writerShouldBlock和readerShouldBlock稍有不同。
-
ReadLock
// 忽略相似的響應(yīng)超時和中斷的lock方法 public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); // 共享鎖的方式 } public boolean tryLock() { return sync.tryReadLock(); // 讀鎖 } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); // 共享鎖不支持條件隊列 } } -
WriteLock
// 忽略相似的響應(yīng)超時和中斷的lock方法 public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); // 獨占方式 } public boolean tryLock() { return sync.tryWriteLock(); // 寫鎖 } public void unlock() { sync.release(1); } public Condition newCondition() { throw new UnsupportedOperationException(); } }
看了上面的代碼發(fā)現(xiàn),兩個鎖都很簡單,一個就是獨占方式用來寫數(shù)據(jù),此操作不支持同時,一個就是共享方式來實現(xiàn)多線程讀,實現(xiàn)的邏輯都是其引用變量sync來實現(xiàn)的,很容易理解。關(guān)鍵的是它們的核心公共參數(shù)Sync。
核心類Sync
但其實Sync說難也不難,一共就400行代碼,接下來我們一起逐行來分析源碼
屬性
- static final int SHARED_SHIFT = 16;
- static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1左移16位=1*2^16=65536
- static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 65535
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 16個1
- static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 共享鎖數(shù)量
- static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 獨占鎖可重入次數(shù)
注意 c>>>SHARED_SHIFT c是一個4字節(jié)32位整型 >>>表示無符號右移,右移16位,其實就是獲取其高16位的值
再看 c & EXCLUSIVE_MASK,這是一個短連接,也就是獲取其后面16位。我們知道AQS將由此可見,讀寫鎖采用將int的前16位作為讀的狀態(tài),后16位作為寫的狀態(tài)(可結(jié)合后文中代碼理解)。
這些變量的作用是什么呢?
獨占鎖方法
寫鎖中的lock調(diào)用的是Sync中的acquire,這個方法來自于AQS,所以再重新回憶一下aquire(1)對應(yīng)的AQS方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看出它跟其他所不同之處還是tryAcquire方法,釋放unlock同樣使用了AQS的release(1),其子類核心是tryReleaseShared。
- tryAcquire
再次結(jié)合AQS回一下整個過程,首先去嘗試獲取鎖,如果成功則返回,如果失敗則加入同步隊列等待喚醒(park),或自旋獲取鎖(頭結(jié)點的下個節(jié)點)。這里的寫鎖與前面講到的獨占鎖幾乎一樣。protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //獲取狀態(tài)值的獨占狀態(tài)值,也就是可重入個數(shù) if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 走到這里說明是該線程占有鎖,且可重入次數(shù)沒有達到限制,重入次數(shù)+1 setState(c + acquires); return true; } // 走到這里說明鎖還沒有被占有,則判斷以下兩個條件: // 1. 寫的時候是否等待,公平鎖則需要等待前驅(qū)節(jié)點,非公平鎖不需要直接return false // 2. 嘗試CAS,如果失敗,說明其他線程拿到鎖了 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //走到這,說明當前線程競爭鎖成功。 setExclusiveOwnerThread(current); return true; } - tryRelease
這一段也不再贅述,邏輯與獨占鎖一模一樣protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } - tryWriteLock()
還是與獨占鎖對比(這兩個幾乎一樣),上面說了,獨占鎖中的trylock對于公平和非公平?jīng)]有區(qū)別都是用非公平的策略去實現(xiàn)的。這里同樣如此
有一點要注意ReentrantLock中trylock和lock共用了nofairTryAquire方法,但公平和非公平的lock方法不一致。final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); // 不再有前驅(qū)節(jié)點的判斷 return true; }
而這里是ReentrantReadWriteLock中的寫鎖正好相反,也就是公平和非公平的lock方法共用邏輯,但是tryWriteLock和lock的方法不一致。
但其實原理是一樣的。
共享鎖方法
因為我們文中前面部分已經(jīng)講過獨占鎖,所以寫鎖再理解起來就非常簡單了,但是讀鎖的具體實現(xiàn)我們還沒講過,不過原理大體是差不多的。
記得我們在AQS源碼解析文章中講過,共享鎖和獨占所得區(qū)別:
- 狀態(tài)操作不一樣,共享鎖是判斷state>0 ,則表示還可以被持有
- 獲取到鎖后以及釋放時的操作不一樣,共享鎖還會嘗試去擴散通知其他節(jié)點
- 共享鎖沒有條件隊列
我們接下來跟診源碼看一下具體實現(xiàn)
- tryAcquireShared
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState();// 獲取當前狀態(tài)和當前線程 // 判斷寫鎖是否被占用,記住寫的時候是不能讀的。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 三個條件: 1.讀的時候是否阻塞 2.判斷是否達到共享數(shù)量最大值 3.CAS成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) {// 如果為0則說明該線程是第一個讀線程,則記錄 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 到這說明不是第一個讀線程 HoldCounter rh = cachedHoldCounter; // 不是當前線程緩存值或者沒有緩存則設(shè)置緩存 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // // 7. 為什么要 count == 0 時進行 ThreadLocal.set? 因為上面tryReleaseShared方法 中當 count == 0 時, 進行了ThreadLocal.remove readHolds.set(rh); rh.count++; } return 1; } // 到這說明沒有獲取到鎖 return fullTryAcquireShared(current); // 自旋獲取鎖 } - fullTryAcquireShared
這個方法其實是 tryAcquireShared 的冗余(redundant)方法, 主要補足 readerShouldBlock 導致的獲取等待 和 CAS 修改 AQS 中 state 值失敗進行的修補工作
這段代碼我們重點看下什么情況進入重復循環(huán),這才是這個方法的意義,首先下列條件會返回-1:final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) // 到這里說明有線程正在寫,直接返回加入AQS隊列等待 return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 公平鎖表示如果有前驅(qū)節(jié)點,非公平鎖表示頭節(jié)點是否是寫鎖 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 到這說明不是第一個線程 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 到這里說明aqs 同步隊列里面有 獲取 readLock 的node 或 head.next 是獲取 writeLock 的節(jié)點 if (rh.count == 0) // 為真說明不是重入,且cas失敗 return -1; } } // 到這里說明不需要等待,只要能cas成功就行 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }- 如果有寫鎖或者頭節(jié)點的下一個節(jié)點想獲取寫鎖
- readerShouldBlock(非公平模式下,表示頭結(jié)點的下一個節(jié)點是獨占模式,這種設(shè)計是為了給寫鎖給予非常高的優(yōu)先級) 且不是該鎖重入
- readerShouldBlock(公平模式下,還有前驅(qū)節(jié)點無論讀寫) 且不是該鎖重入
上面三種情況都不會循環(huán)還是進入AQS同步隊列等待,只有在非block且當前沒有寫線程的時候,cas失敗才會去進入loop,這也即是這個函數(shù)的目的。
- tryReleaseShared
這個邏輯也比較簡單,和共享鎖相比,多了些操作重入次數(shù)的邏輯。protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; // 釋放 else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 自旋cas直到釋放 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
鎖升級/鎖降級
同一個線程中,在沒有釋放讀鎖的情況下,就去申請寫鎖,這屬于鎖升級,同一個線程中,在沒有釋放寫鎖的情況下,就去申請讀鎖,這屬于鎖降級,ReentrantReadWriteLock只支持降級,升級會導致死鎖。
案例:
public class ReentrantReadWriteLockTest {
public static void main(String[] args) throws Exception {
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
log.info("readWriteLock.readLock().lock() begin");
readWriteLock.readLock().lock();
log.info("readWriteLock.readLock().lock() over");
new Thread() {
@Override
public void run() {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
log.info("readLock().lock() begin:" );
readWriteLock.readLock().lock(); // 獲取過一次就能再次獲取, 但是若其他沒有獲取的線程因為 syn queue里面 head.next 是獲取write的線程, 則到
// syn queue 里面進行等待
log.info("readLock().lock() over:" );
}
}.start();
log.info("readWriteLock.writeLock().lock() begin");
readWriteLock.writeLock().lock();
log.info("readWriteLock.writeLock().lock() over");
}
}
如上輸出為:
readWriteLock.readLock().lock() begin
readWriteLock.readLock().lock() over
readWriteLock.writeLock().lock() begin
readLock().lock() begin:
可以看出讀鎖和寫鎖都阻塞了,發(fā)生了死鎖,原因是:
- main線程中首先占用了讀鎖,然后再去獲取寫鎖,這個時候會在tryAcquire中根據(jù)條件,c != 0&&w==0,表示當前有線程在讀,也就是自己main線程在讀,因此進入AQS隊列中等待。這就是升級會導致死鎖
- thread線程中嘗試去獲取讀線程,但發(fā)現(xiàn)AQS隊列中頭結(jié)點的下一個節(jié)點是寫鎖,因此也加入AQS中等待。
StampedLock
StampedLock這個類是在1.8版本加入的