引言
上一篇文章提到,CountDownLatch不支持重置計(jì)數(shù),如果你有反復(fù)重置計(jì)數(shù)的需求的話(huà),最好使用CyclicBarrier。
CyclicBarrier的中文名叫做"循環(huán)柵欄",能夠讓n個(gè)線(xiàn)程都到達(dá)同步點(diǎn)之后再讓他們開(kāi)始運(yùn)行,之后CyclicBarrier就會(huì)重新計(jì)數(shù),這個(gè)過(guò)程可以反復(fù)進(jìn)行,甚至還可以在到達(dá)同步點(diǎn)與重新運(yùn)行之間插入一段代碼(叫做barrierAction)。
Demo
文章重點(diǎn)講的是實(shí)現(xiàn),但是出于完整性考慮還是給個(gè)Demo,Demo摘自《實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)》。
場(chǎng)景:有10個(gè)士兵(其實(shí)就是10個(gè)線(xiàn)程),他們要先進(jìn)行集合,集合完畢后會(huì)打印"司令:[士兵10個(gè),集合完畢!]"(其實(shí)這就是一個(gè)barrierAction),然后開(kāi)始各自的工作,工作完畢后士兵們?cè)偌掀饋?lái),此時(shí)會(huì)打印"司令:[士兵10個(gè), 任務(wù)完成!]"(同理,這也是一個(gè)barrierAction)。
代碼實(shí)現(xiàn)如下:
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static class Soldier implements Runnable{
private String soldier;
private final CyclicBarrier cyclic;
public Soldier(CyclicBarrier cyclic, String soldier) {
this.soldier = soldier;
this.cyclic = cyclic;
}
@Override
public void run() {
try {
//等待士兵到齊
cyclic.await();
//士兵開(kāi)始做各自的工作
doWork();
//等待所有士兵完成任務(wù)
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
void doWork(){
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + " 任務(wù)完成");
}
}
/**
* 這個(gè)類(lèi)用于barrierAction
*/
public static class BarrierRun implements Runnable{
boolean flag;
int N;
public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}
@Override
public void run() {
if ( flag ){
System.out.println("司令:[士兵" + N + "個(gè), 任務(wù)完成!]");
} else {
System.out.println("司令:[士兵" + N + "個(gè),集合完畢!]");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
/**
* 插入了BarrierRun作為barrierAction
*/
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
for ( int i = 0; i < N; i++ ){
System.out.println("士兵 " + i + " 報(bào)道");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
allSoldier[i].start();
}
}
}
圖示如下(圖中下標(biāo)號(hào)對(duì)應(yīng)著下面文字描述的標(biāo)號(hào)):

- 士兵陸續(xù)前來(lái)集合
- 士兵集合完畢
- barrierAction1: 打印"司令:[士兵10個(gè),集合完畢!]"
- 士兵陸續(xù)完成任務(wù)
- 所有士兵的任務(wù)都執(zhí)行完畢
- barrierAction2: 打印"司令:[士兵10個(gè), 任務(wù)完成!]"
運(yùn)行打印的結(jié)果如下:
士兵 0 報(bào)道
士兵 1 報(bào)道
士兵 2 報(bào)道
士兵 3 報(bào)道
士兵 4 報(bào)道
士兵 5 報(bào)道
士兵 6 報(bào)道
士兵 7 報(bào)道
士兵 8 報(bào)道
士兵 9 報(bào)道
司令:[士兵10個(gè),集合完畢!]
士兵 7 任務(wù)完成
士兵 2 任務(wù)完成
士兵 9 任務(wù)完成
士兵 0 任務(wù)完成
士兵 3 任務(wù)完成
士兵 5 任務(wù)完成
士兵 4 任務(wù)完成
士兵 1 任務(wù)完成
士兵 6 任務(wù)完成
士兵 8 任務(wù)完成
司令:[士兵10個(gè), 任務(wù)完成!]
你可能會(huì)疑惑這個(gè)barrierAction是由哪個(gè)線(xiàn)程負(fù)責(zé)執(zhí)行的呢?從圖中可以看出barrierAction每次都是由一個(gè)線(xiàn)程執(zhí)行的,而這個(gè)線(xiàn)程一般就是最后到達(dá)的那個(gè)線(xiàn)程,之后我也會(huì)通過(guò)源碼分析得出這個(gè)結(jié)論。
從上面的demo看出CyclicBarrier的核心方法就只有一個(gè)await,它會(huì)拋出兩個(gè)異常,InterruptedException和BrokenBarrierException。InterruptedException顯然是當(dāng)前線(xiàn)程等待的過(guò)程被中斷而拋出的異常,而這些要集合的線(xiàn)程有一個(gè)線(xiàn)程被中斷就會(huì)導(dǎo)致線(xiàn)程永遠(yuǎn)都無(wú)法集齊,導(dǎo)致“柵欄損壞”,剩下的線(xiàn)程就會(huì)拋出BrokenBarrierException異常。
想要看到“柵欄損壞”的現(xiàn)象只要把main方法改成如下即可:
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
/**
* 插入了BarrierRun作為barrierAction
*/
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
for ( int i = 0; i < N; i++ ){
System.out.println("士兵 " + i + " 報(bào)道");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
allSoldier[i].start();
if ( i == 5 ){
allSoldier[0].interrupt();
}
}
}
然后你會(huì)得到1個(gè)InterruptedException和9個(gè)BrokenBarrierException。
柵欄損壞
其實(shí)除了上面說(shuō)的情況會(huì)發(fā)生“柵欄損壞”,文檔中還提到了好幾種會(huì)發(fā)生的情況,如下:
- 有一個(gè)線(xiàn)程發(fā)生中斷(就是上一小節(jié)提到的那個(gè)情況)或者超時(shí),而當(dāng)前線(xiàn)程正在等待(await),則當(dāng)前線(xiàn)程會(huì)拋出
BrokenBarrierException - 該CyclicBarrier對(duì)象被調(diào)用了reset方法
- 該CyclicBarrier對(duì)象被調(diào)用await時(shí),狀態(tài)已經(jīng)是"broken"了
- barrierAction拋出了未捕獲的異常
實(shí)現(xiàn)分析
與CountDownLatch不同,CyclicBarrier不是基于A(yíng)QS實(shí)現(xiàn),而是應(yīng)用ReentrantLock實(shí)現(xiàn)的,它的同步靠的是兩個(gè)成員變量(分別是一個(gè)ReentrantLock以及從中引申出的Condition):
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
我們先從構(gòu)造函數(shù)看起:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
從這里看出幾個(gè)關(guān)鍵成員變量的含義,parties代表循環(huán)柵欄每次要等待多少個(gè)線(xiàn)程,count則是用于倒計(jì)數(shù)用的計(jì)數(shù)器,而barrierCommand就是barrierAction了。
下面看CyclicBarrier唯一的核心方法await:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
await方法返回的數(shù)值是線(xiàn)程的達(dá)到次序,對(duì)于第一個(gè)到達(dá)的線(xiàn)程會(huì)返回(總的數(shù)值 - 1),而最后一個(gè)到達(dá)的線(xiàn)程會(huì)返回0。
發(fā)現(xiàn)它其實(shí)調(diào)用的是dowait方法。
dowait方法整個(gè)被一把ReentrantLock給鎖住了:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
...
} finally {
lock.unlock();
}
}
被鎖住的這段代碼看起來(lái)長(zhǎng),其實(shí)總結(jié)起來(lái)由以下步驟組成:

我在下面的代碼中標(biāo)出了相應(yīng)步驟(圖中的①對(duì)應(yīng)下面注釋中的"一",②對(duì)應(yīng)"二",以此類(lèi)推):
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//一:倒計(jì)數(shù)
int index = --count;
//二:所有線(xiàn)程都已就位?
//index為0說(shuō)明所有線(xiàn)程都已就位
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//三:最后一個(gè)到達(dá)的線(xiàn)程順手執(zhí)行barrierAction
if (command != null)
command.run();
ranAction = true;
//四:nextGeneration方法會(huì)喚醒所有線(xiàn)程并且更新generation進(jìn)入下一代
nextGeneration();
//最后一個(gè)到達(dá)的線(xiàn)程返回0
return 0;
} finally {
//如果barrierAction出現(xiàn)異常則將該循環(huán)柵欄損壞
if (!ranAction)
breakBarrier();
}
}
// 五:利用Condition進(jìn)行等待
for (;;) {
try {
if (!timed) //無(wú)超時(shí)等待
trip.await();
else if (nanos > 0L) //有超時(shí)等待
nanos = trip.awaitNanos(nanos);//返回剩余的時(shí)間
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
//超時(shí)則破壞循環(huán)柵欄
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
下面分別對(duì)每一塊進(jìn)行講解。
①之前
在我列出的關(guān)鍵步驟①之前還有幾行代碼,我們先來(lái)看一看:
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
CyclicBarrier對(duì)象有一個(gè)Generation內(nèi)部類(lèi),其唯一的作用是標(biāo)記這一代是否出現(xiàn)了"柵欄損壞":
private static class Generation {
//為true則表示這一代發(fā)生了柵欄損壞
boolean broken = false;
}
所有的打破柵欄的控制流都會(huì)調(diào)用breakBarrier方法:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
可以看到它把generation的broker標(biāo)志設(shè)置為了true,然后重置了count計(jì)數(shù)器,最后使用了trip這個(gè)Condition對(duì)象喚醒了所有的線(xiàn)程。generation成員變量會(huì)在每次線(xiàn)程到齊之后換一個(gè)新的(我稱(chēng)之為"換代",分析到后面的代碼時(shí)再說(shuō))。
知道了這些,剛開(kāi)始給出的那幾行代碼就很好理解了,如果線(xiàn)程剛到這里就發(fā)現(xiàn)柵欄在這一代以及損壞(if (g.broken)),則直接拋出柵欄損壞異常(throw new BrokenBarrierException();)。
如果發(fā)現(xiàn)線(xiàn)程在之前以及被中斷了(if (Thread.interrupted()) {),則立即損壞柵欄(breakBarrier();)并拋出InterruptedException。
①: 倒計(jì)數(shù)
這個(gè)很簡(jiǎn)單,就是將count成員變量減1,然后再保存到局部變量index中,等到方法返回時(shí),作為返回值。
②:所有線(xiàn)程都已就位?
這個(gè)也很簡(jiǎn)單,當(dāng)count減到0了,則說(shuō)明所有線(xiàn)程都就位了,否則還需要等。
③④:順手執(zhí)行barrierAction,喚醒其他線(xiàn)程并進(jìn)行下一代
//二:所有線(xiàn)程都已就位?
//index為0說(shuō)明所有線(xiàn)程都已就位
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//三:最后一個(gè)到達(dá)的線(xiàn)程順手執(zhí)行barrierAction
if (command != null)
command.run();
ranAction = true;
//四:nextGeneration方法會(huì)喚醒所有線(xiàn)程并且更新generation進(jìn)入下一代
nextGeneration();
//最后一個(gè)到達(dá)的線(xiàn)程返回0
return 0;
} finally {
//如果barrierAction出現(xiàn)異常則將該循環(huán)柵欄損壞
if (!ranAction)
breakBarrier();
}
}
最后一個(gè)到達(dá)的線(xiàn)程發(fā)現(xiàn)count已經(jīng)減到0了,如果發(fā)現(xiàn)設(shè)置了barrierAction的話(huà),則順手將其執(zhí)行,緊接著調(diào)用nextGeneration方法將整個(gè)CyclicBarrier對(duì)象的狀態(tài)進(jìn)行重置,準(zhǔn)備迎接下一輪循環(huán),nextGeneration方法的內(nèi)容如下:
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
trip就是從lock中衍生出來(lái)的Condition對(duì)象,這里調(diào)用signalAll方法將所有阻塞住的線(xiàn)程都喚醒了(這些線(xiàn)程都阻塞在⑤中),將count重新置位倒計(jì)數(shù)的總數(shù)parties,之后將generation成員變量換了個(gè)新的(稱(chēng)之為"換代")。
最后的finally代碼塊中,會(huì)通過(guò)ranAction標(biāo)志檢測(cè)barrierAction是否成功執(zhí)行,如果未能成功執(zhí)行,則還是會(huì)調(diào)用breakBarrier方法破壞掉柵欄。
從上面的代碼可以看出,CyclicBarrier一旦損壞掉就不會(huì)自動(dòng)回復(fù)了,需要手工調(diào)用CyclicBarrier對(duì)象的reset方法來(lái)開(kāi)啟新的一代。
⑤:利用Condition進(jìn)行等待
除了最后一個(gè)到達(dá)的線(xiàn)程,其他線(xiàn)程都會(huì)進(jìn)入這一段代碼進(jìn)行等待,核心就是使用trip這個(gè)Condition對(duì)象的await方法阻塞?。?/p>
if (!timed) //無(wú)超時(shí)等待
trip.await();
else if (nanos > 0L) //有超時(shí)等待
nanos = trip.awaitNanos(nanos);//返回剩余的時(shí)間
如果await的過(guò)程中發(fā)生了中斷,則在catch代碼塊中破壞柵欄(breakBarrier),如果發(fā)現(xiàn)已經(jīng)換代或者柵欄已經(jīng)損壞,則重置當(dāng)前線(xiàn)程的中斷標(biāo)志位(Thread.currentThread().interrupt();),供下一輪使用:
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
剩下的三個(gè)判斷就是線(xiàn)程被喚醒之后的處理了:
//喚醒之后發(fā)現(xiàn)柵欄已經(jīng)損壞,則拋出異常
if (g.broken)
throw new BrokenBarrierException();
//柵欄沒(méi)有損壞而且換代了,說(shuō)明這一代順利結(jié)束,await方法返回
if (g != generation)
return index;
//超時(shí)
if (timed && nanos <= 0L) {
//超時(shí)則破壞循環(huán)柵欄
breakBarrier();
throw new TimeoutException();
}
這里注意到一個(gè)比較有趣的地方就是,假如nanos大于0的情況下則會(huì)繼續(xù)Condition等待的循環(huán),nanos怎么會(huì)大于0呢?查閱了awaitNanos的文檔發(fā)現(xiàn)它的返回值的含義是線(xiàn)程剩余的需要等待的時(shí)間,為了讓awaitNanos能夠真正地等待你所指定的時(shí)間,推薦的寫(xiě)法是:
boolean aMethod(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
lock.lock();
try {
while (!conditionBeingWaitedFor()) {
if (nanos <= 0L)
return false;
nanos = theCondition.awaitNanos(nanos);
}
// ...
} finally {
lock.unlock();
}
}
其實(shí)CyclicBarrier中的寫(xiě)法就是這種推薦寫(xiě)法的變種。
當(dāng)時(shí)我就很疑惑,為什么這個(gè)awaitNanos方法這么不靠譜呢?我都讓它等待nanos之后再超時(shí),它卻有可能中途就原因不明地給我返回了。
回憶一下AQS中awaitNanos的實(shí)現(xiàn),它阻塞靠的是LockSupport.parkNanos方法,而LockSupport的文檔明確說(shuō)了,它的parkNanos會(huì)出現(xiàn)"偽喚醒"的問(wèn)題(也就是原因不明地返回),而且因?yàn)?code>parkNanos是個(gè)返回值為void的方法,它甚至不會(huì)告訴你剩余的時(shí)間是多少,AQS對(duì)其額外封裝了剩余的等待時(shí)間,也算是比較友好了。
總結(jié)
相比JUC中的其他類(lèi),CyclicBarrier的實(shí)現(xiàn)屬于比較接地氣的,基于ReentrantLock實(shí)現(xiàn)了自己的功能,可以學(xué)習(xí)它對(duì)于ReentrantLock的應(yīng)用
參考文獻(xiàn)
《實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)》(葛一鳴 郭超 著)