1、簡介
CyclicBarrier是一個(gè)同步工具類,它允許一組線程在到達(dá)某個(gè)柵欄點(diǎn)(common barrier point)互相等待,發(fā)生阻塞,直到最后一個(gè)線程到達(dá)柵欄點(diǎn),柵欄才會(huì)打開,處于阻塞狀態(tài)的線程恢復(fù)繼續(xù)執(zhí)行.它非常適用于一組線程之間必需經(jīng)?;ハ嗟却那闆r。CyclicBarrier字面理解是循環(huán)的柵欄,之所以稱之為循環(huán)的是因?yàn)樵诘却€程釋放后,該柵欄還可以復(fù)用。

建議閱讀CyclicBarrier源碼前,先深入研究一下ReentrantLock的原理,搞清楚condition里await和signal的原理,這部分可以看我之前的文章:【細(xì)談Java并發(fā)】談?wù)凙QS、 【細(xì)談Java并發(fā)】談?wù)凴eentrantLock
好了,我們來看看如何使用它吧。
2、使用場景
我們可以簡單使用CyclicBarrier來模擬一下對(duì)戰(zhàn)平臺(tái)中玩家需要完全準(zhǔn)備好了,才能進(jìn)入游戲的場景。
public class CyclicBarrierTest {
private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5);
private final static CyclicBarrier BARRIER = new CyclicBarrier(5);
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
final String name = "玩家" + i;
EXECUTOR_SERVICE.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println(name + "已準(zhǔn)備,等待其他玩家準(zhǔn)備...");
BARRIER.await();
Thread.sleep(1000);
System.out.println(name + "已加入游戲");
} catch (InterruptedException e) {
System.out.println(name + "離開游戲");
} catch (BrokenBarrierException e) {
System.out.println(name + "離開游戲");
}
}
});
}
EXECUTOR_SERVICE.shutdown();
}
}
輸出結(jié)果
玩家1已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家0已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家2已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家3已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家4已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家2已加入游戲
玩家3已加入游戲
玩家4已加入游戲
玩家0已加入游戲
玩家1已加入游戲
3、原理分析
3.1、屬性
首先看看它里面的所有屬性。
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
// 鎖
private final ReentrantLock lock = new ReentrantLock();
// 通過lock得到的一個(gè)狀態(tài)變量,用來await和signal
private final Condition trip = lock.newCondition();
// 通過構(gòu)造器傳入的參數(shù),表示總的等待線程的數(shù)量
private final int parties;
// 當(dāng)屏障正常打開后運(yùn)行的程序,通過最后一個(gè)調(diào)用await的線程來執(zhí)行
private final Runnable barrierCommand;
// 當(dāng)前的Generation。每當(dāng)屏障失效或者開閘之后都會(huì)自動(dòng)替換掉。從而實(shí)現(xiàn)重置的功能
private Generation generation = new Generation();
// 和parties一樣,每次線程await后減1
private int count;
...省略后面代碼
}
3.2、構(gòu)造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
- 默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier已經(jīng)到達(dá)屏障位置,線程被阻塞。
- 另外一個(gè)構(gòu)造方法CyclicBarrier(int parties, Runnable barrierAction),其中barrierAction任務(wù)會(huì)在所有線程到達(dá)屏障后執(zhí)行。
3.3、await()
最主要的方法就是await()方法,調(diào)用await()的線程會(huì)等待直到有足夠數(shù)量的線程調(diào)用await——也就是開閘狀態(tài)。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
await()和await(long, TimeUnit)都是調(diào)用dowait方法,區(qū)別就是參數(shù)不同,我們來看看dowait方法。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken) // 如果當(dāng)前Generation是處于打破狀態(tài)則傳播這個(gè)BrokenBarrierExcption
throw new BrokenBarrierException();
if (Thread.interrupted()) {
// 如果當(dāng)前線程被中斷則使得當(dāng)前generation處于打破狀態(tài),重置剩余count。
// 并且喚醒狀態(tài)變量。這時(shí)候其他線程會(huì)傳播BrokenBarrierException。
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 嘗試降低當(dāng)前count
/**
* 如果當(dāng)前狀態(tài)將為0,則Generation處于開閘狀態(tài)。運(yùn)行可能存在的command,
* 設(shè)置下一個(gè)Generation。相當(dāng)于每次開閘之后都進(jìn)行了一次reset。
*/
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction) // 如果運(yùn)行command失敗也會(huì)導(dǎo)致當(dāng)前屏障被打破。
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed) // 阻塞在當(dāng)前的狀態(tài)變量。
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { // 如果當(dāng)前線程被中斷了則使得屏障被打破。并拋出異常。
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 從阻塞恢復(fù)之后,需要重新判斷當(dāng)前的狀態(tài)。
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
此外再看下兩個(gè)小過程:
這兩個(gè)小過程當(dāng)然是需要鎖的,但是由于這兩個(gè)方法只是通過其他方法調(diào)用,所以依然是在持有鎖的范圍內(nèi)運(yùn)行的。這兩個(gè)方法都是對(duì)域進(jìn)行操作。
nextGeneration實(shí)際上在屏障開閘之后重置狀態(tài)。以待下一次調(diào)用。
breakBarrier實(shí)際上是在屏障打破之后設(shè)定打破狀態(tài),以喚醒其他線程并通知。
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
3.4、reset
reset方法比較簡單。但是這里還是要注意一下要先打破當(dāng)前屏蔽,然后再重建一個(gè)新的屏蔽。否則的話可能會(huì)導(dǎo)致信號(hào)丟失。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
4、CountDownLatch的區(qū)別
我用白話說的通俗點(diǎn)吧。
- CountDownLatch的使用是一次性的,而CyclicBarrier可以用reset進(jìn)行重用。
- CountDownLatch是一個(gè)線程等待多個(gè)線程執(zhí)行完了,再進(jìn)行執(zhí)行。而CyclicBarrier是多個(gè)線程等待所有線程都執(zhí)行完了,再進(jìn)行執(zhí)行。