JAVA 多線程與高并發(fā)學(xué)習(xí)筆記(十七)——異步回調(diào)

異步回調(diào)模式是高并發(fā)下的核心模式,本部分對異步回調(diào)進(jìn)行詳細(xì)介紹。

泡茶案例

本部分從一個(gè)很好理解的異步生活示例-泡茶開始。為了異步執(zhí)行泡茶流程,分別涉及三個(gè)線程:泡茶線程(主線程)、燒水線程和清洗線程。泡茶線程的工作是:啟動清洗線程、啟動燒水線程,等清洗、燒水的工作完成后,泡茶喝;清洗線程的工作是:洗茶壺、洗茶杯;燒水線程的工作是:洗好水壺、灌上涼水,放在火上,一直等水燒開。

下面分別使用阻塞模式、回調(diào)模式實(shí)現(xiàn)泡茶案例。

join:異步阻塞

join操作的原理是阻塞當(dāng)前的線程,直到待合并的目標(biāo)線程執(zhí)行完成。

線程的合并流程

Java 中線程的合并流程是,假設(shè)線程A調(diào)用線程B的join方法合并B線程,那么線程A進(jìn)入阻塞狀態(tài),直到線程B執(zhí)行完成。

調(diào)用join實(shí)現(xiàn)異步泡茶喝

調(diào)用join實(shí)現(xiàn)泡茶喝是一個(gè)異步阻塞版本,具體的代碼實(shí)現(xiàn)如下:


public class JoinDemo {
    public static final int SLEEP_GAP = 500;
    public static String getCurThreadName() {
        return Thread.currentThread().getName();
    }

    static class HotWaterThread extends Thread {
        public HotWaterThread() {
            super("**燒水-Thread");
        }
        public void run() {
            try {
                System.out.println("洗好水壺");
                System.out.println("灌上涼水");
                System.out.println("放在火上");
                Thread.sleep(SLEEP_GAP); // 燒水
                System.out.println("水開了!");
            } catch (InterruptedException e) {
                System.out.println("發(fā)生異常被中斷。");
            }
            System.out.println("運(yùn)行結(jié)束。");
        }
    }

    static class WashThread extends Thread {
        public WashThread() {
            super("$$ 清洗-Thread");
        }
        public void run() {
            try {
                System.out.println("洗茶壺");
                System.out.println("洗茶杯");
                System.out.println("拿茶葉");
                Thread.sleep(SLEEP_GAP); // 清洗中
                System.out.println("洗完了!");
            } catch(InterruptedException e) {
                System.out.println("發(fā)生異常被中斷。");
            }
            System.out.println("運(yùn)行結(jié)束。");
        }
    }

    public static void main(String[] args) {
        Thread hThread = new HotWaterThread();
        Thread wThread = new WashThread();
        hThread.start();
        wThread.start();

        //...在等待燒水和清洗時(shí),可以干點(diǎn)其它事情

        try {
            // 合并燒水-線程
            hThread.join();
            // 合并清洗-線程
            wThread.join();
            Thread.currentThread().setName("主線程");
            System.out.println("泡茶喝");
        } catch (InterruptedException e) {
            System.out.println(getCurThreadName() +"發(fā)生異常被中斷。");
        }
        System.out.println(getCurThreadName() + " 運(yùn)行結(jié)束。");
    }
}

join方法詳解

join方法要注意幾點(diǎn):

  • join是實(shí)例方法不是靜態(tài)方法,需要使用線程對象去調(diào)用。
  • 調(diào)用join時(shí),不是thread所指向的目標(biāo)線程阻塞,而是當(dāng)前線程阻塞。
  • 只有等到thread所指向的線程執(zhí)行完成或者超時(shí),當(dāng)前線程才能啟動執(zhí)行。

join有一個(gè)問題,被合并線程沒有返回值。如果要獲得異步線程的執(zhí)行結(jié)果,可以使用java的FutureTask系列類。

 public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0); // 阻塞當(dāng)前線程
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay); // 限時(shí)阻塞當(dāng)前線程
            now = System.currentTimeMillis() - base;
        }
    }
}

實(shí)現(xiàn)原理是不停地檢查join線程是否存活,如果join線程存活,wait(0)就永遠(yuǎn)等下去,直到j(luò)oin線程終止后,線程的this.nofifyAll()方法會被調(diào)用,join方法將退出循環(huán),恢復(fù)業(yè)務(wù)邏輯執(zhí)行,很明顯這種循環(huán)檢查的方式比較低微。

join方法缺少靈活性,實(shí)際項(xiàng)目中很少自己單獨(dú)創(chuàng)建線程,而是使用Executor。

FutureTask:異步調(diào)用

為了獲取異步線程的返回結(jié)果,Java在1.5版本之后提供了一種新的多線程創(chuàng)建方式——FutureTask。

通過 FutureTask 獲取異步執(zhí)行結(jié)果的步驟

通過FutureTask類和Callable接口的聯(lián)合使用可以創(chuàng)建能獲取異步執(zhí)行結(jié)果的線程。具體的步驟重復(fù)介紹如下:

  1. 創(chuàng)建一個(gè)Callable接口實(shí)現(xiàn)類,并實(shí)現(xiàn)它的call方法,編寫號異步執(zhí)行的具體邏輯,并且可以有返回值。
  2. 使用Callable實(shí)現(xiàn)類的實(shí)例構(gòu)造一個(gè)FutureTask實(shí)例。
  3. 使用FutureTask實(shí)例作為Thread構(gòu)造器的target入?yún)?,?gòu)造新的Thread新的線程實(shí)例。
  4. 調(diào)用Thread實(shí)例的start方法啟動新線程,啟動新線程的run方法并發(fā)執(zhí)行。其內(nèi)部的執(zhí)行過程為:啟動Thread實(shí)例的run方法并發(fā)執(zhí)行后,會執(zhí)行FutureTask實(shí)例的run方法,最終會并發(fā)執(zhí)行Callable實(shí)現(xiàn)類的call方法。
  5. 調(diào)用FutureTask對象的get方法阻塞性地獲得并發(fā)線程的執(zhí)行結(jié)果。

使用FutureTask實(shí)現(xiàn)異步泡茶喝

看一下 FutureTask 的版本。


public class JavaFutureDemo {

    public static final int SLEEP_GAP = 500;
    public static String getCurThreadName() {
        return Thread.currentThread().getName();
    }

    static class HotWaterJob implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            try {
                System.out.println("洗好水壺");
                System.out.println("灌上涼水");
                System.out.println("放在火上");
                Thread.sleep(SLEEP_GAP); // 燒水
                System.out.println("水開了!");
            } catch (InterruptedException e) {
                System.out.println("發(fā)生異常被中斷。");
                return false;
            }
            System.out.println("運(yùn)行結(jié)束。");
            return true;
        }
    }

    static class WashJob implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            try {
                System.out.println("洗茶壺");
                System.out.println("洗茶杯");
                System.out.println("拿茶葉");
                Thread.sleep(SLEEP_GAP); // 清洗中
                System.out.println("洗完了!");
            } catch(InterruptedException e) {
                System.out.println("發(fā)生異常被中斷。");
                return false;
            }
            System.out.println("運(yùn)行結(jié)束。");
            return true;
        }
    }

    public static void drinkTea(boolean waterOk, boolean cupOk) {
        if(waterOk && cupOk) {
            System.out.println("泡茶喝");
        } else if(!waterOk) {
            System.out.println("燒水失敗,沒有茶喝了");
        } else if(!cupOk) {
            System.out.println("杯子洗不了,沒有茶喝了");
        }
    }

    public static void main(String[] args) {
        Thread.currentThread().setName("主線程");
        Callable<Boolean> hJob = new HotWaterJob();
        FutureTask<Boolean> hTask = new FutureTask<>(hJob);
        Thread hThread = new Thread(hTask, "** 燒水-Thread");

        Callable<Boolean> wJob = new WashJob();
        FutureTask<Boolean> wTask = new FutureTask<>(wJob);
        Thread wThread = new Thread(wTask, "$$ 清洗-Thread");
        hThread.start();
        wThread.start();

        try {
            boolean waterOk = hTask.get();
            boolean cupOk = wTask.get();
            drinkTea(waterOk, cupOk);
        } catch (InterruptedException e) {
            System.out.println(getCurThreadName() + "發(fā)生異常被中斷。");
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println(getCurThreadName() + "運(yùn)行結(jié)束.");
    }

}

FutureTask比join線程合并操作更加高明,能取得異步線程的結(jié)果。但是通過FutureTask的get方法獲取異步結(jié)果時(shí),主線程也會被阻塞,它們都是異步阻塞模式。

異步回調(diào)與主動調(diào)用

回調(diào)是一種反向的調(diào)用模式,被調(diào)用方在執(zhí)行完成后,會反向執(zhí)行調(diào)用方所設(shè)置的鉤子方法。

實(shí)質(zhì)上,在回調(diào)模式中負(fù)責(zé)執(zhí)行回調(diào)方法而具體線程已經(jīng)不再是調(diào)用方的線程,而是變成了異步的被調(diào)用方的線程。

Java中回調(diào)模式的標(biāo)準(zhǔn)實(shí)現(xiàn)類為CompletableFuture,由于該類出現(xiàn)比較晚,因此很多中間件如Guava、Netty等都提供了自己的異步回調(diào)模式API。

Guava的異步回調(diào)模式

Guava是Google提供的Java擴(kuò)展包,它提供了一種異步回調(diào)的解決方案。

FutureCallback

為了實(shí)現(xiàn)異步回調(diào)方式獲取一步線程的結(jié)果,Guava做了以下增強(qiáng):

  • 引入了一個(gè)新的接口ListenableFuture,繼承了Java的Future接口,是的Java的Future異步任務(wù)在Guava中能被監(jiān)控和非阻塞獲取異步結(jié)果。

  • 引入了一個(gè)新的接口 FutureCallback,這是一個(gè)獨(dú)立的新接口。該接口的目的是在異步任務(wù)執(zhí)行完成后,根據(jù)異步結(jié)果完成不同的回調(diào)處理,并且可以處理異步結(jié)果。

FutureCallback 是一個(gè)新增接口,用來填寫異步任務(wù)執(zhí)行完成后的監(jiān)聽邏輯。FutureCallback有兩個(gè)回調(diào)方法:

  • onSuccess方法,在異步任務(wù)執(zhí)行成功后被回調(diào)。調(diào)用時(shí),異步任務(wù)的執(zhí)行結(jié)果作為onSuccess的參數(shù)被傳入。
  • onFailure方法,在異步任務(wù)執(zhí)行過程中拋出異常時(shí)被回調(diào)。調(diào)用時(shí),異步任務(wù)所拋出的異常作為onFailure方法的參數(shù)被傳入。

FutureCallback源碼如下:

public interface FutureCallback<V> {
    void onSuccess(@Nullable V var1);
    void onFailure(Throwable var1);
}

注意,Guava的FutureCallback與Java的Callback名字相近,實(shí)質(zhì)不同,存在本質(zhì)區(qū)別:

  • Java的Callback接口代表的是異步執(zhí)行的邏輯。
  • Guava的FutureCallback接口代表的是Callable異步邏輯執(zhí)行完成之后,根據(jù)成功或者異常兩種情形所需要執(zhí)行的善后工作。

詳解ListenableFuture

Guava的ListenableFuture接口是對Java的Future接口的擴(kuò)展,可以理解為異步任務(wù)的實(shí)例。

public interface ListenableFuture<V> extends Future<V> {
    // 此方法由Guava內(nèi)部調(diào)用
    void addListener(Runnable r, Executor e);
}

addListener方法的作用是將FutureCallback的回調(diào)邏輯封裝成一個(gè)內(nèi)部的Runnable異步回調(diào)任務(wù),它只在Guava內(nèi)部調(diào)用。

如果想要將FutureCallback回調(diào)邏輯綁定到異步ListenerFuture任務(wù),可以使用Guava的Futures工具類,它包含一個(gè)addCallback方法。

Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
    public void onSuccess(Boolean r) {
        ...
    }
    public void onFailure(Throwable t) {
        ...
    }
});

ListenableFuture 異步任務(wù)

如果要獲取Guava的ListenableFuture異步任務(wù)實(shí)例,主要通過向Guava自己的線程池中獲取。

Guava線程池是對Java線程池的一種裝飾。創(chuàng)建Guava線程池的方法如下:

// Java線程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
// Guava線程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);

獲取異步任務(wù)實(shí)例的方法是通過向線程池提交 Callable 業(yè)務(wù)邏輯來實(shí)現(xiàn),代碼如下:

// submit方法提交任務(wù),返回異步任務(wù)實(shí)例
ListenableFuture<Boolean> hFuture = gPool.suubmit(hJob);
// 綁定回調(diào)實(shí)例
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
    // 實(shí)現(xiàn)回調(diào)
    ...
});

總結(jié)一下,Guava異步回調(diào)的流程如下:

  1. 實(shí)現(xiàn)Java的Callable接口,創(chuàng)建異步執(zhí)行邏輯。如果不需要返回值,異步執(zhí)行邏輯也可以實(shí)現(xiàn)Runnable接口。

  2. 創(chuàng)建Guava線程池。

  3. 將步驟1創(chuàng)建的Callable/Runnable異步執(zhí)行邏輯的實(shí)例提交到Guava線程池,從而獲取ListenableFuture異步任務(wù)實(shí)例。

  4. 創(chuàng)建FutureCallback回調(diào)實(shí)例,通過Futures.addCallback將都回調(diào)實(shí)例綁定到ListenableFuture異步任務(wù)上。

使用Guava實(shí)現(xiàn)泡茶案例

Guava異步回調(diào)版本的泡茶代碼如下:


public class GuavaDemo {

    public static final int SLEEP_GAP = 3000;

    static class HotWaterJob implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            try {
                System.out.println("洗好水壺");
                System.out.println("灌上涼水");
                System.out.println("放在火上");
                Thread.sleep(SLEEP_GAP); // 燒水
                System.out.println("水開了!");
            } catch (InterruptedException e) {
                System.out.println("發(fā)生異常被中斷。");
                return false;
            }
            System.out.println("運(yùn)行結(jié)束。");
            return true;
        }
    }

    static class WashJob implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            try {
                System.out.println("洗茶壺");
                System.out.println("洗茶杯");
                System.out.println("拿茶葉");
                Thread.sleep(SLEEP_GAP); // 清洗中
                System.out.println("洗完了!");
            } catch(InterruptedException e) {
                System.out.println("發(fā)生異常被中斷。");
                return false;
            }
            System.out.println("運(yùn)行結(jié)束。");
            return true;
        }
    }

    static class DrinkJob {
        boolean waterOk = false;
        boolean cupOk= false;
        // 泡茶喝,回調(diào)方法
        public void drinkTea() {
            if(waterOk && cupOk) {
                System.out.println("泡茶喝,茶喝完");
                this.waterOk = false;
            }
        }

    }

    public static void main(String[] args) {
        Thread.currentThread().setName("泡茶喝線程");

        // 新奇一個(gè)線程,作為泡茶主線程
        DrinkJob drinkJob = new DrinkJob();

        Callable<Boolean> hotJob = new HotWaterJob();
        Callable<Boolean> washJob = new WashJob();

        ExecutorService jPool = Executors.newFixedThreadPool(10);
        ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);

        FutureCallback<Boolean> hotWaterHook = new FutureCallback<Boolean>() {
            public void onSuccess(Boolean r) {
                if(r) {
                    drinkJob.waterOk = true;
                    // 執(zhí)行回調(diào)方法
                    drinkJob.drinkTea();
                }
            }

            public void onFailure(Throwable t) {
                System.out.println("燒水失敗,內(nèi)有茶喝了");
            }
        };

        // 啟動燒水線程
        ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
        // 設(shè)置燒水任務(wù)的回調(diào)鉤子
        Futures.addCallback(hotFuture, hotWaterHook, gPool);

        // 啟動清洗線程
        ListenableFuture<Boolean> washFuture = gPool.submit(washJob);

        Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(@Nullable Boolean r) {
                if(r) {
                    drinkJob.cupOk = true;
                    // 執(zhí)行回調(diào)方法
                    drinkJob.drinkTea();
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("杯子洗不了,沒有茶喝了");
            }
        }, jPool);

        System.out.println("干點(diǎn)其它事情...");
        try {
            Thread.sleep(1000);
        } catch(Exception e) {
            e.printStackTrace();
        }
        System.out.println("執(zhí)行完成");
    }
}

Guava 異步回調(diào)和Java異步調(diào)用的區(qū)別

二者區(qū)別如下:

  • FutureTask是主動調(diào)用的模式,調(diào)用線程主動獲得異步結(jié)果,在獲取異步結(jié)果時(shí)處于阻塞狀態(tài),并且會一直阻塞,直到拿到異步線程的結(jié)果。
  • Guava是異步回調(diào)模式,調(diào)用線程不會主動獲得異步結(jié)果,而是準(zhǔn)備好回調(diào)函數(shù)。當(dāng)回調(diào)函數(shù)被執(zhí)行時(shí),調(diào)用線程可能已經(jīng)結(jié)束很久了。

Netty的異步回調(diào)模式

Netty對Java Future異步任務(wù)的擴(kuò)展如下:

繼承Java的Future接口得到了一個(gè)新的屬于Netty自己的Future異步任務(wù)接口,該接口對原有的接口進(jìn)行了增強(qiáng),使得Netty異步任務(wù)能夠非阻塞地處理回調(diào)結(jié)果。

引入一個(gè)新接口——GenericFutureListener,用于表示異步執(zhí)行完成的監(jiān)聽器。

GenericFutureListener接口詳解

GenericFutureListener接口源碼如下:

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    // 監(jiān)聽器的回調(diào)方法
    void operationComplete(F var1) throws Exception;
}

其中擁有一個(gè)回調(diào)方法operationComplete,表示異步任務(wù)操作完成。在Future異步任務(wù)執(zhí)行完成后將回調(diào)此方法。

GenericFutureListener的父接口EventListener是一個(gè)空接口,沒有任何抽象方法,僅僅起到標(biāo)識作用。

Netty的Future接口詳解

Netty的Future接口擴(kuò)展了一系列方法,對執(zhí)行的過程進(jìn)行監(jiān)控,對異步回調(diào)完成事件進(jìn)行Listen監(jiān)聽并且回調(diào)。Netty的Future源碼如下:

public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess(); // 判斷異步執(zhí)行是否成功
    boolean isCancellable(); // 判斷異步執(zhí)行是否取消
    boolean cause; // 獲取異步任務(wù)異常的原因

    // 增加異步任務(wù)執(zhí)行完成Listener監(jiān)聽器
    Future<V> addListener(GenericFutureListener<? extends Future<? superV>> listener);

    // 移除異步任務(wù)執(zhí)行完成Listener監(jiān)聽器
    Future<V> removeListener(GenericFutureListener<? extends Future<? superV>> listener);

    ...
}

Netty的Future一般不直接使用,而是使用它的子接口,例如ChannelFuture接口。

ChannelFuture的使用

Netty網(wǎng)絡(luò)連接都是異步回調(diào)的,會返回一個(gè)ChannelFuture接口的實(shí)例。

// connect是異步的
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.sample.com", 80));

// 回調(diào)方法
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if(channelFuture.isSuccess()) {
            System.out.println("connection established");
        } else {
            System.err.println("Connection attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
});

Netty的出站和入站異步回調(diào)

下面以經(jīng)典的NIO出站操作write為例說明ChannelFuture的使用。

ChannelFuture future = ctx.channel().write(msg);

// 為異步任務(wù)加上監(jiān)聽器
future.addListener(
    new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            //write操作完成后的回調(diào)代碼
        }
    }
);

在write操作完成后立即返回,返回的是一個(gè)ChannelFuture接口的實(shí)例。通過這個(gè)實(shí)例可以綁定異步回調(diào)監(jiān)聽器,編寫異步回調(diào)的邏輯。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容