- Latch 模式背景釋義:
- 有A、B、C、D若干個(gè)并行任務(wù),現(xiàn)在F任務(wù)需要等ABCD全部完成之后再進(jìn)行,只要其中任一一個(gè)并發(fā)任務(wù)未執(zhí)行完F任務(wù)就阻塞或者拋出超時(shí)異常、取消任務(wù)
- 代碼翻譯:
抽象任務(wù)接口約束類(lèi)
public abstract class Latch {
protected int limit;
public Latch(int limit){
this.limit = limit;
}
/**
* 阻塞當(dāng)前調(diào)用者所在線(xiàn)程,阻塞的邏輯為,如果當(dāng)前還有任務(wù)未完成則阻塞
*
* @throws InterruptedException
*/
public abstract void await() throws InterruptedException;
public abstract void await(TimeUnit unit,long time) throws InterruptedException, TimeoutException;
/**
* 誰(shuí)執(zhí)行完任務(wù)就將任務(wù)完成標(biāo)志減1,當(dāng)任務(wù)完成標(biāo)志為0時(shí)表示所有任務(wù)均已完成
* 本方法為同步方法,任務(wù)線(xiàn)程執(zhí)行時(shí)需要先獲取到本接口的鎖,具體鎖住的對(duì)象為
* limit ,當(dāng)前任務(wù)線(xiàn)程執(zhí)行完任務(wù)之后將標(biāo)志-1,同時(shí)釋放鎖
*/
public abstract void countDown();
/**
* 獲取剩下未完成任務(wù)的個(gè)數(shù)
* @return
*/
public abstract int getUnarrived();
}
具體任務(wù)實(shí)現(xiàn)類(lèi):
public class CountDownLatch extends Latch {
public CountDownLatch(int limit) {
super(limit);
}
@Override
public void await() throws InterruptedException {
synchronized (this){
while(limit > 0){
this.wait();
}
}
}
@Override
public void await(TimeUnit unit, long time) throws InterruptedException, TimeoutException {
if(time <=0){
throw new IllegalArgumentException("the time is invalid");
}
//將時(shí)間轉(zhuǎn)換為納秒
long remainingNanos = unit.toNanos(time);
//等待任務(wù)將在endNanos 納秒后 超時(shí)
final long endNanos = System.nanoTime() + remainingNanos;
synchronized (this){
while(limit > 0){
//超時(shí) 直接拋出異常
if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0){
throw new TimeoutException("time out");
}
//等待remainingNanos 在等待的過(guò)程中可能會(huì)被中斷,需要重新計(jì)算remainingNanos時(shí)間
this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
//執(zhí)行線(xiàn)程中斷 時(shí)重新計(jì)算時(shí)間
remainingNanos = endNanos - System.nanoTime();
}
}
}
@Override
public void countDown() {
synchronized (this){
if(limit <= 0){
throw new IllegalStateException("all of task has done");
}
limit --;
notifyAll();
}
}
@Override
public int getUnarrived() {
return limit;
}
}
工作線(xiàn)程:
public class LatchTaskThread extends Thread {
private Latch latch;
private String programmer;
private String transportion;
public LatchTaskThread(Latch latch,String programmer,String transportion){
this.latch = latch;
this.programmer = programmer;
this.transportion = transportion;
}
@Override
public void run() {
super.run();
System.out.println("26--------執(zhí)行者:"+this.programmer + " start task:"+transportion);
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("26--------執(zhí)行者:"+this.programmer + " finsh task:"+transportion);
latch.countDown();
}
}
- 測(cè)試用例:
private void testLatch() throws InterruptedException, TimeoutException {
latch = new CountDownLatch(4);
new LatchTaskThread(latch,"A","Bus").start();
new LatchTaskThread(latch,"B","Stock").start();
new LatchTaskThread(latch,"C","Play Crad").start();
new LatchTaskThread(latch,"D","Work").start();
//latch.await();
latch.await(TimeUnit.SECONDS,5);
System.out.println("43-------所有任務(wù)均已經(jīng)完成");
}
- 參考書(shū)籍《Java 高并發(fā)編程詳解》