上篇 CyclicBarrier多任務(wù)協(xié)同的利器 我們借助部門TB團(tuán)建的例子,一步步分析了 CyclicBarrier 多線程協(xié)調(diào)的功能。
并在文章末尾,留出思考:
實際部門TB活動中,可能有人白天有事,不能參加公園的活動、但晚上會來聚餐;有人白天能參加,晚上不能參加;
并且公園的門票,聚餐費用,因參與人數(shù)不同,又有不同,需要統(tǒng)計各階段的參與人數(shù),以此計算經(jīng)費。
需求升級后,如何實現(xiàn)呢?CyclicBarrier 能完成嗎?
其實在上篇文章中,我們分析了初版TB需求的任務(wù)特點,其中之一就是參與者的數(shù)量,是確定的。
但當(dāng)前需求,多個參與階段的參與者數(shù)量,各不相同,基本確定 CyclicBarrier 完成不了。
———— 別慌,針對多個階段,靈活設(shè)置參與者數(shù)量的場景,JDK提供了工具類 Phaser。
照舊,先看看 Phaser 的源碼注釋:
A reusable synchronization barrier, similar in functionality to
* {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
* {@link java.util.concurrent.CountDownLatch CountDownLatch}
* but supporting more flexible usage.
Phaser 是一個可重用的同步屏障,功能上跟 CyclicBarrier 和 CountDownLatch 相似,但支持更多靈活的用法。
看過 CountDownLatch的兩種常用場景 和 CyclicBarrier多任務(wù)協(xié)同的利器 的朋友,一定了解:CountDownLatch 能夠?qū)崿F(xiàn)一個或多個線程阻塞等待,直到其他線程完成后再執(zhí)行;
而 CyclicBarrier 允許多個線程相互等待,直到所有參與者到達(dá)屏障同步點后,再往下執(zhí)行。
可以說,Phaser 是二者功能的增強(qiáng)和結(jié)合。
Phaser 階段協(xié)同器
Java 7 中增加的一個用于多階段同步控制的工具類,他包含了 CycIicBarrier 和 CountDownLatch 的相關(guān)功能,讓它們更強(qiáng)大靈活。
下面通過部門TB,多階段不同參與者的例子,具體探究 Phaser 的原理。
部門團(tuán)建,需求升級
公司組織周末郊游,大家各自從公司出發(fā)到公園集合,大家都到了之后,出發(fā)到公園各自游玩,然后在公園門口集合,再去餐廳就餐,大家都到了就可以用餐,有的員工白天有事,選擇
參加晚上的聚餐,有的員工則晚上有事,只參加白天的活動。
任務(wù)特點分析:
- 多階段協(xié)同,但階段的參與數(shù)是可變的,用 CyclicBarrier 好像不好實現(xiàn)。
- 假定 第一階段:到公司集合5人(任務(wù)數(shù)5),去公園游玩。
- 第二階段:到公園門口集合,有2人因晚上有事,自行回家了;則3人去餐廳,這是減少參與數(shù)(任務(wù)數(shù)變?yōu)?)
- 第三階段:餐廳集合,有另4人參與聚餐,這是增加參與數(shù)(任務(wù)數(shù)變?yōu)?)
實際上,當(dāng)前任務(wù)最大的特點是:多階段等待一起出發(fā)、每階段的任務(wù)數(shù)可靈活調(diào)整。
多個線程協(xié)作執(zhí)行的任務(wù),分為多個階段,每個階段都可以有任意個參與者線程,可以隨時注冊并參與到某個階段;
當(dāng)一個階段中所有任務(wù)都完成之后,Phaser 的 onAdvance() 被調(diào)用(可以通過覆蓋添加自定文處理邏輯(類似CyclicBarrier循環(huán)屏障使用的Runnable接口)),然后Phaser釋放等待線程,自動進(jìn)入下個階段,如此循環(huán),直到Phaser不再包含任何參與者。
由于 Phaser 比較復(fù)雜,API也較為繁多,下面將 Phaser 提供的API分為多組。
構(gòu)造方法
- newPhaser() 不指定數(shù)量,參與任務(wù)數(shù)為0。
- new Phaser(int parties) 指定初始參與任務(wù)數(shù)
- new Phaser(Phaser phaser) 指定父階段器,子對象整體作為一個參與者加入到父對象,當(dāng)子對象中沒有參與者時,自動從父對象解除注冊
- new Phaser(Phaser phaser,int parties)
增減參與任務(wù)數(shù)方法
- int register() 增加一個數(shù),返回當(dāng)前階段號。
- int bulkRegister(int parties) 增加指定個數(shù),返回當(dāng)前階段號。
- int arriveAndDeregister() 減少一個任務(wù)數(shù),返回當(dāng)前階段號。
到達(dá)、等待方法
- int arrive() 到達(dá)(任務(wù)完成),返回當(dāng)前階段號。
- int arriveAndAwaitAdvance() 到達(dá)后等待其他任務(wù)到達(dá),返回到達(dá)階段號。
- int awaitAdvance(int phase) 在指定階段等待(必須是當(dāng)前階段才有效)
- int awaitAdvanceInterruptibly(int phase) 階段到達(dá)觸發(fā)動作
- int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
- protected boolean onAdvance(int phase,int registeredParties)類似CyclicBarrier的觸發(fā)命令,通過重寫該方法來增加階段到達(dá)動作,該方法返回true將終結(jié)Phaser對象。
Phaser其他API:
- void forceTermination() 強(qiáng)制結(jié)束
- int getPhase() 獲取當(dāng)前階段號
- boolean isTerminated() 判斷是否結(jié)束
注意事項:
單個 Phaser 實例允許的注冊任務(wù)數(shù)的.上限是65535,如果參與任務(wù)數(shù)超過,可以用父子Phaser樹的方式,通過父子關(guān)聯(lián)來增加參與者上限。
為什么是65535,這和 Phaser 的實現(xiàn)有關(guān):
Phaser中的state狀態(tài),64位的屬性state不同位被用來存放不同的值,低16位存放unarrived,低32位中的高16位存放parties,高32位的低31位存放phase,最高位存放terminated,即Phaser是否關(guān)閉;
2^16=65536
Phaser 實現(xiàn)多任務(wù)協(xié)同
下面來看,如何使用 Phaser 完成多階段任務(wù)協(xié)同。
我們首先將團(tuán)建的不同階段任務(wù),定義在 StaffTask :
static final Random random = new Random();
static class StaffTask {
public void step1Task() throws InterruptedException {
// 第一階段:來公司集合
String staff = "員工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "從家出發(fā)了……");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到達(dá)公司");
}
public void step2Task() throws InterruptedException {
// 第二階段:出發(fā)去公園
String staff = "員工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出發(fā)去公園玩");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到達(dá)公園門口集合");
}
public void step3Task() throws InterruptedException {
// 第三階段:去餐廳
String staff = "員工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出發(fā)去餐廳");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到達(dá)餐廳");
}
public void step4Task() throws InterruptedException {
// 第四階段:就餐
String staff = "員工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "開始用餐");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "用餐結(jié)束,回家");
}
}
還是用隨機(jī)數(shù),模擬不同參與者的耗時。
重點是下面的 main 方法:
public static void main(String[] args) {
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// 參與者數(shù)量,去除主線程
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出發(fā)去公園,人數(shù):" + staffs);
break;
case 1:
System.out.println("大家都到公司門口了,出發(fā)去餐廳,人數(shù):" + staffs);
break;
case 2:
System.out.println("大家都到餐廳了,開始用餐,人數(shù):" + staffs);
break;
}
// 判斷是否只剩下主線程(一個參與者),如果是,則返回true,代表終止
return registeredParties == 1;
}
};
// 注冊主線程 ———— 讓主線程全程參與
phaser.register();
final StaffTask staffTask = new StaffTask();
// 3個全程參與TB的員工
for (int i = 0; i < 3; i++) {
// 添加任務(wù)數(shù)
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
phaser.arriveAndAwaitAdvance();
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注銷離開
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 兩個不聚餐的員工加入
for (int i = 0; i < 2; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
System.out.println("員工【" + Thread.currentThread().getName() + "】回家了");
// 完成了,注銷離開
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
while (!phaser.isTerminated()) {
int phase = phaser.arriveAndAwaitAdvance();
if (phase == 2) {
// 到了去餐廳的階段,又新增4人,參加晚上的聚餐
for (int i = 0; i < 4; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注銷離開
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
}
先給出運行結(jié)果,直觀感受下:
員工【Thread-0】從家出發(fā)了……
員工【Thread-2】從家出發(fā)了……
員工【Thread-1】從家出發(fā)了……
員工【Thread-3】從家出發(fā)了……
員工【Thread-4】從家出發(fā)了……
員工【Thread-4】到達(dá)公司
員工【Thread-0】到達(dá)公司
員工【Thread-1】到達(dá)公司
員工【Thread-3】到達(dá)公司
員工【Thread-2】到達(dá)公司
大家都到公司了,出發(fā)去公園,人數(shù):5
員工【Thread-2】出發(fā)去公園玩
員工【Thread-1】出發(fā)去公園玩
員工【Thread-4】出發(fā)去公園玩
員工【Thread-3】出發(fā)去公園玩
員工【Thread-0】出發(fā)去公園玩
員工【Thread-1】到達(dá)公園門口集合
員工【Thread-2】到達(dá)公園門口集合
員工【Thread-0】到達(dá)公園門口集合
員工【Thread-3】到達(dá)公園門口集合
員工【Thread-3】回家了
員工【Thread-4】到達(dá)公園門口集合
員工【Thread-4】回家了
大家都到公司門口了,出發(fā)去餐廳,人數(shù):3
員工【Thread-2】出發(fā)去餐廳
員工【Thread-0】出發(fā)去餐廳
員工【Thread-1】出發(fā)去餐廳
員工【Thread-5】出發(fā)去餐廳
員工【Thread-6】出發(fā)去餐廳
員工【Thread-7】出發(fā)去餐廳
員工【Thread-8】出發(fā)去餐廳
員工【Thread-8】到達(dá)餐廳
員工【Thread-7】到達(dá)餐廳
員工【Thread-1】到達(dá)餐廳
員工【Thread-5】到達(dá)餐廳
員工【Thread-2】到達(dá)餐廳
員工【Thread-6】到達(dá)餐廳
員工【Thread-0】到達(dá)餐廳
大家都到餐廳了,開始用餐,人數(shù):7
員工【Thread-0】開始用餐
員工【Thread-8】開始用餐
員工【Thread-7】開始用餐
員工【Thread-1】開始用餐
員工【Thread-5】開始用餐
員工【Thread-2】開始用餐
員工【Thread-6】開始用餐
員工【Thread-5】用餐結(jié)束,回家
員工【Thread-2】用餐結(jié)束,回家
員工【Thread-7】用餐結(jié)束,回家
員工【Thread-1】用餐結(jié)束,回家
員工【Thread-6】用餐結(jié)束,回家
員工【Thread-8】用餐結(jié)束,回家
員工【Thread-0】用餐結(jié)束,回家
怎么樣,各個階段有各的任務(wù),并且各個階段參與者數(shù)量也不同。
代碼分析
1、Phaser 的創(chuàng)建
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// 參與者數(shù)量,去除主線程
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出發(fā)去公園,人數(shù):" + staffs);
break;
case 1:
System.out.println("大家都到公司門口了,出發(fā)去餐廳,人數(shù):" + staffs);
break;
case 2:
System.out.println("大家都到餐廳了,開始用餐,人數(shù):" + staffs);
break;
}
// 判斷是否只剩下主線程(一個參與者),如果是,則返回true,代表終止
return registeredParties == 1;
}
};
創(chuàng)建 Phaser 時,重寫了 onAdvance() 方法。這個方法類似于 CyclicBarrier多任務(wù)協(xié)同的利器 文中所講的CyclicBarrier的回調(diào)函數(shù),在每個階段結(jié)束后,處理一些收尾工作。
不同的是,onAdvance() 方法更高級,方法入?yún)⒅苯痈嬖V我們了當(dāng)前階段,和該階段結(jié)束時的參與者數(shù)量;onAdvance() 方法簽名如下:
protected boolean onAdvance(int phase, int registeredParties)
因此,重寫 onAdvance() 方法后,我們可以直接使用 phase 拿到當(dāng)前階段,registeredParties 為該階段結(jié)束時的參與者數(shù)量。
為了不讓主進(jìn)程結(jié)束,在創(chuàng)建完 phaser 對象后,立即注冊了參與者,該參與者是主線程,也就是讓主線程全程參與。
// 注冊主線程 ———— 讓主線程全程參與
phaser.register();
2、多階段任務(wù)協(xié)同
隨后,我們創(chuàng)建了3個線程,代表3個全程參與團(tuán)建的員工;
for (int i = 0; i < 3; i++) {
// 添加任務(wù)數(shù)
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
phaser.arriveAndAwaitAdvance();
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注銷離開
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
在每次創(chuàng)建線程前,使用 phaser.register(); 添加參與者數(shù)量;
在參與者完成每個階段時,調(diào)用 phaser.arriveAndAwaitAdvance(); 進(jìn)行協(xié)同等待,等所有參與者 都到達(dá)同步點后,再進(jìn)入下一階段。
arriveAndAwaitAdvance() 從方法名也能看出,就是報告自己到達(dá)了同步點,并且協(xié)同、等待 onAdvance() 方法的執(zhí)行。
在最后一個階段任務(wù)完成時,調(diào)用 phaser.arriveAndDeregister(); 代表:等這次協(xié)作完成后,我就離開。
接著,創(chuàng)建了2個線程,代表不聚餐的員工,線程的工作內(nèi)容僅僅是前兩個階段的任務(wù)。
3、在第二階段,加入新的參與者
最后,用了一個 while 判斷,檢查 phaser 的任務(wù)階段,在第二階段,新增了四個參與者,繼續(xù)參加后續(xù)任務(wù)的協(xié)作。
while (!phaser.isTerminated()) {
int phase = phaser.arriveAndAwaitAdvance();
if (phase == 2) {
// 到了去餐廳的階段,又新增4人,參加晚上的聚餐
for (int i = 0; i < 4; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注銷離開
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
Phaser 核心方法
從上面示例的代碼,可以看到頻繁使用的的也就幾個方法:
- arriveAndAwaitAdvance():類似于CyclicBarrier的await()方法,等待其它線程都到屏障點之后,再繼續(xù)執(zhí)行。
- arriveAndDeregister():把執(zhí)行到此的線程從Phaser中注銷掉。
- isTerminated():判斷 Phaser 是否終止。
- register():將一個新的參與者注冊到 Phaser 中,這個新的參與者會被當(dāng)成
沒有執(zhí)行本階段的線程。 - forceTermination():強(qiáng)制 Phaser 進(jìn)入終止態(tài)。
多階段協(xié)同,示意圖如下:

Phaser 的父子層級
Phaser 支持層級,根root Phaser、父Phaser把每個子的 Phaser 當(dāng)作父Phaser的一個parties,相當(dāng)于把子 Phaser 內(nèi)的一組參與者當(dāng)初父Phaser的成員;這個子 Phaser 的內(nèi)部有多少個parties線程,有多少階段,均可自定義。
父Phaser等待所有的parties都到達(dá)父的階段屏障,
即子Phaser的所有階段都執(zhí)行完,也就是子Phaser都到達(dá)父的階段屏障,父Phaser才會進(jìn)入下一階段:喚醒所有的子Phaser的parties線程繼續(xù)執(zhí)行下一階段。
CyclicBarrier 也可以作為階段屏障使用,每個線程重復(fù)做為CyclicBarrier的parties,但是沒辦法像Phaser那樣支持層級。
例如比賽,一個比賽分為3個階段(phase): 初賽、復(fù)賽和決賽,規(guī)定所有運動員都完成上一個階段的比賽才可以進(jìn)行下一階段的比賽,并且比賽的過程中存在晉級、允許退賽(deregister),晉級成功且未退賽的才能進(jìn)入下一階段,這個場景就很適合Phaser。
總結(jié)
JUC包下的CyclicBarrier、CountDownLatch、Phaser 三個都是線程同步輔助工具類,同步輔助三劍客。
CountDownLatch不能重用,CyclicBarrier、Phaser都可以重用,并且Phaser
更加靈活可以在運行期間隨時加入(register)新的parties,也可以在運行期間隨時退出(deregister)。
關(guān)于 CyclicBarrier、CountDownLatch 可閱讀 CountDownLatch的兩種常用場景 、CyclicBarrier多任務(wù)協(xié)同的利器。
閱讀原文:https://mp.weixin.qq.com/s/e_3SDMW5pAG48oXQBaS1Zg
推薦閱讀
-
JUC源碼
本文首發(fā)于 公眾號 架構(gòu)道與術(shù)(ToBeArchitecturer),歡迎關(guān)注、學(xué)習(xí)更多干貨~
推薦閱讀
-
并發(fā)設(shè)計模式
并發(fā)設(shè)計模式 | 生產(chǎn)者-消費者模式,流水線思想提高效率
并發(fā)設(shè)計模式 | 兩階段終止模式:如何優(yōu)雅地終止線程?
并發(fā)設(shè)計模式 | Worker Thread模式:如何避免重復(fù)創(chuàng)建線程?
并發(fā)設(shè)計模式 | Thread-Per-Message每請求每線程
并發(fā)設(shè)計模式 | Balking模式:"你不需要,就算了"
并發(fā)設(shè)計模式 | Guarded Suspension模式:等待喚醒機(jī)制的規(guī)范實現(xiàn)
-
并發(fā)工具
26 | Fork/Join:單機(jī)版的MapReduce
25 | CompletionService:批量執(zhí)行異步任務(wù)
24 | CompletableFuture:Java異步編程
23 | Future:獲取線程的執(zhí)行結(jié)果
22 | Executor與線程池:如何創(chuàng)建正確的線程池?
19 | CountDownLatch和CyclicBarrier讓多線程步調(diào)一致
17 | ReadWriteLock:如何快速實現(xiàn)一個完備的緩存?
-
并發(fā)基礎(chǔ)
12 | 并發(fā)編程:如何用面向?qū)ο笏枷雽懞貌l(fā)程序?
10 | Java線程(中):創(chuàng)建多少線程才是合適的?
06 | 用“等待-通知”機(jī)制優(yōu)化循環(huán)等待