轉(zhuǎn)-Java并發(fā)之AQS詳解

Java并發(fā)之AQS詳解

一、概述

  談到并發(fā),不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!

  類如其名,抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現(xiàn)都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。

  以下是本文的目錄大綱:

概述

框架

源碼詳解

簡單應(yīng)用

  若有不正之處,請諒解和批評指正,不勝感激。

請尊重作者勞動成果,轉(zhuǎn)載請標(biāo)明原文鏈接:http://www.cnblogs.com/waterystone/p/4920797.html

手機(jī)版可訪問:https://mp.weixin.qq.com/s/eyZyzk8ZzjwzZYN4a4H5YA

二、框架

  它維護(hù)了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進(jìn)入此隊列)。這里volatile是核心關(guān)鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:

getState()

setState()

compareAndSetState()

  AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)。

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(hù)(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:

isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。

tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。

tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。

tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true,否則返回false。

  以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()時,會調(diào)用tryAcquire()獨占該鎖并將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機(jī)會獲取該鎖。當(dāng)然,釋放鎖之前,A線程自己是可以重復(fù)獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的。

  再以CountDownLatch以例,任務(wù)分為N個子線程去執(zhí)行,state也初始化為N(注意N要與線程個數(shù)一致)。這N個子線程是并行執(zhí)行的,每個子線程執(zhí)行完后countDown()一次,state會CAS減1。等到所有子線程都執(zhí)行完后(即state=0),會unpark()主調(diào)用線程,然后主調(diào)用線程就會從await()函數(shù)返回,繼續(xù)后余動作。

  一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現(xiàn)tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現(xiàn)獨占和共享兩種方式,如ReentrantReadWriteLock。

三、源碼詳解

  本節(jié)開始講解AQS的源碼實現(xiàn)。依照acquire-release、acquireShared-releaseShared的次序來。

3.1 acquire(int)

此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進(jìn)入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當(dāng)然不僅僅只限于lock()。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了。下面是acquire()的源碼:

1publicfinalvoidacquire(int arg) {2if(!tryAcquire(arg) &&3? ? ? ? acquireQueued(addWaiter(Node.EXCLUSIVE), arg))4? ? ? ? selfInterrupt();5}


  函數(shù)流程如下:

tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

addWaiter()將該線程加入等待隊列的尾部,并標(biāo)記為獨占模式;

acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。

如果線程在等待過程中被中斷過,它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。

這時單憑這4個抽象的函數(shù)來看流程還有點朦朧,不要緊,看完接下來的分析后,你就會明白了。就像《大話西游》里唐僧說的:等你明白了舍生取義的道理,你自然會回來和我唱這首歌的。

3.1.1 tryAcquire(int)

  此方法嘗試去獲取獨占資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,還是那句話,當(dāng)然不僅僅只限于tryLock()。如下是tryAcquire()的源碼:

1protectedbooleantryAcquire(int arg) {2thrownew UnsupportedOperationException();3}


什么?直接throw異常?說好的功能呢?好吧,還記得概述里講的AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現(xiàn)嗎?就是這里了?。?!AQS這里只定義了一個接口,具體資源的獲取交由自定義同步器去實現(xiàn)了(通過state的get/set/CAS)?。?!至于能不能重入,能不能加塞,那就看具體的自定義同步器怎么去設(shè)計了!??!當(dāng)然,自定義同步器在進(jìn)行資源訪問時要考慮線程安全的影響。

  這里之所以沒有定義成abstract,是因為獨占模式下只用實現(xiàn)tryAcquire-tryRelease,而共享模式下只用實現(xiàn)tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個模式也要去實現(xiàn)另一模式下的接口。說到底,Doug Lea還是站在咱們開發(fā)者的角度,盡量減少不必要的工作量。

3.1.2 addWaiter(Node)

  此方法用于將當(dāng)前線程加入到等待隊列的隊尾,并返回當(dāng)前線程所在的結(jié)點。還是上源碼吧:

1private Node addWaiter(Node mode) { 2//以給定模式構(gòu)造結(jié)點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享) 3Node node =new Node(Thread.currentThread(), mode); 4 5//嘗試快速方式直接放到隊尾。 6Node pred = tail; 7if(pred !=null) { 8node.prev = pred; 9if (compareAndSetTail(pred, node)) {10pred.next = node;11return node;12? ? ? ? }13? ? }1415//上一步失敗則通過enq入隊。16? ? enq(node);17return node;18}

?不用再說了,直接看注釋吧。這里我們說下Node。Node結(jié)點是對每一個訪問同步代碼的線程的封裝,其包含了需要同步的線程本身以及線程的狀態(tài),如是否被阻塞,是否等待喚醒,是否已經(jīng)被取消等。變量waitStatus則表示當(dāng)前被封裝成Node結(jié)點的等待狀態(tài),共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

CANCELLED:值為1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結(jié)點,其結(jié)點的waitStatus為CANCELLED,即結(jié)束狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點將不會再變化。

SIGNAL:值為-1,被標(biāo)識為該等待喚醒狀態(tài)的后繼結(jié)點,當(dāng)其前繼結(jié)點的線程釋放了同步鎖或被取消,將會通知該后繼結(jié)點的線程執(zhí)行。說白了,就是處于喚醒狀態(tài),只要前繼結(jié)點釋放鎖,就會通知標(biāo)識為SIGNAL狀態(tài)的后繼結(jié)點的線程執(zhí)行。

CONDITION:值為-2,與Condition相關(guān),該標(biāo)識的結(jié)點處于等待隊列中,結(jié)點的線程等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點將從等待隊列轉(zhuǎn)移到同步隊列中,等待獲取同步鎖。

PROPAGATE:值為-3,與共享模式相關(guān),在共享模式中,該狀態(tài)標(biāo)識結(jié)點的線程處于可運(yùn)行狀態(tài)。

0狀態(tài):值為0,代表初始化狀態(tài)。

AQS在判斷狀態(tài)時,通過用waitStatus>0表示取消狀態(tài),而waitStatus<0表示有效狀態(tài)。

3.1.2.1 enq(Node)

?  此方法用于將node加入隊尾。源碼如下:

1privateNode enq(final Node node) { 2//CAS"自旋",直到成功加入隊尾 3for (;;) { 4Node t = tail; 5if(t ==null) {// 隊列為空,創(chuàng)建一個空的標(biāo)志結(jié)點作為head結(jié)點,并將tail也指向它。 6if(compareAndSetHead(new Node())) 7tail = head; 8}else{//正常流程,放入隊尾 9node.prev = t;10if (compareAndSetTail(t, node)) {11t.next = node;12return t;13? ? ? ? ? ? }14? ? ? ? }15? ? }16}


如果你看過AtomicInteger.getAndIncrement()函數(shù)源碼,那么相信你一眼便看出這段代碼的精華。CAS自旋volatile變量,是一種很經(jīng)典的用法。還不太了解的,自己去百度一下吧。

3.1.3?acquireQueued(Node, int)

OK,通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經(jīng)被放入等待隊列尾部了。聰明的你立刻應(yīng)該能想到該線程下一部該干什么了吧:進(jìn)入等待狀態(tài)休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源,然后就可以去干自己想干的事了。沒錯,就是這樣!是不是跟醫(yī)院排隊拿號有點相似~~acquireQueued()就是干這件事:在等待隊列中排隊拿號(中間沒其它事干可以休息),直到拿到號后再返回。這個函數(shù)非常關(guān)鍵,還是上源碼吧:

1finalbooleanacquireQueued(finalNode node,int arg) { 2booleanfailed =true;//標(biāo)記是否成功拿到資源 3try { 4booleaninterrupted =false;//標(biāo)記等待過程中是否被中斷過 5 6//又是一個“自旋”! 7for (;;) { 8finalNode p = node.predecessor();//拿到前驅(qū) 9//如果前驅(qū)是head,即該結(jié)點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當(dāng)然也可能被interrupt了)。10if(p == head && tryAcquire(arg)) {11setHead(node);//拿到資源后,將head指向該結(jié)點。所以head所指的標(biāo)桿結(jié)點,就是當(dāng)前獲取到資源的那個結(jié)點或null。12p.next =null;// setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結(jié)點。也就意味著之前拿完資源的結(jié)點出隊了!13failed =false;14returninterrupted;//返回等待過程中是否被中斷過15? ? ? ? ? ? }1617//如果自己可以休息了,就進(jìn)入waiting狀態(tài),直到被unpark()18if(shouldParkAfterFailedAcquire(p, node) &&19? ? ? ? ? ? ? ? parkAndCheckInterrupt())20interrupted =true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標(biāo)記為true21? ? ? ? }22}finally {23if (failed)24? ? ? ? ? ? cancelAcquire(node);25? ? }26}


到這里了,我們先不急著總結(jié)acquireQueued()的函數(shù)流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體干些什么。

3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于檢查狀態(tài),看看自己是否真的可以去休息了(進(jìn)入waiting狀態(tài),如果線程狀態(tài)轉(zhuǎn)換不熟,可以參考本人上一篇寫的Thread詳解),萬一隊列前邊的線程都放棄了只是瞎站著,那也說不定,對吧!

1privatestaticboolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2intws = pred.waitStatus;//拿到前驅(qū)的狀態(tài) 3if(ws == Node.SIGNAL) 4//如果已經(jīng)告訴前驅(qū)拿完號后通知自己一下,那就可以安心休息了 5returntrue; 6if(ws > 0) { 7/* 8? ? ? ? * 如果前驅(qū)放棄了,那就一直往前找,直到找到最近一個正常等待的狀態(tài),并排在它的后邊。 9? ? ? ? * 注意:那些放棄的結(jié)點,由于被自己“加塞”到它們前邊,它們相當(dāng)于形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)!10*/11do {12node.prev = pred = pred.prev;13}while(pred.waitStatus > 0);14pred.next = node;15}else {16//如果前驅(qū)正常,那就把前驅(qū)的狀態(tài)設(shè)置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!17? ? ? ? compareAndSetWaitStatus(pred, ws, Node.SIGNAL);18? ? }19returnfalse;20}


整個流程中,如果前驅(qū)結(jié)點的狀態(tài)不是SIGNAL,那么自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機(jī)會輪到自己拿號。

3.1.3.2 parkAndCheckInterrupt()

  如果線程找好安全休息點后,那就可以安心去休息了。此方法就是讓線程去休息,真正進(jìn)入等待狀態(tài)。

1privatefinalboolean parkAndCheckInterrupt() {2LockSupport.park(this);//調(diào)用park()使線程進(jìn)入waiting狀態(tài)3returnThread.interrupted();//如果被喚醒,查看自己是不是被中斷的。4}

park()會讓當(dāng)前線程進(jìn)入waiting狀態(tài)。在此狀態(tài)下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。(再說一句,如果線程狀態(tài)轉(zhuǎn)換不熟,可以參考本人寫的Thread詳解)。需要注意的是,Thread.interrupted()會清除當(dāng)前線程的中斷標(biāo)記位。

3.1.3.3 小結(jié)

  OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現(xiàn)在讓我們再回到acquireQueued(),總結(jié)下該函數(shù)的具體流程:

結(jié)點進(jìn)入隊尾后,檢查狀態(tài),找到安全休息點;

調(diào)用park()進(jìn)入waiting狀態(tài),等待unpark()或interrupt()喚醒自己;

被喚醒后,看自己是不是有資格能拿到號。如果拿到,head指向當(dāng)前結(jié)點,并返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續(xù)流程1。


3.1.4 小結(jié)

  OKOK,acquireQueued()分析完之后,我們接下來再回到acquire()!再貼上它的源碼吧:

1publicfinalvoidacquire(int arg) {2if(!tryAcquire(arg) &&3? ? ? ? acquireQueued(addWaiter(Node.EXCLUSIVE), arg))4? ? ? ? selfInterrupt();5}

再來總結(jié)下它的流程吧:

調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標(biāo)記為獨占模式;

acquireQueued()使線程在等待隊列中休息,有機(jī)會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。

如果線程在等待過程中被中斷過,它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。

由于此函數(shù)是重中之重,我再用流程圖總結(jié)一下:

至此,acquire()的流程終于算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()源碼吧,整個函數(shù)就是一條acquire(1)?。?!


3.2 release(int)

?  上一小節(jié)已經(jīng)把a(bǔ)cquire()說完了,這一小節(jié)就來講講它的反操作release()吧。此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。這也正是unlock()的語義,當(dāng)然不僅僅只限于unlock()。下面是release()的源碼:

1publicfinalbooleanrelease(int arg) {2if (tryRelease(arg)) {3Node h = head;//找到頭結(jié)點4if(h !=null&& h.waitStatus != 0)5unparkSuccessor(h);//喚醒等待隊列里的下一個線程6returntrue;7? ? }8returnfalse;9}


邏輯并不復(fù)雜。它調(diào)用tryRelease()來釋放資源。有一點需要注意的是,它是根據(jù)tryRelease()的返回值來判斷該線程是否已經(jīng)完成釋放掉資源了!所以自定義同步器在設(shè)計tryRelease()的時候要明確這一點??!

3.2.1 tryRelease(int)

  此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:

1protectedbooleantryRelease(int arg) {2thrownew UnsupportedOperationException();3}


跟tryAcquire()一樣,這個方法是需要獨占模式的自定義同步器去實現(xiàn)的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那么它肯定已經(jīng)拿到獨占資源了,直接減掉相應(yīng)量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經(jīng)提到了,release()是根據(jù)tryRelease()的返回值來判斷該線程是否已經(jīng)完成釋放掉資源了!所以自義定同步器在實現(xiàn)時,如果已經(jīng)徹底釋放資源(state=0),要返回true,否則返回false。

3.2.2 unparkSuccessor(Node)

  此方法用于喚醒等待隊列中下一個線程。下面是源碼:

1privatevoid unparkSuccessor(Node node) { 2//這里,node一般為當(dāng)前線程所在的結(jié)點。 3intws = node.waitStatus; 4if(ws < 0)//置零當(dāng)前線程所在的結(jié)點狀態(tài),允許失敗。 5compareAndSetWaitStatus(node, ws, 0); 6 7Node s = node.next;//找到下一個需要喚醒的結(jié)點s 8if(s ==null|| s.waitStatus > 0) {//如果為空或已取消 9s =null;10for(Node t = tail; t !=null&& t != node; t = t.prev)11if(t.waitStatus <= 0)//從這里可以看出,<=0的結(jié)點,都是還有效的結(jié)點。12s = t;13? ? }14if(s !=null)15LockSupport.unpark(s.thread);//喚醒16}


這個函數(shù)并不復(fù)雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這里我們也用s來表示吧。此時,再和acquireQueued()聯(lián)系起來,s被喚醒后,進(jìn)入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關(guān)系,它會再進(jìn)入shouldParkAfterFailedAcquire()尋找一個安全點。這里既然s已經(jīng)是等待隊列中最前邊的那個未放棄線程了,那么通過shouldParkAfterFailedAcquire()的調(diào)整,s也必然會跑到head的next結(jié)點,下一次自旋p==head就成立啦),然后s把自己設(shè)置成head標(biāo)桿結(jié)點,表示自己已經(jīng)獲取到資源了,acquire()也返回了??!And then, DO what you WANT!

3.2.3 小結(jié)

  release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。

3.3 acquireShared(int)

  此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進(jìn)入等待隊列,直到獲取到資源為止,整個過程忽略中斷。下面是acquireShared()的源碼:

1publicfinalvoidacquireShared(int arg) {2if(tryAcquireShared(arg) < 0)3? ? ? ? doAcquireShared(arg);4}


  這里tryAcquireShared()依然需要自定義同步器去實現(xiàn)。但是AQS已經(jīng)把其返回值的語義定義好了:負(fù)值代表獲取失敗;0代表獲取成功,但沒有剩余資源;正數(shù)表示獲取成功,還有剩余資源,其他線程還可以去獲取。所以這里acquireShared()的流程就是:

tryAcquireShared()嘗試獲取資源,成功則直接返回;

失敗則通過doAcquireShared()進(jìn)入等待隊列,直到獲取到資源為止才返回。

3.3.1 doAcquireShared(int)

  此方法用于將當(dāng)前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回。下面是doAcquireShared()的源碼:

1privatevoiddoAcquireShared(int arg) { 2finalNode node = addWaiter(Node.SHARED);//加入隊列尾部 3booleanfailed =true;//是否成功標(biāo)志 4try { 5booleaninterrupted =false;//等待過程中是否被中斷過的標(biāo)志 6for (;;) { 7finalNode p = node.predecessor();//前驅(qū) 8if(p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的 9intr = tryAcquireShared(arg);//嘗試獲取資源10if(r >= 0) {//成功11setHeadAndPropagate(node, r);//將head指向自己,還有剩余資源可以再喚醒之后的線程12p.next =null;// help GC13if(interrupted)//如果等待過程中被打斷過,此時將中斷補(bǔ)上。14? ? ? ? ? ? ? ? ? ? ? ? selfInterrupt();15failed =false;16return;17? ? ? ? ? ? ? ? }18? ? ? ? ? ? }1920//判斷狀態(tài),尋找安全點,進(jìn)入waiting狀態(tài),等著被unpark()或interrupt()21if(shouldParkAfterFailedAcquire(p, node) &&22? ? ? ? ? ? ? ? parkAndCheckInterrupt())23interrupted =true;24? ? ? ? }25}finally {26if (failed)27? ? ? ? ? ? cancelAcquire(node);28? ? }29}


  有木有覺得跟acquireQueued()很相似?對,其實流程并沒有太大區(qū)別。只不過這里將補(bǔ)中斷的selfInterrupt()放到doAcquireShared()里了,而獨占模式是放到acquireQueued()之外,其實都一樣,不知道Doug Lea是怎么想的。

  跟獨占模式比,還有一點需要注意的是,這里只有線程是head.next時(“老二”),才會去嘗試獲取資源,有剩余的話還會喚醒之后的隊友。那么問題就來了,假如老大用完后釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續(xù)park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨占模式,同一時刻只有一個線程去執(zhí)行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執(zhí)行的,現(xiàn)在因為老二的資源需求量大,而把后面量小的老三和老四也都卡住了。當(dāng)然,這并不是問題,只是AQS保證嚴(yán)格按照入隊順序喚醒罷了(保證公平,但降低了并發(fā))。


3.3.1.1 setHeadAndPropagate(Node, int)

1privatevoidsetHeadAndPropagate(Node node,int propagate) { 2Node h = head;? 3setHead(node);//head指向自己 4//如果還有剩余量,繼續(xù)喚醒下一個鄰居線程 5if(propagate > 0 || h ==null|| h.waitStatus < 0) { 6Node s = node.next; 7if(s ==null|| s.isShared()) 8? ? ? ? ? ? doReleaseShared(); 9? ? }10}


  此方法在setHead()的基礎(chǔ)上多了一步,就是自己蘇醒的同時,如果條件符合(比如還有剩余資源),還會去喚醒后繼結(jié)點,畢竟是共享模式!

  doReleaseShared()我們留著下一小節(jié)的releaseShared()里來講。


3.3.2 小結(jié)

  OK,至此,acquireShared()也要告一段落了。讓我們再梳理一下它的流程:

tryAcquireShared()嘗試獲取資源,成功則直接返回;

失敗則通過doAcquireShared()進(jìn)入等待隊列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個等待過程也是忽略中斷的。

其實跟acquire()的流程大同小異,只不過多了個自己拿到資源后,還會去喚醒后繼隊友的操作(這才是共享嘛)

3.4?releaseShared()

  上一小節(jié)已經(jīng)把a(bǔ)cquireShared()說完了,這一小節(jié)就來講講它的反操作releaseShared()吧。此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。下面是releaseShared()的源碼:

1publicfinalbooleanreleaseShared(int arg) {2if(tryReleaseShared(arg)) {//嘗試釋放資源3doReleaseShared();//喚醒后繼結(jié)點4returntrue;5? ? }6returnfalse;7}


此方法的流程也比較簡單,一句話:釋放掉資源后,喚醒后繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會返回true去喚醒其他線程,這主要是基于獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質(zhì)就是控制一定量的線程并發(fā)執(zhí)行,那么擁有資源的線程在釋放掉部分資源時就可以喚醒后繼等待結(jié)點。例如,資源總量是13,A(5)和B(7)分別獲取到資源并發(fā)運(yùn)行,C(4)來時只剩1個資源就需要等待。A在運(yùn)行過程中釋放掉2個資源量,然后tryReleaseShared(2)返回true喚醒C,C一看只有3個仍不夠繼續(xù)等待;隨后B又釋放2個,tryReleaseShared(2)返回true喚醒C,C一看有5個夠自己用了,然后C就可以跟A和B一起運(yùn)行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在完全釋放掉資源(state=0)才返回true,所以自定義同步器可以根據(jù)需要決定tryReleaseShared()的返回值。

3.4.1?doReleaseShared()

  此方法主要用于喚醒后繼。下面是它的源碼:

1privatevoid doReleaseShared() { 2for (;;) { 3Node h = head; 4if(h !=null&& h != tail) { 5intws = h.waitStatus; 6if(ws == Node.SIGNAL) { 7if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 8continue; 9unparkSuccessor(h);//喚醒后繼10? ? ? ? ? ? }11elseif(ws == 0 &&12!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))13continue;14? ? ? ? }15if(h == head)// head發(fā)生變化16break;17? ? }18}



3.5 小結(jié)

  本節(jié)我們詳解了獨占和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認(rèn)識了。值得注意的是,acquire()和acquireShared()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應(yīng)中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這里相應(yīng)的源碼跟acquire()和acquireShared()差不多,這里就不再詳解了。


四、簡單應(yīng)用

  通過前邊幾個章節(jié)的學(xué)習(xí),相信大家已經(jīng)基本理解AQS的原理了。這里再將“框架”一節(jié)中的一段話復(fù)制過來:

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(hù)(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:

isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。

tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。

tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。

tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true,否則返回false。

  OK,下面我們就以AQS源碼里的Mutex為例,講一下AQS的簡單應(yīng)用。

4.1 Mutex(互斥鎖)

  Mutex是一個不可重入的互斥鎖實現(xiàn)。鎖資源(AQS里的state)只有兩種狀態(tài):0表示未鎖定,1表示鎖定。下邊是Mutex的核心源碼:

1classMuteximplements Lock, java.io.Serializable { 2// 自定義同步器 3privatestaticclassSyncextends AbstractQueuedSynchronizer { 4// 判斷是否鎖定狀態(tài) 5protectedboolean isHeldExclusively() { 6returngetState() == 1; 7? ? ? ? } 8 9// 嘗試獲取資源,立即返回。成功則返回true,否則false。10publicbooleantryAcquire(int acquires) {11assertacquires == 1;// 這里限定只能為1個量12if(compareAndSetState(0, 1)) {//state為0才設(shè)置為1,不可重入!13setExclusiveOwnerThread(Thread.currentThread());//設(shè)置為當(dāng)前線程獨占資源14returntrue;15? ? ? ? ? ? }16returnfalse;17? ? ? ? }1819// 嘗試釋放資源,立即返回。成功則為true,否則false。20protectedbooleantryRelease(int releases) {21assertreleases == 1;// 限定為1個量22if(getState() == 0)//既然來釋放,那肯定就是已占有狀態(tài)了。只是為了保險,多層判斷!23thrownew IllegalMonitorStateException();24setExclusiveOwnerThread(null);25setState(0);//釋放資源,放棄占有狀態(tài)26returntrue;27? ? ? ? }28? ? }2930// 真正同步類的實現(xiàn)都依賴?yán)^承于AQS的自定義同步器!31privatefinalSync sync =new Sync();3233//lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。34publicvoid lock() {35sync.acquire(1);36? ? }3738//tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。39publicboolean tryLock() {40returnsync.tryAcquire(1);41? ? }4243//unlock<-->release。兩者語文一樣:釋放資源。44publicvoid unlock() {45sync.release(1);46? ? }4748//鎖是否占有狀態(tài)49publicboolean isLocked() {50return sync.isHeldExclusively();51? ? }52}


  同步類在實現(xiàn)時一般都將自定義同步器(sync)定義為內(nèi)部類,供自己使用;而同步類自己(Mutex)則實現(xiàn)某個接口,對外服務(wù)。當(dāng)然,接口的實現(xiàn)要直接依賴sync,它們在語義上也存在某種對應(yīng)關(guān)系??!而sync只用實現(xiàn)資源state的獲取-釋放方式tryAcquire-tryRelelase,至于線程的排隊、等待、喚醒等,上層的AQS都已經(jīng)實現(xiàn)好了,我們不用關(guān)心。

  除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現(xiàn)方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了!

  OK,至此,整個AQS的講解也要落下帷幕了。希望本文能夠?qū)W(xué)習(xí)Java并發(fā)編程的同學(xué)有所借鑒,中間寫的有不對的地方,也歡迎討論和指正~

作者:水巖

出處:http://www.cnblogs.com/waterystone

本博客中未標(biāo)明轉(zhuǎn)載的文章歸作者水巖和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責(zé)任的權(quán)利。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容