CyclicBarrier,這時JDK1.5的java.util.concurrent并發(fā)包中提供的一個并發(fā)工具類。
從字面意思來理解的話,cyclic是循環(huán)的意思,Barrier是屏障。是的,這個工具類同樣是用于一個或者多個線程同時完成后才能繼續(xù)執(zhí)行往后的步驟。
有一個公共的屏障點的概念,只有所有的線程都到達屏障點之后才可以繼續(xù)執(zhí)行。這里我們可以對比一下,他跟CountDownLatch的用法有一些相似的地方。但是我們可以發(fā)現(xiàn)以下一些區(qū)別:
相同點:
兩個都可以同來實現(xiàn)一個或者多個線程等待其他線程全都執(zhí)行完成一定的操作之后,才開始接著往下執(zhí)行。這里我查找資料說: CountDownLatch比較關(guān)注等待其他線程完成一些事之后才能繼續(xù)執(zhí)行,而CyclicBarrier其實是比較關(guān)注于所有的線程全部完成某些任務(wù),才能一起執(zhí)行下去。
區(qū)別:
1.從字面意思來看,CyclicBarrier中的Cyclic有循環(huán)的意思,所以CyclicBarrier是可以支持循環(huán)的實現(xiàn)所有的線程要一起做任務(wù)的目標的。而不是像CountDownLatch一樣,只支持一次線程同步堵塞的意思。
2.從底層實現(xiàn)來看,CyclicBarrier底層主要是采用ReentrantLock+Condition實現(xiàn)的,而CountDownLatch內(nèi)部采用共享鎖來實現(xiàn)。CountDownLatch內(nèi)部是進行減法來判斷,而CyclicBarrier是使用加法來判斷的。
常用應(yīng)用場景:
經(jīng)常用于多線程的分組計算。
提供的方法有:
——CyclicBarrier(parties)
初始化相互等待的線程數(shù)量的構(gòu)造方法。
——CyclicBarrier(parties,Runnable barrierAction)
初始化相互等待的線程數(shù)量以及屏障線程的構(gòu)造方法。
屏障線程的運行時機:等待的線程數(shù)量=parties之后,CyclicBarrier打開屏障之前。
舉例:在分組計算中,每個線程負責(zé)一部分計算,最終這些線程計算結(jié)束之后,交由屏障線程進行匯總計算。
——getParties()
獲取CyclicBarrier打開屏障的線程數(shù)量,也成為方數(shù)。
——getNumberWaiting()
獲取正在CyclicBarrier上等待的線程數(shù)量。
——await()
在CyclicBarrier上進行阻塞等待,直到發(fā)生以下情形之一:
在CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放,繼續(xù)執(zhí)行。
當前線程被中斷,則拋出InterruptedException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他線程調(diào)用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
——await(timeout,TimeUnit)
在CyclicBarrier上進行限時的阻塞等待,直到發(fā)生以下情形之一:
在CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放,繼續(xù)執(zhí)行。
當前線程被中斷,則拋出InterruptedException異常,并停止等待,繼續(xù)執(zhí)行。
當前線程等待超時,則拋出TimeoutException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他線程調(diào)用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
——isBroken()
獲取是否破損標志位broken的值,此值有以下幾種情況:
CyclicBarrier初始化時,broken=false,表示屏障未破損。
如果正在等待的線程被中斷,則broken=true,表示屏障破損。
如果正在等待的線程超時,則broken=true,表示屏障破損。
如果有線程調(diào)用CyclicBarrier.reset()方法,則broken=false,表示屏障回到未破損狀態(tài)。
——reset()
使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事:
如果有正在等待的線程,則會拋出BrokenBarrierException異常,且這些線程停止等待,繼續(xù)執(zhí)行。將是否破損標志位broken置為false。
看下代碼的演示:
這個自己也手敲了一遍
public void test01() throws InterruptedException {
//構(gòu)造函數(shù)1:初始化-開啟屏障的方數(shù)
CyclicBarrier barrier0 = new CyclicBarrier(2);
//通過barrier.getParties()獲取開啟屏障的方數(shù)
System.out.println("barrier.getParties()獲取開啟屏障的方數(shù):" + barrier0.getParties());
System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的線程數(shù)
System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):初始----" + barrier0.getNumberWaiting());
System.out.println();
new Thread(() -> {
//添加一個等待線程
System.out.println("添加第1個等待線程----" + Thread.currentThread().getName());
try {
barrier0.await();
System.out.println(Thread.currentThread().getName() + " is running...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
//通過barrier.getNumberWaiting()獲取正在等待的線程數(shù)
System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):添加第1個等待線程---" + barrier0.getNumberWaiting());
Thread.sleep(10);
System.out.println();
new Thread(() -> {
//添加一個等待線程
System.out.println("添加第2個等待線程----" + Thread.currentThread().getName());
try {
barrier0.await();
System.out.println(Thread.currentThread().getName() + " is running...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(100);
System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的線程數(shù)
System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):打開屏障之后---" + barrier0.getNumberWaiting());
//已經(jīng)打開的屏障,再次有線程等待的話,還會重新生效--視為循環(huán)
new Thread(() -> {
System.out.println("屏障打開之后,再有線程加入等待:" + Thread.currentThread().getName());
try {
//BrokenBarrierException
barrier0.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is terminated.");
}).start();
System.out.println();
Thread.sleep(10);
System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):打開屏障之后---" + barrier0.getNumberWaiting());
Thread.sleep(10);
new Thread(() -> {
System.out.println("屏障打開之后,再有線程加入等待:" + Thread.currentThread().getName());
try {
//BrokenBarrierException
barrier0.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):打開屏障之后---" + barrier0.getNumberWaiting());
}
public static void main(String[] args) throws InterruptedException {
TestCyclicBarrier c=new TestCyclicBarrier();
c.test01();
}
結(jié)果:

然后再來寫一組模擬多線程分組計算
有一個大小為50000的隨機數(shù)組,用五個線程分別計算10000個元素的和,在將結(jié)果進行合并。
public class TestCyclicBarrierCalculate {
public static void main(String[] args) {
//數(shù)組大小
int size = 50000;
//定義數(shù)組
int[] numbers = new int[size];
Random random = new Random();
//隨機初始化數(shù)組
for (int i = 0; i < size; i++) {
numbers[i] = random.nextInt(10);
}
//單線程計算結(jié)果
System.out.println();
int sum = 0;
for (int i = 0; i < size; i++) {
sum += numbers[i];
}
System.out.println("單線程計算結(jié)果:" + sum);
ExecutorService executorService = Executors.newCachedThreadPool();
//定義五個Future去保存子數(shù)組計算結(jié)果
final int [] result=new int[5];
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
int sums = 0;
for (int i = 0; i < 5; i++) {
sums += result[i];
}
System.out.println("多線程計算結(jié)果為:"+sums);
});
int lengths=10000;
//定義五個線程去計算
for(int i=0;i<5;i++) {
int[] range = Arrays.copyOfRange(numbers, i * lengths, (i + 1) * lengths);
int finalI = i;
executorService.execute(() -> {
//臨時計算結(jié)果
int temp = 0;
for (int j = 0; j < 10000; j++) {
temp += range[j];
}
System.out.println("第N個多線程計算結(jié)果為:"+temp);
result[finalI] = temp;
try {
//暫停,等待其他線程的計算結(jié)果
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
結(jié)果:

喜歡可以關(guān)注我哦