jdk8特性-CompletableFuture

  • CompletionStage接口翻譯成中文是“完工階段“,是java8新增的一個(gè)工具。定義的一系列方法,接收的參數(shù)有有三類CompletionStage,Runnable,Consumer和Function方法。面向函數(shù)式編程,因?yàn)槿雲(yún)⒊橄蟮氖欠椒ǎ⑶覍⒔Y(jié)果傳入下一個(gè)被調(diào)用的方法。該接口表示異步計(jì)算的某種狀態(tài)。到了這個(gè)狀態(tài)會(huì)出發(fā)對(duì)應(yīng)的方法。

  • CompletableFuture分別實(shí)現(xiàn)了Fulture和CompletionStage接口。也就是說擁有異步計(jì)算的能力,通過回調(diào)來處理計(jì)算的結(jié)果。當(dāng)然包括了多個(gè)CompletableFuture之間的組合。增加了多線程之間的協(xié)調(diào)性??梢允峭瓿闪艘部梢允俏赐瓿傻哪硞€(gè)階段。這樣的組合提供了很多編程的可能性。

  • 話不多說,我們直接上代碼,用法都總結(jié)和歸類到寫在注釋上

package com.example.demo;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * Project <demo-project>
 * Created by jorgezhong on 2018/9/8 11:45.
 */
public class CompletableFutureDemo {


    /**
     * 創(chuàng)建CompletableFuture
     * - runAsync
     * - supplyAsync
     * - completedFuture
     * <p>
     * 異步計(jì)算啟用的線程池是守護(hù)線程
     */
    @Test
    public void test1() {

        //1、異步計(jì)算:無返回值

        //默認(rèn)線程池為:ForkJoinPool.commonPool()
        CompletableFuture.runAsync(() -> {
            // TODO: 2018/9/8 無返回異步計(jì)算
            System.out.println(Thread.currentThread().isDaemon());
        });

        //指定線程池,(到了jdk9CompletableFuture還拓展了延遲的線程池)
        CompletableFuture.runAsync(() -> {
            // TODO: 2018/9/8 無返回異步計(jì)算
        }, Executors.newFixedThreadPool(2));


        //2、異步計(jì)算:有返回值

        // 使用默認(rèn)線程池
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "result1");
        //getNow指定異步計(jì)算拋出異常或結(jié)果返回null時(shí)替代的的值
        String result1 = future1.getNow(null);


        //  指定線程池
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "result2", Executors.newFixedThreadPool(2));
        //getNow指定異步計(jì)算拋出異?;蚪Y(jié)果返回null時(shí)替代的的值
        String result2 = future2.getNow(null);


        //3、初始化一個(gè)有結(jié)果無計(jì)算的CompletableFuture
        CompletableFuture<String> future = CompletableFuture.completedFuture("result");
        String now = future.getNow(null);
        System.out.println("now = " + now);


    }


    /**
     * 計(jì)算完成時(shí)需要對(duì)異常進(jìn)行處理或者對(duì)結(jié)果進(jìn)行處理
     * - whenComplete:同步處理包括異常
     * - thenApply:同步處理正常結(jié)果(前提是沒有異常)
     * <p>
     * - whenCompleteAsync:異步處理包括異常
     * - thenApplyAsync:異步處理正常結(jié)果(前提是沒有異常)
     * <p>
     * - exceptionally : 處理異常
     */
    @Test
    public void test2() {


        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");

        //whenComplete方法收future的結(jié)果和異常,可靈活進(jìn)行處理
        //1、同步處理

        //  無返回值:可處理異常
        future.whenComplete((result, throwable) -> System.out.println("result = " + result));

        //  有返回值:沒有異常處理(前提)
        CompletableFuture<String> resultFuture1 = future.thenApply(result -> "result");
        String result1 = resultFuture1.getNow(null);


        //2、異步處理:

        //  無返回值: 默認(rèn)線程池
        future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result));
        //  無返回值:指定線程池
        future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result), Executors.newFixedThreadPool(2));

        //  有返回值:默認(rèn)線程池
        CompletableFuture<String> resultFuture2 = future.thenApplyAsync(result -> "result");
        String result2 = resultFuture2.getNow(null);

        //  有返回值:指定線程池
        CompletableFuture<String> resultFuture3 = future.thenApplyAsync(result -> "result", Executors.newFixedThreadPool(2));
        String result3 = resultFuture3.getNow(null);


        //3、處理異常,處理完之后返回一個(gè)結(jié)果
        CompletableFuture<String> exceptionallyFuture = future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + 1 / 0))
                .exceptionally(throwable -> "發(fā)生異常了:" + throwable.getMessage());
        System.out.println(exceptionallyFuture.getNow(null));


    }


    /**
     * 異常處理還可以使用以下兩個(gè)方法
     * - handle
     * - handleAsync
     * <p>
     * 備注:exceptionally同步和異步計(jì)算一起用如果出現(xiàn)異常會(huì)把異常拋出。用以上的方法可以攔截處理
     */
    @Test
    public void test3() {


        CompletableFuture<String> exceptionoHandle = CompletableFuture.completedFuture("produce msg")
                .thenApplyAsync(s -> "result" + 1 / 0);

        String handleResult1 = exceptionoHandle.handle((s, throwable) -> {
            if (throwable != null) {
                return throwable.getMessage();
            }
            return s;
        }).getNow(null);

        //指定線程池
        String handleResult2 = exceptionoHandle.handleAsync((s, throwable) -> {
            if (throwable != null) {
                return throwable.getMessage();
            }
            return s;
        }, Executors.newFixedThreadPool(2)).getNow(null);

    }

    /**
     * 生產(chǎn)--消費(fèi)
     * - thenAccept:同步的
     * - thenAcceptAsync:異步的
     * <p>
     * 接受上一個(gè)處理結(jié)果,并實(shí)現(xiàn)一個(gè)Consumer,消費(fèi)結(jié)果
     */
    @Test
    public void test4() {

        //同步的
        CompletableFuture.completedFuture("produce msg")
                .thenAccept(s -> System.out.println("sync consumed msg : " + s));

        //異步的
        //默認(rèn)線程池
        CompletableFuture.completedFuture("produce msg")
                .thenAcceptAsync(s -> System.out.println("async consumed msg : " + s));
        //指定線程池
        CompletableFuture.completedFuture("produce msg")
                .thenAcceptAsync(s -> System.out.println("async consumed msg : " + s), Executors.newFixedThreadPool(2));
    }


    /**
     * 取消任務(wù)
     * - cancel
     */
    @Test
    public void test5() throws InterruptedException {

        CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {

            try {
                Thread.sleep(800);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "result";

        });
        String now = message.getNow(null);
        System.out.println("now = " + now);

        //取消
        boolean cancel = message.cancel(true);
        System.out.println("cancel = " + cancel);


        //如果這里再去獲取,會(huì)拋出異常,說明已經(jīng)取消了
        //String now1 = message.getNow(null);

        Thread.sleep(1000);

    }


    /**
     * 兩個(gè)異步計(jì)算
     * - applyToEither:有返回值,同步
     * - acceptEither:無返回值,同步
     * - applyToEitherAsync:有返回值,異步
     * -
     */
    @Test
    public void test6() {


        CompletableFuture<String> task1 = CompletableFuture.completedFuture("task1")
                .thenApply(s -> "task1的計(jì)算結(jié)果:s1 = " + s);

        //同步,有返回值
        //applyToEither第二個(gè)參數(shù)接收的值是task1計(jì)算的返回值
        CompletableFuture<String> result1 = task1.applyToEither(CompletableFuture.completedFuture("task2")
                .thenApply(s -> "task2的計(jì)算結(jié)果:s2 = " + s), s -> s);
        System.out.println("task2:" + result1.getNow(null));


        //同步,無返回值
        task1.acceptEither(CompletableFuture.completedFuture("task3")
                .thenApply(s -> "task3的計(jì)算結(jié)果:s3 = " + s), s -> System.out.println("task3:" + s));


        //異步有返回值,默認(rèn)線程池,也可以指定
        CompletableFuture<String> result2 = task1.applyToEitherAsync(CompletableFuture.completedFuture("task4")
                .thenApply(s -> "task4的計(jì)算結(jié)果:s4 = " + s), s -> s);
        //由于是異步的,主線程跑的快一點(diǎn),因此join()之后才能看到跑完的結(jié)果
        System.out.println("task4:" + result2.join());


        //異步無返回值,指定線程池,也可以使用默認(rèn)線程池
        CompletableFuture<Void> task5 = task1.acceptEitherAsync(CompletableFuture.completedFuture("task5")
                .thenApply(s -> "task5的計(jì)算結(jié)果:s5 = " + s), s -> System.out.println("task5:" + s), Executors.newFixedThreadPool(2));
        task5.join();


    }

    /**
     * 組合計(jì)算結(jié)果
     * - runAfterBoth:都計(jì)算完之后執(zhí)行一段代碼
     * - thenAcceptBoth:都計(jì)算完之后把結(jié)果傳入,并執(zhí)行一段代碼
     * <p>
     * - thenCombine:組合兩個(gè)結(jié)果
     * - thenCompose:組合兩個(gè)結(jié)果
     */
    @Test
    public void test7() {

        //runAfterBoth方式
        StringBuilder msg = new StringBuilder("jorgeZhong");
        CompletableFuture.completedFuture(msg)
                .thenApply(s -> s.append(" task1,"))
                .runAfterBoth(CompletableFuture.completedFuture(msg)
                        .thenApply(s -> s.append(" task2")), () -> System.out.println(msg));


        //thenAcceptBoth方式
        CompletableFuture.completedFuture("jorgeZhong")
                .thenApplyAsync(String::toLowerCase)
                .thenAcceptBoth(CompletableFuture.completedFuture("jorgeZhong")
                        .thenApplyAsync(String::toUpperCase), (s, s2) -> System.out
                        .println("s1:" + s + ", s2:" + s2));


        //thenCombine方式
        CompletableFuture<String> result1 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenCombine(CompletableFuture.completedFuture("jorgeZhong")
                        .thenApply(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2);

        System.out.println("result1:" + result1.getNow(null));

        //異步
        CompletableFuture<String> result11 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenCombineAsync(CompletableFuture.completedFuture("jorgeZhong")
                        .thenApplyAsync(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2);
        System.out.println("result11:" + result11.join());


        //thenCompose方式
        CompletableFuture<String> result2 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenCompose(s -> CompletableFuture.completedFuture("jorgeZhong")
                        .thenApply(String::toUpperCase)
                        .thenApply(s1 -> "s:" + s + ", s1:" + s1));
        System.out.println("result2:" + result2.getNow(null));

        //異步
        CompletableFuture<String> result22 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenComposeAsync(s -> CompletableFuture.completedFuture("jorgeZhong")
                        .thenApplyAsync(String::toUpperCase)
                        .thenApplyAsync(s1 -> "s:" + s + ", s1:" + s1));

        System.out.println("result22:" + result22.join());
    }


    /**
     * 多個(gè)CompletableFuture策略
     * - anyOf:接受一個(gè)CompletableFuture數(shù)組,任意一個(gè)任務(wù)執(zhí)行完返回。都會(huì)觸發(fā)該CompletableFuture
     * - whenComplete:計(jì)算執(zhí)行完之后執(zhí)行實(shí)現(xiàn)的一段代碼,將上一個(gè)結(jié)果和異常作為參數(shù)傳入
     */
    @Test
    public void test8() throws InterruptedException {

        List<String> messages = Arrays.asList("a", "b", "c");
        CompletableFuture.anyOf(messages.stream()
                .map(o -> CompletableFuture.completedFuture(o).thenApplyAsync(s -> {

                    try {
                        Thread.sleep(new Random().ints(99, 300).findFirst().getAsInt());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return s.toUpperCase();
                }))
                .toArray(CompletableFuture[]::new))
                .whenComplete((res, throwable) -> {
                    if (throwable == null) {
                        System.out.println(res.toString());
                    }
                });


        Thread.sleep(1000);


    }


    /**
     * 多個(gè)CompletableFuture策略
     * - allOf:接受一個(gè)CompletableFuture數(shù)組,所有任務(wù)返回后,創(chuàng)建一個(gè)CompletableFuture
     */
    @Test
    public void test9() {

        List<String> messages = Arrays.asList("a", "b", "c");
        CompletableFuture[] cfs = messages.stream()
                .map(s -> CompletableFuture.completedFuture(s).thenApplyAsync(String::toUpperCase))
                .toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(cfs)
                .whenCompleteAsync((aVoid, throwable) -> Arrays.stream(cfs).forEach(completableFuture -> System.out
                        .println(completableFuture.getNow(null))));


    }


}


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容