Java并發(fā)之CyclicBarrier

barrier(屏障)與互斥量、讀寫鎖、自旋鎖不同,它不是用來保護(hù)臨界區(qū)的。相反,它跟條件變量一樣,是用來協(xié)同多線程一起工作的。
??條件變量是多線程間傳遞狀態(tài)的改變來達(dá)到協(xié)同工作的效果。屏障是多線程各自做自己的工作,如果某一線程完成了工作,就等待在屏障那里,直到其他線程的工作都完成了,再一起做別的事。舉個通俗的例子:
??1.對于條件變量。在接力賽跑里,1號隊員開始跑的時候,2,3,4號隊員都站著不動,直到1號隊員跑完一圈,把接力棒給2號隊員,2號隊員收到接力棒后就可以跑了,跑完再給3號隊員。這里這個接力棒就相當(dāng)于條件變量,條件滿足后就可以由下一個隊員(線程)跑。
??2.對于屏障:在百米賽跑里,比賽沒開始之前,每個運(yùn)動員都在賽場上自由活動,有的熱身,有的喝水,有的跟教練談?wù)?。比賽快開始時,準(zhǔn)備完畢的運(yùn)動員就預(yù)備在起跑線上,如果有個運(yùn)動員還沒準(zhǔn)備完(除去特殊情況),他們就一直等,直到運(yùn)動員都在起跑線上,裁判喊口號后再開始跑。這里的起跑線就是屏障,做完準(zhǔn)備工作的運(yùn)動員都等在起跑線,直到其他運(yùn)動員也把準(zhǔn)備工作做完。

java.util.concurrent.CyclicBarrier類是一個同步機(jī)制。它可以通過一些算法來同步線程處理的過程。換言之,就是所有的線程必須等待對方,直到所有的線程到達(dá)屏障,然后繼續(xù)運(yùn)行。之所以叫做“循環(huán)屏障”,是因為這個屏障可以被重復(fù)使用。

CyclicBarrier有兩個構(gòu)造參數(shù),分別是:

CyclicBarrier(int parties)

創(chuàng)建一個新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,但它不會在啟動 barrier 時執(zhí)行預(yù)定義的操作。
??CyclicBarrier(int parties, Runnable barrierAction)

創(chuàng)建一個新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時啟動,并在啟動 barrier 時執(zhí)行給定的屏障操作,該操作由最后一個進(jìn)入 barrier 的線程執(zhí)行。

讓線程在CyclicBarrier中等待

有兩個方法可以讓線程在CyclicBarrier處等待:

barrier.await();
??barrier.await(10, TimeUnit.SECONDS);
??第二個方法指線程等待的超時時間,當(dāng)出現(xiàn)等待超時的時候,當(dāng)前線程會被釋放,但會像其他線程傳播出BrokenBarrierException異常。
??所有線程在CyclicBarrier等待,是指:

   ? 最后一個線程到達(dá)(調(diào)用await方法)

   ? 一個線程被被另外一個線程中斷(另外一個線程調(diào)用了這個現(xiàn)場的interrupt()方法)

   ? 其中一個等待的線程被中斷

   ? 其中一個等待的線程超時

   ? 一個外部的線程調(diào)用了CyclicBarrier.reset()方法。

下面以5個線程模擬5個運(yùn)動員。運(yùn)動員在賽跑的時候都會準(zhǔn)備一段時間,當(dāng)裁判發(fā)現(xiàn)所有的運(yùn)動員都準(zhǔn)備完畢的時候,就舉起發(fā)令槍,比賽開始。


package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 模擬運(yùn)動員
**/
public class MyThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    private String name;

    public MyThread(CyclicBarrier cyclicBarrier, String name) {
        super();
        this.cyclicBarrier = cyclicBarrier;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(name + "開始準(zhǔn)備");
        try {
            Thread.currentThread().sleep(5000);
            System.out.println(name + "準(zhǔn)備完畢!等待發(fā)令槍");
            try {
                cyclicBarrier.await();
            } catch (BrokenBarrierException e) {            
                e.printStackTrace();
            }
        } catch (InterruptedException e) {

            e.printStackTrace();
        }
    }
}
//測試類
public class Test {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {

            @Override
            public void run() {
                System.out.println("發(fā)令槍響了,跑!");

            }
        });
        for (int i = 0; i < 5; i++) {
            new MyThread(barrier, "運(yùn)動員" + i + "號").start();

        }

    }

}

當(dāng)執(zhí)行測試類的時候,輸出如下的結(jié)果(順序每次執(zhí)行可能會不太一樣):

運(yùn)動員1號開始準(zhǔn)備
運(yùn)動員3號開始準(zhǔn)備
運(yùn)動員2號開始準(zhǔn)備
運(yùn)動員0號開始準(zhǔn)備
運(yùn)動員4號開始準(zhǔn)備
運(yùn)動員1號準(zhǔn)備完畢!等待發(fā)令槍
運(yùn)動員4號準(zhǔn)備完畢!等待發(fā)令槍
運(yùn)動員0號準(zhǔn)備完畢!等待發(fā)令槍
運(yùn)動員3號準(zhǔn)備完畢!等待發(fā)令槍
運(yùn)動員2號準(zhǔn)備完畢!等待發(fā)令槍
發(fā)令槍響了,跑!

從輸出可以看到,當(dāng)給定數(shù)量的參與者(線程)調(diào)用了await()方法之后,屏障放開,CyclicBarrier中的屏障動作被觸發(fā)了。如果沒有達(dá)到指定的數(shù)量,就會一直被阻塞。

Barrier被破壞

BrokenBarrierException如果在參與者(線程)在等待的過程中,Barrier被破壞,就會拋出BrokenBarrierException??梢杂?strong>isBroken()方法檢測Barrier是否被破壞。
??1.如果有線程已經(jīng)處于等待狀態(tài),調(diào)用reset方法會導(dǎo)致已經(jīng)在等待的線程出現(xiàn)BrokenBarrierException異常。并且由于出現(xiàn)了BrokenBarrierException,將會導(dǎo)致始終無法等待。

比如,五個運(yùn)動員,其中一個在等待發(fā)令槍的過程中錯誤地接收到裁判傳過來的指令,導(dǎo)致這個運(yùn)動員以為今天比賽取消就離開了賽場。但是其他運(yùn)動員都領(lǐng)會的裁判正確的指令,剩余的運(yùn)動員在起跑線上無限地等待下去,并且裁判看到運(yùn)動員沒有到齊,也不會打發(fā)令槍。

package thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class MyThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    private String name;
    private int ID;

    public MyThread(CyclicBarrier cyclicBarrier, String name,int ID) {
        super();
        this.cyclicBarrier = cyclicBarrier;
        this.name = name;
        this.ID=ID;

    }
    @Override
    public void run() {
        System.out.println(name + "開始準(zhǔn)備");
        try {
            Thread.sleep(ID*1000);  //不同運(yùn)動員準(zhǔn)備時間不一樣,方便模擬不同情況
            System.out.println(name + "準(zhǔn)備完畢!在起跑線等待發(fā)令槍");
            try {
                cyclicBarrier.await();
                System.out.println(name + "跑完了路程!");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
                System.out.println(name+"看不見起跑線了");
            }
            System.out.println(name+"退場!");
        } catch (InterruptedException e) {

            e.printStackTrace();
        }

    }

}
public class Test {

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("發(fā)令槍響了,跑!");

            }
        });

        for (int i = 0; i < 5; i++) {
            new MyThread(barrier, "運(yùn)動員" + i + "號", i).start();
        }
        Thread.sleep(1000);
        barrier.reset();
    }

}

輸出結(jié)果:

運(yùn)動員0號開始準(zhǔn)備
運(yùn)動員1號開始準(zhǔn)備
運(yùn)動員2號開始準(zhǔn)備
運(yùn)動員3號開始準(zhǔn)備
運(yùn)動員4號開始準(zhǔn)備
運(yùn)動員0號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員1號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.util.concurrent.BrokenBarrierException
運(yùn)動員0號看不見起跑線了
運(yùn)動員0號退場!
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員2號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員3號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員4號準(zhǔn)備完畢!在起跑線等待發(fā)令槍

從輸出可以看到,運(yùn)動員0號在等待的過程中,主線程調(diào)用了reset方法,導(dǎo)致拋出BrokenBarrierException異常。但是其他線程并沒有受到影響,它們會一直等待下去,從而一直被阻塞。

2.如果在等待的過程中,線程被中斷,也會拋出BrokenBarrierException異常,并且這個異常會傳播到其他所有的線程。

package thread;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;

public class Test {
static   Map<Integer,Thread>   threads=new HashMap<>();
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("發(fā)令槍響了,跑!");

            }
        });

        for (int i = 0; i < 5; i++) {
        MyThread t = new MyThread(barrier, "運(yùn)動員" + i + "號", i);
            threads.put(i, t);
            t.start();
        }
        Thread.sleep(3000);
        threads.get(1).interrupt();
    }

}

輸出:

運(yùn)動員0號開始準(zhǔn)備
運(yùn)動員2號開始準(zhǔn)備
運(yùn)動員3號開始準(zhǔn)備
運(yùn)動員1號開始準(zhǔn)備
運(yùn)動員0號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員4號開始準(zhǔn)備
運(yùn)動員1號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員2號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員3號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.lang.InterruptedException
運(yùn)動員3號看不見起跑線了
運(yùn)動員3號退場!
運(yùn)動員2號看不見起跑線了
運(yùn)動員2號退場!
運(yùn)動員0號看不見起跑線了
運(yùn)動員0號退場!
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:234)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員4號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員4號看不見起跑線了
運(yùn)動員4號退場!

從輸出可以看到,其中一個線程被中斷,那么所有的運(yùn)動員都退場了。

3.如果在執(zhí)行屏障操作過程中發(fā)生異常,則該異常將傳播到當(dāng)前線程中,其他線程會拋出BrokenBarrierException,屏障被損壞。

這個就好比運(yùn)動員都沒有問題,而是裁判出問題了。裁判權(quán)力比較大,直接告訴所有的運(yùn)動員,今天不比賽了,你們都回家吧!

package thread;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;

public class Test {
    static Map<Integer, Thread> threads = new HashMap<>();

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                String str = null;
                str.substring(0, 1);
                System.out.println("發(fā)令槍響了,跑!");
            
            }
        });

        for (int i = 0; i < 5; i++) {
            MyThread t = new MyThread(barrier, "運(yùn)動員" + i + "號", i);
            threads.put(i, t);
            t.start();
        }

    }

}

輸出:

運(yùn)動員0號開始準(zhǔn)備
運(yùn)動員3號開始準(zhǔn)備
運(yùn)動員2號開始準(zhǔn)備
運(yùn)動員1號開始準(zhǔn)備
運(yùn)動員4號開始準(zhǔn)備
運(yùn)動員0號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員1號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員2號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員3號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員4號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
Exception in thread "Thread-4" java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員0號看不見起跑線了
運(yùn)動員0號退場!
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員3號看不見起跑線了
運(yùn)動員3號退場!
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員1號看不見起跑線了
運(yùn)動員1號退場!
java.lang.NullPointerException
    at thread.Test$1.run(Test.java:15)
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:220)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at thread.MyThread.run(MyThread.java:27)
運(yùn)動員2號看不見起跑線了
運(yùn)動員2號退場!

可以看到,如果在執(zhí)行屏障動作的過程中出現(xiàn)異常,那么所有的線程都會拋出BrokenBarrierException異常。

4.如果超出指定的等待時間,當(dāng)前線程會拋出 TimeoutException 異常,其他線程會拋出BrokenBarrierException異常。

package thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class MyThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    private String name;
    private int ID;

    public MyThread(CyclicBarrier cyclicBarrier, String name, int ID) {
        super();
        this.cyclicBarrier = cyclicBarrier;
        this.name = name;
        this.ID = ID;

    }

    @Override
    public void run() {
        System.out.println(name + "開始準(zhǔn)備");
        try {
            Thread.sleep(ID * 1000);
            System.out.println(name + "準(zhǔn)備完畢!在起跑線等待發(fā)令槍");
            try {
                try {
                    cyclicBarrier.await(ID * 1000, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(name + "跑完了路程!");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
                System.out.println(name + "看不見起跑線了");
            }
            System.out.println(name + "退場!");
        } catch (InterruptedException e) {

            e.printStackTrace();
        }

    }

}

輸出:

運(yùn)動員0號開始準(zhǔn)備
運(yùn)動員2號開始準(zhǔn)備
運(yùn)動員3號開始準(zhǔn)備
運(yùn)動員1號開始準(zhǔn)備
運(yùn)動員0號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
運(yùn)動員4號開始準(zhǔn)備
java.util.concurrent.TimeoutException運(yùn)動員0號跑完了路程!
運(yùn)動員0號退場!

    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at thread.MyThread.run(MyThread.java:29)
運(yùn)動員1號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at thread.MyThread.run(MyThread.java:29)
運(yùn)動員1號看不見起跑線了
運(yùn)動員1號退場!
運(yùn)動員2號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.util.concurrent.BrokenBarrierException
運(yùn)動員2號看不見起跑線了
運(yùn)動員2號退場!
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at thread.MyThread.run(MyThread.java:29)
運(yùn)動員3號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at thread.MyThread.run(MyThread.java:29)
運(yùn)動員3號看不見起跑線了
運(yùn)動員3號退場!
運(yùn)動員4號準(zhǔn)備完畢!在起跑線等待發(fā)令槍
java.util.concurrent.BrokenBarrierException
運(yùn)動員4號看不見起跑線了
運(yùn)動員4號退場!
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at thread.MyThread.run(MyThread.java:29)

從輸出可以看到,如果其中一個參與者拋出TimeoutException,其他參與者會拋出BrokenBarrierException。

與CountDownLatch不同的是,CyclicBarrier是可以重復(fù)使用的。并且CyclicBarrier可以傳遞一個action方法,是的所有線程到達(dá)后執(zhí)行某個任務(wù),這個是CountDownLatch不具備的。
新書Java并發(fā)編程系統(tǒng)與模型出版啦!比較通俗易懂,非常適合初學(xué)者,百度閱讀可以下載電子書。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容