Phaser是一個更強大的、更復雜的同步輔助類,可以代替CyclicBarrier CountDownLatch的功能,但是比他們更強大。
Phaser類機制是在每一步結束的位置對線程進行同步,當所有的線程都完成了這一步,才能進行下一步。
當我們有并發(fā)任務并且需要分解成幾步執(zhí)行的時候,這種機制就非常適合。
CyclicBarrier CountDownLatch 只能在構造時指定參與量,而phaser可以動態(tài)的增減參與量。
phaser 使用說明:
使用phaser.arriveAndAwaitAdvance(); //等待參與者達到指定數(shù)量,才開始運行下面的代碼
使用phaser.arriveAndDeregister(); //注銷當前線程,該線程就不會進入休眠狀態(tài),也會從phaser的數(shù)量中減少
模擬代替CountDownLatch功能,只需要當前線程arriveAndAwaitAdvance()之后運行需要的代碼之后,就arriveAndDeregister()取消當前線程的注冊。
phaser有一個重大特性,就是不必對它的方法進行異常處理。置于休眠的線程不會響應中斷事件,不會拋出interruptedException異常, 只有一個方法會響應:AwaitAdvanceInterruptibly(int phaser).
其他api
arrive():這個方法通知phase對象一個參與者已經(jīng)完成了當前階段,但是它不應該等待其他參與者都完成當前階段,必須小心使用這個方法,因為它不會與其他線程同步。
awaitAdvance(int phase):如果傳入的階段參數(shù)與當前階段一致,這個方法會將當前線程至于休眠,直到這個階段的所有參與者都運行完成。如果傳入的階段參數(shù)與當前階段不一致,這個方法立即返回。
awaitAdvanceInterruptibly(int phaser):這個方法跟awaitAdvance(int phase)一樣,不同處是:該訪問將會響應線程中斷。會拋出interruptedException異常
將參與者注冊到phaser中:
register():將一個新的參與者注冊到phase中,這個新的參與者將被當成沒有執(zhí)完本階段的線程。
bulkRegister(int parties):將指定數(shù)目的參與者注冊到phaser中,所有這些新的參與者都將被當成沒有執(zhí)行完本階段的線程。
減少參與者
只提供了一個方法來減少參與者:arriveAndDeregister():告知phaser對應的線程已經(jīng)完成了當前階段,并它不會參與到下一階段的操作中。
強制終止
當一個phaser么有參與者的時候,它就處于終止狀態(tài),使用forceTermination()方法來強制phaser進入終止狀態(tài),不管是否存在未注冊的參與線程,當一個線程出現(xiàn)錯誤時,強制終止phaser是很有意義的。
當phaser處于終止狀態(tài)的時候,arriveAndAwaitAdvance() 和 awaitAdvance() 立即返回一個負數(shù),而不再是一個正值了,如果知道phaser可能會被終止,就需要驗證這些方法的值,以確定phaser是不是被終止了。
被終止的phaser不會保證參與者的同步。
example
public class PhaserExample1 {
private static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
final Phaser phaser = new Phaser();
IntStream.rangeClosed(1,5).boxed().map(i->phaser).forEach(Task::new);
phaser.register();
phaser.arriveAndAwaitAdvance();
System.out.println("all thread finish work.");
}
static class Task extends Thread{
private final Phaser phaser;
Task(Phaser phaser) {
this.phaser = phaser;
phaser.register();
start();
}
@Override
public void run() {
try {
System.out.println("The worker ["+getName()+"] start");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("The worker ["+getName()+"] end.");
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance();
}
}
}
運行結果:
The worker [Thread-0] start
The worker [Thread-2] start
The worker [Thread-1] start
The worker [Thread-4] start
The worker [Thread-3] start
The worker [Thread-3] end.
The worker [Thread-0] end.
The worker [Thread-1] end.
The worker [Thread-4] end.
The worker [Thread-2] end.
all thread finish work.
public class PhaserExample2 {
private final static Random random = new Random(System.currentTimeMillis());
/**
* running
*
* bicycle
*
* long jump
* @param args
*/
public static void main(String[] args) {
final Phaser phaser = new Phaser(5);
for (int i = 0; i < 6; i++) {
new Athletes(i,phaser).start();
}
}
static class Athletes extends Thread{
private final int no;
private final Phaser phaser;
Athletes(int no, Phaser phaser) {
this.no = no;
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(no + " running start.");
sleep(random.nextInt(5));
System.out.println(no + " running end.");
phaser.arriveAndAwaitAdvance();
System.out.println(no + " bicycle start.");
sleep(random.nextInt(5));
System.out.println(no + " bicycle end.");
phaser.arriveAndAwaitAdvance();
System.out.println(no + " long jump start.");
sleep(random.nextInt(5));
System.out.println(no + " long jump end.");
phaser.arriveAndAwaitAdvance();
}
private void sleep(int time){
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}