J.U.C源碼閱讀之ReentrantReadWriteLock

一、框架圖:

框架圖

從中可以看出:

(01) ReentrantReadWriteLock實(shí)現(xiàn)了ReadWriteLock接口。ReadWriteLock是一個(gè)讀寫鎖的接口,提供了"獲取讀鎖的readLock()函數(shù)" 和 "獲取寫鎖的writeLock()函數(shù)"。

(02) ReentrantReadWriteLock中包含:sync對(duì)象,讀鎖readerLock和寫鎖writerLock。讀鎖ReadLock和寫鎖WriteLock都實(shí)現(xiàn)了Lock接口。讀鎖ReadLock和寫鎖WriteLock中也都分別包含了"Sync對(duì)象",它們的Sync對(duì)象和ReentrantReadWriteLock的Sync對(duì)象 是一樣的,就是通過sync,讀鎖和寫鎖實(shí)現(xiàn)了對(duì)同一個(gè)對(duì)象的訪問。

(03) 和"ReentrantLock"一樣,sync是Sync類型;而且,Sync也是一個(gè)繼承于AQS的抽象類。Sync也包括"公平鎖"FairSync和"非公平鎖"NonfairSync。sync對(duì)象是"FairSync"和"NonfairSync"中的一個(gè),默認(rèn)是"NonfairSync"。

二、介紹

ReadWriteLock,顧名思義,是讀寫鎖。它維護(hù)了一對(duì)相關(guān)的鎖 — — “讀取鎖”和“寫入鎖”,一個(gè)用于讀取操作,另一個(gè)用于寫入操作。

“讀取鎖”用于只讀操作,它是“共享鎖”,能同時(shí)被多個(gè)線程獲取。

“寫入鎖”用于寫入操作,它是“獨(dú)占鎖”,寫入鎖只能被一個(gè)線程鎖獲取。

注意:不能同時(shí)存在讀取鎖和寫入鎖!

ReadWriteLock是一個(gè)接口。ReentrantReadWriteLock是它的實(shí)現(xiàn)類,ReentrantReadWriteLock包括子類ReadLock和WriteLock。

三、特點(diǎn)

復(fù)用的state值

state表示持有鎖的數(shù)量,因?yàn)镽eentrantReadWriteLock分為“讀鎖”和“寫鎖”兩把鎖,所以它把低4位用來表示“寫鎖(獨(dú)占鎖)”的持有數(shù),其他位數(shù)表示“讀鎖(共享鎖)”的持有數(shù)。

代碼:

/*

*?Read?vs?write?count?extraction?constants?and?functions.

*?Lock?state?is?logically?divided?into?two?unsigned?shorts:

*?The?lower?one?representing?the?exclusive?(writer)?lock?hold?count,

*?and?the?upper?the?shared?(reader)?hold?count.

*/

static?final?intSHARED_SHIFT=16;

static?final?intSHARED_UNIT=?(1<

static?final?intMAX_COUNT=?(1<

static?final?intEXCLUSIVE_MASK=?(1<

/**?Returns?the?number?of?shared?holds?represented?in?count? */

static?intsharedCount(intc)? ? {returnc?>>>SHARED_SHIFT;}

/**?Returns?the?number?of?exclusive?holds?represented?in?count? */

static?intexclusiveCount(intc)?{returnc?&EXCLUSIVE_MASK;}

ThreadLocal變量,保持線程和獲取“共享鎖”的次數(shù)

代碼

static final classThreadLocalHoldCounter

extendsThreadLocal {

publicHoldCounterinitialValue() {

return newHoldCounter();

}

}

/**

* The number of reentrant read locks held by current thread.

* Initialized only in constructor and readObject.

* Removed whenever a thread's read hold count drops to 0.

*/

private transientThreadLocalHoldCounterreadHolds;

四、源碼分析

還是從lock()開始分析:

ReadLock中的lock():

public?voidlock()?{

sync.acquireShared(1);

}

AQS中的acquireShared():

public?final?voidacquireShared(intarg)?{

if(tryAcquireShared(arg)?<0)

doAcquireShared(arg);

}

ReentrantReadWriteLock中的tryAcquireShared(arg)

protected?final?inttryAcquireShared(intunused)?{

/*

*?Walkthrough:

*?1.?If?write?lock?held?by?another?thread,?fail.

*?2.?Otherwise,?this?thread?is?eligible?for

*? ? lock?wrt?state,?so?ask?if?it?should?block

*? ? because?of?queue?policy.?If?not,?try

*? ? to?grant?by?CASing?state?and?updating?count.

*? ? Note?that?step?does?not?check?for?reentrant

*? ? acquires,?which?is?postponed?to?full?version

*? ? to?avoid?having?to?check?hold?count?in

*? ? the?more?typical?non-reentrant?case.

*?3.?If?step?2?fails?either?because?thread

*? ? apparently?not?eligible?or?CAS?fails?or?count

*? ? saturated,?chain?to?version?with?full?retry?loop.

*/

Thread?current?=?Thread.currentThread();

intc?=?getState();

if(exclusiveCount(c)?!=0&& ? ? ? //如果是獨(dú)占鎖,而且不是當(dāng)前線程,直接返回-1

getExclusiveOwnerThread()?!=?current)

return-1;

intr?=sharedCount(c);

if(!readerShouldBlock()?&& ? ? ? ?//如果不需要“阻塞等待”,而且“讀取鎖”的計(jì)數(shù)小于“MAX_COUNT”,“讀取鎖”的狀態(tài)+1

r?

compareAndSetState(c,c?+SHARED_UNIT))?{

if(r?==0)?{

firstReader=?current;

firstReaderHoldCount=1;

}else?if(firstReader==?current)?{

firstReaderHoldCount++;

}else{

HoldCounter?rh?=cachedHoldCounter; ? ? ?//readHolds是一個(gè)ThreadLocal,記錄當(dāng)前線程的獲取“讀取鎖”的次數(shù),cacheHoldCounter是當(dāng)前線程的緩存,避免每次都從ThreadLocal中拿。這個(gè)次數(shù)當(dāng)然要+1

if(rh?==null||?rh.tid!=getThreadId(current))

cachedHoldCounter=?rh?=readHolds.get();

else?if(rh.count==0)

readHolds.set(rh);

rh.count++;

}

return1;

}

returnfullTryAcquireShared(current);

}

說明:tryAcquireShared()的作用是嘗試獲取“共享鎖”。

如果在嘗試獲取鎖時(shí),“不需要阻塞等待”并且“讀取鎖的共享計(jì)數(shù)小于MAX_COUNT”,則直接通過CAS函數(shù)更新“讀取鎖的共享計(jì)數(shù)”,以及將“當(dāng)前線程獲取讀取鎖的次數(shù)+1”。

否則,通過fullTryAcquireShared()獲取讀取鎖。

ReentrantReadWriteLock中的fullTryAcquireShared(thread)

final?intfullTryAcquireShared(Thread?current)?{

/*

*?This?code?is?in?part?redundant?with?that?in

*?tryAcquireShared?but?is?simpler?overall?by?not

*?complicating?tryAcquireShared?with?interactions?between

*?retries?and?lazily?reading?hold?counts.

*/

HoldCounter?rh?=null;

for(;;)?{

intc?=?getState();

if(exclusiveCount(c)?!=0)?{//如果是獨(dú)占鎖,且當(dāng)前線程不是“獨(dú)占鎖”的持有者,則什么都不干,直接返回-1

if(getExclusiveOwnerThread()?!=?current)

return-1;

//如果“需要阻塞等待”。

//(01)?當(dāng)“需要阻塞等待”的線程是第1個(gè)獲取鎖的線程的話,則繼續(xù)往下執(zhí)行。

//(02)?當(dāng)“需要阻塞等待”的線程獲取鎖的次數(shù)=0時(shí),則返回-1。

}else?if(readerShouldBlock())?{

//?Make?sure?we're?not?acquiring?read?lock?reentrantly

if(firstReader==?current)?{

//?assert?firstReaderHoldCount?>?0;

}else{

if(rh?==null)?{

rh?=cachedHoldCounter;

if(rh?==null||?rh.tid!=getThreadId(current))?{

rh?=readHolds.get();

if(rh.count==0)

readHolds.remove();

}

}

if(rh.count==0)

return-1;

}

}

//如果共享統(tǒng)計(jì)數(shù)超過MAX_COUNT,則拋出異常。

if(sharedCount(c)?==MAX_COUNT)

throw?newError("Maximum?lock?count?exceeded");

if(compareAndSetState(c,c?+SHARED_UNIT))?{

if(sharedCount(c)?==0)?{//更新state值,如果還沒有線程獲得“讀鎖”,那么就把當(dāng)前線程更新為firstReader,firstReaderHoldCounter值設(shè)為1

firstReader=?current;

firstReaderHoldCount=1;

}else?if(firstReader==?current)?{//如果當(dāng)前線程就是鎖的持有者,firstReadHoldCounter+1

firstReaderHoldCount++;

}else{//都不滿足的話,從緩存或者LocalHold中獲取當(dāng)前線程獲得“讀鎖”的次數(shù),+1

if(rh?==null)

rh?=cachedHoldCounter;

if(rh?==null||?rh.tid!=getThreadId(current))

rh?=readHolds.get();

else?if(rh.count==0)

readHolds.set(rh);

rh.count++;

cachedHoldCounter=?rh;//?cache?for?release

}

return1;

}

}

}

說明:fullTryAcquireShared()會(huì)根據(jù)“是否需要阻塞等待”,“讀取鎖的共享計(jì)數(shù)是否超過限制”等等進(jìn)行處理。如果不需要阻塞等待,并且鎖的共享計(jì)數(shù)沒有超過限制,則通過CAS嘗試獲取鎖,并返回1。

doAcquireShared()定義在AQS函數(shù)中

privatevoiddoAcquireShared(intarg) {//addWaiter(Node.SHARED)的作用是,創(chuàng)建“當(dāng)前線程”對(duì)應(yīng)的節(jié)點(diǎn),并將該線程添加到CLH隊(duì)列中。finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {//獲取“node”的前一節(jié)點(diǎn)finalNode p =node.predecessor();//如果“當(dāng)前線程”是CLH隊(duì)列的表頭,則嘗試獲取共享鎖。if(p ==head) {intr =tryAcquireShared(arg);if(r >= 0) {

setHeadAndPropagate(node, r);

p.next=null;//help GCif(interrupted)

selfInterrupt();

failed=false;return;

}

}//如果“當(dāng)前線程”不是CLH隊(duì)列的表頭,則通過shouldParkAfterFailedAcquire()判斷是否需要等待,//需要的話,則通過parkAndCheckInterrupt()進(jìn)行阻塞等待。若阻塞等待過程中,線程被中斷過,則設(shè)置interrupted為true。if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

interrupted=true;

}

}finally{if(failed)

cancelAcquire(node);

}

}

說明:doAcquireShared()的作用是獲取共享鎖。

它會(huì)首先創(chuàng)建線程對(duì)應(yīng)的CLH隊(duì)列的節(jié)點(diǎn),然后將該節(jié)點(diǎn)添加到CLH隊(duì)列中。CLH隊(duì)列是管理獲取鎖的等待線程的隊(duì)列。

如果“當(dāng)前線程”是CLH隊(duì)列的表頭,則嘗試獲取共享鎖;否則,則需要通過shouldParkAfterFailedAcquire()判斷是否阻塞等待,需要的話,則通過parkAndCheckInterrupt()進(jìn)行阻塞等待。

doAcquireShared()會(huì)通過for循環(huán),不斷的進(jìn)行上面的操作;目的就是獲取共享鎖。需要注意的是:doAcquireShared()在每一次嘗試獲取鎖時(shí),是通過tryAcquireShared()來執(zhí)行的!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容