Phaser多任務(wù)協(xié)同的神器

上篇 CyclicBarrier多任務(wù)協(xié)同的利器 我們借助部門TB團(tuán)建的例子,一步步分析了 CyclicBarrier 多線程協(xié)調(diào)的功能。
并在文章末尾,留出思考
實際部門TB活動中,可能有人白天有事,不能參加公園的活動、但晚上會來聚餐;有人白天能參加,晚上不能參加;
并且公園的門票,聚餐費用,因參與人數(shù)不同,又有不同,需要統(tǒng)計各階段的參與人數(shù),以此計算經(jīng)費。
需求升級后,如何實現(xiàn)呢?CyclicBarrier 能完成嗎?

其實在上篇文章中,我們分析了初版TB需求的任務(wù)特點,其中之一就是參與者的數(shù)量,是確定的。
但當(dāng)前需求,多個參與階段的參與者數(shù)量,各不相同,基本確定 CyclicBarrier 完成不了。
———— 別慌,針對多個階段,靈活設(shè)置參與者數(shù)量的場景,JDK提供了工具類 Phaser。

照舊,先看看 Phaser 的源碼注釋:

A reusable synchronization barrier, similar in functionality to
 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
 * but supporting more flexible usage.

Phaser 是一個可重用的同步屏障,功能上跟 CyclicBarrier 和 CountDownLatch 相似,但支持更多靈活的用法。

看過 CountDownLatch的兩種常用場景CyclicBarrier多任務(wù)協(xié)同的利器 的朋友,一定了解:CountDownLatch 能夠?qū)崿F(xiàn)一個或多個線程阻塞等待,直到其他線程完成后再執(zhí)行;
而 CyclicBarrier 允許多個線程相互等待,直到所有參與者到達(dá)屏障同步點后,再往下執(zhí)行。
可以說,Phaser 是二者功能的增強(qiáng)和結(jié)合。

Phaser 階段協(xié)同器

Java 7 中增加的一個用于多階段同步控制的工具類,他包含了 CycIicBarrier 和 CountDownLatch 的相關(guān)功能,讓它們更強(qiáng)大靈活。
下面通過部門TB,多階段不同參與者的例子,具體探究 Phaser 的原理。

部門團(tuán)建,需求升級

公司組織周末郊游,大家各自從公司出發(fā)到公園集合,大家都到了之后,出發(fā)到公園各自游玩,然后在公園門口集合,再去餐廳就餐,大家都到了就可以用餐,有的員工白天有事,選擇
參加晚上的聚餐,有的員工則晚上有事,只參加白天的活動。

任務(wù)特點分析:

  • 多階段協(xié)同,但階段的參與數(shù)是可變的,用 CyclicBarrier 好像不好實現(xiàn)。
  • 假定 第一階段:到公司集合5人(任務(wù)數(shù)5),去公園游玩。
  • 第二階段:到公園門口集合,有2人因晚上有事,自行回家了;則3人去餐廳,這是減少參與數(shù)(任務(wù)數(shù)變?yōu)?)
  • 第三階段:餐廳集合,有另4人參與聚餐,這是增加參與數(shù)(任務(wù)數(shù)變?yōu)?)

實際上,當(dāng)前任務(wù)最大的特點是:多階段等待一起出發(fā)、每階段的任務(wù)數(shù)可靈活調(diào)整。

多個線程協(xié)作執(zhí)行的任務(wù),分為多個階段,每個階段都可以有任意個參與者線程,可以隨時注冊并參與到某個階段;
當(dāng)一個階段中所有任務(wù)都完成之后,Phaser 的 onAdvance() 被調(diào)用(可以通過覆蓋添加自定文處理邏輯(類似CyclicBarrier循環(huán)屏障使用的Runnable接口)),然后Phaser釋放等待線程,自動進(jìn)入下個階段,如此循環(huán),直到Phaser不再包含任何參與者。

由于 Phaser 比較復(fù)雜,API也較為繁多,下面將 Phaser 提供的API分為多組。
構(gòu)造方法

  • newPhaser() 不指定數(shù)量,參與任務(wù)數(shù)為0。
  • new Phaser(int parties) 指定初始參與任務(wù)數(shù)
  • new Phaser(Phaser phaser) 指定父階段器,子對象整體作為一個參與者加入到父對象,當(dāng)子對象中沒有參與者時,自動從父對象解除注冊
  • new Phaser(Phaser phaser,int parties)

增減參與任務(wù)數(shù)方法

  • int register() 增加一個數(shù),返回當(dāng)前階段號。
  • int bulkRegister(int parties) 增加指定個數(shù),返回當(dāng)前階段號。
  • int arriveAndDeregister() 減少一個任務(wù)數(shù),返回當(dāng)前階段號。

到達(dá)、等待方法

  • int arrive() 到達(dá)(任務(wù)完成),返回當(dāng)前階段號。
  • int arriveAndAwaitAdvance() 到達(dá)后等待其他任務(wù)到達(dá),返回到達(dá)階段號。
  • int awaitAdvance(int phase) 在指定階段等待(必須是當(dāng)前階段才有效)
  • int awaitAdvanceInterruptibly(int phase) 階段到達(dá)觸發(fā)動作
  • int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
  • protected boolean onAdvance(int phase,int registeredParties)類似CyclicBarrier的觸發(fā)命令,通過重寫該方法來增加階段到達(dá)動作,該方法返回true將終結(jié)Phaser對象。

Phaser其他API:

  • void forceTermination() 強(qiáng)制結(jié)束
  • int getPhase() 獲取當(dāng)前階段號
  • boolean isTerminated() 判斷是否結(jié)束

注意事項:
單個 Phaser 實例允許的注冊任務(wù)數(shù)的.上限是65535,如果參與任務(wù)數(shù)超過,可以用父子Phaser樹的方式,通過父子關(guān)聯(lián)來增加參與者上限。

為什么是65535,這和 Phaser 的實現(xiàn)有關(guān):
Phaser中的state狀態(tài),64位的屬性state不同位被用來存放不同的值,低16位存放unarrived,低32位中的高16位存放parties,高32位的低31位存放phase,最高位存放terminated,即Phaser是否關(guān)閉;
2^16=65536

Phaser 實現(xiàn)多任務(wù)協(xié)同

下面來看,如何使用 Phaser 完成多階段任務(wù)協(xié)同。
我們首先將團(tuán)建的不同階段任務(wù),定義在 StaffTask :

static final Random random = new Random();

static class StaffTask {
    public void step1Task() throws InterruptedException {
        // 第一階段:來公司集合
        String staff = "員工【" + Thread.currentThread().getName() + "】";
        System.out.println(staff + "從家出發(fā)了……");
        Thread.sleep(random.nextInt(5000));
        System.out.println(staff + "到達(dá)公司");
    }
    
    public void step2Task() throws InterruptedException {
        // 第二階段:出發(fā)去公園
        String staff = "員工【" + Thread.currentThread().getName() + "】";
        System.out.println(staff + "出發(fā)去公園玩");
        Thread.sleep(random.nextInt(5000));
        System.out.println(staff + "到達(dá)公園門口集合");
    
    }
    
    public void step3Task() throws InterruptedException {
        // 第三階段:去餐廳
        String staff = "員工【" + Thread.currentThread().getName() + "】";
        System.out.println(staff + "出發(fā)去餐廳");
        Thread.sleep(random.nextInt(5000));
        System.out.println(staff + "到達(dá)餐廳");
    
    }
    
    public void step4Task() throws InterruptedException {
        // 第四階段:就餐
        String staff = "員工【" + Thread.currentThread().getName() + "】";
        System.out.println(staff + "開始用餐");
        Thread.sleep(random.nextInt(5000));
        System.out.println(staff + "用餐結(jié)束,回家");
    }
}

還是用隨機(jī)數(shù),模擬不同參與者的耗時。

重點是下面的 main 方法:

public static void main(String[] args) {

    final Phaser phaser = new Phaser() {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            // 參與者數(shù)量,去除主線程
            int staffs = registeredParties - 1;
            switch (phase) {
                case 0:
                    System.out.println("大家都到公司了,出發(fā)去公園,人數(shù):" + staffs);
                    break;
                case 1:
                    System.out.println("大家都到公司門口了,出發(fā)去餐廳,人數(shù):" + staffs);
                    break;
                case 2:
                    System.out.println("大家都到餐廳了,開始用餐,人數(shù):" + staffs);
                    break;

            }

            // 判斷是否只剩下主線程(一個參與者),如果是,則返回true,代表終止
            return registeredParties == 1;
        }
    };

    // 注冊主線程 ———— 讓主線程全程參與
    phaser.register();
    final StaffTask staffTask = new StaffTask();

    // 3個全程參與TB的員工
    for (int i = 0; i < 3; i++) {
        // 添加任務(wù)數(shù)
        phaser.register();
        new Thread(() -> {
            try {
                staffTask.step1Task();
                phaser.arriveAndAwaitAdvance();

                staffTask.step2Task();
                phaser.arriveAndAwaitAdvance();

                staffTask.step3Task();
                phaser.arriveAndAwaitAdvance();

                staffTask.step4Task();
                // 完成了,注銷離開
                phaser.arriveAndDeregister();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    // 兩個不聚餐的員工加入
    for (int i = 0; i < 2; i++) {
        phaser.register();
        new Thread(() -> {
            try {
                staffTask.step1Task();
                phaser.arriveAndAwaitAdvance();

                staffTask.step2Task();
                System.out.println("員工【" + Thread.currentThread().getName() + "】回家了");
                // 完成了,注銷離開
                phaser.arriveAndDeregister();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    while (!phaser.isTerminated()) {
        int phase = phaser.arriveAndAwaitAdvance();
        if (phase == 2) {
            // 到了去餐廳的階段,又新增4人,參加晚上的聚餐
            for (int i = 0; i < 4; i++) {
                phaser.register();
                new Thread(() -> {
                    try {
                        staffTask.step3Task();
                        phaser.arriveAndAwaitAdvance();

                        staffTask.step4Task();
                        // 完成了,注銷離開
                        phaser.arriveAndDeregister();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
}

先給出運行結(jié)果,直觀感受下:

員工【Thread-0】從家出發(fā)了……
員工【Thread-2】從家出發(fā)了……
員工【Thread-1】從家出發(fā)了……
員工【Thread-3】從家出發(fā)了……
員工【Thread-4】從家出發(fā)了……
員工【Thread-4】到達(dá)公司
員工【Thread-0】到達(dá)公司
員工【Thread-1】到達(dá)公司
員工【Thread-3】到達(dá)公司
員工【Thread-2】到達(dá)公司
大家都到公司了,出發(fā)去公園,人數(shù):5
員工【Thread-2】出發(fā)去公園玩
員工【Thread-1】出發(fā)去公園玩
員工【Thread-4】出發(fā)去公園玩
員工【Thread-3】出發(fā)去公園玩
員工【Thread-0】出發(fā)去公園玩
員工【Thread-1】到達(dá)公園門口集合
員工【Thread-2】到達(dá)公園門口集合
員工【Thread-0】到達(dá)公園門口集合
員工【Thread-3】到達(dá)公園門口集合
員工【Thread-3】回家了
員工【Thread-4】到達(dá)公園門口集合
員工【Thread-4】回家了
大家都到公司門口了,出發(fā)去餐廳,人數(shù):3
員工【Thread-2】出發(fā)去餐廳
員工【Thread-0】出發(fā)去餐廳
員工【Thread-1】出發(fā)去餐廳
員工【Thread-5】出發(fā)去餐廳
員工【Thread-6】出發(fā)去餐廳
員工【Thread-7】出發(fā)去餐廳
員工【Thread-8】出發(fā)去餐廳
員工【Thread-8】到達(dá)餐廳
員工【Thread-7】到達(dá)餐廳
員工【Thread-1】到達(dá)餐廳
員工【Thread-5】到達(dá)餐廳
員工【Thread-2】到達(dá)餐廳
員工【Thread-6】到達(dá)餐廳
員工【Thread-0】到達(dá)餐廳
大家都到餐廳了,開始用餐,人數(shù):7
員工【Thread-0】開始用餐
員工【Thread-8】開始用餐
員工【Thread-7】開始用餐
員工【Thread-1】開始用餐
員工【Thread-5】開始用餐
員工【Thread-2】開始用餐
員工【Thread-6】開始用餐
員工【Thread-5】用餐結(jié)束,回家
員工【Thread-2】用餐結(jié)束,回家
員工【Thread-7】用餐結(jié)束,回家
員工【Thread-1】用餐結(jié)束,回家
員工【Thread-6】用餐結(jié)束,回家
員工【Thread-8】用餐結(jié)束,回家
員工【Thread-0】用餐結(jié)束,回家

怎么樣,各個階段有各的任務(wù),并且各個階段參與者數(shù)量也不同。

代碼分析

1、Phaser 的創(chuàng)建

final Phaser phaser = new Phaser() {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        // 參與者數(shù)量,去除主線程
        int staffs = registeredParties - 1;
        switch (phase) {
            case 0:
                System.out.println("大家都到公司了,出發(fā)去公園,人數(shù):" + staffs);
                break;
            case 1:
                System.out.println("大家都到公司門口了,出發(fā)去餐廳,人數(shù):" + staffs);
                break;
            case 2:
                System.out.println("大家都到餐廳了,開始用餐,人數(shù):" + staffs);
                break;

        }

        // 判斷是否只剩下主線程(一個參與者),如果是,則返回true,代表終止
        return registeredParties == 1;
    }
};

創(chuàng)建 Phaser 時,重寫了 onAdvance() 方法。這個方法類似于 CyclicBarrier多任務(wù)協(xié)同的利器 文中所講的CyclicBarrier的回調(diào)函數(shù),在每個階段結(jié)束后,處理一些收尾工作。
不同的是,onAdvance() 方法更高級,方法入?yún)⒅苯痈嬖V我們了當(dāng)前階段,和該階段結(jié)束時的參與者數(shù)量;onAdvance() 方法簽名如下:

protected boolean onAdvance(int phase, int registeredParties) 

因此,重寫 onAdvance() 方法后,我們可以直接使用 phase 拿到當(dāng)前階段,registeredParties 為該階段結(jié)束時的參與者數(shù)量。

為了不讓主進(jìn)程結(jié)束,在創(chuàng)建完 phaser 對象后,立即注冊了參與者,該參與者是主線程,也就是讓主線程全程參與。

// 注冊主線程 ———— 讓主線程全程參與
phaser.register();

2、多階段任務(wù)協(xié)同
隨后,我們創(chuàng)建了3個線程,代表3個全程參與團(tuán)建的員工;

for (int i = 0; i < 3; i++) {
    // 添加任務(wù)數(shù)
    phaser.register();
    new Thread(() -> {
        try {
            staffTask.step1Task();
            phaser.arriveAndAwaitAdvance();

            staffTask.step2Task();
            phaser.arriveAndAwaitAdvance();

            staffTask.step3Task();
            phaser.arriveAndAwaitAdvance();

            staffTask.step4Task();
            // 完成了,注銷離開
            phaser.arriveAndDeregister();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

在每次創(chuàng)建線程前,使用 phaser.register(); 添加參與者數(shù)量;

在參與者完成每個階段時,調(diào)用 phaser.arriveAndAwaitAdvance(); 進(jìn)行協(xié)同等待,等所有參與者 都到達(dá)同步點后,再進(jìn)入下一階段。
arriveAndAwaitAdvance() 從方法名也能看出,就是報告自己到達(dá)了同步點,并且協(xié)同、等待 onAdvance() 方法的執(zhí)行。

在最后一個階段任務(wù)完成時,調(diào)用 phaser.arriveAndDeregister(); 代表:等這次協(xié)作完成后,我就離開。

接著,創(chuàng)建了2個線程,代表不聚餐的員工,線程的工作內(nèi)容僅僅是前兩個階段的任務(wù)。

3、在第二階段,加入新的參與者
最后,用了一個 while 判斷,檢查 phaser 的任務(wù)階段,在第二階段,新增了四個參與者,繼續(xù)參加后續(xù)任務(wù)的協(xié)作。

while (!phaser.isTerminated()) {
    int phase = phaser.arriveAndAwaitAdvance();
    if (phase == 2) {
        // 到了去餐廳的階段,又新增4人,參加晚上的聚餐
        for (int i = 0; i < 4; i++) {
            phaser.register();
            new Thread(() -> {
                try {
                    staffTask.step3Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step4Task();
                    // 完成了,注銷離開
                    phaser.arriveAndDeregister();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

完整源碼
https://github.com/ljheee/JavaConcurrencyInPractice/blob/master/src/main/java/com/ljheee/juc/PhaserUsage.java

Phaser 核心方法

從上面示例的代碼,可以看到頻繁使用的的也就幾個方法:

  • arriveAndAwaitAdvance():類似于CyclicBarrier的await()方法,等待其它線程都到屏障點之后,再繼續(xù)執(zhí)行。
  • arriveAndDeregister():把執(zhí)行到此的線程從Phaser中注銷掉。
  • isTerminated():判斷 Phaser 是否終止。
  • register():將一個新的參與者注冊到 Phaser 中,這個新的參與者會被當(dāng)成沒有執(zhí)行本階段的線程。
  • forceTermination():強(qiáng)制 Phaser 進(jìn)入終止態(tài)。

多階段協(xié)同,示意圖如下:

Phaser 的父子層級

Phaser 支持層級,根root Phaser、父Phaser把每個子的 Phaser 當(dāng)作父Phaser的一個parties,相當(dāng)于把子 Phaser 內(nèi)的一組參與者當(dāng)初父Phaser的成員;這個子 Phaser 的內(nèi)部有多少個parties線程,有多少階段,均可自定義。

父Phaser等待所有的parties都到達(dá)父的階段屏障,
即子Phaser的所有階段都執(zhí)行完,也就是子Phaser都到達(dá)父的階段屏障,父Phaser才會進(jìn)入下一階段:喚醒所有的子Phaser的parties線程繼續(xù)執(zhí)行下一階段。

CyclicBarrier 也可以作為階段屏障使用,每個線程重復(fù)做為CyclicBarrier的parties,但是沒辦法像Phaser那樣支持層級。
例如比賽,一個比賽分為3個階段(phase): 初賽、復(fù)賽和決賽,規(guī)定所有運動員都完成上一個階段的比賽才可以進(jìn)行下一階段的比賽,并且比賽的過程中存在晉級、允許退賽(deregister),晉級成功且未退賽的才能進(jìn)入下一階段,這個場景就很適合Phaser。

總結(jié)

JUC包下的CyclicBarrier、CountDownLatch、Phaser 三個都是線程同步輔助工具類,同步輔助三劍客。
CountDownLatch不能重用,CyclicBarrier、Phaser都可以重用,并且Phaser
更加靈活可以在運行期間隨時加入(register)新的parties,也可以在運行期間隨時退出(deregister)。

關(guān)于 CyclicBarrier、CountDownLatch 可閱讀 CountDownLatch的兩種常用場景 、CyclicBarrier多任務(wù)協(xié)同的利器

閱讀原文:https://mp.weixin.qq.com/s/e_3SDMW5pAG48oXQBaS1Zg


推薦閱讀

本文首發(fā)于 公眾號 架構(gòu)道與術(shù)(ToBeArchitecturer),歡迎關(guān)注、學(xué)習(xí)更多干貨~

推薦閱讀
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容