注:文中代碼的解釋基本上都以注釋的形式和代碼寫在一起
CountDownLatch是并發(fā)環(huán)境中常用的計(jì)數(shù)組件,也是基于AQS實(shí)現(xiàn)的。主要的方法有兩個(gè),countDown和await,實(shí)現(xiàn)了AQS模板方法的tryReleaseShared方法來完成countDown計(jì)數(shù)減的過程,實(shí)現(xiàn)了AQS模板方法的tryAcquireShared方法來實(shí)現(xiàn)await阻塞等待功能。
countDown方法
countDown方法源碼如下,直接調(diào)用了內(nèi)部類sync的releaseShared方法來實(shí)現(xiàn),這里的Sync和ReentrantLock的內(nèi)部類Sync一樣,是繼承了AQS的內(nèi)部類,releaseShared方法正是AQS提供的共享模式的模板方法。
public void countDown() {
//直接調(diào)用了AQS的releaseShared
sync.releaseShared(1);
}
AQS的releaseShared方法源碼如下
public final boolean releaseShared(int arg) {
//tryReleaseShared方法AQS留給子類自己實(shí)現(xiàn)
//嘗試將status減去arg,如果返回為true,執(zhí)行doRelease方法,否則返回
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared方法中調(diào)用了CountDownLatch中實(shí)現(xiàn)的tryReleaseShared方法,源碼如下
protected boolean tryReleaseShared(int releases) {
//通過源碼我們發(fā)現(xiàn)傳入的參數(shù)releases并沒有什么用,每次計(jì)數(shù)固定減一
// 無限循環(huán)直到值減一成功或者status變成0
for (;;) {
//獲取status的值,CountDownlatch中status的值代表要等待的總計(jì)數(shù)
int c = getState();
//如果已經(jīng)是0了說明已經(jīng)不能再減計(jì)數(shù)了,返回false
if (c == 0)
return false;
int nextc = c-1;
//CAS的方式將status減一
if (compareAndSetState(c, nextc))
如果當(dāng)前減完之后,status是0,也就意味著計(jì)數(shù)結(jié)束了,返回true
return nextc == 0;
}
}
tryReleaseShared方法返回為true,也就是計(jì)數(shù)結(jié)束時(shí),會(huì)接著執(zhí)行doReleaseShared方法。doReleaseShared方法在CountDownLatch中沒有重寫,直接調(diào)用的是AQS的doReleaseShared方法,源碼如下,其中unparkSuccessor源碼解析見另一篇博客《AQS源碼解析》
private void doReleaseShared() {
//無限循環(huán)
for (;;) {
//獲取頭節(jié)點(diǎn)
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果頭節(jié)點(diǎn)設(shè)置的是要喚醒下一個(gè)節(jié)點(diǎn)的等待狀態(tài)
if (ws == Node.SIGNAL) {
//將節(jié)點(diǎn)的waitStatus設(shè)置成0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//如果設(shè)置成功,喚醒后面的等待節(jié)點(diǎn)
unparkSuccessor(h);
}
//ws==0說明第一步設(shè)置成功或者原先就是0
//將其狀態(tài)設(shè)置為PROPAGATE
//失敗(PROPAGATE狀態(tài)表示同步狀態(tài)將會(huì)無條件傳播,意思就是節(jié)點(diǎn)可運(yùn)行)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果h還是頭節(jié)點(diǎn),就結(jié)束循環(huán)
if (h == head) // loop if head changed
break;
}
}
await方法
await方法直接調(diào)用了內(nèi)部類Sync的acquireSharedInterruptibly方法,阻塞線程直到count為0,當(dāng)前線程才能拿到鎖(或者拋出異常也有可能結(jié)束阻塞)。源碼如下:
public void await() throws InterruptedException {
//直接調(diào)用了內(nèi)部類Sync的方法(其實(shí)是AQS的方法)
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法,如果線程被中斷過就拋出異常結(jié)束阻塞,不然就判斷計(jì)數(shù)的值,為0就返回,等于當(dāng)前阻塞的線程獲得了鎖,如果計(jì)數(shù)不為0,進(jìn)入doAcquireSharedInterruptibly方法進(jìn)行排隊(duì)等待。源碼如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果線程被中斷過,拋出異常,線程不再等待
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared嘗試獲取共享狀態(tài)
//tryAcquireShared源碼見下方,實(shí)際就是獲取計(jì)數(shù)
//計(jì)數(shù)不為0則執(zhí)行doAcquireSharedInterruptibly方法
//計(jì)數(shù)為0則返回值大于0,方法直接返回,線程阻塞結(jié)束。等于線程獲得了鎖
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly方法只有在線程阻塞時(shí)會(huì)被調(diào)用,也就是計(jì)數(shù)不為0時(shí)被調(diào)用。方法將當(dāng)前線程構(gòu)造為一個(gè)共享模式的等待節(jié)點(diǎn)加入等待隊(duì)列中,然后開啟無限自循環(huán),直到計(jì)數(shù)等于0獲取到鎖,或者拋出異常為止。源碼如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//新建一個(gè)共享模式節(jié)點(diǎn)加入等待隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取新建節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
//如果前一個(gè)就是頭節(jié)點(diǎn)
if (p == head) {
//說明當(dāng)前線程是第一個(gè)等待的,嘗試獲取鎖
//也就是獲取計(jì)數(shù)
int r = tryAcquireShared(arg);
//r不小于0說明計(jì)數(shù)已經(jīng)是0了,等于當(dāng)前線程已經(jīng)獲得了鎖
if (r >= 0) {
//將當(dāng)前線程的節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn)
setHeadAndPropagate(node, r);
//原先的頭節(jié)點(diǎn)p從隊(duì)列中解除,便于垃圾回收
p.next = null; // help GC
failed = false;
return;
}
}
//shouldParkAfterFailedAcquire檢查如果獲取鎖失敗當(dāng)前節(jié)點(diǎn)是否需要掛起
//只有當(dāng)前一個(gè)節(jié)點(diǎn)的waitStatus是SIGNAL也就是說前一個(gè)節(jié)點(diǎn)
//獲得鎖以后會(huì)把自己喚醒,當(dāng)前節(jié)點(diǎn)才能放心掛起
//parkAndCheckInterrupt判斷節(jié)點(diǎn)是否被中斷過
//這里意思是如果當(dāng)前節(jié)點(diǎn)是在被掛起狀態(tài)而且被中斷過就拋出異常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果failed為true說明線程被中斷,沒有獲得鎖
//所以取消獲取鎖的動(dòng)作
if (failed)
cancelAcquire(node);
}
}
總結(jié)來說就是await方法會(huì)讓當(dāng)前線程阻塞,進(jìn)入方法后先判斷一次計(jì)數(shù)是否為0,如果是0則直接返回,線程獲得鎖,阻塞結(jié)束。如果不為0則進(jìn)入doAcquireSharedInterruptibly方法,將當(dāng)前線程構(gòu)造成了一個(gè)等待節(jié)點(diǎn),開啟無限循環(huán),線程被阻塞。無限循環(huán)直到當(dāng)前線程的節(jié)點(diǎn)排隊(duì)排到了頭節(jié)點(diǎn)的后面,就可以嘗試獲得鎖了,如果成功了就可以返回,阻塞結(jié)束。在排隊(duì)的過程中如果線程被中斷那么就拋出異常。簡而言之阻塞是由無限的for循環(huán)造成的,所以結(jié)束循環(huán)就是線程結(jié)束阻塞的關(guān)鍵了。
提問:await方法支持多個(gè)線程一起等待嗎
回答:支持的。從源碼角度看,await方法并沒有做任何的同步控制,多個(gè)線程等待和一個(gè)線程等待,結(jié)果沒有什么不同,所有等待的線程都會(huì)阻塞到status為0,阻塞過程中這些線程就乖乖的在隊(duì)列里等待。