
CountDownLatch 介紹
CountDownLatch是一個(gè)同步協(xié)助類(lèi),允許一個(gè)或多個(gè)線程等待,直到其他線程完成操作集。
CountDownLatch使用給定的計(jì)數(shù)值(count)初始化。await方法會(huì)阻塞直到當(dāng)前的計(jì)數(shù)值(count)由于countDown方法的調(diào)用達(dá)到0,在這之后(即,count為0之后)所有等待的線程都會(huì)被釋放,并且隨后對(duì)await方法的調(diào)用都會(huì)立即返回。這是一個(gè)一次性現(xiàn)象 ———— count不會(huì)被重置。如果你需要一個(gè)重置count的版本,那么請(qǐng)考慮使用CyclicBarrier。
CountDownLatch是一個(gè)通用的同步工具,它能用于許多用途。一個(gè)使用’1’計(jì)數(shù)值初始化的CountDownLatch服務(wù)作為一個(gè)簡(jiǎn)單的開(kāi)關(guān)門(mén):所有執(zhí)行await的線程等待在門(mén)口,直到某個(gè)執(zhí)行countDown方法的線程將門(mén)打開(kāi)。一個(gè)使用‘N(count)’初始化的CountDownLatch能被用于使一個(gè)線程等待,直到N個(gè)線程完成某些動(dòng)作,或者某些動(dòng)作已經(jīng)完成N次。
CountDownLatch一個(gè)很有用的性質(zhì)是,它不要求你在可以繼續(xù)進(jìn)行之前調(diào)用countDown方法等待count到達(dá)0,它只是簡(jiǎn)單的防止任何線程超過(guò)await方法直到所有的線程都可以通過(guò)。
也就是說(shuō),你可以在任意時(shí)刻調(diào)用await,如果當(dāng)前的count值非0,那么線程會(huì)等待直到count為0時(shí)才會(huì)繼續(xù)往下執(zhí)行,否則如果count值為0,await方法會(huì)立即返回,你可以不被阻塞的繼續(xù)往下執(zhí)行。
內(nèi)存一致性作用:直到count到達(dá)0,一個(gè)線程調(diào)用countDown()方法之前的動(dòng)作 happen-before 從另一個(gè)線程相應(yīng)的await()方法返回之后的動(dòng)作。
比如,threadB.await()、threadA.countDown(),那么threadA執(zhí)行countDown()之前的動(dòng)作,對(duì)于threadB的await()方法之后的動(dòng)作都可見(jiàn)(當(dāng)count為0時(shí),threadB會(huì)從await()方法的阻塞中結(jié)束而繼續(xù)往下執(zhí)行)。
AbstractQueuedSynchronizer
因?yàn)镃ountDownLatch是使用AbstractQueuedSynchronizer(AQS)的state來(lái)實(shí)現(xiàn)其同步控制的。CountDownLatch使用的是共享鎖模式,由于AQS除了共享鎖模式還有排他鎖模式,本文僅對(duì)CountDownLatch涉及到的共享鎖模式部分的內(nèi)容進(jìn)行介紹,關(guān)于排他鎖模式的部分會(huì)在 ReentrantLock 源碼淺析一文中介紹。
AQS提供一個(gè)框架用于實(shí)現(xiàn)依賴于先進(jìn)先出(FIFO)等待隊(duì)列的阻塞鎖和同步器(信號(hào)量,事件等)。這個(gè)類(lèi)被設(shè)計(jì)與作為一個(gè)有用的基類(lèi),一個(gè)依賴單一原子值為代表狀態(tài)的多種同步器的基類(lèi)。子類(lèi)必須將修改這個(gè)狀態(tài)值的方法定義為受保護(hù)的方法,并且該方法會(huì)根據(jù)對(duì)象(即,AbstractQueuedSynchronizer子類(lèi))被獲取和釋放的方式來(lái)定義這個(gè)狀態(tài)。根據(jù)這些,這個(gè)類(lèi)的其他方法實(shí)現(xiàn)所有排隊(duì)和阻塞的機(jī)制。子類(lèi)能夠維護(hù)其他的狀態(tài)屬性,但是只有使用『getState』方法、『setState』方法以及『compareAndSetState』方法來(lái)原子性的修改 int 狀態(tài)值的操作才能遵循相關(guān)同步性。
等待隊(duì)列節(jié)點(diǎn)類(lèi) ——— Node
等待隊(duì)列是一個(gè)CLH鎖隊(duì)列的變體。CLH通常被用于自旋鎖(CLH鎖是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請(qǐng)線程只在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。)。我們用它來(lái)代替阻塞同步器,但是使用相同的基本策略,該策略是持有一些關(guān)于一個(gè)線程在它前驅(qū)節(jié)點(diǎn)的控制信息。一個(gè)“status”字段在每個(gè)節(jié)點(diǎn)中用于保持追蹤是否一個(gè)線程需要被阻塞。一個(gè)節(jié)點(diǎn)會(huì)得到通知當(dāng)它的前驅(qū)節(jié)點(diǎn)被釋放時(shí)。隊(duì)列中的每一個(gè)節(jié)點(diǎn)都作為一個(gè)持有單一等待線程的特定通知風(fēng)格的監(jiān)視器。狀態(tài)字段不會(huì)控制線程是否被授予鎖等。一個(gè)線程可能?chē)L試去獲取鎖如果它在隊(duì)列的第一個(gè)。但是首先這并不保證成功,它只是給與了競(jìng)爭(zhēng)的權(quán)力(也就是說(shuō),隊(duì)列中第一個(gè)線程嘗試獲取鎖時(shí),并不保證一定能得到鎖,它只是有競(jìng)爭(zhēng)鎖的權(quán)力而已)。所以當(dāng)前被釋放的競(jìng)爭(zhēng)者線程可能需要重新等待獲取鎖。
(這里說(shuō)的"隊(duì)列中的第一個(gè)的線程"指的時(shí),從隊(duì)列頭開(kāi)始往下的節(jié)點(diǎn)中,第一個(gè)node.thread != null的線程。因?yàn)?,AQS隊(duì)列的head節(jié)點(diǎn)是一個(gè)虛節(jié)點(diǎn),不是有個(gè)有效的等待節(jié)點(diǎn),因此head節(jié)點(diǎn)的thread是為null的。)
為了排隊(duì)進(jìn)入一個(gè)CLH鎖,你可以原子性的拼接節(jié)點(diǎn)到隊(duì)列中作為一個(gè)新的隊(duì)尾;對(duì)于出隊(duì),你只要設(shè)置頭字段。(即,入隊(duì)操作時(shí)新的節(jié)點(diǎn)會(huì)排在CLH鎖隊(duì)列的隊(duì)尾,而出隊(duì)操作就是將待出隊(duì)的node設(shè)置為head。由此可見(jiàn),在AQS中維護(hù)的這個(gè)等待隊(duì)列,head是一個(gè)無(wú)效的節(jié)點(diǎn)。初始化時(shí)head是一個(gè)new Node()節(jié)點(diǎn);在后期的操作中,需要出隊(duì)的節(jié)點(diǎn)就會(huì)設(shè)置到head中。)
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+
插入到一個(gè)CLH隊(duì)列的請(qǐng)求只是一個(gè)對(duì)“tail”的單個(gè)原子操作,所以有一個(gè)簡(jiǎn)單的從未入隊(duì)到入隊(duì)的原子分割點(diǎn)。類(lèi)似的,出隊(duì)調(diào)用只需要修改“head”。然而,節(jié)點(diǎn)需要更多的工作來(lái)確定他們的后繼者是誰(shuí),部分是為了處理由于超時(shí)和中斷而導(dǎo)致的可能的取消。
(也就是說(shuō),一個(gè)node的后繼節(jié)點(diǎn)不一定就是node.next,因?yàn)殛?duì)列中的節(jié)點(diǎn)可能因?yàn)槌瑫r(shí)或中斷而取消了,而這些取消的節(jié)點(diǎn)此時(shí)還沒(méi)被移除隊(duì)列(也許正在移除隊(duì)列的過(guò)程中),而一個(gè)node的后繼節(jié)點(diǎn)指的是一個(gè)未被取消的有效節(jié)點(diǎn),因此在下面的操作中你就會(huì)發(fā)現(xiàn),在尋找后繼節(jié)點(diǎn)時(shí),尋找的都是當(dāng)前節(jié)點(diǎn)后面第一個(gè)有效節(jié)點(diǎn),即非取消節(jié)點(diǎn)。)
“prev”(前驅(qū))連接(原始的CLH鎖是不使用前驅(qū)連接的),主要用于處理取消。如果一個(gè)節(jié)點(diǎn)被取消了,它的后驅(qū)(通常)會(huì)重連接到一個(gè)未被取消的前驅(qū)。
另外我們使用“next”連接去實(shí)現(xiàn)阻塞機(jī)制。每個(gè)節(jié)點(diǎn)的線程ID被它們自己的節(jié)點(diǎn)所持有,所以前驅(qū)節(jié)點(diǎn)通知下一個(gè)節(jié)點(diǎn)可以被喚醒,這是通過(guò)遍歷下一個(gè)鏈接(即,next字段)來(lái)確定需要喚醒的線程。后繼節(jié)點(diǎn)的決定必須同‘新入隊(duì)的節(jié)點(diǎn)在設(shè)置它的前驅(qū)節(jié)點(diǎn)的“next”屬性操作(即,新入隊(duì)節(jié)點(diǎn)為newNode,在newNode的前驅(qū)節(jié)點(diǎn)preNewNode進(jìn)行preNewNode.next = newNode操作)’產(chǎn)生競(jìng)爭(zhēng)。一個(gè)解決方法是必要的話當(dāng)一個(gè)節(jié)點(diǎn)的后繼看起來(lái)是空的時(shí)候,從原子更新“tail”向前檢測(cè)。(或者換句話說(shuō),next鏈接是一個(gè)優(yōu)化,所以我們通常不需要反向掃描。)
取消引入了對(duì)基本算法的一些保守性。當(dāng)我們必須為其他節(jié)點(diǎn)的取消輪詢時(shí),我們不需要留意一個(gè)取消的節(jié)點(diǎn)是在我們節(jié)點(diǎn)的前面還是后面。它的處理方式是總是根據(jù)取消的節(jié)點(diǎn)喚醒其后繼節(jié)點(diǎn),允許它們?nèi)ミB接到一個(gè)新的前驅(qū)節(jié)點(diǎn),除非我們能夠標(biāo)識(shí)一個(gè)未被取消的前驅(qū)節(jié)點(diǎn)來(lái)完成這個(gè)責(zé)任。
- waitStatus
volatile int waitStatus;
狀態(tài)屬性,只有如下值:
① SIGNAL:
static final int SIGNAL = -1;
這個(gè)節(jié)點(diǎn)的后繼(或者即將被阻塞)被阻塞(通過(guò)park阻塞)了,所以當(dāng)前節(jié)點(diǎn)需要喚醒它的后繼當(dāng)它被釋放或者取消時(shí)。為了避免競(jìng)爭(zhēng),獲取方法必須首先表示他們需要一個(gè)通知信號(hào),然后再原子性的嘗試獲取鎖,如果失敗,則阻塞。
也就是說(shuō),在獲取鎖的操作中,需要確保當(dāng)前node的preNode的waitStatus狀態(tài)值為’SIGNAL’,才可以被阻塞,當(dāng)獲取鎖失敗時(shí)。(『shouldParkAfterFailedAcquire』方法的用意就是這)
② CANCELLED:
static final int CANCELLED = 1;
這個(gè)節(jié)點(diǎn)由于超時(shí)或中斷被取消了。節(jié)點(diǎn)不會(huì)離開(kāi)(改變)這個(gè)狀態(tài)。尤其,一個(gè)被取消的線程不再會(huì)被阻塞了。
③ CONDITION:
static final int CONDITION = -2;
這個(gè)節(jié)點(diǎn)當(dāng)前在一個(gè)條件隊(duì)列中。它將不會(huì)被用于當(dāng)做一個(gè)同步隊(duì)列的節(jié)點(diǎn)直到它被轉(zhuǎn)移到同步隊(duì)列中,轉(zhuǎn)移的同時(shí)狀態(tài)值(waitStatus)將會(huì)被設(shè)置為0。(這里使用這個(gè)值將不會(huì)做任何事情與該字段其他值對(duì)比,只是為了簡(jiǎn)化機(jī)制)。
④ PROPAGATE:
static final int PROPAGATE = -3;
一個(gè)releaseShared操作必須被廣播給其他節(jié)點(diǎn)。(只有頭節(jié)點(diǎn)的)該值會(huì)在doReleaseShared方法中被設(shè)置去確保持續(xù)的廣播,即便其他操作的介入。
⑤ 0:不是上面的值的情況。
這個(gè)值使用數(shù)值排列以簡(jiǎn)化使用。非負(fù)的值表示該節(jié)點(diǎn)不需要信號(hào)(通知)。因此,大部分代碼不需要去檢查這個(gè)特殊的值,只是為了標(biāo)識(shí)。
對(duì)于常規(guī)的節(jié)點(diǎn)該字段會(huì)被初始化為0,競(jìng)爭(zhēng)節(jié)點(diǎn)該值為CONDITION。這個(gè)值使用CAS修改(或者可能的話,無(wú)競(jìng)爭(zhēng)的volatile寫(xiě))。
- prev
volatile Node prev
連接到前驅(qū)節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)/線程依賴與這個(gè)節(jié)點(diǎn)waitStatus的檢測(cè)。分配發(fā)生在入隊(duì)時(shí),并在出隊(duì)時(shí)清空(為了GC)。并且,一個(gè)前驅(qū)的取消,我們將短路當(dāng)發(fā)現(xiàn)一個(gè)未被取消的節(jié)點(diǎn)時(shí),未被取消的節(jié)點(diǎn)總是存在因?yàn)轭^節(jié)點(diǎn)不能被取消:只有在獲取鎖操作成功的情況下一個(gè)節(jié)點(diǎn)才會(huì)成為頭節(jié)點(diǎn)。一個(gè)被取消的線程絕不會(huì)獲取成功,一個(gè)線程只能被它自己取消,不能被其他線程取消。
- next
volatile Node next
連接到后繼的節(jié)點(diǎn),該節(jié)點(diǎn)是當(dāng)前的節(jié)點(diǎn)/線程釋放喚醒的節(jié)點(diǎn)。分配發(fā)生在入隊(duì)時(shí),在繞過(guò)取消的前驅(qū)節(jié)點(diǎn)時(shí)進(jìn)行調(diào)整,并在出隊(duì)列時(shí)清空(為了GC的緣故)。一個(gè)入隊(duì)操作(enq)不會(huì)被分配到前驅(qū)節(jié)點(diǎn)的next字段,直到tail成功指向當(dāng)前節(jié)點(diǎn)之后(通過(guò)CAS來(lái)將tail指向當(dāng)前節(jié)點(diǎn)?!篹nq』方法實(shí)現(xiàn)中,會(huì)先將node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再將oldTailNode.next = node;),所以當(dāng)看到next字段為null時(shí)并不意味著當(dāng)前節(jié)點(diǎn)是隊(duì)列的尾部了。無(wú)論如何,如果一個(gè)next字段顯示為null,我們能夠從隊(duì)列尾向前掃描進(jìn)行復(fù)核。被取消的節(jié)點(diǎn)的next字段會(huì)被設(shè)置為它自己,而不是一個(gè)null,這使得isOnSyncQueue方法更簡(jiǎn)單。
- thread
volatile Thread thread
這個(gè)節(jié)點(diǎn)的入隊(duì)線程。在構(gòu)建時(shí)初始化,在使用完后清除。
- nextWaiter
Node nextWaiter
鏈接下一個(gè)等待條件的節(jié)點(diǎn),或者一個(gè)指定的SHARED值。因?yàn)橹挥谐钟信潘i時(shí)能訪問(wèn)條件隊(duì)列,所以我們只需要一個(gè)簡(jiǎn)單的單鏈表來(lái)維持正在等待條件的節(jié)點(diǎn)。它們接下來(lái)會(huì)被轉(zhuǎn)換到隊(duì)列中以去重新獲取鎖。因?yàn)橹挥信潘i才有conditions,所以我們使用給一個(gè)特殊值保存的字段來(lái)表示共享模式。
也就是說(shuō),nextWaiter用于在排他鎖模式下表示正在等待條件的下一個(gè)節(jié)點(diǎn),因?yàn)橹挥信潘i模式有conditions;所以在共享鎖模式下,我們使用’SHARED’這個(gè)特殊值來(lái)表示該字段。
源碼分析
初始化
CountDownLatch doneSignal = new CountDownLatch(N);
CountDownLatch 使用了共享鎖模式。CountDownLatch 使用了一個(gè)內(nèi)部類(lèi) Sync來(lái)實(shí)現(xiàn)CountDownLatch的同步控制,而Sync是AQS的一個(gè)實(shí)現(xiàn)類(lèi),它使用AQS的狀態(tài)(state)來(lái)表示count。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
構(gòu)造一個(gè)CountDownLatch使用給定的count值進(jìn)行初始化。
count值最終是設(shè)置到sync(AbstractQueuedSynchronizer)里的state字段。
阻塞的流程分析
『await()』
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
導(dǎo)致當(dāng)前的線程等待直到latch被倒數(shù)到0,或者線程被中斷了。
如果當(dāng)前的count是0,那么方法會(huì)立即返回,并且返回值為true。
如果當(dāng)前的count大于0,則當(dāng)前線程因?yàn)榫€程調(diào)度而變得不可用,并且處于休眠狀態(tài),直到發(fā)生下面二件事之一:
① 由于countDown方法的調(diào)用當(dāng)前的count達(dá)到0;
如果count達(dá)到0,那么這個(gè)方法將返回true。
② 其他線程中斷了當(dāng)前的線程;
如果當(dāng)前線程在進(jìn)入這個(gè)方法時(shí)設(shè)置了中斷狀態(tài);或者當(dāng)前線程在等待時(shí)被設(shè)置了中斷狀態(tài),那么“InterruptedException”異常將會(huì)拋出,并且當(dāng)前的線程的中斷狀態(tài)會(huì)被清除。
『acquireSharedInterruptibly』
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
獲取一個(gè)共享模式鎖,如果發(fā)生中斷則異常終止。實(shí)現(xiàn)首先會(huì)檢查中斷的狀態(tài),然后執(zhí)行至少一次的tryAcquireShared,成功的話返回。否則,線程將會(huì)入隊(duì),可能會(huì)重復(fù)的阻塞和解阻塞,執(zhí)行tryAcquireShared直到成功或者線程被中斷。
① 首先判斷當(dāng)前的線程是否被標(biāo)志為了中斷,如果被標(biāo)志位了中斷,則拋出“InterruptedException”異常,并清除中斷標(biāo)志;否則到第②步;
② 執(zhí)行『tryAcquireShared』來(lái)嘗試獲取鎖,如果成功(即,返回>=0)。則返回true退出方法;否則到第③步
③ 執(zhí)行doAcquireSharedInterruptibly。
『doAcquireSharedInterruptibly』

② 獲取新創(chuàng)建好節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)。如果前驅(qū)節(jié)點(diǎn)是head節(jié)點(diǎn),則說(shuō)明當(dāng)前節(jié)點(diǎn)是隊(duì)列中第一個(gè)等待獲取鎖的節(jié)點(diǎn),那么就執(zhí)行『tryAcquireShared』方法嘗試獲取共享鎖。tryAcquireShared是由CountDownLatch重寫(xiě)的方法。具體實(shí)現(xiàn)下面會(huì)詳細(xì)說(shuō)明。這里先給出結(jié)果就是tryAcquireShared方法的返回值會(huì)小于0.也就說(shuō)獲取共享鎖失敗。進(jìn)入步驟③
③ 如果前驅(qū)節(jié)點(diǎn)不是head節(jié)點(diǎn),或者當(dāng)前節(jié)點(diǎn)獲取共享鎖失?。?,步驟②)。那么執(zhí)行『shouldParkAfterFailedAcquire』方法,該方法返回true則說(shuō)明本次獲取共享鎖失敗需要阻塞(掛起)當(dāng)前線程。接著執(zhí)行『parkAndCheckInterrupt』方法,該方法會(huì)將當(dāng)前線程掛起,直到被喚醒。
這就是阻塞情況下的一個(gè)主流程,可以知道的是,在這個(gè)邏輯過(guò)程中使用了大量的CAS來(lái)進(jìn)行原子性的修改,當(dāng)修改失敗的時(shí)候,是會(huì)通過(guò)for(;;)來(lái)重新循環(huán)的,也就是說(shuō)『doAcquireSharedInterruptibly』使用自旋鎖(自旋+CAS)來(lái)保證在多線程并發(fā)的情況下,隊(duì)列節(jié)點(diǎn)狀態(tài)也是正確的以及在等待隊(duì)列的正確性,最終使得當(dāng)前節(jié)點(diǎn)要么獲取共享鎖成功,要么被掛起等待喚醒。
下面我們來(lái)對(duì)阻塞情況下,涉及的方法進(jìn)行進(jìn)一步的展開(kāi)。
『addWaiter』
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
根據(jù)給定的模式創(chuàng)建當(dāng)前線程的節(jié)點(diǎn),并將創(chuàng)建好的節(jié)點(diǎn)入隊(duì)(加入等待隊(duì)列尾部)。
首先在隊(duì)列非空的情況下會(huì)嘗試一次快速入隊(duì),也就是通過(guò)嘗試一次CAS操作入隊(duì),如果CAS操作失敗,則調(diào)用enq方法進(jìn)行“自旋+CAS”方法將創(chuàng)建好的節(jié)點(diǎn)加入隊(duì)列尾。
在共享模式下,Node的mode(即,waitStatus)為’SHARED’。waitStatus是用于在排他鎖模式下當(dāng)節(jié)點(diǎn)處于條件隊(duì)列時(shí)表示下一個(gè)等待條件的節(jié)點(diǎn),所以在共享鎖模式下,我們使用’SHARED’這個(gè)特殊值來(lái)表示該字段。
『enq』
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
使用自旋鎖的方式(自旋+CAS)插入節(jié)點(diǎn)到等待隊(duì)列,如果等待隊(duì)列為空則初始化隊(duì)列。
初始化隊(duì)列:創(chuàng)建一個(gè)空節(jié)點(diǎn)(即,new Node()),將head和tail都指向這個(gè)節(jié)點(diǎn)。
然后才是將我們待插入的節(jié)點(diǎn)插入,即:emptyNode -> newNode. head指向emptyNode,tail指向newNode。
『tryAcquireShared』
在共享模式下嘗試獲取。這個(gè)方法需要查詢是否對(duì)象的狀態(tài)允許在共享模式下被獲取,如果允許則去獲取它。
這個(gè)方法總是被線程執(zhí)行獲取共享鎖時(shí)被調(diào)用。如果這個(gè)方法報(bào)告失敗,那么獲取方法可能會(huì)使線程排隊(duì)等待,如果它(即,線程)還沒(méi)入隊(duì)的話,直到其他的線程發(fā)出釋放的信號(hào)。
默認(rèn)實(shí)現(xiàn)拋出一個(gè)“UnsupportedOperationException”
返回:
a)< 0 : 一個(gè)負(fù)數(shù)的返回表示失敗;
b) 0 : 0表示在共享模式下獲取鎖成功,但是后續(xù)的獲取共享鎖將不會(huì)成功
c)> 0 : 大于0表示共享模式下獲取鎖成功,并且后續(xù)的獲取共享鎖可能也會(huì)成功,在這種情況下后續(xù)等待的線程必須檢查是否有效。
CountDownLatch對(duì)該方法進(jìn)行了重寫(xiě):
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果當(dāng)前的狀態(tài)值為0(即,count為0),則表示獲取成功(返回’1’);否則表示獲取失?。ǚ祷亍?1’)
『shouldParkAfterFailedAcquire』
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
檢查并修改一個(gè)節(jié)點(diǎn)的狀態(tài),當(dāng)該節(jié)點(diǎn)獲取鎖失敗時(shí)。返回true如果線程需要阻塞。這是主要的信號(hào)通知控制在所有的獲取鎖循環(huán)中。要求’pred’ == ‘node.prev’
① 如果pred.waitStatus == Node.SIGNAL。則說(shuō)明node的前驅(qū)節(jié)點(diǎn)已經(jīng)被要求去通知釋放它的后繼節(jié)點(diǎn),所以node可以安全的被掛起(park)。然后,退出方法,返回true。
② 如果pred.waitStatus > 0。則說(shuō)明node的前驅(qū)節(jié)點(diǎn)被取消了。那么跳過(guò)這個(gè)前驅(qū)節(jié)點(diǎn)并重新標(biāo)志一個(gè)有效的前驅(qū)節(jié)點(diǎn)(即,waitStatus <= 0 的節(jié)點(diǎn)可作為有效的前驅(qū)節(jié)點(diǎn)),然后,退出方法,返回false。
③ 其他情況下,即pred.waitStatus為’0’或’PROPAGATE’。表示我們需要一個(gè)通知信號(hào)(即,當(dāng)前的node需要喚醒的通知),但是當(dāng)前還不能掛起node。調(diào)用『compareAndSetWaitStatus(pred, ws, Node.SIGNAL)』方法通過(guò)CAS的方式來(lái)修改前驅(qū)節(jié)點(diǎn)的waitStatus為“SIGNAL”。退出方法,返回false。
我們需要一個(gè)通知信號(hào),主要是因?yàn)楫?dāng)前線程要被掛起了(park)。而如果waitStatus已經(jīng)是’SIGNAL’的話就無(wú)需修改,直接掛起就好,而如果waitStatus是’CANCELLED’的話,說(shuō)明prev已經(jīng)被取消了,是個(gè)無(wú)效節(jié)點(diǎn)了,那么無(wú)需修改這個(gè)無(wú)效節(jié)點(diǎn)的waitStatus,而是需要先找到一個(gè)有效的prev。因此,剩下的情況就只有當(dāng)waitStatus為’0’和’PROPAGAET’了(注意,waitStatus為’CONDITION’是節(jié)點(diǎn)不在等待隊(duì)列中,所以當(dāng)下情況waitStatus不可能為’CONDITION’),這是我們需要將prev的waitStatus使用CAS的方式修改為’SIGNAL’,而且只有修改成功的情況下,當(dāng)前的線程才能安全被掛起。
還值得注意的時(shí),因此該方法的CAS操作都是沒(méi)有自旋的,所以當(dāng)它操作完CAS后都會(huì)返回false,在外層的方法中會(huì)使用自旋,當(dāng)發(fā)現(xiàn)返回的是false時(shí),會(huì)再次調(diào)用該方法,以檢查保證有當(dāng)前node有一個(gè)有效的prev,并且其waitStatus為’SIGNAL’,在此情況下當(dāng)前的線程才會(huì)被掛起(park)。
釋放的流程分析
『countDown』
public void countDown() {
sync.releaseShared(1);
}
減小latch的count,如果count達(dá)到0則釋放所有正在等待的線程。
如果當(dāng)前的count大于0,那么減少count。如果減少后的count值為0,那么所有正在等待的線程因?yàn)榫€程調(diào)度的原因被重新啟用。
如果當(dāng)前的count值已經(jīng)是0了,那么什么都不會(huì)發(fā)生。
『releaseShared』
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
共享模式下的釋放。如果『tryReleaseShared』返回true的話,會(huì)使一個(gè)或多個(gè)線程重新啟動(dòng)。
『tryReleaseShared』
在共享模式下,嘗試去設(shè)置狀態(tài)來(lái)反映一個(gè)釋放。
這個(gè)方法總是在線程執(zhí)行釋放時(shí)被調(diào)用。
默認(rèn)實(shí)現(xiàn)拋出一個(gè)UnsupportedOperationException異常。
返回:如果當(dāng)前共享模式可能允許一個(gè)正在等待的獲取成功(正在等待的獲取可能是共享模式的,也可能是排他模式的),則返回true;否則,返回false。
CountDownLatch對(duì)該方法進(jìn)行了重寫(xiě):
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
減少count的值,如果count為0則發(fā)出釋放信號(hào)。
這里使用了"自旋+CAS”的方式來(lái)原子性的將state的值減少1,如果在此過(guò)程中state已經(jīng)為0了(在并發(fā)情況下,可能已經(jīng)被其他線程修改為了0),則返回false。否則根據(jù)修改后state的值是否等于0來(lái)返回boolean值。
『doReleaseShared』
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
共享模式下的釋放動(dòng)作 ———— 用信號(hào)通知后繼節(jié)點(diǎn)并且確保廣播。(注意:在排他鎖模式下,釋放只是相當(dāng)于調(diào)用head的unparkSuccessor方法如果它需要通知喚醒的話。)
確保一個(gè)釋放的廣播,即使有其他線程正在進(jìn)行獲取/釋放鎖。這個(gè)過(guò)程通常的方式是嘗試head的unparkSuccessor操作如果需要通知釋放的話。如果沒(méi)這么做,狀態(tài)會(huì)被設(shè)置為‘PROPAGATE’以確保在釋放,廣播繼續(xù)。此外,當(dāng)我們正在做這個(gè)操作的時(shí)候如果新的節(jié)點(diǎn)被添加的話,我們需要重新循環(huán)再進(jìn)行一次該操作。另外,不同于unparkSuccessor的其他用途,我們需要知道CAS重置狀態(tài)是否失敗,如果失敗則重新檢查。
在隊(duì)列非空的時(shí)候,該方法會(huì)釋放head的后繼節(jié)點(diǎn),如果該節(jié)點(diǎn)可以被釋放的話。『(h != null && h != tail)』表示隊(duì)列非空,即有等待獲取鎖的節(jié)點(diǎn);『(h == head)』表示,已經(jīng)操作完釋放后繼節(jié)點(diǎn),或者隊(duì)列已經(jīng)空了(即,『(h == null || h == tail)』),那么就退出循環(huán)。否則如果循環(huán)過(guò)程中(即,『h != head』),頭結(jié)點(diǎn)發(fā)生了變化,則重新循環(huán)。
如果『if (h != null && h != tail)』為true,那么:
① 如果head的waitStatus為’SIGNAL’,則說(shuō)明head的后繼節(jié)點(diǎn)可被通知釋放,那么執(zhí)行CAS操作將head.waitStatus修改為’0’,如果成功,則執(zhí)行『unparkSuccessor』對(duì)head的后繼節(jié)點(diǎn)進(jìn)行釋放操作,如果CAS操作失敗,則說(shuō)明發(fā)送了多線程競(jìng)爭(zhēng)(即,此時(shí)有其他線程也在修改head的waitStatus狀態(tài)值),那么重新循環(huán)檢查。
② 如果head的waitStatus為’0’,則使用CAS的方式將其修改為’PROPAGATE’。如果CAS操作失敗,則說(shuō)明發(fā)生了多線程競(jìng)爭(zhēng),那么重新循環(huán)檢查。
③ 如果上面的兩個(gè)操作中有一個(gè)成功了,就會(huì)走到“if (h == head)”這一步,并且此時(shí)head節(jié)點(diǎn)沒(méi)有發(fā)生變化,則退出循環(huán),操作結(jié)束。否則,說(shuō)明head節(jié)點(diǎn)發(fā)生變化了,那么重新循環(huán)檢查。
『if (h != null && h != tail)』為false,那么:
說(shuō)明隊(duì)列中沒(méi)有等到獲取鎖的節(jié)點(diǎn)。會(huì)直接到“if (h == head)”,如果此時(shí)head節(jié)點(diǎn)沒(méi)有發(fā)生變化,則直接退出循環(huán),操作結(jié)束。如果此時(shí)head節(jié)點(diǎn)發(fā)生了變化,那么重新循環(huán)檢查。
也就是說(shuō),該方法在等待隊(duì)列非空時(shí)(即,存在一個(gè)有效的等待節(jié)點(diǎn),頭結(jié)點(diǎn)不是有效節(jié)點(diǎn)),會(huì)根據(jù)head的waitStatus進(jìn)行后續(xù)的操作。
a) 如果『ws == Node.SIGNAL』,則說(shuō)明需要釋放head后繼節(jié)點(diǎn),如果此時(shí)CAS操作『compareAndSetWaitStatus(h, Node.SIGNAL, 0)』也成功的話(說(shuō)明,此時(shí)沒(méi)有其他線程在修改head的waitStatus),那么就會(huì)執(zhí)行『unparkSuccessor(h);』來(lái)釋放head的后繼節(jié)點(diǎn)。
b) 如果『ws != Node.SIGNAL』并且『ws == 0』,則通過(guò)CAS操作將head的waitStatus修改為’PROPAGATE’。
以上兩步,當(dāng)CAS失敗,也就是有其他線程也在修改head的waitStatus狀態(tài)時(shí),需要繼續(xù)循環(huán)進(jìn)行重新檢測(cè),如果head節(jié)點(diǎn)改變了也需要繼續(xù)循環(huán)重新檢測(cè)。
Q:關(guān)于node的waitStatus為’0’的情況?
A:當(dāng)節(jié)點(diǎn)不屬于任何waitStatus的話,就會(huì)是0。比如,創(chuàng)建好的節(jié)點(diǎn)。比如,原來(lái)是SIGNAL狀態(tài),在執(zhí)行完unparkSuccessor操作后(邏輯上說(shuō)是執(zhí)行完unparkSuccessor后,但實(shí)際的代碼實(shí)現(xiàn)必須先將node的waitStatus通過(guò)CAS成功從SINGAL修改為0后,才可執(zhí)行unparkSuccessor操作,以保證多線程競(jìng)爭(zhēng)情況下的正確性)。比如,將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)移到等待隊(duì)列的時(shí)候,會(huì)通過(guò)CAS將node的waitStatus從’CONDITION’修改為0。
Q:’PROPAGATE’狀態(tài)與釋放之間的關(guān)系?
A:當(dāng)head的waitStatus為’PROPAGATE’的話,在釋放操作時(shí),這個(gè)釋放會(huì)被廣播下去,也就是說(shuō),第一個(gè)線程被釋放完后,會(huì)繼續(xù)釋放第二個(gè)被阻塞的線程。。。
『unparkSuccessor』
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
喚醒后繼節(jié)點(diǎn),如果存在的話
① 如果狀態(tài)值是負(fù)數(shù),則在預(yù)期發(fā)信號(hào)通知時(shí)清除這個(gè)負(fù)數(shù)狀態(tài)值。如果狀態(tài)被等待的線程修改了或者清除負(fù)數(shù)狀態(tài)值失敗是允許。
② 后繼節(jié)點(diǎn)的線程被喚醒,后繼節(jié)點(diǎn)通常就是下一個(gè)節(jié)點(diǎn)。但是如果下一個(gè)節(jié)點(diǎn)被取消了或者下一個(gè)節(jié)點(diǎn)為null,則從隊(duì)列尾(tail)往前遍歷去找真實(shí)的未取消的后繼節(jié)點(diǎn)。
『(s == null || s.waitStatus > 0)』:說(shuō)明下一個(gè)節(jié)點(diǎn)為null或被取消了(waitStatus允許的狀態(tài)值中,只有’CANCELLED’是>0的)。那么,就從隊(duì)列尾(tail)開(kāi)始向前遍歷,獲取第一個(gè)非空且未被取消的節(jié)點(diǎn)。如果存在這樣的一個(gè)后繼節(jié)點(diǎn)的話(即,“s != null”),則執(zhí)行『LockSupport.unpark(s.thread);』操作來(lái)喚醒這個(gè)節(jié)點(diǎn)的線程。
Q:關(guān)于node的waitStatus為’CANCELLED’的情況?
A:關(guān)于node的waitStatus為’CANCELLED’的情況:比如,當(dāng)這個(gè)node被中斷了,或者設(shè)置的超時(shí)時(shí)間到了,那么說(shuō)明這個(gè)線程獲取鎖失敗,那么此時(shí)就應(yīng)該將其設(shè)置為cancelled,因?yàn)槿绻摼€程還需要獲取鎖的話,會(huì)重新調(diào)用獲取鎖的方法,而獲取鎖的方法就是創(chuàng)建一個(gè)新的node的。所以,那么線程獲取鎖失敗的時(shí)候就會(huì)將這個(gè)node的waitStatus設(shè)置為’CANCELLED’,一個(gè)被取消的線程絕不會(huì)獲取鎖成功,一個(gè)線程只能被它自己取消,不能被其他線程取消。
Q:關(guān)于node為null的情況?
A:關(guān)于node為null的情況:比如,一個(gè)入隊(duì)操作(enq)不會(huì)被分配到前驅(qū)節(jié)點(diǎn)的next字段,直到tail成功指向當(dāng)前節(jié)點(diǎn)之后(通過(guò)CAS來(lái)將tail指向當(dāng)前節(jié)點(diǎn)?!篹nq』方法實(shí)現(xiàn)中,會(huì)先將node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再將oldTailNode.next = node;),所以當(dāng)看到next字段為null時(shí)并不意味著當(dāng)前節(jié)點(diǎn)是隊(duì)列的尾部了。無(wú)論如何,如果一個(gè)next字段顯示為null,我們能夠從隊(duì)列尾向前掃描進(jìn)行復(fù)核。
當(dāng)調(diào)用了『LockSupport.unpark(s.thread);』操作后,等待隊(duì)列中第一個(gè)等待的線程就會(huì)重新啟動(dòng)。流程回到『doAcquireSharedInterruptibly』方法中,線程從阻塞中恢復(fù):

『setHeadAndPropagate』
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
設(shè)置’node’節(jié)點(diǎn)為頭結(jié)點(diǎn),并且檢查’node’節(jié)點(diǎn)的后繼是否正在等待獲取共享鎖,如果是的話,并且'propagate > 0'或者’node’的waitStatus被設(shè)置成了’PROPAGATE’,則廣播。
① 設(shè)置’node’為head節(jié)點(diǎn)
② 嘗試通知隊(duì)列中的下一個(gè)節(jié)點(diǎn),如果:
??[1]
???a) 調(diào)用者標(biāo)識(shí)了廣播(即,propagate > 0),
???b) 或者waitStatus被前面的操作重新記錄了(’h.waitStatus’可能在setHead之前或之后被重新記錄)(注意,這里使用waitStatus的符號(hào)檢查,因?yàn)镻ROPAGATE狀態(tài)可能被轉(zhuǎn)換為SIGNAL)。
??并且[2]隊(duì)列中下一個(gè)等待的節(jié)點(diǎn)是共享模式的,或者下一個(gè)節(jié)點(diǎn)為null。
這兩次檢查的保守性可能導(dǎo)致不必要的喚醒,但是只有當(dāng)多線程競(jìng)爭(zhēng)獲取/釋放鎖時(shí),所以大多數(shù)情況下現(xiàn)在或即將需要通知(signal)喚醒。(因?yàn)樵趀nq新節(jié)點(diǎn)入隊(duì)過(guò)程中,可能出現(xiàn)next為null的短暫現(xiàn)象,這是發(fā)現(xiàn)在節(jié)點(diǎn)入隊(duì)的過(guò)程中,隨后節(jié)點(diǎn)就會(huì)入隊(duì)成功,next字段就不會(huì)為null了。所以這里將next為null的情況也考慮了,在廣播釋放時(shí),會(huì)將這個(gè)正在入隊(duì)的節(jié)點(diǎn)對(duì)應(yīng)的線程也進(jìn)行釋放)。
如果符合??[1]、[2]個(gè)條件則執(zhí)行『doReleaseShared()』來(lái)釋放后繼的節(jié)點(diǎn)。
可設(shè)置超時(shí)時(shí)間的await
『await(long timeout, TimeUnit unit)』同『await()』方法大體是相同的,主要多了在獲取共享鎖時(shí)對(duì)時(shí)間的控制。
在嘗試獲取鎖時(shí)的區(qū)別:
① 如果傳入的給定的超時(shí)納秒數(shù)是否小于等于0,如果是則直接返回false,獲取共享鎖失敗。
② 如果在使用自旋的方式獲取共享鎖的過(guò)程中,發(fā)現(xiàn)已經(jīng)過(guò)了設(shè)置的超時(shí)時(shí)間,那么直接返回false,獲取共享鎖失敗。
③ 如果當(dāng)前線程無(wú)法獲取當(dāng)共享鎖,并且『shouldParkAfterFailedAcquire』方法返回true(則說(shuō)明本次獲取共享鎖失敗需要阻塞/掛起當(dāng)前線程)。但當(dāng)『nanosTimeout <= spinForTimeoutThreshold』說(shuō)明設(shè)置的超時(shí)時(shí)間 <= 自旋超時(shí)的閾值。這里spinForTimeoutThreshold的值為1000納秒,表示當(dāng)設(shè)置的超時(shí)時(shí)間小于1000納秒時(shí),使用自旋比使用線程掛起更快。粗略估算這足以去提升響應(yīng)在一個(gè)很短的超時(shí)時(shí)間內(nèi)。否則也是使用『LockSupport.parkNanos(this, nanosTimeout);』將當(dāng)前線程掛起,直到被喚醒或者超時(shí)時(shí)間到。
取消節(jié)點(diǎn)
當(dāng)嘗試獲取鎖的節(jié)點(diǎn),因?yàn)槌瑫r(shí)或中斷而結(jié)束時(shí),說(shuō)明本次獲取鎖操作失敗,因?yàn)楸敬尾僮鞯膎ode就應(yīng)該被取消。如果線程還需要獲取鎖的話,會(huì)再次嘗試獲取鎖操作,此時(shí)如果需要的話是會(huì)生成一個(gè)新的node的。
『cancelAcquire』
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
① 如果待取消節(jié)點(diǎn)(node)為null,則直接返回。
② 將node的thread置為null;
③ 將node的prev屬性指向一個(gè)在它之前的有效的節(jié)點(diǎn)(即,waitStatus <= 0的節(jié)點(diǎn)都為有效節(jié)點(diǎn))。 也就是跳過(guò)被取消的前驅(qū)節(jié)點(diǎn)。
④ 『Node predNext = pred.next;』取pred的下一個(gè)節(jié)點(diǎn)。這個(gè)predNext是pred表面上的下一個(gè)連接的節(jié)點(diǎn)(即,無(wú)需考慮該節(jié)點(diǎn)是否被取消了)。下面的CAS操作將會(huì)失?。ā篶ompareAndSetNext(pred, predNext, null);』or『compareAndSetNext(pred, predNext, next);』),如果和其他的取消或通知操作發(fā)生競(jìng)爭(zhēng)時(shí),這時(shí)不需要進(jìn)一步的操作。因?yàn)槿绻a(chǎn)生競(jìng)爭(zhēng),說(shuō)明pred的next已經(jīng)被修改了,并且是最新的值了,而我們的操作也就沒(méi)有要執(zhí)行的必要了。
⑤ 將node的waitStatus設(shè)置為’CANCELLED’。這里可以使用無(wú)條件的寫(xiě)代替CAS(注意,node的waitStatus是volatile的)。在這個(gè)原子操作之后,其他節(jié)點(diǎn)會(huì)跳過(guò)我們(即,跳過(guò)waitStatus被置位CANCELLED的節(jié)點(diǎn)),在這個(gè)原子操作之前,我們不受其他線程的干擾。也就是說(shuō),無(wú)論其他線程對(duì)node的waitStatus是否有在操作,在當(dāng)前的情況下我們都需要將這個(gè)node的waitStatus置為’CANCELLED’。
⑥ 如果待取消的node節(jié)點(diǎn)是隊(duì)列尾節(jié)點(diǎn)的話(即,『node == tail』),那么刪除node自己即可。使用CAS將tail節(jié)點(diǎn)設(shè)置成前面得到的第一個(gè)有效前驅(qū)節(jié)點(diǎn)(即,『compareAndSetTail(node, pred)』)。并且CAS操作成功的話,執(zhí)行『compareAndSetNext(pred, predNext, null);』也就是將tail的next置為null的意思。如果該CAS操作失敗的話,沒(méi)關(guān)系。說(shuō)明此時(shí)tail已經(jīng)被修改了。
⑦ 如果待取消的node節(jié)點(diǎn)不是隊(duì)尾節(jié)點(diǎn)。并且:
a)pred(即,node的有效前驅(qū)節(jié)點(diǎn))不是head節(jié)點(diǎn);并且
b)“pred.waitStatus為SIGNAL” 或者 “pred.waitStatus <= 0”時(shí)通過(guò)CAS將pred.waitStatus設(shè)置為SIGNAL”成功;并且
c) pred的thread非空
那么,當(dāng)node的next節(jié)點(diǎn)非空,且next節(jié)點(diǎn)的waitStatus<=0(說(shuō)明next節(jié)點(diǎn)未被取消)時(shí),通過(guò)CAS將pred的next執(zhí)行node的next(即,pred.next = node.next)。同時(shí),如果該CAS操作失敗是沒(méi)關(guān)系的,說(shuō)明有其他線程操作已經(jīng)修改了該pre的next值。
⑧ 如果待取消的node節(jié)點(diǎn)不是隊(duì)尾節(jié)點(diǎn),并且步驟[7]條件不成立。那么執(zhí)行『unparkSuccessor(node);』來(lái)釋放當(dāng)前這個(gè)待取消節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。(也就是說(shuō),當(dāng)prev是head節(jié)點(diǎn),或者prev也被取消的話,會(huì)執(zhí)行『unparkSuccessor(node);』來(lái)釋放node的下一個(gè)節(jié)點(diǎn),其實(shí)也就是pred的下一個(gè)節(jié)點(diǎn))
從上面的分析我們可以知道,其實(shí)CountDownLatch中線程的釋放其實(shí)是有順序的,根據(jù)節(jié)點(diǎn)入隊(duì)的順序依次被釋放,先入隊(duì)的節(jié)點(diǎn)的線程會(huì)先被釋放。
后記
如果文章有錯(cuò)不吝指教 :)