Phaser并發(fā)階段任務的運行

Phaser是一個更強大的、更復雜的同步輔助類,可以代替CyclicBarrier CountDownLatch的功能,但是比他們更強大。
Phaser類機制是在每一步結束的位置對線程進行同步,當所有的線程都完成了這一步,才能進行下一步。
當我們有并發(fā)任務并且需要分解成幾步執(zhí)行的時候,這種機制就非常適合。
CyclicBarrier CountDownLatch 只能在構造時指定參與量,而phaser可以動態(tài)的增減參與量。
phaser 使用說明:
使用phaser.arriveAndAwaitAdvance(); //等待參與者達到指定數(shù)量,才開始運行下面的代碼
使用phaser.arriveAndDeregister(); //注銷當前線程,該線程就不會進入休眠狀態(tài),也會從phaser的數(shù)量中減少
模擬代替CountDownLatch功能,只需要當前線程arriveAndAwaitAdvance()之后運行需要的代碼之后,就arriveAndDeregister()取消當前線程的注冊。
phaser有一個重大特性,就是不必對它的方法進行異常處理。置于休眠的線程不會響應中斷事件,不會拋出interruptedException異常, 只有一個方法會響應:AwaitAdvanceInterruptibly(int phaser).
其他api

arrive():這個方法通知phase對象一個參與者已經(jīng)完成了當前階段,但是它不應該等待其他參與者都完成當前階段,必須小心使用這個方法,因為它不會與其他線程同步。
awaitAdvance(int phase):如果傳入的階段參數(shù)與當前階段一致,這個方法會將當前線程至于休眠,直到這個階段的所有參與者都運行完成。如果傳入的階段參數(shù)與當前階段不一致,這個方法立即返回。
awaitAdvanceInterruptibly(int phaser):這個方法跟awaitAdvance(int phase)一樣,不同處是:該訪問將會響應線程中斷。會拋出interruptedException異常
將參與者注冊到phaser中:
register():將一個新的參與者注冊到phase中,這個新的參與者將被當成沒有執(zhí)完本階段的線程。
bulkRegister(int parties):將指定數(shù)目的參與者注冊到phaser中,所有這些新的參與者都將被當成沒有執(zhí)行完本階段的線程。
減少參與者
只提供了一個方法來減少參與者:arriveAndDeregister():告知phaser對應的線程已經(jīng)完成了當前階段,并它不會參與到下一階段的操作中。
強制終止
當一個phaser么有參與者的時候,它就處于終止狀態(tài),使用forceTermination()方法來強制phaser進入終止狀態(tài),不管是否存在未注冊的參與線程,當一個線程出現(xiàn)錯誤時,強制終止phaser是很有意義的。
當phaser處于終止狀態(tài)的時候,arriveAndAwaitAdvance() 和 awaitAdvance() 立即返回一個負數(shù),而不再是一個正值了,如果知道phaser可能會被終止,就需要驗證這些方法的值,以確定phaser是不是被終止了。
被終止的phaser不會保證參與者的同步。

example

public class PhaserExample1 {

    private static Random random = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        final Phaser phaser = new Phaser();
        IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);

        phaser.register();
        phaser.arriveAndAwaitAdvance();
        System.out.println("all thread finish work.");
    }

    static class Task extends Thread{

        private final Phaser phaser;

        Task(Phaser phaser) {
            this.phaser = phaser;
            phaser.register();
            start();
        }

        @Override
        public void run() {
            try {
                System.out.println("The worker ["+getName()+"] start");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println("The worker ["+getName()+"] end.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            phaser.arriveAndAwaitAdvance();
        }
    }
}

運行結果:
The worker [Thread-0] start
The worker [Thread-2] start
The worker [Thread-1] start
The worker [Thread-4] start
The worker [Thread-3] start
The worker [Thread-3] end.
The worker [Thread-0] end.
The worker [Thread-1] end.
The worker [Thread-4] end.
The worker [Thread-2] end.
all thread finish work.

public class PhaserExample2 {

    private final static Random random = new Random(System.currentTimeMillis());

    /**
     * running
     *
     * bicycle
     *
     * long jump
     * @param args
     */

    public static void main(String[] args) {
        final Phaser phaser = new Phaser(5);
        for (int i = 0; i < 6; i++) {
            new Athletes(i,phaser).start();
        }
    }

    static class Athletes extends Thread{
        private final int no;
        private final Phaser phaser;

        Athletes(int no, Phaser phaser) {
            this.no = no;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println(no + " running start.");
            sleep(random.nextInt(5));
            System.out.println(no + " running end.");
            phaser.arriveAndAwaitAdvance();

            System.out.println(no + " bicycle start.");
            sleep(random.nextInt(5));
            System.out.println(no + " bicycle end.");
            phaser.arriveAndAwaitAdvance();

            System.out.println(no + " long jump start.");
            sleep(random.nextInt(5));
            System.out.println(no + " long jump end.");
            phaser.arriveAndAwaitAdvance();
        }

        private void sleep(int time){
            try {
                TimeUnit.SECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 原文出處http://cmsblogs.com/ 『chenssy』 作為Executor框架中最核心的類,Thr...
    踩在浪花上看浪閱讀 1,319評論 0 4
  • 一、基礎知識:1、JVM、JRE和JDK的區(qū)別:JVM(Java Virtual Machine):java虛擬機...
    殺小賊閱讀 2,565評論 0 4
  • 今天真的好難得!我們閨蜜群每年一次新年聚餐在今天又歡聚一起了!與往年不同的是,今天沒有在外面酒店吃飯,因其中一...
    俏之花語閱讀 295評論 0 1
  • 感覺今天智商情商嚴重掉線,雖然平時也基本沒有。忙碌并充實,復雜也新鮮,挺好的工作生活,加快跟上節(jié)奏,做好應當做好的...
    一空萬有閱讀 190評論 0 0
  • 001 節(jié)奏 學任何樂器,都需要學節(jié)奏。節(jié)拍器是不可少的設備。野路子玩音樂的和專業(yè)音樂訓練過的,在節(jié)奏在這個坎兒上...
    優(yōu)爸_IT_???/span>閱讀 238評論 0 0

友情鏈接更多精彩內容