引言
今天分享一個比較簡短一些的源碼,那就是concurrent包中我們經(jīng)常使用的CountDownLatch同步器,"latch"在英文中也是鎖的意思,翻譯成中文就是“倒數(shù)鎖”,當你調(diào)用了這個類型對象中的await方法后,必須要等待這個鎖倒數(shù)到0,才能繼續(xù)運行。
這個類的源碼非常短,因為其實它就是對AQS共享模式的一個簡單實現(xiàn)而已,如果你還不理解AQS的話,可以去看看我這個系列的第一篇文章和第三篇文章,他們詳細介紹了AQS的使用與原理,以及AQS經(jīng)典的兩個實現(xiàn)類ReentrantLock與Semophore。
Demo
雖然這篇文章重點講的是實現(xiàn),但是出于完整性,這里也給個demo示意其使用,如果你已經(jīng)知道這個類的使用方法的話,直接跳過這一部分即可。
假設現(xiàn)在有一個大小為15的int數(shù)組(全部為0),我想用三個線程將數(shù)組中的內(nèi)容全部置位1(每個線程只需要將5個數(shù)組元素置位1即可),使用CountDownLatch即可實現(xiàn)這個過程的同步:
package org.du.test.blogdemo;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
private static class Fill extends Thread{
private int start;
private int end;
private int[] array;
private CountDownLatch countDownLatch;
public Fill(int start, int end, int[] array, CountDownLatch countDownLatch) {
this.start = start;
this.end = end;
this.array = array;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
for ( int i = start; i < end; i++ ){
array[i] = 1;
}
//該部分任務完成,倒數(shù)一個
countDownLatch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
//任務只有3個部分,所以倒數(shù)三下即可,構(gòu)造函數(shù)中的3代表需要倒數(shù)的次數(shù)
CountDownLatch countDownLatch = new CountDownLatch(3);
int[] array = new int[15];
new Fill(0, 5, array, countDownLatch).start();
new Fill(5, 10, array, countDownLatch).start();
new Fill(10, 15, array, countDownLatch).start();
//等待倒數(shù)完成
countDownLatch.await();
System.out.printf(Arrays.toString(array));
}
}
程序最后輸出:
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
所有的元素都被置1了,說明線程同步是正確的。
需要注意的一點是CountDownLatch的計數(shù)是無法被重置的,如果你的場景總是需要重置計數(shù)的話,最好考慮使用CyclicBarrier(循環(huán)柵欄),我的下一篇文章將會詳細分析CyclicBarrier。
實現(xiàn)分析
從上面的demo可以看出,CountDownLatch的核心方法其實就只有兩個:
- countDown: 用于計數(shù)器倒數(shù)一次
- await: 等待計數(shù)器倒數(shù)完成
點開這兩個方法看一下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
發(fā)現(xiàn)有點類似與之前講過的Semaphore,因為他們都是AQS在共享模式下的實現(xiàn),套路是一樣的。
Sync內(nèi)部類實現(xiàn)了AQS接口:
private static final class Sync extends AbstractQueuedSynchronizer {
...
而當你調(diào)用CountDownLatch(int count)構(gòu)造方法時會實例化一個sync實例:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//實例化sync同步器
this.sync = new Sync(count);
}
繼續(xù)查看Sync的構(gòu)造器的代碼:
Sync(int count) {
//這里AQS的state代表計數(shù)器的值
setState(count);
}
回憶一下AQS的相關知識,AQS允許子類通過state變量進行同步,在ReentrantLock的實現(xiàn)中,state同步變量被用作記錄線程的沖入次數(shù),而在Semaphore的實現(xiàn)中,state同步變量被用作記錄剩余許可數(shù),這里可以看出:
- CountDownLatch中的AQS同步狀態(tài)被用作記錄剩余倒數(shù)次數(shù)
countDown的實現(xiàn)
從上面可以看到countDown方法本質(zhì)上調(diào)用的是AQS的releaseShared,回憶一下AQS的知識,releaseShared本質(zhì)上是調(diào)用子類實現(xiàn)的tryReleaseShared類嘗試釋放鎖的,Sync的實現(xiàn)如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//使用CAS循環(huán)更新 計數(shù)器 減一
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
可以看出就是一個CAS循環(huán),不把同步狀態(tài)state減1誓不擺休。其中compareAndSetState是由父類AQS提供的用于操作同步狀態(tài)state的方法。
await的實現(xiàn)
await調(diào)用的其實就是AQS的acquireSharedInterruptibly方法,而acquireSharedInterruptibly本質(zhì)上是調(diào)用子類實現(xiàn)的tryAcquireShared方法類不斷嘗試獲得鎖的,Sync中的實現(xiàn)如下:
protected int tryAcquireShared(int acquires) {
//返回1會不停地傳播下去
return (getState() == 0) ? 1 : -1;
}
代碼只有一行,但是要看懂的話需要了解一些AQS共享模式的知識,如果子類實現(xiàn)的tryAcquireShared返回一個大于等于0的數(shù)字的話,線程獲得鎖,除了會將自己的節(jié)點設置為頭結(jié)點外,還會繼續(xù)喚醒后繼的一個節(jié)點(代碼中稱之為“傳播”, PROPAGATE);如果返回0的話,則線程獲得鎖,然后就像獨占模式中一樣將自己的節(jié)點設置為頭節(jié)點;返回一個小于0的數(shù)字的話,則線程無法獲得鎖,等待它的就將是被阻塞。
這里畫了張表格總結(jié)了一下:
tryAcquireShared返回值 |
導致的行為 |
|---|---|
| >0 | 線程獲得鎖,將自己的節(jié)點設置為頭結(jié)點,繼續(xù)喚醒后繼的一個節(jié)點 |
| =0 | 線程獲得鎖,將自己的節(jié)點設置為頭結(jié)點 |
| <0 | 線程無法獲得鎖 |
這樣我們就能看懂這個返回值了,如果getState發(fā)現(xiàn)同步狀態(tài)已經(jīng)倒數(shù)到0了,則一直返回1,這樣就能夠從頭節(jié)點不停地傳播下去,直到喚醒所有正在await的線程。
如果發(fā)現(xiàn)還沒有倒數(shù)到0,則始終返回-1,這樣所有正在await的線程就會一直阻塞下去。
End
這一篇分享比較短,有劃水的嫌疑,其實我是在為下一篇介紹CyclicBarrier的文章作鋪墊。
