ReentrantReadWriteLock可重入讀寫鎖,先從功能以及具體實現(xiàn)有一個簡單的了解
一、兩把鎖
writerLock,readerLock分為讀鎖跟寫鎖,他們之間的共存與互斥進(jìn)行一個簡單羅列
寫寫互斥、讀寫互斥、寫讀可降級,讀寫不可升級、讀讀可并行
這是我的總結(jié),另外競爭狀態(tài)同樣是使用state變量進(jìn)行的,通過高16進(jìn)行共享鎖也就是讀讀線程之間的共享模式調(diào)度,低16位用于寫鎖的阻塞模式。
也就是說寫鎖重入次數(shù)是int最大取值范圍的差不多一半,不過這也夠用了。
二、同步器代碼
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
這幾個成員屬性主要是做state值的切分與轉(zhuǎn)換的,
SHARED_SHIFT =16 等于是將int的state 4字節(jié) 32位進(jìn)行切分,16位
SHARED_UNIT 左移位16位也就是實際share模式(讀鎖時候state的)存儲時候是以65536為底,
MAX_COUNT 讀鎖或者寫鎖最大的資源數(shù),65535
EXCLUSIVE_MASK 字面意思獨占模式的掩碼,其實就是為了方便進(jìn)行獨占模式進(jìn)行位&運算
sharedCount(int c) 通過無符號右移操作得出目前share資源的剩余數(shù)量
exclusiveCount(int c) 通過掩碼的與運算,"抹掉"高位的讀鎖占用的變量值,不得不說你大爺還是你大爺,如果是我也就是加加減減那么用。
用于存儲讀鎖的計數(shù)器在重入時候使用,代碼如下
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
讀鎖的重入計數(shù)是通過線程本地變量ThreadLocal里面存儲了HoldCounter實現(xiàn)的
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
接下來對幾個變量做一個簡單說明就開始分析幾個重要的方法
當(dāng)前線程對自身重入計數(shù)器的一個引用
private transient ThreadLocalHoldCounter readHolds;
sync對象內(nèi)部緩存的最后一個獲得讀鎖的線程計數(shù)器對象引用目的提升查詢比較時候的性能
private transient HoldCounter cachedHoldCounter;
sync對象內(nèi)部緩存的第一個獲取讀鎖的線程
private transient Thread firstReader = null;
sync對象內(nèi)部緩存的第一個獲取讀鎖的線程重入次數(shù)
private transient int firstReaderHoldCount;
兩個抽象方法定義要求公平鎖與非公平鎖去實現(xiàn)在嘗試獲得鎖時候是否進(jìn)行當(dāng)前線程的阻塞
判斷讀鎖是否要阻塞
abstract boolean readerShouldBlock();
判斷寫鎖是否需要阻塞
abstract boolean readerShouldBlock()
公平鎖實現(xiàn)
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
判斷當(dāng)前阻塞隊列是否有等待節(jié)點
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
判斷當(dāng)前阻塞隊列是否有等待節(jié)點
return hasQueuedPredecessors();
}
}
非公平鎖實現(xiàn)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
不阻塞嘗試進(jìn)行加鎖
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
判斷當(dāng)前是否有寫鎖,如果沒有寫鎖就不阻塞
return apparentlyFirstQueuedIsExclusive();
}
}
下面仔細(xì)分解一下讀鎖跟寫鎖的加鎖與釋放鎖的相關(guān)代碼
從命名就可以看出來哪些是用于寫鎖的
protected final boolean tryRelease(int releases) {
//因為寫鎖是獨占模式,如果當(dāng)前線程不是鎖持有者則屬于異常狀態(tài)
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//因為支持重入所以不只是0跟1的關(guān)系
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//當(dāng)state減到0的時候進(jìn)行競爭狀態(tài)的釋放,并修改當(dāng)前寫鎖線程持有者為空
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
這部分是寫鎖的獲取,寫鎖的獲取稍微有些復(fù)雜
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
取出當(dāng)前state的數(shù)值
int c = getState();
通過與運算算出當(dāng)前獨占鎖下數(shù)值
int w = exclusiveCount(c);
如果是state不為0有可能有線程獲得了讀鎖,也有可能是有現(xiàn)成獲得寫鎖
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
這表示兩種情況下都失敗
1.有讀鎖存在
2.有寫鎖,但是當(dāng)前寫鎖持有線程不是當(dāng)前線程
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
state不為0并且當(dāng)前線程又為鎖持有線程
鎖重入增加state的值
setState(c + acquires);
return true;
}
如果當(dāng)前state為0走入這個分支,判斷是否需要等待,
或者嘗試修改競爭狀態(tài)失敗返回加鎖失敗否則設(shè)置當(dāng)前
線程為鎖持有線程
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
下面是讀鎖的獲取與釋放邏輯相關(guān)的代碼
個人認(rèn)為讀鎖的獲取相對值得自信分解,因為里面會涉及一個鎖降級過程,并且還有一個令我想了挺長沒get到李大爺?shù)狞c的地方,就是一個死循環(huán)去獲取讀鎖。前提條件是目前無寫鎖未被持有
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
當(dāng)前如果有寫鎖,并且寫鎖線程不是自己就進(jìn)入讀鎖獲取邏輯
這里面是有含義的包含了鎖降級的支持,
假設(shè)一個線程獲得了寫鎖,那么第一個條件是成立的,他會判斷持有線程是不是當(dāng)前線程,如果是當(dāng)前線程那么還是會執(zhí)行下面讀鎖獲取代碼。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
這是那個困擾我地方,為什么跟上面方法一樣還要再來這么個方法,截止到目前我的理解是這樣的,之所以可以這么任性的死循環(huán)其實是因為讀鎖數(shù)量限制造成的,以往比如信號量,他雖然會通過共享模式提高并發(fā)線程數(shù),但是并發(fā)量會被嚴(yán)格限制,讀鎖比較特殊在源碼一開始就有一段話
This lock supports a maximum of 65535 recursive write locks
and 65535 read locks. Attempts to exceed these limits result in
*{@link Error} throws from locking methods.
正是因為這個近乎不限制并發(fā)讀數(shù)量的做法才允許這么任性的死循環(huán),因為這個并發(fā)獲得讀鎖其實是很快的,所以不需要進(jìn)入阻塞隊列進(jìn)行排隊,但是這里里面有一個前提是當(dāng)前沒有被寫鎖阻塞,因為如果寫鎖阻塞大量的循環(huán)是無效的依然會浪費性能,還是要去排隊的。代碼如下,就是為了在一次獲得
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
另外還有兩個嘗試獲得鎖的方法一個是嘗試獲得寫鎖,一個是嘗試獲得讀鎖,基礎(chǔ)邏輯為不進(jìn)行是否應(yīng)該阻塞當(dāng)前線程判斷,而是直接進(jìn)行嘗試,加鎖成功則返回true否則返回false。這里面還是要特別注意讀鎖的獲取依然采用的循環(huán)方式,還是因為不限制讀鎖數(shù)量的原因
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
當(dāng)當(dāng)前沒有寫鎖時候就一直進(jìn)行循環(huán)添加當(dāng)前線程讀鎖進(jìn)行嘗試
int c = getState();
鎖降級
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
最后一個注意點就是condition的支持,寫鎖支持condition,讀鎖不支持
可以通過代碼就看的出來
讀鎖中獲得conditon方法
public Condition newCondition() {
throw new UnsupportedOperationException();
}
寫鎖中獲得condition方法
public Condition newCondition() {
return sync.newCondition();
}
其他方法就不進(jìn)行說明了,都是一些統(tǒng)計或者判斷的方法,避免沒有重點。
最后做一個總結(jié)吧
1、可重入讀寫鎖利用state變量標(biāo)記競爭狀態(tài)具體是高16位用于讀鎖分配計數(shù)低16位用于寫鎖重入計數(shù)
2、讀鎖的最大并發(fā)量是65535個讀并發(fā),寫鎖支持65535次重入。
3、寫鎖的獲取與重入跟ReetrantLock區(qū)別不算大,都是通過state進(jìn)行重入計數(shù)的
4、鎖關(guān)系總結(jié)如下,讀鎖被持有則寫鎖獲取失敗、寫鎖被持有則其他線程獲取讀鎖或者寫鎖失敗,但是本線程可以支持獲取讀鎖(鎖降級)。讀鎖與讀鎖是可以并發(fā)獲取的。
5、讀鎖的重入計數(shù)是通過ThreadLocal里面的HoldCounter進(jìn)行存儲的。為了提升對比效率做了一些對象的緩存firsterReader,firstReaderHoldCount、cachedHoldCounter