【細(xì)談Java并發(fā)】談?wù)凜yclicBarrier

1、簡介

CyclicBarrier是一個(gè)同步工具類,它允許一組線程在到達(dá)某個(gè)柵欄點(diǎn)(common barrier point)互相等待,發(fā)生阻塞,直到最后一個(gè)線程到達(dá)柵欄點(diǎn),柵欄才會(huì)打開,處于阻塞狀態(tài)的線程恢復(fù)繼續(xù)執(zhí)行.它非常適用于一組線程之間必需經(jīng)?;ハ嗟却那闆r。CyclicBarrier字面理解是循環(huán)的柵欄,之所以稱之為循環(huán)的是因?yàn)樵诘却€程釋放后,該柵欄還可以復(fù)用。

image

建議閱讀CyclicBarrier源碼前,先深入研究一下ReentrantLock的原理,搞清楚condition里await和signal的原理,這部分可以看我之前的文章:【細(xì)談Java并發(fā)】談?wù)凙QS【細(xì)談Java并發(fā)】談?wù)凴eentrantLock

好了,我們來看看如何使用它吧。

2、使用場景

我們可以簡單使用CyclicBarrier來模擬一下對(duì)戰(zhàn)平臺(tái)中玩家需要完全準(zhǔn)備好了,才能進(jìn)入游戲的場景。

public class CyclicBarrierTest {

    private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5);
    private final static CyclicBarrier BARRIER = new CyclicBarrier(5);

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            final String name = "玩家" + i;
            EXECUTOR_SERVICE.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(2000);
                        System.out.println(name + "已準(zhǔn)備,等待其他玩家準(zhǔn)備...");
                        BARRIER.await();
                        Thread.sleep(1000);
                        System.out.println(name + "已加入游戲");
                    } catch (InterruptedException e) {
                        System.out.println(name + "離開游戲");
                    } catch (BrokenBarrierException e) {
                        System.out.println(name + "離開游戲");
                    }
                }
            });
        }
        EXECUTOR_SERVICE.shutdown();
    }
}

輸出結(jié)果

玩家1已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家0已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家2已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家3已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家4已準(zhǔn)備,等待其他玩家準(zhǔn)備...
玩家2已加入游戲
玩家3已加入游戲
玩家4已加入游戲
玩家0已加入游戲
玩家1已加入游戲

3、原理分析

3.1、屬性

首先看看它里面的所有屬性。

public class CyclicBarrier {
    private static class Generation {
        boolean broken = false;
    }
    // 鎖
    private final ReentrantLock lock = new ReentrantLock();
    // 通過lock得到的一個(gè)狀態(tài)變量,用來await和signal
    private final Condition trip = lock.newCondition();
    // 通過構(gòu)造器傳入的參數(shù),表示總的等待線程的數(shù)量
    private final int parties;
    // 當(dāng)屏障正常打開后運(yùn)行的程序,通過最后一個(gè)調(diào)用await的線程來執(zhí)行
    private final Runnable barrierCommand;
    // 當(dāng)前的Generation。每當(dāng)屏障失效或者開閘之后都會(huì)自動(dòng)替換掉。從而實(shí)現(xiàn)重置的功能
    private Generation generation = new Generation();
    // 和parties一樣,每次線程await后減1
    private int count;
    ...省略后面代碼
}

3.2、構(gòu)造方法

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}
  1. 默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier已經(jīng)到達(dá)屏障位置,線程被阻塞。
  2. 另外一個(gè)構(gòu)造方法CyclicBarrier(int parties, Runnable barrierAction),其中barrierAction任務(wù)會(huì)在所有線程到達(dá)屏障后執(zhí)行。

3.3、await()

最主要的方法就是await()方法,調(diào)用await()的線程會(huì)等待直到有足夠數(shù)量的線程調(diào)用await——也就是開閘狀態(tài)。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
        BrokenBarrierException,
        TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await()和await(long, TimeUnit)都是調(diào)用dowait方法,區(qū)別就是參數(shù)不同,我們來看看dowait方法。

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)   // 如果當(dāng)前Generation是處于打破狀態(tài)則傳播這個(gè)BrokenBarrierExcption
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            // 如果當(dāng)前線程被中斷則使得當(dāng)前generation處于打破狀態(tài),重置剩余count。
            // 并且喚醒狀態(tài)變量。這時(shí)候其他線程會(huì)傳播BrokenBarrierException。
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;    // 嘗試降低當(dāng)前count
        /**
         * 如果當(dāng)前狀態(tài)將為0,則Generation處于開閘狀態(tài)。運(yùn)行可能存在的command,
         * 設(shè)置下一個(gè)Generation。相當(dāng)于每次開閘之后都進(jìn)行了一次reset。
         */
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction) // 如果運(yùn)行command失敗也會(huì)導(dǎo)致當(dāng)前屏障被打破。
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed) // 阻塞在當(dāng)前的狀態(tài)變量。
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {    // 如果當(dāng)前線程被中斷了則使得屏障被打破。并拋出異常。
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 從阻塞恢復(fù)之后,需要重新判斷當(dāng)前的狀態(tài)。
            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

此外再看下兩個(gè)小過程:

這兩個(gè)小過程當(dāng)然是需要鎖的,但是由于這兩個(gè)方法只是通過其他方法調(diào)用,所以依然是在持有鎖的范圍內(nèi)運(yùn)行的。這兩個(gè)方法都是對(duì)域進(jìn)行操作。

nextGeneration實(shí)際上在屏障開閘之后重置狀態(tài)。以待下一次調(diào)用。
breakBarrier實(shí)際上是在屏障打破之后設(shè)定打破狀態(tài),以喚醒其他線程并通知。

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

3.4、reset

reset方法比較簡單。但是這里還是要注意一下要先打破當(dāng)前屏蔽,然后再重建一個(gè)新的屏蔽。否則的話可能會(huì)導(dǎo)致信號(hào)丟失。

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

4、CountDownLatch的區(qū)別

我用白話說的通俗點(diǎn)吧。

  1. CountDownLatch的使用是一次性的,而CyclicBarrier可以用reset進(jìn)行重用。
  2. CountDownLatch是一個(gè)線程等待多個(gè)線程執(zhí)行完了,再進(jìn)行執(zhí)行。而CyclicBarrier是多個(gè)線程等待所有線程都執(zhí)行完了,再進(jìn)行執(zhí)行。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容