并發(fā)之CyclicBarrier實現(xiàn)與探索

CyclicBarrier,這時JDK1.5的java.util.concurrent并發(fā)包中提供的一個并發(fā)工具類。
從字面意思來理解的話,cyclic是循環(huán)的意思,Barrier是屏障。是的,這個工具類同樣是用于一個或者多個線程同時完成后才能繼續(xù)執(zhí)行往后的步驟。

有一個公共的屏障點的概念,只有所有的線程都到達屏障點之后才可以繼續(xù)執(zhí)行。這里我們可以對比一下,他跟CountDownLatch的用法有一些相似的地方。但是我們可以發(fā)現(xiàn)以下一些區(qū)別:

相同點:
兩個都可以同來實現(xiàn)一個或者多個線程等待其他線程全都執(zhí)行完成一定的操作之后,才開始接著往下執(zhí)行。這里我查找資料說: CountDownLatch比較關(guān)注等待其他線程完成一些事之后才能繼續(xù)執(zhí)行,而CyclicBarrier其實是比較關(guān)注于所有的線程全部完成某些任務(wù),才能一起執(zhí)行下去。

區(qū)別:
1.從字面意思來看,CyclicBarrier中的Cyclic有循環(huán)的意思,所以CyclicBarrier是可以支持循環(huán)的實現(xiàn)所有的線程要一起做任務(wù)的目標的。而不是像CountDownLatch一樣,只支持一次線程同步堵塞的意思。
2.從底層實現(xiàn)來看,CyclicBarrier底層主要是采用ReentrantLock+Condition實現(xiàn)的,而CountDownLatch內(nèi)部采用共享鎖來實現(xiàn)。CountDownLatch內(nèi)部是進行減法來判斷,而CyclicBarrier是使用加法來判斷的。

常用應(yīng)用場景:
經(jīng)常用于多線程的分組計算。

提供的方法有:

——CyclicBarrier(parties)

初始化相互等待的線程數(shù)量的構(gòu)造方法。

——CyclicBarrier(parties,Runnable barrierAction)

初始化相互等待的線程數(shù)量以及屏障線程的構(gòu)造方法。

屏障線程的運行時機:等待的線程數(shù)量=parties之后,CyclicBarrier打開屏障之前。

舉例:在分組計算中,每個線程負責(zé)一部分計算,最終這些線程計算結(jié)束之后,交由屏障線程進行匯總計算。

——getParties()

獲取CyclicBarrier打開屏障的線程數(shù)量,也成為方數(shù)。

——getNumberWaiting()

獲取正在CyclicBarrier上等待的線程數(shù)量。
——await()

在CyclicBarrier上進行阻塞等待,直到發(fā)生以下情形之一:

在CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放,繼續(xù)執(zhí)行。
當前線程被中斷,則拋出InterruptedException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他線程調(diào)用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
——await(timeout,TimeUnit)

在CyclicBarrier上進行限時的阻塞等待,直到發(fā)生以下情形之一:

在CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放,繼續(xù)執(zhí)行。
當前線程被中斷,則拋出InterruptedException異常,并停止等待,繼續(xù)執(zhí)行。
當前線程等待超時,則拋出TimeoutException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他等待的線程超時,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
其他線程調(diào)用CyclicBarrier.reset()方法,則當前線程拋出BrokenBarrierException異常,并停止等待,繼續(xù)執(zhí)行。
——isBroken()

獲取是否破損標志位broken的值,此值有以下幾種情況:

CyclicBarrier初始化時,broken=false,表示屏障未破損。
如果正在等待的線程被中斷,則broken=true,表示屏障破損。
如果正在等待的線程超時,則broken=true,表示屏障破損。
如果有線程調(diào)用CyclicBarrier.reset()方法,則broken=false,表示屏障回到未破損狀態(tài)。
——reset()

使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事:

如果有正在等待的線程,則會拋出BrokenBarrierException異常,且這些線程停止等待,繼續(xù)執(zhí)行。將是否破損標志位broken置為false。

看下代碼的演示:

這個自己也手敲了一遍
public void test01() throws InterruptedException {

        //構(gòu)造函數(shù)1:初始化-開啟屏障的方數(shù)
        CyclicBarrier barrier0 = new CyclicBarrier(2);
        //通過barrier.getParties()獲取開啟屏障的方數(shù)
        System.out.println("barrier.getParties()獲取開啟屏障的方數(shù):" + barrier0.getParties());
        System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的線程數(shù)
        System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):初始----" + barrier0.getNumberWaiting());
        System.out.println();
        new Thread(() -> {
            //添加一個等待線程
            System.out.println("添加第1個等待線程----" + Thread.currentThread().getName());
            try {
                barrier0.await();
                System.out.println(Thread.currentThread().getName() + " is running...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " is terminated.");
        }).start();
        Thread.sleep(10);
//通過barrier.getNumberWaiting()獲取正在等待的線程數(shù)
        System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):添加第1個等待線程---" + barrier0.getNumberWaiting());
        Thread.sleep(10);
        System.out.println();
        new Thread(() -> {
            //添加一個等待線程
            System.out.println("添加第2個等待線程----" + Thread.currentThread().getName());
            try {
                barrier0.await();
                System.out.println(Thread.currentThread().getName() + " is running...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " is terminated.");
        }).start();
        Thread.sleep(100);
        System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的線程數(shù)
        System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):打開屏障之后---" + barrier0.getNumberWaiting());

//已經(jīng)打開的屏障,再次有線程等待的話,還會重新生效--視為循環(huán)
        new Thread(() -> {
            System.out.println("屏障打開之后,再有線程加入等待:" + Thread.currentThread().getName());
            try {
                //BrokenBarrierException
                barrier0.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " is terminated.");

        }).start();
        System.out.println();
        Thread.sleep(10);
        System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):打開屏障之后---" + barrier0.getNumberWaiting());
        Thread.sleep(10);
        new Thread(() -> {
            System.out.println("屏障打開之后,再有線程加入等待:" + Thread.currentThread().getName());
            try {
                //BrokenBarrierException
                barrier0.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " is terminated.");

        }).start();
        Thread.sleep(10);
        System.out.println("通過barrier.getNumberWaiting()獲取正在等待的線程數(shù):打開屏障之后---" + barrier0.getNumberWaiting());
    }

    public static void main(String[] args) throws InterruptedException {
        TestCyclicBarrier c=new TestCyclicBarrier();
        c.test01();
    }

結(jié)果:


然后再來寫一組模擬多線程分組計算
有一個大小為50000的隨機數(shù)組,用五個線程分別計算10000個元素的和,在將結(jié)果進行合并。

public class TestCyclicBarrierCalculate {

    public static void main(String[] args) {

        //數(shù)組大小
        int size = 50000;
        //定義數(shù)組
        int[] numbers = new int[size];
        Random random = new Random();
        //隨機初始化數(shù)組
        for (int i = 0; i < size; i++) {
            numbers[i] = random.nextInt(10);
        }

        //單線程計算結(jié)果
        System.out.println();
        int sum = 0;
        for (int i = 0; i < size; i++) {
            sum += numbers[i];
        }
        System.out.println("單線程計算結(jié)果:" + sum);

        ExecutorService executorService = Executors.newCachedThreadPool();
        //定義五個Future去保存子數(shù)組計算結(jié)果
        final int [] result=new int[5];

        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            int sums = 0;
            for (int i = 0; i < 5; i++) {
                sums += result[i];
            }
            System.out.println("多線程計算結(jié)果為:"+sums);
        });

        int lengths=10000;
        //定義五個線程去計算
        for(int i=0;i<5;i++) {

            int[] range = Arrays.copyOfRange(numbers, i * lengths, (i + 1) * lengths);
            int finalI = i;
            executorService.execute(() -> {
                //臨時計算結(jié)果
                int temp = 0;
                for (int j = 0; j < 10000; j++) {
                    temp += range[j];
                }
                System.out.println("第N個多線程計算結(jié)果為:"+temp);
                result[finalI] = temp;
                try {
                    //暫停,等待其他線程的計算結(jié)果
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

結(jié)果:


image.png

喜歡可以關(guān)注我哦

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容