J.U.C之AQS CyclicBarrier

CyclicBarrier概述

  • 允許多個線程相互等待,直到到達某個公共屏障點(Common Barrier Point),只有當每個線程都準備就緒后,才能各自往下執(zhí)行后面的操作;
  • 其和CountDownLatch相似的地方是都是通過計數器實現;
  • 當某個現場調用await()后,該線程就進入等待狀態(tài),而且計數器執(zhí)行加1操作,當計數器達到了設置的初始值,調用await()方法進行等待的線程都會被喚醒,繼續(xù)await()之后的操作;
  • 由于CyclicBarrier在釋放等待線程后可以重用,所以稱其為循環(huán)屏障;

應用場景

  • 多線程計算數據,最后合并計算結果;
  • 如果發(fā)生錯誤,可以重置計數器,讓線程重新執(zhí)行1次;

CyclicBarrier vs CountDownLatch

  • CountDownLatch的計數器只能使用1次;CyclicBarrier的計數器可以使用reset方法重置;
  • CountDownLatch主要實現1個或多個線程,需要等待其他線程完成某項操作后,才能繼續(xù)往下執(zhí)行;CyclicBarrier主要實現了多個線程之間相互等待,直到所有的線程都滿足了條件之后,才能繼續(xù)執(zhí)行后續(xù)操作;
  • CyclicBarrier的getNumberWaiting()方法,獲得線程的阻塞數量;
  • CyclicBarrier的isBroken知道阻塞的線程是否已經被中斷了;

CyclicBarrier 示例(一)

  • 初始化的時候給定一個值,表示有多少個線程要相互等待;
  • 每個線程在到達公共屏障點的時候要調用await()方法;
  • 當所有的線程都到達了公共屏障點后,所有線程之后的代碼就可以執(zhí)行了;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

CyclicBarrier 示例(二) 等不及先走了

  • barrier.await(2000, TimeUnit.MILLISECONDS)在等待的時候可以設置一個最大等待實現,如果在在這個時間內,所有的線程都到達了,那么一起往下走;
  • 如果在這個時間還有線程沒有到,就不等了,自己往前走;
  • 但是注意,時間到還沒來全時,會拋出一個異常,如果在異常拋出后后續(xù)代碼還想執(zhí)行,那么就要catch住這個異常;
  • 如果在這個時間內,都到齊了,那么一起走,沒問題;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("{} occur BarrierException", threadNum);
        }
        log.info("{} continue", threadNum);
    }
}

CyclicBarrier 示例(三) CyclicBarrier可以設置“回調函數”

  • CyclicBarrier在初始化的時候,可以設置一個Runnable作為回調函數,其意義是:當都到達了屏障后,先于各個線程的后續(xù)方法執(zhí)行回調函數;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

輸出:

19:58:59.623 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 0 is ready
19:59:00.620 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 1 is ready
19:59:01.621 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 2 is ready
19:59:02.620 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 3 is ready
19:59:03.621 [pool-1-thread-5] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 4 is ready
19:59:03.621 [pool-1-thread-5] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
19:59:03.621 [pool-1-thread-5] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 4 continue
19:59:03.621 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 0 continue
19:59:03.621 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 1 continue
19:59:03.621 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 2 continue
19:59:03.621 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 3 continue
19:59:04.621 [pool-1-thread-6] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 5 is ready
19:59:05.622 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 6 is ready
19:59:06.623 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 7 is ready
19:59:07.624 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 8 is ready
19:59:08.624 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 9 is ready
19:59:08.624 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
19:59:08.624 [pool-1-thread-2] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 9 continue
19:59:08.624 [pool-1-thread-6] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 5 continue
19:59:08.624 [pool-1-thread-4] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 6 continue
19:59:08.624 [pool-1-thread-3] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 7 continue
19:59:08.624 [pool-1-thread-1] INFO com.example.concurrency.example.aqs.CyclicBarrierExample3 - 8 continue

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容