java并發(fā)之ReentrantReadWriteLock
知識(shí)導(dǎo)讀
- 讀寫鎖內(nèi)部維護(hù)了兩個(gè)分離的鎖,讀鎖和寫鎖,兩個(gè)鎖共用一個(gè)AQS實(shí)現(xiàn)。state的高16位記錄讀鎖資源占用,低16位記錄寫鎖資源占用。讀鎖基于AQS的共享模式實(shí)現(xiàn),寫鎖基于AQS的獨(dú)占模式實(shí)現(xiàn)
- 讀鎖和寫鎖都是可重入,提供公平模式和非公平模式的。非公平模式的讀鎖要優(yōu)先等待隊(duì)列中頭部是寫鎖的線程去獲取寫鎖。
- 當(dāng)一個(gè)線程持有寫鎖的時(shí)候,允許當(dāng)前線程再添加讀鎖,鎖降級(jí);當(dāng)一個(gè)線程持有讀鎖的時(shí)候不允許再添加寫鎖,即不允許鎖升級(jí)。鎖釋放的時(shí)候需要按照加鎖順序釋放
原理
- ReentrantReadWriteLock持有兩把鎖,readerLock和writerLock
- readerLock是基于AQS共享模式的可重入、可共享的鎖。提供了公平和非公平兩種加鎖模式
- writerLock是基于AQS獨(dú)占模式的可重入鎖。提供了公平和非公平兩種加鎖模式
- ReentrantReadWriteLock的內(nèi)部類Sync繼承了AQS,覆寫實(shí)現(xiàn)了獨(dú)占模式和共享模式獲取同步狀態(tài)和釋放資源的邏輯,readerLock和writerLock是基于同一個(gè)Sync實(shí)例實(shí)現(xiàn)的。Sync中state變量的高16位記錄讀鎖的資源獲取情況,state變量的低16位記錄寫鎖的資源獲取情況。
- 當(dāng)一個(gè)線程持有寫鎖的時(shí)候,允許當(dāng)前線程再添加讀鎖,鎖降級(jí);當(dāng)一個(gè)線程持有讀鎖的時(shí)候不允許再添加寫鎖,即不允許鎖升級(jí)。鎖釋放的時(shí)候需要按照加鎖順序釋放
- 添加讀鎖后不允許添加寫鎖,個(gè)人感覺(jué)原因是讀鎖與讀鎖是可以共存的,當(dāng)加了讀鎖后再加寫鎖需要其他線程持有的讀鎖全部釋放,單純的只看添加讀鎖的是否是本線程不行。不過(guò)也能做啊,為啥不支持,還是想不通,判斷一下讀鎖的state數(shù)量和firstReaderHoldCount數(shù)一致就能判斷出當(dāng)前讀鎖是否只被當(dāng)前線程持有
- 獲取讀、寫鎖失敗被阻塞的線程都會(huì)添加到同一個(gè)隊(duì)列中,當(dāng)讀鎖很多的時(shí)候,容易反生寫鎖饑餓的情況
注意:state變量是一個(gè)int值,高16位和低16位分別表示讀鎖和寫鎖的資源數(shù)量,所以讀、寫鎖的資源都有個(gè)最大值,2的16次方-1
鎖降級(jí)實(shí)現(xiàn)
class CashData {
Object obj;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public Object load() {
rwl.readLock().lock();
if (!cacheValid) {
// 釋放讀鎖
rwl.readLock().unlock();
// 要進(jìn)行賦值,添加寫鎖
rwl.writeLock().lock();
try {
if (!cacheValid) {
cacheValid = true;
obj = 1;
}
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
rwl.readLock().unlock();
return obj;
}
}
源碼分析
基本構(gòu)造
ReentrantReadWriteLock內(nèi)部持有兩個(gè)鎖實(shí)現(xiàn)。api都是調(diào)用這兩個(gè)鎖的api,ReentrantReadWriteLock只是一個(gè)外層包裝。在創(chuàng)建ReentrantReadWriteLock實(shí)例的時(shí)候會(huì)初始化一個(gè)Sync實(shí)例,讀鎖和寫鎖都是基于Sync實(shí)現(xiàn)。
private final ReentrantReadWriteLock.ReadLock readerLock;//讀鎖
private final ReentrantReadWriteLock.WriteLock writerLock;//寫鎖
//AQS實(shí)現(xiàn)類,基于該類實(shí)現(xiàn)
final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
//創(chuàng)建讀、寫鎖的時(shí)候會(huì)使用 同一個(gè)Sync來(lái)創(chuàng)建
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
ReadLock實(shí)現(xiàn)了Lock接口,基于ReentrantReadWriteLock的內(nèi)部類Sync的共享模式模式實(shí)現(xiàn),每次加鎖需要修改state的高16位值+1
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
//使用AQS的共享模式模式
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
WriteLock實(shí)現(xiàn)了Lock接口,基于ReentrantReadWriteLock的內(nèi)部類Sync的獨(dú)占模式實(shí)現(xiàn),每次加鎖需要修改state的低16位值+1
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
//使用AQS的獨(dú)占模式
public void lock() {
sync.acquire(1);
}
//使用AQS的獨(dú)占模式
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//使用AQS的獨(dú)占模式
public boolean tryLock( ) {
return sync.tryWriteLock();
}
//使用AQS的獨(dú)占模式
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//使用AQS的獨(dú)占模式
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
Sync實(shí)現(xiàn)
ReentrantReadWriteLock中的讀、寫鎖各個(gè)方法的實(shí)現(xiàn)都依賴內(nèi)部類Sync。Sync繼承了AQS,覆寫了AQS的獨(dú)占模式方法和共享模式方法
Sync內(nèi)部構(gòu)造
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//獲取當(dāng)前讀鎖數(shù)量(共享+重入),高16位的數(shù)值
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//獲取當(dāng)前寫鎖數(shù)量(重入),低16位的數(shù)值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}
獨(dú)占模式(寫鎖實(shí)現(xiàn))
獨(dú)占模式加鎖
Sync覆寫AQS的tryAcquire方法,只允許一個(gè)線程獲取寫鎖的同步狀態(tài)成功
- 如果當(dāng)前存在讀鎖,不允許添加寫鎖,無(wú)論是否持有讀鎖的線程是否為本線程,返回獲取同步狀態(tài)失敗,當(dāng)前線程入隊(duì)自旋阻塞
- 如果當(dāng)前有其他線程持有寫鎖,返回獲取同步狀態(tài)失敗,當(dāng)前線程入隊(duì)自旋阻塞
- 如果持有寫鎖的是當(dāng)前線程,重入獨(dú)占鎖,直接設(shè)置state值
- 如果寫鎖的數(shù)量超過(guò)允許的最大值,則返回獲取同步狀態(tài)失敗
- CAS設(shè)置state值
- CAS設(shè)置state失敗,則返回獲取同步狀態(tài)失敗
- CAS設(shè)置state成功,則獲取同步狀態(tài)成功,設(shè)置持有寫鎖的線程為當(dāng)前線程,返回true,當(dāng)前線程繼續(xù)執(zhí)行
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);//獲取當(dāng)前寫鎖數(shù)量
if (c != 0) {
//有其他線程持有讀鎖,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//重入
setState(c + acquires);
return true;
}
//公平鎖和非公平鎖邏輯 由writerShouldBlock方法來(lái)判斷
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//獲取同步狀態(tài)成功,設(shè)置持有鎖的線程為當(dāng)前線程
setExclusiveOwnerThread(current);
return true;
}
非公平鎖NonfairSync的writerShouldBlock方法直接返回false,可以直接CAS設(shè)置state值來(lái)獲取同步狀態(tài)
公平鎖FairSync的writerShouldBlock調(diào)用AQS的hasQueuedPredecessors方法來(lái)判斷,跟ReenTrantLock一樣,判斷AQS同步隊(duì)列中是否存在排在該線程之前的的節(jié)點(diǎn),保證先入隊(duì)的先執(zhí)行
獨(dú)占模式釋放鎖
Sync覆寫AQS的tryRelease方法,定義釋放資源的邏輯。由于是獨(dú)占模式,只會(huì)有一個(gè)線程同時(shí)執(zhí)行該方法,不會(huì)存在并發(fā)問(wèn)題
- 首先判斷如果不是持有寫鎖的線程,調(diào)用該方法直接拋出異常
- state的低16位存儲(chǔ)寫鎖數(shù),沒(méi)releae一次,低16位減去1,當(dāng)?shù)?6位數(shù)值為0的時(shí)候,釋放寫鎖成功,修改持有寫鎖的線程為null
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//判斷寫鎖數(shù)量是否為0,0的時(shí)候修改持有寫鎖的線程為null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
共享模式(讀鎖實(shí)現(xiàn))
共享模式加鎖
Sync覆寫AQS的tryAcquireShared方法,定義獲取同步狀態(tài)的邏輯
- 存在寫鎖,如果持有寫鎖的是當(dāng)前線程則允許添加讀鎖,如果持有寫鎖的是其他線程則不允許添加讀鎖
- 調(diào)用readerShouldBlock來(lái)判斷是否公平與非公平
- 判斷讀鎖數(shù)量不要超過(guò)最大值,不能超過(guò)16位數(shù)值的最大值
- 存在并發(fā)則CAS設(shè)置state的值,如果成功則獲取讀鎖的同步狀態(tài)成功,返回1
- 如果獲取讀鎖的同步狀態(tài)失敗,調(diào)用fullTryAcquireShared方法,循環(huán)再次嘗試一遍
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//其他線程持有寫鎖,直接返回-1,獲取同步狀態(tài)失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//獲取state的高16位數(shù)值(讀鎖數(shù)量)
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//讀鎖數(shù)量為0,第一次加讀鎖,設(shè)置firstReader為當(dāng)前線程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //讀鎖重入,如果當(dāng)前線程是第一個(gè)持有該讀鎖的線程,計(jì)數(shù)器+1
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
//成功獲取同步狀態(tài),返回一個(gè)正整數(shù)或者0
return 1;
}
//獲取同步狀態(tài)失敗
return fullTryAcquireShared(current);
}
fullTryAcquireShared方法實(shí)現(xiàn)基本與tryAcquireShared方法中的邏輯一樣,多了一層CAS設(shè)置失敗后再for循環(huán)重試
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
公平鎖FairSync的readerShouldBlock調(diào)用AQS的hasQueuedPredecessors方法來(lái)判斷,跟ReenTrantLock一樣,判斷AQS同步隊(duì)列中是否存在排在該線程之前的的節(jié)點(diǎn),保證先入隊(duì)的先執(zhí)行
非公平鎖NonfairSync的readerShouldBlock實(shí)現(xiàn),如果AQS同步隊(duì)列中第一個(gè)是等待寫鎖的線程則返回true,當(dāng)前獲取讀鎖的線程先阻塞,優(yōu)先讓等待隊(duì)列的線程去獲取寫鎖
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
共享模式釋放鎖
Sync覆寫AQS的tryReleaseShared方法,定義釋放資源的邏輯。當(dāng)前是共享模式,會(huì)有多個(gè)線程同時(shí)獲取執(zhí)行權(quán),所以該方法會(huì)存在多線程并發(fā)執(zhí)行的情況
- 修改firstReader和firstReaderHoldCount變量值,感覺(jué)是為了查詢方便
- for循環(huán)加CAS修改state的高16位值,當(dāng)state的高16位為0的時(shí)候,釋放成功返回true
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}