CyclicBarrier概述
- 允許多個線程相互等待,直到到達某個公共屏障點(Common Barrier Point),只有當每個線程都準備就緒后,才能各自往下執(zhí)行后面的操作;
- 其和CountDownLatch相似的地方是都是通過計數器實現;
- 當某個現場調用await()后,該線程就進入等待狀態(tài),而且計數器執(zhí)行加1操作,當計數器達到了設置的初始值,調用await()方法進行等待的線程都會被喚醒,繼續(xù)await()之后的操作;
- 由于CyclicBarrier在釋放等待線程后可以重用,所以稱其為循環(huán)屏障;
應用場景
- 多線程計算數據,最后合并計算結果;
- 如果發(fā)生錯誤,可以重置計數器,讓線程重新執(zhí)行1次;
CyclicBarrier vs CountDownLatch
- CountDownLatch的計數器只能使用1次;CyclicBarrier的計數器可以使用reset方法重置;
- CountDownLatch主要實現1個或多個線程,需要等待其他線程完成某項操作后,才能繼續(xù)往下執(zhí)行;CyclicBarrier主要實現了多個線程之間相互等待,直到所有的線程都滿足了條件之后,才能繼續(xù)執(zhí)行后續(xù)操作;
- CyclicBarrier的getNumberWaiting()方法,獲得線程的阻塞數量;
- CyclicBarrier的isBroken知道阻塞的線程是否已經被中斷了;
CyclicBarrier 示例(一)
- 初始化的時候給定一個值,表示有多少個線程要相互等待;
- 每個線程在到達公共屏障點的時候要調用await()方法;
- 當所有的線程都到達了公共屏障點后,所有線程之后的代碼就可以執(zhí)行了;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
CyclicBarrier 示例(二) 等不及先走了
- barrier.await(2000, TimeUnit.MILLISECONDS)在等待的時候可以設置一個最大等待實現,如果在在這個時間內,所有的線程都到達了,那么一起往下走;
- 如果在這個時間還有線程沒有到,就不等了,自己往前走;
- 但是注意,時間到還沒來全時,會拋出一個異常,如果在異常拋出后后續(xù)代碼還想執(zhí)行,那么就要catch住這個異常;
- 如果在這個時間內,都到齊了,那么一起走,沒問題;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("{} occur BarrierException", threadNum);
}
log.info("{} continue", threadNum);
}
}
CyclicBarrier 示例(三) CyclicBarrier可以設置“回調函數”
- CyclicBarrier在初始化的時候,可以設置一個Runnable作為回調函數,其意義是:當都到達了屏障后,先于各個線程的后續(xù)方法執(zhí)行回調函數;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
輸出:
19:58:59.623 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 0 is ready
19:59:00.620 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 1 is ready
19:59:01.621 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 2 is ready
19:59:02.620 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 3 is ready
19:59:03.621 [pool-1-thread-5] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 4 is ready
19:59:03.621 [pool-1-thread-5] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
19:59:03.621 [pool-1-thread-5] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 4 continue
19:59:03.621 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 0 continue
19:59:03.621 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 1 continue
19:59:03.621 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 2 continue
19:59:03.621 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 3 continue
19:59:04.621 [pool-1-thread-6] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 5 is ready
19:59:05.622 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 6 is ready
19:59:06.623 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 7 is ready
19:59:07.624 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 8 is ready
19:59:08.624 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 9 is ready
19:59:08.624 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
19:59:08.624 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 9 continue
19:59:08.624 [pool-1-thread-6] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 5 continue
19:59:08.624 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 6 continue
19:59:08.624 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 7 continue
19:59:08.624 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 8 continue