異步回調(diào)模式是高并發(fā)下的核心模式,本部分對異步回調(diào)進(jìn)行詳細(xì)介紹。
泡茶案例
本部分從一個(gè)很好理解的異步生活示例-泡茶開始。為了異步執(zhí)行泡茶流程,分別涉及三個(gè)線程:泡茶線程(主線程)、燒水線程和清洗線程。泡茶線程的工作是:啟動清洗線程、啟動燒水線程,等清洗、燒水的工作完成后,泡茶喝;清洗線程的工作是:洗茶壺、洗茶杯;燒水線程的工作是:洗好水壺、灌上涼水,放在火上,一直等水燒開。
下面分別使用阻塞模式、回調(diào)模式實(shí)現(xiàn)泡茶案例。
join:異步阻塞
join操作的原理是阻塞當(dāng)前的線程,直到待合并的目標(biāo)線程執(zhí)行完成。
線程的合并流程
Java 中線程的合并流程是,假設(shè)線程A調(diào)用線程B的join方法合并B線程,那么線程A進(jìn)入阻塞狀態(tài),直到線程B執(zhí)行完成。
調(diào)用join實(shí)現(xiàn)異步泡茶喝
調(diào)用join實(shí)現(xiàn)泡茶喝是一個(gè)異步阻塞版本,具體的代碼實(shí)現(xiàn)如下:
public class JoinDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}
static class HotWaterThread extends Thread {
public HotWaterThread() {
super("**燒水-Thread");
}
public void run() {
try {
System.out.println("洗好水壺");
System.out.println("灌上涼水");
System.out.println("放在火上");
Thread.sleep(SLEEP_GAP); // 燒水
System.out.println("水開了!");
} catch (InterruptedException e) {
System.out.println("發(fā)生異常被中斷。");
}
System.out.println("運(yùn)行結(jié)束。");
}
}
static class WashThread extends Thread {
public WashThread() {
super("$$ 清洗-Thread");
}
public void run() {
try {
System.out.println("洗茶壺");
System.out.println("洗茶杯");
System.out.println("拿茶葉");
Thread.sleep(SLEEP_GAP); // 清洗中
System.out.println("洗完了!");
} catch(InterruptedException e) {
System.out.println("發(fā)生異常被中斷。");
}
System.out.println("運(yùn)行結(jié)束。");
}
}
public static void main(String[] args) {
Thread hThread = new HotWaterThread();
Thread wThread = new WashThread();
hThread.start();
wThread.start();
//...在等待燒水和清洗時(shí),可以干點(diǎn)其它事情
try {
// 合并燒水-線程
hThread.join();
// 合并清洗-線程
wThread.join();
Thread.currentThread().setName("主線程");
System.out.println("泡茶喝");
} catch (InterruptedException e) {
System.out.println(getCurThreadName() +"發(fā)生異常被中斷。");
}
System.out.println(getCurThreadName() + " 運(yùn)行結(jié)束。");
}
}
join方法詳解
join方法要注意幾點(diǎn):
- join是實(shí)例方法不是靜態(tài)方法,需要使用線程對象去調(diào)用。
- 調(diào)用join時(shí),不是thread所指向的目標(biāo)線程阻塞,而是當(dāng)前線程阻塞。
- 只有等到thread所指向的線程執(zhí)行完成或者超時(shí),當(dāng)前線程才能啟動執(zhí)行。
join有一個(gè)問題,被合并線程沒有返回值。如果要獲得異步線程的執(zhí)行結(jié)果,可以使用java的FutureTask系列類。
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0); // 阻塞當(dāng)前線程
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay); // 限時(shí)阻塞當(dāng)前線程
now = System.currentTimeMillis() - base;
}
}
}
實(shí)現(xiàn)原理是不停地檢查join線程是否存活,如果join線程存活,wait(0)就永遠(yuǎn)等下去,直到j(luò)oin線程終止后,線程的this.nofifyAll()方法會被調(diào)用,join方法將退出循環(huán),恢復(fù)業(yè)務(wù)邏輯執(zhí)行,很明顯這種循環(huán)檢查的方式比較低微。
join方法缺少靈活性,實(shí)際項(xiàng)目中很少自己單獨(dú)創(chuàng)建線程,而是使用Executor。
FutureTask:異步調(diào)用
為了獲取異步線程的返回結(jié)果,Java在1.5版本之后提供了一種新的多線程創(chuàng)建方式——FutureTask。
通過 FutureTask 獲取異步執(zhí)行結(jié)果的步驟
通過FutureTask類和Callable接口的聯(lián)合使用可以創(chuàng)建能獲取異步執(zhí)行結(jié)果的線程。具體的步驟重復(fù)介紹如下:
- 創(chuàng)建一個(gè)Callable接口實(shí)現(xiàn)類,并實(shí)現(xiàn)它的call方法,編寫號異步執(zhí)行的具體邏輯,并且可以有返回值。
- 使用Callable實(shí)現(xiàn)類的實(shí)例構(gòu)造一個(gè)FutureTask實(shí)例。
- 使用FutureTask實(shí)例作為Thread構(gòu)造器的target入?yún)?,?gòu)造新的Thread新的線程實(shí)例。
- 調(diào)用Thread實(shí)例的start方法啟動新線程,啟動新線程的run方法并發(fā)執(zhí)行。其內(nèi)部的執(zhí)行過程為:啟動Thread實(shí)例的run方法并發(fā)執(zhí)行后,會執(zhí)行FutureTask實(shí)例的run方法,最終會并發(fā)執(zhí)行Callable實(shí)現(xiàn)類的call方法。
- 調(diào)用FutureTask對象的get方法阻塞性地獲得并發(fā)線程的執(zhí)行結(jié)果。
使用FutureTask實(shí)現(xiàn)異步泡茶喝
看一下 FutureTask 的版本。
public class JavaFutureDemo {
public static final int SLEEP_GAP = 500;
public static String getCurThreadName() {
return Thread.currentThread().getName();
}
static class HotWaterJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
System.out.println("洗好水壺");
System.out.println("灌上涼水");
System.out.println("放在火上");
Thread.sleep(SLEEP_GAP); // 燒水
System.out.println("水開了!");
} catch (InterruptedException e) {
System.out.println("發(fā)生異常被中斷。");
return false;
}
System.out.println("運(yùn)行結(jié)束。");
return true;
}
}
static class WashJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
System.out.println("洗茶壺");
System.out.println("洗茶杯");
System.out.println("拿茶葉");
Thread.sleep(SLEEP_GAP); // 清洗中
System.out.println("洗完了!");
} catch(InterruptedException e) {
System.out.println("發(fā)生異常被中斷。");
return false;
}
System.out.println("運(yùn)行結(jié)束。");
return true;
}
}
public static void drinkTea(boolean waterOk, boolean cupOk) {
if(waterOk && cupOk) {
System.out.println("泡茶喝");
} else if(!waterOk) {
System.out.println("燒水失敗,沒有茶喝了");
} else if(!cupOk) {
System.out.println("杯子洗不了,沒有茶喝了");
}
}
public static void main(String[] args) {
Thread.currentThread().setName("主線程");
Callable<Boolean> hJob = new HotWaterJob();
FutureTask<Boolean> hTask = new FutureTask<>(hJob);
Thread hThread = new Thread(hTask, "** 燒水-Thread");
Callable<Boolean> wJob = new WashJob();
FutureTask<Boolean> wTask = new FutureTask<>(wJob);
Thread wThread = new Thread(wTask, "$$ 清洗-Thread");
hThread.start();
wThread.start();
try {
boolean waterOk = hTask.get();
boolean cupOk = wTask.get();
drinkTea(waterOk, cupOk);
} catch (InterruptedException e) {
System.out.println(getCurThreadName() + "發(fā)生異常被中斷。");
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(getCurThreadName() + "運(yùn)行結(jié)束.");
}
}
FutureTask比join線程合并操作更加高明,能取得異步線程的結(jié)果。但是通過FutureTask的get方法獲取異步結(jié)果時(shí),主線程也會被阻塞,它們都是異步阻塞模式。
異步回調(diào)與主動調(diào)用
回調(diào)是一種反向的調(diào)用模式,被調(diào)用方在執(zhí)行完成后,會反向執(zhí)行調(diào)用方所設(shè)置的鉤子方法。
實(shí)質(zhì)上,在回調(diào)模式中負(fù)責(zé)執(zhí)行回調(diào)方法而具體線程已經(jīng)不再是調(diào)用方的線程,而是變成了異步的被調(diào)用方的線程。
Java中回調(diào)模式的標(biāo)準(zhǔn)實(shí)現(xiàn)類為CompletableFuture,由于該類出現(xiàn)比較晚,因此很多中間件如Guava、Netty等都提供了自己的異步回調(diào)模式API。
Guava的異步回調(diào)模式
Guava是Google提供的Java擴(kuò)展包,它提供了一種異步回調(diào)的解決方案。
FutureCallback
為了實(shí)現(xiàn)異步回調(diào)方式獲取一步線程的結(jié)果,Guava做了以下增強(qiáng):
引入了一個(gè)新的接口ListenableFuture,繼承了Java的Future接口,是的Java的Future異步任務(wù)在Guava中能被監(jiān)控和非阻塞獲取異步結(jié)果。
引入了一個(gè)新的接口 FutureCallback,這是一個(gè)獨(dú)立的新接口。該接口的目的是在異步任務(wù)執(zhí)行完成后,根據(jù)異步結(jié)果完成不同的回調(diào)處理,并且可以處理異步結(jié)果。
FutureCallback 是一個(gè)新增接口,用來填寫異步任務(wù)執(zhí)行完成后的監(jiān)聽邏輯。FutureCallback有兩個(gè)回調(diào)方法:
- onSuccess方法,在異步任務(wù)執(zhí)行成功后被回調(diào)。調(diào)用時(shí),異步任務(wù)的執(zhí)行結(jié)果作為onSuccess的參數(shù)被傳入。
- onFailure方法,在異步任務(wù)執(zhí)行過程中拋出異常時(shí)被回調(diào)。調(diào)用時(shí),異步任務(wù)所拋出的異常作為onFailure方法的參數(shù)被傳入。
FutureCallback源碼如下:
public interface FutureCallback<V> {
void onSuccess(@Nullable V var1);
void onFailure(Throwable var1);
}
注意,Guava的FutureCallback與Java的Callback名字相近,實(shí)質(zhì)不同,存在本質(zhì)區(qū)別:
- Java的Callback接口代表的是異步執(zhí)行的邏輯。
- Guava的FutureCallback接口代表的是Callable異步邏輯執(zhí)行完成之后,根據(jù)成功或者異常兩種情形所需要執(zhí)行的善后工作。
詳解ListenableFuture
Guava的ListenableFuture接口是對Java的Future接口的擴(kuò)展,可以理解為異步任務(wù)的實(shí)例。
public interface ListenableFuture<V> extends Future<V> {
// 此方法由Guava內(nèi)部調(diào)用
void addListener(Runnable r, Executor e);
}
addListener方法的作用是將FutureCallback的回調(diào)邏輯封裝成一個(gè)內(nèi)部的Runnable異步回調(diào)任務(wù),它只在Guava內(nèi)部調(diào)用。
如果想要將FutureCallback回調(diào)邏輯綁定到異步ListenerFuture任務(wù),可以使用Guava的Futures工具類,它包含一個(gè)addCallback方法。
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean r) {
...
}
public void onFailure(Throwable t) {
...
}
});
ListenableFuture 異步任務(wù)
如果要獲取Guava的ListenableFuture異步任務(wù)實(shí)例,主要通過向Guava自己的線程池中獲取。
Guava線程池是對Java線程池的一種裝飾。創(chuàng)建Guava線程池的方法如下:
// Java線程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
// Guava線程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
獲取異步任務(wù)實(shí)例的方法是通過向線程池提交 Callable 業(yè)務(wù)邏輯來實(shí)現(xiàn),代碼如下:
// submit方法提交任務(wù),返回異步任務(wù)實(shí)例
ListenableFuture<Boolean> hFuture = gPool.suubmit(hJob);
// 綁定回調(diào)實(shí)例
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
// 實(shí)現(xiàn)回調(diào)
...
});
總結(jié)一下,Guava異步回調(diào)的流程如下:
實(shí)現(xiàn)Java的Callable接口,創(chuàng)建異步執(zhí)行邏輯。如果不需要返回值,異步執(zhí)行邏輯也可以實(shí)現(xiàn)Runnable接口。
創(chuàng)建Guava線程池。
將步驟1創(chuàng)建的Callable/Runnable異步執(zhí)行邏輯的實(shí)例提交到Guava線程池,從而獲取ListenableFuture異步任務(wù)實(shí)例。
創(chuàng)建FutureCallback回調(diào)實(shí)例,通過Futures.addCallback將都回調(diào)實(shí)例綁定到ListenableFuture異步任務(wù)上。
使用Guava實(shí)現(xiàn)泡茶案例
Guava異步回調(diào)版本的泡茶代碼如下:
public class GuavaDemo {
public static final int SLEEP_GAP = 3000;
static class HotWaterJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
System.out.println("洗好水壺");
System.out.println("灌上涼水");
System.out.println("放在火上");
Thread.sleep(SLEEP_GAP); // 燒水
System.out.println("水開了!");
} catch (InterruptedException e) {
System.out.println("發(fā)生異常被中斷。");
return false;
}
System.out.println("運(yùn)行結(jié)束。");
return true;
}
}
static class WashJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
System.out.println("洗茶壺");
System.out.println("洗茶杯");
System.out.println("拿茶葉");
Thread.sleep(SLEEP_GAP); // 清洗中
System.out.println("洗完了!");
} catch(InterruptedException e) {
System.out.println("發(fā)生異常被中斷。");
return false;
}
System.out.println("運(yùn)行結(jié)束。");
return true;
}
}
static class DrinkJob {
boolean waterOk = false;
boolean cupOk= false;
// 泡茶喝,回調(diào)方法
public void drinkTea() {
if(waterOk && cupOk) {
System.out.println("泡茶喝,茶喝完");
this.waterOk = false;
}
}
}
public static void main(String[] args) {
Thread.currentThread().setName("泡茶喝線程");
// 新奇一個(gè)線程,作為泡茶主線程
DrinkJob drinkJob = new DrinkJob();
Callable<Boolean> hotJob = new HotWaterJob();
Callable<Boolean> washJob = new WashJob();
ExecutorService jPool = Executors.newFixedThreadPool(10);
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
FutureCallback<Boolean> hotWaterHook = new FutureCallback<Boolean>() {
public void onSuccess(Boolean r) {
if(r) {
drinkJob.waterOk = true;
// 執(zhí)行回調(diào)方法
drinkJob.drinkTea();
}
}
public void onFailure(Throwable t) {
System.out.println("燒水失敗,內(nèi)有茶喝了");
}
};
// 啟動燒水線程
ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
// 設(shè)置燒水任務(wù)的回調(diào)鉤子
Futures.addCallback(hotFuture, hotWaterHook, gPool);
// 啟動清洗線程
ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean r) {
if(r) {
drinkJob.cupOk = true;
// 執(zhí)行回調(diào)方法
drinkJob.drinkTea();
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("杯子洗不了,沒有茶喝了");
}
}, jPool);
System.out.println("干點(diǎn)其它事情...");
try {
Thread.sleep(1000);
} catch(Exception e) {
e.printStackTrace();
}
System.out.println("執(zhí)行完成");
}
}
Guava 異步回調(diào)和Java異步調(diào)用的區(qū)別
二者區(qū)別如下:
- FutureTask是主動調(diào)用的模式,調(diào)用線程主動獲得異步結(jié)果,在獲取異步結(jié)果時(shí)處于阻塞狀態(tài),并且會一直阻塞,直到拿到異步線程的結(jié)果。
- Guava是異步回調(diào)模式,調(diào)用線程不會主動獲得異步結(jié)果,而是準(zhǔn)備好回調(diào)函數(shù)。當(dāng)回調(diào)函數(shù)被執(zhí)行時(shí),調(diào)用線程可能已經(jīng)結(jié)束很久了。
Netty的異步回調(diào)模式
Netty對Java Future異步任務(wù)的擴(kuò)展如下:
繼承Java的Future接口得到了一個(gè)新的屬于Netty自己的Future異步任務(wù)接口,該接口對原有的接口進(jìn)行了增強(qiáng),使得Netty異步任務(wù)能夠非阻塞地處理回調(diào)結(jié)果。
引入一個(gè)新接口——GenericFutureListener,用于表示異步執(zhí)行完成的監(jiān)聽器。
GenericFutureListener接口詳解
GenericFutureListener接口源碼如下:
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
// 監(jiān)聽器的回調(diào)方法
void operationComplete(F var1) throws Exception;
}
其中擁有一個(gè)回調(diào)方法operationComplete,表示異步任務(wù)操作完成。在Future異步任務(wù)執(zhí)行完成后將回調(diào)此方法。
GenericFutureListener的父接口EventListener是一個(gè)空接口,沒有任何抽象方法,僅僅起到標(biāo)識作用。
Netty的Future接口詳解
Netty的Future接口擴(kuò)展了一系列方法,對執(zhí)行的過程進(jìn)行監(jiān)控,對異步回調(diào)完成事件進(jìn)行Listen監(jiān)聽并且回調(diào)。Netty的Future源碼如下:
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess(); // 判斷異步執(zhí)行是否成功
boolean isCancellable(); // 判斷異步執(zhí)行是否取消
boolean cause; // 獲取異步任務(wù)異常的原因
// 增加異步任務(wù)執(zhí)行完成Listener監(jiān)聽器
Future<V> addListener(GenericFutureListener<? extends Future<? superV>> listener);
// 移除異步任務(wù)執(zhí)行完成Listener監(jiān)聽器
Future<V> removeListener(GenericFutureListener<? extends Future<? superV>> listener);
...
}
Netty的Future一般不直接使用,而是使用它的子接口,例如ChannelFuture接口。
ChannelFuture的使用
Netty網(wǎng)絡(luò)連接都是異步回調(diào)的,會返回一個(gè)ChannelFuture接口的實(shí)例。
// connect是異步的
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.sample.com", 80));
// 回調(diào)方法
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()) {
System.out.println("connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
Netty的出站和入站異步回調(diào)
下面以經(jīng)典的NIO出站操作write為例說明ChannelFuture的使用。
ChannelFuture future = ctx.channel().write(msg);
// 為異步任務(wù)加上監(jiān)聽器
future.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
//write操作完成后的回調(diào)代碼
}
}
);
在write操作完成后立即返回,返回的是一個(gè)ChannelFuture接口的實(shí)例。通過這個(gè)實(shí)例可以綁定異步回調(diào)監(jiān)聽器,編寫異步回調(diào)的邏輯。