1. 簡(jiǎn)介
在上篇中我們介紹了SyclicBarrier類的使用,通過(guò)SyclicBarrier我們可以完成一些分批執(zhí)行匯總的任務(wù),而此次介紹的CountDownLatch則是實(shí)現(xiàn)類似“倒計(jì)時(shí)”的功能。
2. Api分析
CountDownLatch源碼很簡(jiǎn)潔,它提供兩個(gè)典型的方法來(lái)實(shí)現(xiàn)“倒計(jì)時(shí)功能”。CountDownLatch在構(gòu)造方法中指定門(mén)閂鎖latch的個(gè)數(shù),當(dāng)latch的個(gè)數(shù)為0的時(shí)候則喚醒所有等待狀態(tài)下的線程。
- countDown():通過(guò)調(diào)用countDown方法實(shí)現(xiàn)門(mén)閂鎖-1的效果。
- await():讓當(dāng)前線程進(jìn)入等待。
- getCount():獲取門(mén)閂鎖的個(gè)數(shù)。
下面我們通過(guò)一個(gè)示例代碼來(lái)看一下:
public class MyTest {
static CountDownLatch countDownLatch;
public static void main(String[]args){
countDownLatch = new CountDownLatch(3);
for(int i=0;i<3;i++){
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "--到達(dá)車門(mén)");
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "--已登車");
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "--登車完畢");
}
}
在示例代碼中,通過(guò)構(gòu)造函數(shù)創(chuàng)建一個(gè)指定latch個(gè)數(shù)為3的CountDownLatch的方法,同時(shí)創(chuàng)建了三個(gè)線程,在每個(gè)線程的內(nèi)部分別調(diào)用一次countDown()方法,最后在主線程中調(diào)用await方法進(jìn)行等待阻塞,最后完成輸出。
執(zhí)行結(jié)果:
Thread-2--到達(dá)車門(mén)
Thread-2--已登車
Thread-0--到達(dá)車門(mén)
Thread-0--已登車
Thread-1--到達(dá)車門(mén)
main--登車完畢
Thread-1--已登車
從上面可以得知CountDownLatch以下特點(diǎn):
- 線程中執(zhí)行countDown方法后能繼續(xù)執(zhí)行后續(xù)代碼,線程不會(huì)阻塞等待;
- latch個(gè)數(shù)為0的時(shí)候會(huì)立即喚醒a(bǔ)wait等待的線程;
3. CountDownLatch源碼解析
結(jié)合上面對(duì)CountDownLatch功能的描述,假如讓我們實(shí)現(xiàn)一個(gè)這樣的功能,我們首先想到的思路應(yīng)該是在工具類中維持一個(gè)Count計(jì)數(shù)變量,然后維持對(duì)該變量的判斷。那么我們來(lái)看看CountDownLatch是怎么實(shí)現(xiàn)的。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 讓當(dāng)前線程進(jìn)行等待,直到鎖個(gè)數(shù)為0或者當(dāng)前線程interrupt
* 如果當(dāng)前鎖latch個(gè)數(shù)為0,則立即返回
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 減少latch鎖,當(dāng)latch鎖個(gè)數(shù)為0的時(shí)候釋放所有等待線程。
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
首先從構(gòu)造方法來(lái)看,CountDownLatch提供了有參構(gòu)造函數(shù):
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
這里的Sync類是CountDownLatch內(nèi)部集成AQS實(shí)現(xiàn)的一個(gè)內(nèi)部類。
Sync(int count) {
setState(count);
}
在Sync的構(gòu)造函數(shù)中,調(diào)用的是AQS的setState方法修改state值,將state值修改為count值。
通過(guò)上面的介紹,對(duì)于CountDownLatch類來(lái)說(shuō),使用最多的就是countDown和await方法。
countDown()方法
public void countDown() {
sync.releaseShared(1);
}
/**AQS類中定義的該方法*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在countDown方法內(nèi)部是調(diào)用Sync的releaseShared方法,嘗試釋放公共鎖狀態(tài)。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
采用自旋的方式,獲取當(dāng)前state值,如果state值為0就返回false,即釋放失敗。反之則state-1,通過(guò)CAS操作成功,返回nextc == 0;當(dāng)最后通過(guò)咨詢的方式返回true的時(shí)候(state ==0),則執(zhí)行releaseShared方法中的doReleaseShared()方法,在doReleaseShared()方法內(nèi)部通過(guò)unparkSuccessor()方法喚醒阻塞的線程開(kāi)始執(zhí)行。
await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/***AQS中的方法*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
這里調(diào)用acquireSharedInterruptibly()方法申請(qǐng)鎖,如果鎖被占有state!=0,則通過(guò)doAcquireSharedInterruptibly()進(jìn)行鎖申請(qǐng)。
4. CountDownLatch的使用場(chǎng)景
- 確保某個(gè)計(jì)算在其需要的所有資源都被初始化之后才繼續(xù)執(zhí)行。
- 確保某個(gè)服務(wù)在其依賴的所有其他服務(wù)都已啟動(dòng)后才啟動(dòng)。
- 等待知道某個(gè)操作的所有者都就緒在繼續(xù)執(zhí)行。