一、框架圖:

從中可以看出:
(01) ReentrantReadWriteLock實(shí)現(xiàn)了ReadWriteLock接口。ReadWriteLock是一個(gè)讀寫鎖的接口,提供了"獲取讀鎖的readLock()函數(shù)" 和 "獲取寫鎖的writeLock()函數(shù)"。
(02) ReentrantReadWriteLock中包含:sync對(duì)象,讀鎖readerLock和寫鎖writerLock。讀鎖ReadLock和寫鎖WriteLock都實(shí)現(xiàn)了Lock接口。讀鎖ReadLock和寫鎖WriteLock中也都分別包含了"Sync對(duì)象",它們的Sync對(duì)象和ReentrantReadWriteLock的Sync對(duì)象 是一樣的,就是通過sync,讀鎖和寫鎖實(shí)現(xiàn)了對(duì)同一個(gè)對(duì)象的訪問。
(03) 和"ReentrantLock"一樣,sync是Sync類型;而且,Sync也是一個(gè)繼承于AQS的抽象類。Sync也包括"公平鎖"FairSync和"非公平鎖"NonfairSync。sync對(duì)象是"FairSync"和"NonfairSync"中的一個(gè),默認(rèn)是"NonfairSync"。
二、介紹
ReadWriteLock,顧名思義,是讀寫鎖。它維護(hù)了一對(duì)相關(guān)的鎖 — — “讀取鎖”和“寫入鎖”,一個(gè)用于讀取操作,另一個(gè)用于寫入操作。
“讀取鎖”用于只讀操作,它是“共享鎖”,能同時(shí)被多個(gè)線程獲取。
“寫入鎖”用于寫入操作,它是“獨(dú)占鎖”,寫入鎖只能被一個(gè)線程鎖獲取。
注意:不能同時(shí)存在讀取鎖和寫入鎖!
ReadWriteLock是一個(gè)接口。ReentrantReadWriteLock是它的實(shí)現(xiàn)類,ReentrantReadWriteLock包括子類ReadLock和WriteLock。
三、特點(diǎn)
復(fù)用的state值
state表示持有鎖的數(shù)量,因?yàn)镽eentrantReadWriteLock分為“讀鎖”和“寫鎖”兩把鎖,所以它把低4位用來表示“寫鎖(獨(dú)占鎖)”的持有數(shù),其他位數(shù)表示“讀鎖(共享鎖)”的持有數(shù)。
代碼:
/*
*?Read?vs?write?count?extraction?constants?and?functions.
*?Lock?state?is?logically?divided?into?two?unsigned?shorts:
*?The?lower?one?representing?the?exclusive?(writer)?lock?hold?count,
*?and?the?upper?the?shared?(reader)?hold?count.
*/
static?final?intSHARED_SHIFT=16;
static?final?intSHARED_UNIT=?(1<
static?final?intMAX_COUNT=?(1<
static?final?intEXCLUSIVE_MASK=?(1<
/**?Returns?the?number?of?shared?holds?represented?in?count? */
static?intsharedCount(intc)? ? {returnc?>>>SHARED_SHIFT;}
/**?Returns?the?number?of?exclusive?holds?represented?in?count? */
static?intexclusiveCount(intc)?{returnc?&EXCLUSIVE_MASK;}
ThreadLocal變量,保持線程和獲取“共享鎖”的次數(shù)
代碼
static final classThreadLocalHoldCounter
extendsThreadLocal {
publicHoldCounterinitialValue() {
return newHoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transientThreadLocalHoldCounterreadHolds;
四、源碼分析
還是從lock()開始分析:
ReadLock中的lock():
public?voidlock()?{
sync.acquireShared(1);
}
AQS中的acquireShared():
public?final?voidacquireShared(intarg)?{
if(tryAcquireShared(arg)?<0)
doAcquireShared(arg);
}
ReentrantReadWriteLock中的tryAcquireShared(arg)
protected?final?inttryAcquireShared(intunused)?{
/*
*?Walkthrough:
*?1.?If?write?lock?held?by?another?thread,?fail.
*?2.?Otherwise,?this?thread?is?eligible?for
*? ? lock?wrt?state,?so?ask?if?it?should?block
*? ? because?of?queue?policy.?If?not,?try
*? ? to?grant?by?CASing?state?and?updating?count.
*? ? Note?that?step?does?not?check?for?reentrant
*? ? acquires,?which?is?postponed?to?full?version
*? ? to?avoid?having?to?check?hold?count?in
*? ? the?more?typical?non-reentrant?case.
*?3.?If?step?2?fails?either?because?thread
*? ? apparently?not?eligible?or?CAS?fails?or?count
*? ? saturated,?chain?to?version?with?full?retry?loop.
*/
Thread?current?=?Thread.currentThread();
intc?=?getState();
if(exclusiveCount(c)?!=0&& ? ? ? //如果是獨(dú)占鎖,而且不是當(dāng)前線程,直接返回-1
getExclusiveOwnerThread()?!=?current)
return-1;
intr?=sharedCount(c);
if(!readerShouldBlock()?&& ? ? ? ?//如果不需要“阻塞等待”,而且“讀取鎖”的計(jì)數(shù)小于“MAX_COUNT”,“讀取鎖”的狀態(tài)+1
r?
compareAndSetState(c,c?+SHARED_UNIT))?{
if(r?==0)?{
firstReader=?current;
firstReaderHoldCount=1;
}else?if(firstReader==?current)?{
firstReaderHoldCount++;
}else{
HoldCounter?rh?=cachedHoldCounter; ? ? ?//readHolds是一個(gè)ThreadLocal,記錄當(dāng)前線程的獲取“讀取鎖”的次數(shù),cacheHoldCounter是當(dāng)前線程的緩存,避免每次都從ThreadLocal中拿。這個(gè)次數(shù)當(dāng)然要+1
if(rh?==null||?rh.tid!=getThreadId(current))
cachedHoldCounter=?rh?=readHolds.get();
else?if(rh.count==0)
readHolds.set(rh);
rh.count++;
}
return1;
}
returnfullTryAcquireShared(current);
}
說明:tryAcquireShared()的作用是嘗試獲取“共享鎖”。
如果在嘗試獲取鎖時(shí),“不需要阻塞等待”并且“讀取鎖的共享計(jì)數(shù)小于MAX_COUNT”,則直接通過CAS函數(shù)更新“讀取鎖的共享計(jì)數(shù)”,以及將“當(dāng)前線程獲取讀取鎖的次數(shù)+1”。
否則,通過fullTryAcquireShared()獲取讀取鎖。
ReentrantReadWriteLock中的fullTryAcquireShared(thread)
final?intfullTryAcquireShared(Thread?current)?{
/*
*?This?code?is?in?part?redundant?with?that?in
*?tryAcquireShared?but?is?simpler?overall?by?not
*?complicating?tryAcquireShared?with?interactions?between
*?retries?and?lazily?reading?hold?counts.
*/
HoldCounter?rh?=null;
for(;;)?{
intc?=?getState();
if(exclusiveCount(c)?!=0)?{//如果是獨(dú)占鎖,且當(dāng)前線程不是“獨(dú)占鎖”的持有者,則什么都不干,直接返回-1
if(getExclusiveOwnerThread()?!=?current)
return-1;
//如果“需要阻塞等待”。
//(01)?當(dāng)“需要阻塞等待”的線程是第1個(gè)獲取鎖的線程的話,則繼續(xù)往下執(zhí)行。
//(02)?當(dāng)“需要阻塞等待”的線程獲取鎖的次數(shù)=0時(shí),則返回-1。
}else?if(readerShouldBlock())?{
//?Make?sure?we're?not?acquiring?read?lock?reentrantly
if(firstReader==?current)?{
//?assert?firstReaderHoldCount?>?0;
}else{
if(rh?==null)?{
rh?=cachedHoldCounter;
if(rh?==null||?rh.tid!=getThreadId(current))?{
rh?=readHolds.get();
if(rh.count==0)
readHolds.remove();
}
}
if(rh.count==0)
return-1;
}
}
//如果共享統(tǒng)計(jì)數(shù)超過MAX_COUNT,則拋出異常。
if(sharedCount(c)?==MAX_COUNT)
throw?newError("Maximum?lock?count?exceeded");
if(compareAndSetState(c,c?+SHARED_UNIT))?{
if(sharedCount(c)?==0)?{//更新state值,如果還沒有線程獲得“讀鎖”,那么就把當(dāng)前線程更新為firstReader,firstReaderHoldCounter值設(shè)為1
firstReader=?current;
firstReaderHoldCount=1;
}else?if(firstReader==?current)?{//如果當(dāng)前線程就是鎖的持有者,firstReadHoldCounter+1
firstReaderHoldCount++;
}else{//都不滿足的話,從緩存或者LocalHold中獲取當(dāng)前線程獲得“讀鎖”的次數(shù),+1
if(rh?==null)
rh?=cachedHoldCounter;
if(rh?==null||?rh.tid!=getThreadId(current))
rh?=readHolds.get();
else?if(rh.count==0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter=?rh;//?cache?for?release
}
return1;
}
}
}
說明:fullTryAcquireShared()會(huì)根據(jù)“是否需要阻塞等待”,“讀取鎖的共享計(jì)數(shù)是否超過限制”等等進(jìn)行處理。如果不需要阻塞等待,并且鎖的共享計(jì)數(shù)沒有超過限制,則通過CAS嘗試獲取鎖,并返回1。
doAcquireShared()定義在AQS函數(shù)中
privatevoiddoAcquireShared(intarg) {//addWaiter(Node.SHARED)的作用是,創(chuàng)建“當(dāng)前線程”對(duì)應(yīng)的節(jié)點(diǎn),并將該線程添加到CLH隊(duì)列中。finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {//獲取“node”的前一節(jié)點(diǎn)finalNode p =node.predecessor();//如果“當(dāng)前線程”是CLH隊(duì)列的表頭,則嘗試獲取共享鎖。if(p ==head) {intr =tryAcquireShared(arg);if(r >= 0) {
setHeadAndPropagate(node, r);
p.next=null;//help GCif(interrupted)
selfInterrupt();
failed=false;return;
}
}//如果“當(dāng)前線程”不是CLH隊(duì)列的表頭,則通過shouldParkAfterFailedAcquire()判斷是否需要等待,//需要的話,則通過parkAndCheckInterrupt()進(jìn)行阻塞等待。若阻塞等待過程中,線程被中斷過,則設(shè)置interrupted為true。if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted=true;
}
}finally{if(failed)
cancelAcquire(node);
}
}
說明:doAcquireShared()的作用是獲取共享鎖。
它會(huì)首先創(chuàng)建線程對(duì)應(yīng)的CLH隊(duì)列的節(jié)點(diǎn),然后將該節(jié)點(diǎn)添加到CLH隊(duì)列中。CLH隊(duì)列是管理獲取鎖的等待線程的隊(duì)列。
如果“當(dāng)前線程”是CLH隊(duì)列的表頭,則嘗試獲取共享鎖;否則,則需要通過shouldParkAfterFailedAcquire()判斷是否阻塞等待,需要的話,則通過parkAndCheckInterrupt()進(jìn)行阻塞等待。
doAcquireShared()會(huì)通過for循環(huán),不斷的進(jìn)行上面的操作;目的就是獲取共享鎖。需要注意的是:doAcquireShared()在每一次嘗試獲取鎖時(shí),是通過tryAcquireShared()來執(zhí)行的!