Java并發(fā)編程系列之CyclicBarrier詳解

簡介

jdk原文

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. 
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 
The barrier is called cyclic because it can be re-used after the waiting threads are released.

這句話翻譯意思:CyclicBarrier是一個(gè)同步輔助類,它允許一組線程相互等待直到所有線程都到達(dá)一個(gè)公共的屏障點(diǎn)。
在程序中有固定數(shù)量的線程,這些線程有時(shí)候必須等待彼此,這種情況下,使用CyclicBarrier很有幫助。
這個(gè)屏障之所以用循環(huán)修飾,是因?yàn)樵谒械木€程釋放彼此之后,這個(gè)屏障是可以重新使用的

抓住重點(diǎn):1、允許一組線程相互等待直到達(dá)到一個(gè)公共屏障點(diǎn),2、可以重復(fù)使用

簡單舉例就是:玩王者榮耀只有所有人進(jìn)入游戲之前都必須加載到100%,所有人才能進(jìn)入游戲。
與CountDownLatch比較


image.png

源碼解析

先從構(gòu)造方法入手

  /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
 public CyclicBarrier(int parties) {
        this(parties, null);
    }
   /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

從jdk注釋我們可以看出:
第一個(gè)構(gòu)造器:創(chuàng)建一個(gè)新的{@code CyclicBarrier},它會(huì)在
給定數(shù)量的屏障(線程)正在等待它,并且在屏障被觸發(fā)時(shí)不執(zhí)行預(yù)定義的操作。
第二個(gè)構(gòu)造器:創(chuàng)建一個(gè)新的{@code CyclicBarrier},它會(huì)在
給定數(shù)量的屏障(線程)正在等待它,以及當(dāng)屏障被觸發(fā)時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景。


await()方法
調(diào)用await方法的線程告訴CyclicBarrier自己已經(jīng)到達(dá)同步點(diǎn),然后當(dāng)前線程被阻塞。直到parties個(gè)參與線程調(diào)用了await方法,CyclicBarrier同樣提供帶超時(shí)時(shí)間的await和不帶超時(shí)時(shí)間的await方法:
await()方法里面最主要就是doawait()

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 獲取獨(dú)占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 當(dāng)前先從
        final Generation g = generation;
        // 如果這個(gè)線程損壞了,拋出異常
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 如果線程中斷了,拋出異常
        if (Thread.interrupted()) {
            // 將損壞狀態(tài)設(shè)置為true
            // 并通知其他阻塞在此屏障上的線程
            breakBarrier();
            throw new InterruptedException();
        }
 
        // 獲取下標(biāo)
        int index = --count;
        // 如果是 0,說明最后一個(gè)線程調(diào)用了該方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 執(zhí)行屏障任務(wù)
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,將count重置,將generation重置
                // 喚醒之前等待的線程
                nextGeneration();
                return 0;
            } finally {
                // 如果執(zhí)行屏障任務(wù)的時(shí)候失敗了,就將損壞狀態(tài)設(shè)置為true
                if (!ranAction)
                    breakBarrier();
            }
        }
 
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                 // 如果沒有時(shí)間限制,則直接等待,直到被喚醒
                if (!timed)
                    trip.await();
                // 如果有時(shí)間限制,則等待指定時(shí)間
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 當(dāng)前線程沒有損壞
                if (g == generation && ! g.broken) {
                    // 讓屏障失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面條件不滿足,說明這個(gè)線程不是這代的
                    // 就不會(huì)影響當(dāng)前這代屏障的執(zhí)行,所以,就打個(gè)中斷標(biāo)記
                    Thread.currentThread().interrupt();
                }
            }
 
            // 當(dāng)有任何一個(gè)線程中斷了,就會(huì)調(diào)用breakBarrier方法
            // 就會(huì)喚醒其他的線程,其他線程醒來后,也要拋出異常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // g != generation表示正常換代了,返回當(dāng)前線程所在屏障的下標(biāo)
            // 如果 g == generation,說明還沒有換代,那為什么會(huì)醒了?
            // 因?yàn)橐粋€(gè)線程可以使用多個(gè)屏障,當(dāng)別的屏障喚醒了這個(gè)線程,就會(huì)走到這里,所以需要判斷是否是當(dāng)前代。
            // 正是因?yàn)檫@個(gè)原因,才需要generation來保證正確。
            if (g != generation)
                return index;
            
            // 如果有時(shí)間限制,且時(shí)間小于等于0,銷毀屏障并拋出異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放獨(dú)占鎖
        lock.unlock();
    }
   }

總結(jié)如果該線程不是最后一個(gè)調(diào)用await方法的線程,則它會(huì)一直處于等待狀態(tài),除非發(fā)生以下情況:
最后一個(gè)線程到達(dá),即index == 0
某個(gè)參與線程等待超時(shí)
某個(gè)參與線程被中斷
調(diào)用了CyclicBarrier的reset()方法。該方法會(huì)將屏障重置為初始狀態(tài)

Generation描述著CyclicBarrier的更新?lián)Q代。在CyclicBarrier中,同一批線程屬于同一代。當(dāng)有parties個(gè)線程到達(dá)barrier之后,generation就會(huì)被更新?lián)Q代。其中broken標(biāo)識該當(dāng)前CyclicBarrier是否已經(jīng)處于中斷狀態(tài)。

默認(rèn)barrier(屏障)是沒有損壞的。當(dāng)barrier(屏障)損壞了或者有一個(gè)線程中斷了,則通過breakBarrier()來終止所有的線程:

private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

breakBarrier()不僅會(huì)把broken設(shè)置為true,還會(huì)將所有處于等待狀態(tài)的線程全部喚醒(singalAll)方法

注意CyclicBarrier使用獨(dú)占鎖來執(zhí)行await方法,并發(fā)性可能不是很高

簡單例子加深印象

/**
 * @author shuliangzhao
 * @Title: CyclicBarrierTest
 * @ProjectName design-parent
 * @Description: TODO
 * @date 2019/6/3 0:23
 */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);

        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }

       
       /* try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("CyclicBarrier重用");

        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }*/
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數(shù)據(jù)...");
            try {
                Thread.sleep(2000);      //以睡眠來模擬寫入數(shù)據(jù)操作
                System.out.println("線程"+Thread.currentThread().getName()+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");

                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
        }
    }
}

運(yùn)行結(jié)果


image.png

怎么用多線程求和


/**
 * @author shuliangzhao
 * @Title: CyclicBarrier
 * @ProjectName design-parent
 * @Description: TODO
 * @date 2019/6/3 0:18
 */
public class CyclicBarrierExc {

    //private static final Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierExc.class);

    public static void main(String[] args) {
        //數(shù)組大小
        int size = 50000;
        //定義數(shù)組
        int[] numbers = new int[size];

        //隨機(jī)初始化數(shù)組
        for (int i = 0; i < size; i++) {
            numbers[i] = RandomUtils.nextInt(100, 1000);
        }

        //多線程計(jì)算結(jié)果
        //定義線程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //定義五個(gè)Future去保存子數(shù)組計(jì)算結(jié)果
        final int[] results = new int[5];

        //定義一個(gè)循環(huán)屏障,在屏障線程中進(jìn)行計(jì)算結(jié)果合并
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            int sums = 0;
            for (int i = 0; i < 5; i++) {
                sums += results[i];
            }
            System.out.println("多線程計(jì)算結(jié)果:" + sums);
        });

        //子數(shù)組長度
        int length = 10000;
        //定義五個(gè)線程去計(jì)算
        for (int i = 0; i < 5; i++) {
            //定義子數(shù)組
            int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length));
            //盛放計(jì)算結(jié)果
            int finalI = i;
            executorService.submit(() -> {
                for (int j = 0; j < subNumbers.length; j++) {
                    results[finalI] += subNumbers[j];
                }
                //等待其他線程進(jìn)行計(jì)算
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }

        //關(guān)閉線程池
        executorService.shutdown();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容