圖解java.util.concurrent源碼 (六)CyclicBarrier (循環(huán)柵欄)

引言


上一篇文章提到,CountDownLatch不支持重置計(jì)數(shù),如果你有反復(fù)重置計(jì)數(shù)的需求的話(huà),最好使用CyclicBarrier。

CyclicBarrier的中文名叫做"循環(huán)柵欄",能夠讓n個(gè)線(xiàn)程都到達(dá)同步點(diǎn)之后再讓他們開(kāi)始運(yùn)行,之后CyclicBarrier就會(huì)重新計(jì)數(shù),這個(gè)過(guò)程可以反復(fù)進(jìn)行,甚至還可以在到達(dá)同步點(diǎn)與重新運(yùn)行之間插入一段代碼(叫做barrierAction)。

Demo


文章重點(diǎn)講的是實(shí)現(xiàn),但是出于完整性考慮還是給個(gè)Demo,Demo摘自《實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)》。

場(chǎng)景:有10個(gè)士兵(其實(shí)就是10個(gè)線(xiàn)程),他們要先進(jìn)行集合,集合完畢后會(huì)打印"司令:[士兵10個(gè),集合完畢!]"(其實(shí)這就是一個(gè)barrierAction),然后開(kāi)始各自的工作,工作完畢后士兵們?cè)偌掀饋?lái),此時(shí)會(huì)打印"司令:[士兵10個(gè), 任務(wù)完成!]"(同理,這也是一個(gè)barrierAction)。

代碼實(shí)現(xiàn)如下:

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

    public static class Soldier implements Runnable{

        private String soldier;

        private final CyclicBarrier cyclic;

        public Soldier(CyclicBarrier cyclic, String soldier) {
            this.soldier = soldier;
            this.cyclic = cyclic;
        }

        @Override
        public void run() {
            try {
                //等待士兵到齊
                cyclic.await();
                //士兵開(kāi)始做各自的工作
                doWork();
                //等待所有士兵完成任務(wù)
                cyclic.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork(){
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + " 任務(wù)完成");
        }
    }

    /**
     * 這個(gè)類(lèi)用于barrierAction
     */
    public static class BarrierRun implements Runnable{

        boolean flag;

        int N;

        public BarrierRun(boolean flag, int n) {
            this.flag = flag;
            N = n;
        }

        @Override
        public void run() {
            if ( flag ){
                System.out.println("司令:[士兵" + N + "個(gè), 任務(wù)完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "個(gè),集合完畢!]");
                flag = true;
            }
        }
    }

    public static void main(String[] args) {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        /**
         * 插入了BarrierRun作為barrierAction
         */
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));

        for ( int i = 0; i < N; i++ ){
            System.out.println("士兵 " + i + " 報(bào)道");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
        }
    }

}

圖示如下(圖中下標(biāo)號(hào)對(duì)應(yīng)著下面文字描述的標(biāo)號(hào)):

CyclicBarrier運(yùn)作過(guò)程
  1. 士兵陸續(xù)前來(lái)集合
  2. 士兵集合完畢
  3. barrierAction1: 打印"司令:[士兵10個(gè),集合完畢!]"
  4. 士兵陸續(xù)完成任務(wù)
  5. 所有士兵的任務(wù)都執(zhí)行完畢
  6. barrierAction2: 打印"司令:[士兵10個(gè), 任務(wù)完成!]"

運(yùn)行打印的結(jié)果如下:

你可能會(huì)疑惑這個(gè)barrierAction是由哪個(gè)線(xiàn)程負(fù)責(zé)執(zhí)行的呢?從圖中可以看出barrierAction每次都是由一個(gè)線(xiàn)程執(zhí)行的,而這個(gè)線(xiàn)程一般就是最后到達(dá)的那個(gè)線(xiàn)程,之后我也會(huì)通過(guò)源碼分析得出這個(gè)結(jié)論。

從上面的demo看出CyclicBarrier的核心方法就只有一個(gè)await,它會(huì)拋出兩個(gè)異常,InterruptedExceptionBrokenBarrierException。InterruptedException顯然是當(dāng)前線(xiàn)程等待的過(guò)程被中斷而拋出的異常,而這些要集合的線(xiàn)程有一個(gè)線(xiàn)程被中斷就會(huì)導(dǎo)致線(xiàn)程永遠(yuǎn)都無(wú)法集齊,導(dǎo)致“柵欄損壞”,剩下的線(xiàn)程就會(huì)拋出BrokenBarrierException異常。

想要看到“柵欄損壞”的現(xiàn)象只要把main方法改成如下即可:

    public static void main(String[] args) {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        /**
         * 插入了BarrierRun作為barrierAction
         */
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));

        for ( int i = 0; i < N; i++ ){
            System.out.println("士兵 " + i + " 報(bào)道");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
            allSoldier[i].start();
            if ( i == 5 ){
                allSoldier[0].interrupt();
            }
        }
    }

然后你會(huì)得到1個(gè)InterruptedException和9個(gè)BrokenBarrierException。

柵欄損壞


其實(shí)除了上面說(shuō)的情況會(huì)發(fā)生“柵欄損壞”,文檔中還提到了好幾種會(huì)發(fā)生的情況,如下:

  • 有一個(gè)線(xiàn)程發(fā)生中斷(就是上一小節(jié)提到的那個(gè)情況)或者超時(shí),而當(dāng)前線(xiàn)程正在等待(await),則當(dāng)前線(xiàn)程會(huì)拋出BrokenBarrierException
  • 該CyclicBarrier對(duì)象被調(diào)用了reset方法
  • 該CyclicBarrier對(duì)象被調(diào)用await時(shí),狀態(tài)已經(jīng)是"broken"了
  • barrierAction拋出了未捕獲的異常

實(shí)現(xiàn)分析


與CountDownLatch不同,CyclicBarrier不是基于A(yíng)QS實(shí)現(xiàn),而是應(yīng)用ReentrantLock實(shí)現(xiàn)的,它的同步靠的是兩個(gè)成員變量(分別是一個(gè)ReentrantLock以及從中引申出的Condition):

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();

我們先從構(gòu)造函數(shù)看起:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

從這里看出幾個(gè)關(guān)鍵成員變量的含義,parties代表循環(huán)柵欄每次要等待多少個(gè)線(xiàn)程,count則是用于倒計(jì)數(shù)用的計(jì)數(shù)器,而barrierCommand就是barrierAction了。

下面看CyclicBarrier唯一的核心方法await:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

await方法返回的數(shù)值是線(xiàn)程的達(dá)到次序,對(duì)于第一個(gè)到達(dá)的線(xiàn)程會(huì)返回(總的數(shù)值 - 1),而最后一個(gè)到達(dá)的線(xiàn)程會(huì)返回0。

發(fā)現(xiàn)它其實(shí)調(diào)用的是dowait方法。

dowait方法整個(gè)被一把ReentrantLock給鎖住了:

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            ...
        } finally {
            lock.unlock();
        }
    }

被鎖住的這段代碼看起來(lái)長(zhǎng),其實(shí)總結(jié)起來(lái)由以下步驟組成:

dowait流程

我在下面的代碼中標(biāo)出了相應(yīng)步驟(圖中的①對(duì)應(yīng)下面注釋中的"一",②對(duì)應(yīng)"二",以此類(lèi)推):

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

           //一:倒計(jì)數(shù)
           int index = --count;
           //二:所有線(xiàn)程都已就位?
           //index為0說(shuō)明所有線(xiàn)程都已就位
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   //三:最后一個(gè)到達(dá)的線(xiàn)程順手執(zhí)行barrierAction
                   if (command != null)
                       command.run();
                   ranAction = true;
                   //四:nextGeneration方法會(huì)喚醒所有線(xiàn)程并且更新generation進(jìn)入下一代
                   nextGeneration();
                   //最后一個(gè)到達(dá)的線(xiàn)程返回0
                   return 0;
               } finally {
                   //如果barrierAction出現(xiàn)異常則將該循環(huán)柵欄損壞
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // 五:利用Condition進(jìn)行等待
            for (;;) {
                try {
                    if (!timed)   //無(wú)超時(shí)等待
                        trip.await();
                    else if (nanos > 0L)   //有超時(shí)等待
                        nanos = trip.awaitNanos(nanos);//返回剩余的時(shí)間
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    //超時(shí)則破壞循環(huán)柵欄
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

下面分別對(duì)每一塊進(jìn)行講解。

①之前

在我列出的關(guān)鍵步驟①之前還有幾行代碼,我們先來(lái)看一看:

            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

CyclicBarrier對(duì)象有一個(gè)Generation內(nèi)部類(lèi),其唯一的作用是標(biāo)記這一代是否出現(xiàn)了"柵欄損壞":

    private static class Generation {
        //為true則表示這一代發(fā)生了柵欄損壞
        boolean broken = false;
    }

所有的打破柵欄的控制流都會(huì)調(diào)用breakBarrier方法:

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

可以看到它把generation的broker標(biāo)志設(shè)置為了true,然后重置了count計(jì)數(shù)器,最后使用了trip這個(gè)Condition對(duì)象喚醒了所有的線(xiàn)程。generation成員變量會(huì)在每次線(xiàn)程到齊之后換一個(gè)新的(我稱(chēng)之為"換代",分析到后面的代碼時(shí)再說(shuō))。

知道了這些,剛開(kāi)始給出的那幾行代碼就很好理解了,如果線(xiàn)程剛到這里就發(fā)現(xiàn)柵欄在這一代以及損壞(if (g.broken)),則直接拋出柵欄損壞異常(throw new BrokenBarrierException();)。

如果發(fā)現(xiàn)線(xiàn)程在之前以及被中斷了(if (Thread.interrupted()) {),則立即損壞柵欄(breakBarrier();)并拋出InterruptedException。

①: 倒計(jì)數(shù)

這個(gè)很簡(jiǎn)單,就是將count成員變量減1,然后再保存到局部變量index中,等到方法返回時(shí),作為返回值。

②:所有線(xiàn)程都已就位?

這個(gè)也很簡(jiǎn)單,當(dāng)count減到0了,則說(shuō)明所有線(xiàn)程都就位了,否則還需要等。

③④:順手執(zhí)行barrierAction,喚醒其他線(xiàn)程并進(jìn)行下一代

           //二:所有線(xiàn)程都已就位?
           //index為0說(shuō)明所有線(xiàn)程都已就位
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   //三:最后一個(gè)到達(dá)的線(xiàn)程順手執(zhí)行barrierAction
                   if (command != null)
                       command.run();
                   ranAction = true;
                   //四:nextGeneration方法會(huì)喚醒所有線(xiàn)程并且更新generation進(jìn)入下一代
                   nextGeneration();
                   //最后一個(gè)到達(dá)的線(xiàn)程返回0
                   return 0;
               } finally {
                   //如果barrierAction出現(xiàn)異常則將該循環(huán)柵欄損壞
                   if (!ranAction)
                       breakBarrier();
               }
           }

最后一個(gè)到達(dá)的線(xiàn)程發(fā)現(xiàn)count已經(jīng)減到0了,如果發(fā)現(xiàn)設(shè)置了barrierAction的話(huà),則順手將其執(zhí)行,緊接著調(diào)用nextGeneration方法將整個(gè)CyclicBarrier對(duì)象的狀態(tài)進(jìn)行重置,準(zhǔn)備迎接下一輪循環(huán),nextGeneration方法的內(nèi)容如下:

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

trip就是從lock中衍生出來(lái)的Condition對(duì)象,這里調(diào)用signalAll方法將所有阻塞住的線(xiàn)程都喚醒了(這些線(xiàn)程都阻塞在⑤中),將count重新置位倒計(jì)數(shù)的總數(shù)parties,之后將generation成員變量換了個(gè)新的(稱(chēng)之為"換代")。

最后的finally代碼塊中,會(huì)通過(guò)ranAction標(biāo)志檢測(cè)barrierAction是否成功執(zhí)行,如果未能成功執(zhí)行,則還是會(huì)調(diào)用breakBarrier方法破壞掉柵欄。

從上面的代碼可以看出,CyclicBarrier一旦損壞掉就不會(huì)自動(dòng)回復(fù)了,需要手工調(diào)用CyclicBarrier對(duì)象的reset方法來(lái)開(kāi)啟新的一代。

⑤:利用Condition進(jìn)行等待

除了最后一個(gè)到達(dá)的線(xiàn)程,其他線(xiàn)程都會(huì)進(jìn)入這一段代碼進(jìn)行等待,核心就是使用trip這個(gè)Condition對(duì)象的await方法阻塞?。?/p>

                    if (!timed)   //無(wú)超時(shí)等待
                        trip.await();
                    else if (nanos > 0L)   //有超時(shí)等待
                        nanos = trip.awaitNanos(nanos);//返回剩余的時(shí)間

如果await的過(guò)程中發(fā)生了中斷,則在catch代碼塊中破壞柵欄(breakBarrier),如果發(fā)現(xiàn)已經(jīng)換代或者柵欄已經(jīng)損壞,則重置當(dāng)前線(xiàn)程的中斷標(biāo)志位(Thread.currentThread().interrupt();),供下一輪使用:

                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

剩下的三個(gè)判斷就是線(xiàn)程被喚醒之后的處理了:

                //喚醒之后發(fā)現(xiàn)柵欄已經(jīng)損壞,則拋出異常
                if (g.broken)
                    throw new BrokenBarrierException();

                //柵欄沒(méi)有損壞而且換代了,說(shuō)明這一代順利結(jié)束,await方法返回
                if (g != generation)
                    return index;

                //超時(shí)
                if (timed && nanos <= 0L) {
                    //超時(shí)則破壞循環(huán)柵欄
                    breakBarrier();
                    throw new TimeoutException();
                }

這里注意到一個(gè)比較有趣的地方就是,假如nanos大于0的情況下則會(huì)繼續(xù)Condition等待的循環(huán),nanos怎么會(huì)大于0呢?查閱了awaitNanos的文檔發(fā)現(xiàn)它的返回值的含義是線(xiàn)程剩余的需要等待的時(shí)間,為了讓awaitNanos能夠真正地等待你所指定的時(shí)間,推薦的寫(xiě)法是:

 boolean aMethod(long timeout, TimeUnit unit) {
   long nanos = unit.toNanos(timeout);
   lock.lock();
   try {
     while (!conditionBeingWaitedFor()) {
       if (nanos <= 0L)
         return false;
       nanos = theCondition.awaitNanos(nanos);
     }
     // ...
   } finally {
     lock.unlock();
   }
 }

其實(shí)CyclicBarrier中的寫(xiě)法就是這種推薦寫(xiě)法的變種。

當(dāng)時(shí)我就很疑惑,為什么這個(gè)awaitNanos方法這么不靠譜呢?我都讓它等待nanos之后再超時(shí),它卻有可能中途就原因不明地給我返回了。

回憶一下AQS中awaitNanos的實(shí)現(xiàn),它阻塞靠的是LockSupport.parkNanos方法,而LockSupport的文檔明確說(shuō)了,它的parkNanos會(huì)出現(xiàn)"偽喚醒"的問(wèn)題(也就是原因不明地返回),而且因?yàn)?code>parkNanos是個(gè)返回值為void的方法,它甚至不會(huì)告訴你剩余的時(shí)間是多少,AQS對(duì)其額外封裝了剩余的等待時(shí)間,也算是比較友好了。

總結(jié)


相比JUC中的其他類(lèi),CyclicBarrier的實(shí)現(xiàn)屬于比較接地氣的,基于ReentrantLock實(shí)現(xiàn)了自己的功能,可以學(xué)習(xí)它對(duì)于ReentrantLock的應(yīng)用

參考文獻(xiàn)


《實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)》(葛一鳴 郭超 著)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容