異步、線程池與CompletableFuture異步編排

一、創(chuàng)建線程的幾種方式

1.繼承thread類

public class ThreadTest {
    public static void main(String[] args) {
        Thread01 thread = new Thread01();
        thread.start();
    }
    public static class Thread01 extends Thread{
        @Override
        public void run() {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
        }
    }
}

2.實(shí)現(xiàn)runnable接口

public class ThreadTest {
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable01());
        thread.start();
    }

    public static class Runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
        }
    }

}


3.實(shí)現(xiàn)Callable通過FutureTask創(chuàng)建線程

public class ThreadTest {
    public static void main(String[] args) {

        FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
        Thread thread = new Thread(futureTask);
        thread.start();
    }
    public static class Callable01 implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }
    }
}

4.線程池創(chuàng)建線程

七大參數(shù):
corePoolSize:固定線程數(shù)
maximumPoolSize:最大線程數(shù)
keepAliveTime:存活時(shí)間
unit:時(shí)間單位
BlockingQueue<Runnable>:阻塞隊(duì)列
ThreadFactory: 線程工廠
RejectedExecutionHandler:拒絕策略

疑問:拒絕策略丟棄的任務(wù)如何去解決

1.改寫拒絕策略,延遲任務(wù)重新投向線程池
2.打印對應(yīng)任務(wù)參數(shù),可以做塞回?cái)?shù)據(jù)庫,或者打印出來方便排查問題
問題
Q:如何打印線程參數(shù)
A:RetryPolicy#rejectedExecution里面通過判斷runnable的類型,然后進(jìn)行打印相關(guān)參數(shù)

Q:有沒有其他方案
A:有的,比如說有些就不用使用延遲隊(duì)列,比如說我們是從數(shù)據(jù)庫讀取到的任務(wù),執(zhí)行成功就修改執(zhí)行的標(biāo)識,如果不成功或者任務(wù)被拒絕了,它下次掃描還是會(huì)繼續(xù)塞回去

Q:延遲隊(duì)列如果宕機(jī)的話,任務(wù)也丟失了怎么辦
A:這里的打印日志就很重要了,可以記錄起來,或者加個(gè)hook回調(diào)鉤子,在宕機(jī)的時(shí)候?qū)⑦@些數(shù)據(jù)寫回?cái)?shù)據(jù)庫(kill -9 pid不會(huì)調(diào)用hook~)

5.CompletableFuture異步編排

創(chuàng)建異步對象:

image.png

1、runXxxx 都是沒有返回結(jié)果的,supplyXxx 都是可以獲取返回結(jié)果的
2、可以傳入自定義的線程池,否則就用默認(rèn)的線程池;
列子:

CompletableFuture<Void> futureRunnable = CompletableFuture.runAsync(()->{
            System.out.println("啟動(dòng)執(zhí)行"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
        },threadPoolExecutor);
        try {
            futureRunnable.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).whenCompleteAsync((res,e)->{
            System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
        },threadPoolExecutor);
        System.out.println("返回結(jié)果===》:"+ future.get());

計(jì)算完成時(shí)回調(diào):

image.png

whenComplete 可以處理正常和異常的計(jì)算結(jié)果,exceptionally 處理異常情況。
whenComplete 和 whenCompleteAsync 的區(qū)別:
whenComplete:是執(zhí)行當(dāng)前任務(wù)的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務(wù)。
whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個(gè)任務(wù)繼續(xù)提交給線程池
來進(jìn)行執(zhí)行。
方法不以 Async 結(jié)尾,意味著 Action 使用相同的線程執(zhí)行,而 Async 可能會(huì)使用其他線程
執(zhí)行(如果是使用相同的線程池,也可能會(huì)被同一個(gè)線程選中執(zhí)行)
列子:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).whenCompleteAsync((res,e)->{
            System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
        },threadPoolExecutor).exceptionally(throwable->{
            return 10;
        });
        System.out.println("返回結(jié)果===》:"+ future.get());

handle 方法:可改變返回值

image.png
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).handle((res,e)->{ //第一個(gè)參數(shù) 線程返回值,第二個(gè)是 異常返回
            if (res != null){
                return res*2;
            }
            if (e == null){
                return 0;
            }
            return -1;
        });

線程串行化方法

image.png

thenApply 方法:當(dāng)一個(gè)線程依賴另一個(gè)線程時(shí),獲取上一個(gè)任務(wù)返回的結(jié)果,并返回當(dāng)前
任務(wù)的返回值。
thenAccept 方法:消費(fèi)處理結(jié)果。接收任務(wù)的處理結(jié)果,并消費(fèi)處理,無返回結(jié)果。
thenRun 方法:只要上面的任務(wù)執(zhí)行完成,就開始執(zhí)行 thenRun,只是處理完任務(wù)后,執(zhí)行
thenRun 的后續(xù)操作
帶有 Async 默認(rèn)是異步執(zhí)行的。同之前。
以上都要前置任務(wù)成功完成。
Function<? super T,? extends U>
T:上一個(gè)任務(wù)返回結(jié)果的類型
U:當(dāng)前任務(wù)的返回值類型

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).thenApplyAsync(res -> {
            System.out.println("第二任務(wù)啟動(dòng)");
            return "hallo"+res;
        }, threadPoolExecutor);
==================================

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).thenAcceptAsync(res -> {
            System.out.println("開啟另一個(gè)任務(wù)");
        }, threadPoolExecutor);

兩任務(wù)組合 - 都要完成

image.png

image.png

兩個(gè)任務(wù)必須都完成,觸發(fā)該任務(wù)。
thenCombine:組合兩個(gè) future,獲取兩個(gè) future 的返回結(jié)果,并返回當(dāng)前任務(wù)的返回值
thenAcceptBoth:組合兩個(gè) future,獲取兩個(gè) future 任務(wù)的返回結(jié)果,然后處理任務(wù),沒有
返回值。
runAfterBoth:組合兩個(gè) future,不需要獲取 future 的結(jié)果,只需兩個(gè) future 處理完任務(wù)后,
處理該任務(wù)。

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束當(dāng)前線程");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<String> stringCompletableFuture = futur1.thenCombineAsync(futur2, (f1, f2) -> {
            System.out.println("全部結(jié)束:");
            return f1 + ":" + f2;
        }, threadPoolExecutor);

兩任務(wù)組合 - 一個(gè)完成

image.png

image.png

當(dāng)兩個(gè)任務(wù)中,任意一個(gè) future 任務(wù)完成的時(shí)候,執(zhí)行任務(wù)。
applyToEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值,處理任務(wù)并有新的返回值。
acceptEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值,處理任務(wù),沒有新的返回值。
runAfterEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,不需要獲取 future 的結(jié)果,處理任務(wù),也沒有返
回值。

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur1:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果futur1:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur2:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束當(dāng)前線程futur2");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> future = futur1.applyToEitherAsync(futur2, (s) -> {
            System.out.println("第三個(gè)任務(wù)future");
            return s;
        }, threadPoolExecutor);

多任務(wù)組合

image.png

allOf:等待所有任務(wù)完成
anyOf:只要有一個(gè)任務(wù)完成

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur1:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果futur1:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur2:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束當(dāng)前線程futur2");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futur1, futur2);
        System.out.println(futureAllOf.get());
        CompletableFuture<Object> future = CompletableFuture.anyOf(futur1, futur2);
        System.out.println(future.get());
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容