引言
CountDownLatch是jdk1.5開始concurrent包里提供的,并發(fā)編程工具類。
這個類能夠使一個線程等待其他線程完成各自的工作后再執(zhí)行,可用于多線程的并發(fā)執(zhí)行。
例如,應用程序的主線程希望在多個網絡請求線程并發(fā)執(zhí)行完后,刷新頁面,避免串行請求導致網絡請求耗時長。
CountDownLatch的使用
CountDownLatch的主要使用步驟是
1、初始化,指定線程個數(shù),CountDownLatch latch = new CountDownLatch(3);
參數(shù)4代表線程的總數(shù)
2、每個線程執(zhí)行后執(zhí)行latch.countDown();,代表一個線程執(zhí)行完成,待完成的線程數(shù)減1。
3、在線程添加latch.await();,阻塞該線程,等待其他子線程完成。
Demo如下
package com.example.zzh.myapplication;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// Let us create task that is going to
// wait for four threads before it starts
CountDownLatch latch = new CountDownLatch(3);
long start = System.currentTimeMillis();
// Let us create four worker
// threads and start them.
WorkerThread first = new WorkerThread(1000, latch, "worker-1");
WorkerThread second = new WorkerThread(2000, latch, "worker-2");
WorkerThread third = new WorkerThread(3000, latch, "worker-3");
first.start();
second.start();
third.start();
// The main task waits for four threads
latch.await();
// Main thread has started
System.out.println(Thread.currentThread().getName() + " has finished. Spend Time = " + (System.currentTimeMillis() - start));
}
// A class to represent threads for which
// the main thread waits.
static class WorkerThread extends Thread {
private int delay;
private CountDownLatch latch;
public WorkerThread(int delay, CountDownLatch latch, String name) {
super(name);
this.delay = delay;
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(delay);
latch.countDown();
System.out.println(Thread.currentThread().getName() + " finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
運行結果
worker-1 finished
worker-2 finished
worker-3 finished
main has finished. Spend Time = 3006
CountDownLatch的解析
1、什么是AQS(AbstractQueuedSynchronizer)
深入CountDownLatch源碼,需要了解AQS(AbstractQueuedSynchronizer),因為CountDownLatch的底層原理是通過AQS(AbstractQueuedSynchronizer)里面的共享鎖來實現(xiàn)的。
推薦閱讀:【死磕Java并發(fā)】—–J.U.C之AQS(一篇就夠了)
以下是上述文章的引用:
AQS:AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架,JUC并發(fā)包的作者(Doug Lea)期望它能夠成為實現(xiàn)大部分同步需求的基礎。它是JUC并發(fā)包中的核心基礎組件。
AQS解決了實現(xiàn)同步器時涉及當?shù)拇罅考毠?jié)問題,例如獲取同步狀態(tài)、FIFO同步隊列?;贏QS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現(xiàn)工作,而且也不必處理在多個位置上發(fā)生的競爭問題。
AQS使用一個int類型的成員變量state來表示同步狀態(tài),當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態(tài)state進行操作,當然AQS可以確保對state的操作是安全的。
AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態(tài)失敗(鎖)時,AQS則會將當前線程以及等待狀態(tài)等信息構造成一個節(jié)點(Node)并將其加入同步隊列,同時會阻塞當前線程,當同步狀態(tài)釋放時,則會把節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
AQS的使用方式是繼承,子類通過繼承同步器并實現(xiàn)它的抽象方法來管理同步狀態(tài)。AQS提供了獨占鎖和共享鎖必須實現(xiàn)的方法。
共享鎖則是一種樂觀鎖,它放寬了加鎖策略,允許多個執(zhí)行讀操作的線程同時訪問共享資源。對應的是獨占鎖,是一種悲觀鎖,它避免了讀/讀沖突,如果某個只讀線程獲取鎖,則其他讀線程都只能等待,這樣就限制了不必要的并發(fā)性,因為讀操作并不會影響數(shù)據的一致性。
在AQS中,共享鎖獲取鎖,節(jié)點模式則為Node.SHARED。獨占鎖獲取鎖時,設置節(jié)點模式為Node.EXCLUSIVE
CountDownLatch使用的是共享鎖,繼承AQS的方法有:
tryAcquireShared(int arg):共享式獲取同步狀態(tài),返回值大于等于0則表示獲取成功,否則獲取失??;tryReleaseShared(int arg):共享式釋放同步狀態(tài)。
上面Demo的隊列同步器模型如下(參考這里)

2、初始化源碼解析
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); // 獲取主存中的state值
if (c == 0)
return false; //state已經為0 直接退出
int nextc = c-1; // 減一 準備cas更新該值
if (compareAndSetState(c, nextc)) //cas更新status值為nextc
return nextc == 0; //更新成功 判斷是否為0 退出;更新失敗則繼續(xù)for循環(huán),直到線程并發(fā)更新成功
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
初始化做的工作是創(chuàng)建同步器實例,這個同步器就是上文提到的繼承AQS的類,并實現(xiàn)共享鎖方法。
3、latch.countDown()解析
public void countDown() {
sync.releaseShared(1);
}
//AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中tryReleaseShared是上文實現(xiàn)的方法,主要的工作是CAS更新state值減一,并判斷是否為0,如果為0返回true,說明所有線程都執(zhí)行完成,可以做喚醒的工作doReleaseShared。
//AbstractQueuedSynchronizer.java
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
上面的邏輯是:
如果當前節(jié)點是SIGNAL意味著,它正在等待一個信號,或者說它在等待被喚醒,因此做兩件事,一是重置waitStatus標志位,二是重置成功后,喚醒下一個節(jié)點。
如果本身頭節(jié)點的waitStatus是出于重置狀態(tài)(waitStatus==0)的,將其設置為“傳播”狀態(tài)。意味著需要將狀態(tài)向后一個節(jié)點傳播。
這個死循環(huán),退出的路只有一條,那就是h==head,即該線程是頭節(jié)點,且狀態(tài)為共享狀態(tài)。
4、latch.await()解析
await是阻塞當前線程(中斷被拋中斷異常),等待被喚醒,源碼如下
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
上面的邏輯是:
如果線程被中斷,則拋出異常。然后判斷tryAcquireShared方法的返回值是否小于0,這個方法是第2步初始化實現(xiàn)的,當(getState() == 0)時則返回1,否則返回-1,即當state還沒有減少到0時,則執(zhí)行doAcquireSharedInterruptibly(arg)
//AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);// 往同步隊列中添加節(jié)點
boolean failed = true;
try {
for (;;) { // 一個死循環(huán) 跳出循環(huán)只有下面兩個途徑
final Node p = node.predecessor(); // 當前線程的前一個節(jié)點
if (p == head) {
int r = tryAcquireShared(arg); //當getState()==0時則返回1,否則返回-1
if (r >= 0) {
setHeadAndPropagate(node, r);// 處理后續(xù)節(jié)點
p.next = null; // help GC
failed = false;
return;//當getState為0,并且為頭節(jié)點,則跳出循環(huán)
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();// 響應打斷 跳出循環(huán)
}
} finally {
if (failed)
cancelAcquire(node); //如果是打斷退出的,則移除同步隊列節(jié)點
}
}
在同步隊列中掛起的線程,它們自旋的形式查看自己是否滿足條件醒來(state==0,且為頭節(jié)點),如果成立(即被喚醒),將調用setHeadAndPropagate這個方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
這個方法是將當前節(jié)點的下一個節(jié)點設置為頭節(jié)點,且它也調用了doReleaseShared這個方法,在第3步解析latch.countDown中提到,這個方法就是將頭節(jié)點設置為共享狀態(tài)的,由此,共享狀態(tài)傳播下去。
擴展內容
1、CountDownLatch的優(yōu)缺點
優(yōu)點:
對使用者而言,你只需要傳入一個int型變量控制任務數(shù)量即可,至于同步隊列的出隊入隊維護,state變量值的維護對使用者都是透明的,使用方便。
缺點:
CountDownLatch設置了state后就不能更改,也不能循環(huán)使用。
2、CountDownLatch的超時處理
如果線程等待超過一定時間,可以取消阻塞被喚醒,那么可以通過設置await的參數(shù)
//等待超過2s,自動被喚醒
latch.await(2000, TimeUnit.MILLISECONDS);
參考
【死磕Java并發(fā)】—–J.U.C之AQS(一篇就夠了)