一,F(xiàn)uture模式
????????Future 的意思是未來,假設(shè)有一個(gè)方法需要花費(fèi)很長(zhǎng)的時(shí)間才能獲取運(yùn)行結(jié)果,那么與其一直等待,不如先拿到一份最終數(shù)據(jù)的模板,即 Future 角色,等過一陣子再通過 Future 角色去獲取數(shù)據(jù),如果數(shù)據(jù)已經(jīng)好了則直接返回,否則就一直等待到有數(shù)據(jù)返回。
????????這種模式可以用在不是馬上需要一個(gè)操作的返回值時(shí),這樣可以提高程序的響應(yīng)性,使得一個(gè)方法的多個(gè)步驟可以并行執(zhí)行。
????????下面的代碼示例用來演示實(shí)現(xiàn) Future 模式:
/**
* @author koma <komazhang@foxmail.com>
* @date 2018-11-04
*/
public class Main {
public static void main(String[] args) {
Host host = new Host();
Data data1 = host.request(10, 'A');
Data data2 = host.request(20, 'B');
Data data3 = host.request(30, 'C');
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("data1 = "+data1.getContent());
System.out.println("data2 = "+data2.getContent());
System.out.println("data3 = "+data3.getContent());
}
}
public interface Data {
public abstract String getContent();
}
public class Host {
//這里使用 ThreadFactory 工廠類來創(chuàng)建線程
private static final ThreadFactory threadFactory = Executors.defaultThreadFactory();
public Data request(final int count, final char c) {
System.out.println("Request BEGIN");
FutureData futureData = new FutureData();
threadFactory.newThread(new MakeRealDataTask(count, c, futureData)).start();
System.out.println("Request END");
return futureData;
}
}
public class MakeRealDataTask implements Runnable {
private final int count;
private final char c;
private final FutureData futureData;
public MakeRealDataTask(int count, char c, FutureData futureData) {
this.count = count;
this.c = c;
this.futureData = futureData;
}
@Override
public void run() {
RealData realData = new RealData(count, c);
futureData.setRealData(realData);
}
}
public class FutureData implements Data {
private RealData realData = null;
private boolean ready = false;
/**
* 這里的 synchronized 主要是為了實(shí)現(xiàn) wait/notify 語義
* 本身該類并不需要同步
*
*/
public synchronized void setRealData(RealData realData) { //產(chǎn)生實(shí)際數(shù)據(jù)
if (ready) {
return;
}
this.realData = realData;
this.ready = true;
notifyAll();
}
@Override
public synchronized String getContent() { //獲取實(shí)際數(shù)據(jù),當(dāng)數(shù)據(jù)還未產(chǎn)生時(shí),則需要持續(xù)等待
while (!ready) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.getContent();
}
}
public class RealData implements Data {
private final String content;
public RealData(int count, char c) {
System.out.println("RealData BEGIN");
char[] buffer = new char[count];
for (int i = 0; i < count; i++) {
buffer[i] = c;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("RealData END");
this.content = new String(buffer);
}
@Override
public String getContent() {
return content;
}
}
????????Future 模式和 Thread-Per-Message 模式的區(qū)別在于 Future 模式可以從異步執(zhí)行的線程中獲取返回值。
????????Future 模式也可以有變種使用方式,如:當(dāng)前示例中我們對(duì) Future 返回值的賦值只有一次,而實(shí)際上可以賦值多次,這樣通過一個(gè)返回對(duì)象在不同階段可以獲取到不同的返回值。
????????Java JUC 包中提供了用于支持 Future 模式的接口和類。Callable 接口聲明了 call() 方法,該方法和 Runnable 的 run() 方法類似,不同之處在于,call() 方法有返回值。Future 接口充當(dāng)了 Future 角色,聲明了 get() 方法用于獲取值,設(shè)置值的方法則需要對(duì)應(yīng)的實(shí)現(xiàn)類去實(shí)現(xiàn),同時(shí)還聲明了 cancel() 方法,用于中斷程序。FutureTask 類實(shí)現(xiàn)了 Future 和 Runnable 接口。
????????下面的代碼示例,我們利用 juc 包中已有的類來改造我們的示例程序:
public class FutureData extends FutureTask<RealData> implements Data {
public FutureData(Callable<RealData> callable) {
super(callable);
}
@Override
public String getContent() {
String content = null;
try {
content = get().getContent();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return content;
}
}
public class Host {
private static final ThreadFactory threadFactory = Executors.defaultThreadFactory();
public Data request(final int count, final char c) {
System.out.println("Request BEGIN");
FutureData futureData = new FutureData(new Callable<RealData>() {
@Override
public RealData call() throws Exception {
return new RealData(count, c);
}
});
threadFactory.newThread(futureData).start();
System.out.println("Request END");
return futureData;
}
}
????????在創(chuàng)建 FutureTask 類實(shí)例時(shí),Callable 對(duì)象作為構(gòu)造函數(shù)參數(shù)傳遞進(jìn)去,之后當(dāng)調(diào)用 FutureTask 的 run() 方法時(shí),那么構(gòu)造參數(shù)中接收的 Callable 對(duì)象的 call() 方法會(huì)被執(zhí)行,call() 方法會(huì)同步的獲取 call() 方法的返回值,然后通過 FutureTask 的 set() 方法來設(shè)置該返回值,如果 call() 方法發(fā)生了異常,則調(diào)用 FutureTask 的 setException() 方法設(shè)置的異常處理函數(shù)。之后當(dāng)我們需要時(shí)調(diào)用 FutureTask 的 get() 方法就能夠獲取到 call() 方法的返回值。
????????Executors 框架中通過 submit() 方法實(shí)現(xiàn)了 Future 模式,通過框架我們可以更加簡(jiǎn)單方便的使用 Future 模式,改造后的代碼如下:
public class Main {
public static void main(String[] args) {
Host host = new Host();
Future<RealData> data1 = host.request(10, 'A');
Future<RealData> data2 = host.request(20, 'B');
Future<RealData> data3 = host.request(30, 'C');
try {
Thread.sleep(2000);
System.out.println("data1 = "+data1.get().getContent());
System.out.println("data2 = "+data2.get().getContent());
System.out.println("data3 = "+data3.get().getContent());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace(); //當(dāng)執(zhí)行 call() 方法報(bào)錯(cuò)時(shí)
}
}
}
public class Host {
public Future<RealData> request(final int count, final char c) {
System.out.println("Request BEGIN");
//使用 Executors 框架的 Future 模式
Future<RealData> future = Executors.newFixedThreadPool(1).submit(new Callable<RealData>() {
@Override
public RealData call() throws Exception {
return new RealData(count, c);
}
});
System.out.println("Request END");
return future;
}
}
//這里只需要 RealData 類就夠了,其它類不再需要
二,兩階段終止模式
????????該模式通常用于優(yōu)雅的終止線程,它的意思是先執(zhí)行完終止處理程序再終止線程。

????????我們稱線程在進(jìn)行正常處理時(shí)的狀態(tài)為操作中。在要求停止該線程時(shí),我們發(fā)出終止請(qǐng)求,這樣線程不會(huì)突然終止,而是轉(zhuǎn)為終止處理中狀態(tài),然后執(zhí)行清理工作,完成之后就會(huì)真正的終止線程。
????????從操作中變?yōu)?strong>終止處理中這是終止的第一階段,在該階段下,線程不會(huì)再進(jìn)行正常操作,而是只執(zhí)行清理程序,清理完成之后,就會(huì)真正的終止線程,終止處理中狀態(tài)結(jié)束是線程終止的第二階段。
????????該模式的要點(diǎn)如下:
- 安全的終止線程
- 必定會(huì)終止線程
- 發(fā)起終止請(qǐng)求后會(huì)盡快進(jìn)行終止處理
????????下面是一個(gè)兩階段終止的演示程序:
/**
* @author koma <komazhang@foxmail.com>
* @date 2018-11-04
*/
public class Main {
public static void main(String[] args) {
try {
CounterThread counterThread = new CounterThread();
counterThread.start();
Thread.sleep(10000);
counterThread.shutdownRequest();
counterThread.join(); //主線程等待counterThread線程終止
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CounterThread extends Thread {
private long counter = 0;
private volatile boolean shutdownRequested = false;
/**
* 該方法是線程安全的
*
* 因?yàn)?shutdownRequested 只會(huì)被設(shè)置成 true 沒有別的方法會(huì)再把它設(shè)置成 false
* 不存在數(shù)據(jù)競(jìng)爭(zhēng),因此也就允許多線程調(diào)用
*
*/
public void shutdownRequest() {
shutdownRequested = true;
//給線程自己發(fā)出中斷信號(hào),確保線程在 sleep 或 wait 中也能正常響應(yīng)終止請(qǐng)求
interrupt();
}
public boolean isShutdownRequested() {
return shutdownRequested;
}
@Override
public void run() {
try {
while (!isShutdownRequested()) {//判斷終止請(qǐng)求
doWork();
}
} finally {
doShutdown();
}
}
private void doWork() {
counter++;
System.out.println("doWorker: counter = "+counter);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
private void doShutdown() {
System.out.println("doShutdown: counter = "+counter);
}
}
????????Executor 框架中的 ExecutorService 類提供了 shutdown() 方法來優(yōu)雅的終止線程。該類還提供了 isShutdown() 和 isTerminated() 來判斷線程是否終止和是否實(shí)際停止。其區(qū)別在于當(dāng)線程處于終止處理中狀態(tài)時(shí),isShutdown() 返回 true,而 isTerminated() 返回 false。
1,中斷狀態(tài)和 InterruptedException 異常
????????當(dāng) interrupt() 方法被調(diào)用之后,線程就可以被中斷了。中斷線程這個(gè)行為會(huì)帶來以下結(jié)果之一:
????????(1) 線程變?yōu)?strong>中斷狀態(tài),反映為"狀態(tài)"
????????(2) 拋出InterruptedException異常,反映為"控制"
????????通常情況下會(huì)是結(jié)果(1),但是當(dāng)線程正則 sleep,wait,join 時(shí)則會(huì)是結(jié)果(2)(這時(shí)線程不變?yōu)?strong>中斷狀態(tài))。
2,中斷狀態(tài) 轉(zhuǎn)換為 InterruptedException 異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
????????這段代碼可以把中斷狀態(tài)轉(zhuǎn)換為異常,其中 if 條件檢查的是當(dāng)前線程的中斷狀態(tài),同時(shí)會(huì)清除當(dāng)前線程的中斷狀態(tài),若不想清除線程的中斷狀態(tài),則可以調(diào)用 Thread.currentThread().isInterrupted() 方法。
3,InterruptedException異常 轉(zhuǎn)換為 中斷狀態(tài)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); //將捕獲到的中斷異常轉(zhuǎn)換為中斷狀態(tài)
}
????????這段代碼可以把中斷異常轉(zhuǎn)換為中斷狀態(tài),如果不想當(dāng)前線程的已經(jīng)被中斷這個(gè)狀態(tài)信息丟失的話可以使用這種方式,即線程再次中斷一次自己。
4,juc 包與線程同步
????????當(dāng)我們想讓某個(gè)線程等待指定的線程終止時(shí),可以調(diào)用欲等待線程的 join() 方法,但是由于 join() 方法可以等待的只是線程終止這個(gè)一次性操作,當(dāng)我們想要實(shí)現(xiàn)"等待指定次數(shù)的某種操作發(fā)生"這類需求時(shí),則需要借助 juc 包中的 CountDownLatch 類。
????????CountDownLatch 類只能進(jìn)行倒數(shù),當(dāng)想多次重復(fù)進(jìn)行線程同步時(shí),則使用 CyclicBarrier 會(huì)更加的方便。CyclicBarrier 可以周期性的創(chuàng)建屏障,在屏障解除之前,碰到屏障的線程無法繼續(xù)前進(jìn),屏障解除的條件是到達(dá)屏障處的線程數(shù),達(dá)到了構(gòu)造函數(shù)指定的個(gè)數(shù)。也就是說,當(dāng)指定個(gè)數(shù)的線程達(dá)到屏障處后,屏障就會(huì)被解除。
????????下面的代碼實(shí)現(xiàn)了"讓三個(gè)線程處理一項(xiàng)分為0~4共5個(gè)階段的工作"的功能,我們要求,除非三個(gè)線程都完成了第N個(gè)階段,否則哪個(gè)線程都不允許進(jìn)入到第N+1個(gè)階段:
/**
* @author koma <komazhang@foxmail.com>
* @date 2018-11-04
*/
public class Main {
private static final int THREADS = 3; //工作線程數(shù)
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
//屏障解除時(shí)的操作
Runnable barrierAction = new Runnable() {
@Override
public void run() {
System.out.println("barrier action");
}
};
//用來設(shè)定在屏障處等待的線程數(shù)量
CyclicBarrier cyclicBarrier = new CyclicBarrier(THREADS, barrierAction);
//用來設(shè)定等待結(jié)束線程的數(shù)量
CountDownLatch countDownLatch = new CountDownLatch(THREADS);
try {
for (int i = 0; i < THREADS; i++) {
executorService.execute(new MyTask(cyclicBarrier, countDownLatch, i));
}
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
public class MyTask implements Runnable {
private static final int PAHSE = 5;
private final CyclicBarrier pahseBarrier;
private final CountDownLatch downLatch;
private final int context;
private static final Random random = new Random(315246);
public MyTask(CyclicBarrier pahseBarrier, CountDownLatch downLatch, int context) {
this.pahseBarrier = pahseBarrier;
this.downLatch = downLatch;
this.context = context;
}
@Override
public void run() {
try {
//任務(wù)分 PAHSE 個(gè)階段執(zhí)行,每個(gè)線程在進(jìn)入到某一階段之后就等待其它進(jìn)入該階段的線程
//當(dāng)每個(gè)階段的線程數(shù)達(dá)到設(shè)定的數(shù)量時(shí),該階段屏障解除,所有線程進(jìn)入到下一階段
//pahseBarrier.await() 可以用來循環(huán)的創(chuàng)建屏障
for (int phase = 0; phase < PAHSE; phase++) {
doPhase(phase);
pahseBarrier.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
downLatch.countDown();
}
}
protected void doPhase(int phase) {
String name = Thread.currentThread().getName();
System.out.println(name+"-MyTask BEGIN, context = "+context+", phase = "+phase);
try {
Thread.sleep(random.nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(name+"-MyTask END, context = "+context+", phase = "+phase);
}
}
}