簡介
jdk原文
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
The barrier is called cyclic because it can be re-used after the waiting threads are released.
這句話翻譯意思:CyclicBarrier是一個(gè)同步輔助類,它允許一組線程相互等待直到所有線程都到達(dá)一個(gè)公共的屏障點(diǎn)。
在程序中有固定數(shù)量的線程,這些線程有時(shí)候必須等待彼此,這種情況下,使用CyclicBarrier很有幫助。
這個(gè)屏障之所以用循環(huán)修飾,是因?yàn)樵谒械木€程釋放彼此之后,這個(gè)屏障是可以重新使用的
抓住重點(diǎn):1、允許一組線程相互等待直到達(dá)到一個(gè)公共屏障點(diǎn),2、可以重復(fù)使用
簡單舉例就是:玩王者榮耀只有所有人進(jìn)入游戲之前都必須加載到100%,所有人才能進(jìn)入游戲。
與CountDownLatch比較

源碼解析
先從構(gòu)造方法入手
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
從jdk注釋我們可以看出:
第一個(gè)構(gòu)造器:創(chuàng)建一個(gè)新的{@code CyclicBarrier},它會(huì)在
給定數(shù)量的屏障(線程)正在等待它,并且在屏障被觸發(fā)時(shí)不執(zhí)行預(yù)定義的操作。
第二個(gè)構(gòu)造器:創(chuàng)建一個(gè)新的{@code CyclicBarrier},它會(huì)在
給定數(shù)量的屏障(線程)正在等待它,以及當(dāng)屏障被觸發(fā)時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景。
await()方法
調(diào)用await方法的線程告訴CyclicBarrier自己已經(jīng)到達(dá)同步點(diǎn),然后當(dāng)前線程被阻塞。直到parties個(gè)參與線程調(diào)用了await方法,CyclicBarrier同樣提供帶超時(shí)時(shí)間的await和不帶超時(shí)時(shí)間的await方法:
await()方法里面最主要就是doawait()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 獲取獨(dú)占鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 當(dāng)前先從
final Generation g = generation;
// 如果這個(gè)線程損壞了,拋出異常
if (g.broken)
throw new BrokenBarrierException();
// 如果線程中斷了,拋出異常
if (Thread.interrupted()) {
// 將損壞狀態(tài)設(shè)置為true
// 并通知其他阻塞在此屏障上的線程
breakBarrier();
throw new InterruptedException();
}
// 獲取下標(biāo)
int index = --count;
// 如果是 0,說明最后一個(gè)線程調(diào)用了該方法
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 執(zhí)行屏障任務(wù)
if (command != null)
command.run();
ranAction = true;
// 更新一代,將count重置,將generation重置
// 喚醒之前等待的線程
nextGeneration();
return 0;
} finally {
// 如果執(zhí)行屏障任務(wù)的時(shí)候失敗了,就將損壞狀態(tài)設(shè)置為true
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 如果沒有時(shí)間限制,則直接等待,直到被喚醒
if (!timed)
trip.await();
// 如果有時(shí)間限制,則等待指定時(shí)間
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 當(dāng)前線程沒有損壞
if (g == generation && ! g.broken) {
// 讓屏障失效
breakBarrier();
throw ie;
} else {
// 上面條件不滿足,說明這個(gè)線程不是這代的
// 就不會(huì)影響當(dāng)前這代屏障的執(zhí)行,所以,就打個(gè)中斷標(biāo)記
Thread.currentThread().interrupt();
}
}
// 當(dāng)有任何一個(gè)線程中斷了,就會(huì)調(diào)用breakBarrier方法
// 就會(huì)喚醒其他的線程,其他線程醒來后,也要拋出異常
if (g.broken)
throw new BrokenBarrierException();
// g != generation表示正常換代了,返回當(dāng)前線程所在屏障的下標(biāo)
// 如果 g == generation,說明還沒有換代,那為什么會(huì)醒了?
// 因?yàn)橐粋€(gè)線程可以使用多個(gè)屏障,當(dāng)別的屏障喚醒了這個(gè)線程,就會(huì)走到這里,所以需要判斷是否是當(dāng)前代。
// 正是因?yàn)檫@個(gè)原因,才需要generation來保證正確。
if (g != generation)
return index;
// 如果有時(shí)間限制,且時(shí)間小于等于0,銷毀屏障并拋出異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 釋放獨(dú)占鎖
lock.unlock();
}
}
總結(jié)如果該線程不是最后一個(gè)調(diào)用await方法的線程,則它會(huì)一直處于等待狀態(tài),除非發(fā)生以下情況:
最后一個(gè)線程到達(dá),即index == 0
某個(gè)參與線程等待超時(shí)
某個(gè)參與線程被中斷
調(diào)用了CyclicBarrier的reset()方法。該方法會(huì)將屏障重置為初始狀態(tài)
Generation描述著CyclicBarrier的更新?lián)Q代。在CyclicBarrier中,同一批線程屬于同一代。當(dāng)有parties個(gè)線程到達(dá)barrier之后,generation就會(huì)被更新?lián)Q代。其中broken標(biāo)識該當(dāng)前CyclicBarrier是否已經(jīng)處于中斷狀態(tài)。
默認(rèn)barrier(屏障)是沒有損壞的。當(dāng)barrier(屏障)損壞了或者有一個(gè)線程中斷了,則通過breakBarrier()來終止所有的線程:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
breakBarrier()不僅會(huì)把broken設(shè)置為true,還會(huì)將所有處于等待狀態(tài)的線程全部喚醒(singalAll)方法
注意CyclicBarrier使用獨(dú)占鎖來執(zhí)行await方法,并發(fā)性可能不是很高
簡單例子加深印象
/**
* @author shuliangzhao
* @Title: CyclicBarrierTest
* @ProjectName design-parent
* @Description: TODO
* @date 2019/6/3 0:23
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
/* try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrier重用");
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}*/
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數(shù)據(jù)...");
try {
Thread.sleep(2000); //以睡眠來模擬寫入數(shù)據(jù)操作
System.out.println("線程"+Thread.currentThread().getName()+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
}
}
}
運(yùn)行結(jié)果

怎么用多線程求和
/**
* @author shuliangzhao
* @Title: CyclicBarrier
* @ProjectName design-parent
* @Description: TODO
* @date 2019/6/3 0:18
*/
public class CyclicBarrierExc {
//private static final Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierExc.class);
public static void main(String[] args) {
//數(shù)組大小
int size = 50000;
//定義數(shù)組
int[] numbers = new int[size];
//隨機(jī)初始化數(shù)組
for (int i = 0; i < size; i++) {
numbers[i] = RandomUtils.nextInt(100, 1000);
}
//多線程計(jì)算結(jié)果
//定義線程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
//定義五個(gè)Future去保存子數(shù)組計(jì)算結(jié)果
final int[] results = new int[5];
//定義一個(gè)循環(huán)屏障,在屏障線程中進(jìn)行計(jì)算結(jié)果合并
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
int sums = 0;
for (int i = 0; i < 5; i++) {
sums += results[i];
}
System.out.println("多線程計(jì)算結(jié)果:" + sums);
});
//子數(shù)組長度
int length = 10000;
//定義五個(gè)線程去計(jì)算
for (int i = 0; i < 5; i++) {
//定義子數(shù)組
int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length));
//盛放計(jì)算結(jié)果
int finalI = i;
executorService.submit(() -> {
for (int j = 0; j < subNumbers.length; j++) {
results[finalI] += subNumbers[j];
}
//等待其他線程進(jìn)行計(jì)算
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
//關(guān)閉線程池
executorService.shutdown();
}
}