ReentrantLock是獨(dú)占鎖,只允許一個(gè)線程執(zhí)行;CountDownLatch,Semaphore等是共享鎖;它們分別利用了AQS的獨(dú)占與共享功能;那么如果在讀操作遠(yuǎn)多于寫操作的情況下該如何選擇?讀寫鎖,之前的文章中介紹了如何自己實(shí)現(xiàn)一個(gè)讀寫鎖,還實(shí)現(xiàn)了重入功能,讀讀,寫寫,讀寫,寫讀四種重入?,F(xiàn)在來看看JUC包下的ReentrantReadWriteLock的實(shí)現(xiàn)。
先來大致了解下ReentrantReadWriteLock:
- 讀鎖是個(gè)共享鎖,寫鎖是個(gè)獨(dú)占鎖。讀鎖同時(shí)被多個(gè)線程獲取,寫鎖只能被一個(gè)線程獲取。讀鎖與寫鎖不能同時(shí)存在。
- 一個(gè)線程可以多次重復(fù)獲取讀鎖和寫鎖
- 鎖降級(jí):獲取寫鎖的線程又獲取了讀鎖,之后釋放寫鎖,就完成了一次鎖降級(jí)。
- 鎖升級(jí):不支持升級(jí)。獲取讀鎖的線程去獲取寫鎖的化會(huì)造成死鎖。
- 重入數(shù):讀鎖和寫鎖的最大重入數(shù)為65535
- 公平與非公平兩種模式
AQS維護(hù)了一個(gè)int值,表示同步狀態(tài);對于ReentrantLock,state會(huì)在0與1之間變化,1表示已被占有后續(xù)線程入隊(duì)列等待,0表示free。對于CountDownLatch,會(huì)先將state賦予個(gè)大于0的值,在該值變?yōu)?后喚醒等待隊(duì)列中的線程。那么如何用它來即表示讀鎖又表示寫鎖呢?讀鎖我們是允許多個(gè)線程同步運(yùn)行的,我們還允許重入,那么拿什么來記錄每個(gè)線程讀鎖的重入數(shù)?
針對上面兩個(gè)問題,對于同步狀態(tài)status,高16位表示所有線程持有的讀鎖總數(shù),低16位為一個(gè)線程的寫鎖總數(shù),包括重入。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
采用ThreadLocal來記錄每個(gè)線程鎖持有的讀鎖數(shù)目。
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
- HoldCounter 靜態(tài)內(nèi)部類用來記錄一個(gè)線程的讀鎖重入數(shù),以及id;
- ThreadLocalHoldCounter繼承了ThreadLocal,實(shí)現(xiàn)了initialValue方法,作用是在沒有set前調(diào)用get的話initialValue會(huì)被調(diào)用,HoldCounter對象會(huì)被存儲(chǔ)到Entry里,并返回它。變量名為readHolds,它用來存儲(chǔ)/獲取線程的讀鎖數(shù)量。因?yàn)樽x鎖是共享的,我們利用同步狀態(tài)的高16位來記錄總數(shù),用threadlocal來記錄每個(gè)線程所持有的讀鎖數(shù)目。對于寫鎖來說它是獨(dú)占鎖,低16位代表的就是當(dāng)前線程持有的寫鎖數(shù)目。
- cachedHoldCounter:它是一種優(yōu)化的手段,為了避免頻繁的調(diào)用ThreadLocalHoldCounter的讀取,更改甚至刪除操作,于是緩存最新一個(gè)成功獲取鎖的線程的HoldCounter,意思是當(dāng)一個(gè)線程需要記錄值的時(shí)候會(huì)先檢查自己是否是cachedHoldCounter中緩存的那個(gè)線程,是的話就不用再從readHolds中獲取了,減少對ThreadLocal的操作。
- firstReader 與firstReaderHoldCount:代表首個(gè)獲取讀鎖的線程與其所持有的讀鎖數(shù),該讀鎖數(shù)不會(huì)存儲(chǔ)進(jìn)readHolds,這是種優(yōu)化,針對只有一個(gè)線程的情況,避免頻繁操作readHolds。
ReentrantReadWriteLock繼承了ReadWriteLock,這兩個(gè)方法分別返回讀鎖與寫鎖。ReentrantReadWriteLock內(nèi)部實(shí)現(xiàn)了兩個(gè)類:ReadLock&WriteLock分別實(shí)現(xiàn)讀鎖與寫鎖。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
同ReentrantLock一樣內(nèi)部實(shí)現(xiàn)了非公平與公平兩種同步器:NonfairSync &FairSync ,繼承自同一同步器Sync。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
只定義了writerShouldBlock & readerShouldBlock兩種方法,它們作用在獲取鎖的過程中,決定當(dāng)前線程是否該阻塞。
一 讀鎖
1.獲取讀鎖
public void lock() {
sync.acquireShared(1);
}
定位到AQS的acquireShared,該方法之前介紹過。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
來看看Sync重寫的tryAcquireShared方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
寫鎖不為零且持有寫鎖的并非本線程,則返回-1,之后在acquireShared中將線程的節(jié)點(diǎn)放入到等待隊(duì)列中。寫鎖不為零但是正是本線程持有的,則代表寫讀重入。之后在readerShouldBlock返回false與CAS操作成功后,更新HoldCounter 的值,這里會(huì)對之前提到的firstReader ,firstReaderHoldCount 或cachedHoldCounter進(jìn)行相應(yīng)的操作。
如果CAS失敗或者readerShouldBlock返回true,則會(huì)調(diào)用fullTryAcquireShared,該方法會(huì)繼續(xù)嘗試獲取讀鎖,可以看成是tryAcquireShared的升級(jí)版。
先來看看readerShouldBlock()方法:
公平模式下,根據(jù)隊(duì)列中當(dāng)前線程之前有沒有等待的線程來決定。
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
非公平模式
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
調(diào)用apparentlyFirstQueuedIsExclusive()
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
- 返回true代表等待隊(duì)列head.next節(jié)點(diǎn)是等待寫鎖的線程,該方法的目的是不讓寫鎖一直等待下去;比如在上一篇自己實(shí)現(xiàn)的讀寫鎖中,通過增加一個(gè)寫請求變量來防止寫?zhàn)囸I,讓寫鎖的優(yōu)先級(jí)高于讀鎖。這里有相似的目的。
- 這個(gè)方法是不可靠的,因?yàn)樵跈z測過程中隊(duì)列結(jié)構(gòu)是在變化的,;但是我們并不依賴于它的準(zhǔn)確表達(dá),它更多是一種探測,一種優(yōu)化,我們希望它來防止寫鎖的饑餓;而且并不是該方法返回了true,線程就會(huì)被放入阻塞隊(duì)列退出競爭,來看fullTryAcquireShared的邏輯
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
之前說fullTryAcquireShared是tryAcquireShared的升級(jí)版,它處理了CAS失敗和readerShouldBlock返回true的情況;
- 先檢查寫鎖是否被其他線程占用;
- 調(diào)用readerShouldBlock檢查當(dāng)前線程是否應(yīng)該進(jìn)入等待隊(duì)列,返回true也不代表該線程要進(jìn)入等待隊(duì)列,我們看它的處理邏輯:如果firstReader == current代表當(dāng)前只有你一個(gè)讀線程,那么不用等待可以獲取讀鎖;只有在rh.count == 0(意味著該線程沒有持有讀鎖)的情況下返回-1代表線程要進(jìn)入等待隊(duì)列。為什么?持有讀鎖的線程不能進(jìn)入同步隊(duì)列?
- 之后便是CAS,成功便更改HoldCounter值返回1,代表獲取讀鎖成功,否則循環(huán)再次檢查,總之不能輕易的將線程放入等待隊(duì)列,容易造成死鎖。
上面問題的解答:一個(gè)線程是不能隨便放入隊(duì)列中等待的,容易造成死鎖,看下面兩種情況:
- 如果一個(gè)線程持有讀鎖,重入失敗被放入等待隊(duì)列,若等待隊(duì)列中排在它前面的線程里有等待寫鎖的線程,那么就會(huì)造成死鎖,因?yàn)樽x鎖與寫鎖是互斥的。
- 假設(shè)一個(gè)線程持有寫鎖進(jìn)行“鎖降級(jí)申請”,被放入同步隊(duì)列,那么不僅之后的讀寫線程都會(huì)被放入隊(duì)列,隊(duì)列中之前有等待線程,無論等待的是讀或?qū)戞i都將造成死鎖。
回到fullTryAcquireShared
- 它先判斷是否有寫鎖,如果有且就是本線程就不會(huì)進(jìn)行readerShouldBlock判斷,直接CAS,這樣便解決了情況2的問題;
- 針對情況1線程持有讀鎖的情況,即使readerShouldBlock返回true,rh.count == 0不符合不會(huì)返回-1,也就不會(huì)將線程放入隊(duì)列。
總結(jié)就是fullTryAcquireShared,采用for循環(huán)方式讓線程不斷判斷與嘗試,且只有在一種情況下才會(huì)將線程放入隊(duì)列:readerShouldBlock返回true(原因可能是公平模式或者第一個(gè)等待線程(head.next)在等待寫鎖),當(dāng)前線程不是第一個(gè)讀線程且沒有持有讀鎖。
2.釋放讀鎖
public void unlock() {
sync.releaseShared(1);
}
定位到AQS的releaseShared,之前介紹過。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
來看看同步器Syn實(shí)現(xiàn)的tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
1.如果是firstReader ,就對它及firstReader進(jìn)行修改;2.如果不是,就對readHolds進(jìn)行修改;3. 自旋CAS修改status
返回true代表status == 0,表示既沒有讀鎖也沒有寫鎖。
3. tryLock
public boolean tryLock() {
return sync.tryReadLock();
}
調(diào)用了同步器Syn的tryReadLock
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
感覺與tryAcquireShared很像,不同在于tryReadLock只嘗試獲取讀鎖一次,成功就返回true,否則false;這是由于方法用途不同,所以設(shè)計(jì)自然不同。
二, 寫鎖
1,獲取寫鎖
public void lock() {
sync.acquire(1);
}
定位到AQS的acquire中,AQS文章里介紹過。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
來看看同步器Syn重寫的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
一 :c != 0下分兩種 1. w == 0 代表有讀鎖;2. w != 0 && current != getExclusiveOwnerThread() 代表有其他寫鎖;以上兩種返回false,線程要進(jìn)入等待隊(duì)列。否則就設(shè)置status值,返回true,獲取寫鎖成功。
二 :c == 0情況下要看writerShouldBlock的情況,返回false就會(huì)去CAS更改同步狀態(tài),成功就將AOS里的exclusiveOwnerThread設(shè)置位當(dāng)前線程,最后返回true;
注:情況一用setState更改同步狀態(tài),情況二用compareAndSetState?情況一執(zhí)行到setState這步說明當(dāng)前線程已持有寫鎖,是在重入,其他線程都會(huì)被排斥不同擔(dān)心線程安全問題,所以setState就可以,同步狀態(tài)status是volatile的。情況二里當(dāng)前即無讀鎖也無寫鎖,當(dāng)前線程始于其他線程在競爭,所以要利用CAS來保證原子性。
持有讀鎖線程不能申請寫鎖,即不能升級(jí),從tryAcquire可以看出持有讀鎖線程一定會(huì)返回false,也就是會(huì)被放入隊(duì)列中等待寫鎖,但是它持有的讀鎖將不會(huì)被釋放,那么寫鎖就不肯能獲取到,雖然不影響讀鎖的獲取,但所有寫鎖都將不能被獲取到。
來看看writerShouldBlock方法
非公平模式:直接返回false,也就是寫請求可以插隊(duì),即寫優(yōu)先級(jí)高;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
公平模式:考慮的是當(dāng)前隊(duì)列是否有等待的線程
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
2,釋放寫鎖
public void unlock() {
sync.release(1);
}
定位到AQS中的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
來看看同步器Syn重寫的tryRelease
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
注意返回值:返回true代表寫鎖個(gè)數(shù)為0,也就是寫鎖可用;返回false表示寫鎖仍被當(dāng)前線程占著,可能是因?yàn)楫?dāng)前線程重入了寫鎖。
3,tryLock
public boolean tryLock( ) {
return sync.tryWriteLock();
}
定位到同步器Syn
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryWriteLock方法看上去跟tryAcquire方法真的很像。唯一的區(qū)別在于,tryWriteLock忽略的writerShouldBlock方法;該方法的調(diào)用就是去搶寫鎖,搶不到返回false就行了。