Java8新的異步編程方式 CompletableFuture

CompletableFuture特別是對微服務架構而言,會有很大的作為。舉一個具體的場景,電商的商品頁面可能會涉及到商品詳情服務、商品評論服務、相關商品推薦服務等等。獲取商品的信息時(/productdetails?productid=xxx),需要調用多個服務來處理這一個請求并返回結果。這里可能會涉及到并發(fā)編程,我們完全可以使用Java 8的CompletableFuture或者RxJava來實現(xiàn)。

使用demo


    public List<String> findPriceExecutorsCompletableFuture(String product){
        Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
        List<CompletableFuture<String>> priceFuture = shops.stream()
                .map(shop -> CompletableFuture
                        .supplyAsync(()-> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor))
                .collect(Collectors.toList());
        return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

https://my.oschina.net/u/3703858/blog/1799785

建議如下:

  • 如果你進行的是計算密集型的操作,并且沒有I/O,那么推薦使用Stream接口,因為實現(xiàn)簡單,同時效率也可能是最高的
  • 反之,如果你并行的工作單元還涉及等待I/O的操作(包括網絡連接等待).那么使用CompletableFuture是靈活性更好,你可以像前面討論的那樣,依據等待/計算,或者W/C的比率設定需要使用的線程數

Future Callable例子:

public void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
//創(chuàng)建Callable,它代表了下載所有的圖片
final Callable<List<ImageData>> task = () ->
  info.stream()
        .map(ImageInfo::downloadImage)
        .collect(Collectors.toList());
    // 將下載任務提交到executor
    Future<List<ImageData>> images = executor.submit(task);
    // renderText(source);
try {
   // 獲得所有下載的圖片(在所有圖片可用之前會一直阻塞)
   final List<ImageData> imageDatas = images.get();
   // 渲染圖片
   imageDatas.forEach(this::renderImage);
} catch (InterruptedException e) {
   // 重新維護線程的中斷狀態(tài)
   Thread.currentThread().interrupt();
   // 我們不需要結果,所以取消任務
   images.cancel(true);
} catch (ExecutionException e) {
  throw launderThrowable(e.getCause()); }
}

CompletableFuture

CompletableFuture類實現(xiàn)了CompletionStage和Future接口。Future是Java 5添加的類,用來描述一個異步計算的結果,但是獲取一個結果時方法較少,要么通過輪詢isDone,確認完成后,調用get()獲取值,要么調用get()設置一個超時時間。但是這個get()方法會阻塞住調用線程,這種阻塞的方式顯然和我們的異步編程的初衷相違背。
為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴展功能形成了CompletableFuture。
CompletionStage是一個接口,從命名上看得知是一個完成的階段,它里面的方法也標明是在某個運行階段得到了結果之后要做的事情。

1、進行變換

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

首先說明一下已Async結尾的方法都是可以異步執(zhí)行的,如果指定了線程池,會在指定的線程池中執(zhí)行,如果沒有指定,默認會在ForkJoinPool.commonPool()中執(zhí)行,下文中將會有好多類似的,都不詳細解釋了。關鍵的入參只有一個Function,它是函數式接口,所以使用Lambda表示起來會更加優(yōu)雅。它的入參是上一個階段計算后的結果,返回值是經過轉化后結果。
例如:

@Test
    public void thenApply() {
        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
        System.out.println(result);
    }

結果為:

hello world

2、進行消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

thenAccept是針對結果進行消耗,因為他的入參是Consumer,有入參無返回值。
例如:

@Test
public void thenAccept(){    
       CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}

結果為:hello world
3、對上一步的計算結果不關心,執(zhí)行下一個操作

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenRun它的入參是一個Runnable的實例,表示當得到上一步的結果時的操作。
例如:

  @Test
    public void thenRun(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenRun(() -> System.out.println("hello world"));
        while (true){}
    }

4、結合兩個CompletionStage的結果,進行轉化后返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

它需要原來的處理返回值,并且other代表的CompletionStage也要返回值之后,利用這兩個返回值,進行轉換后返回指定類型的值。
例如:

 @Test
    public void thenCombine() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> s1 + " " + s2).join();
        System.out.println(result);
    }

5、結合兩個CompletionStage的結果,進行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

它需要原來的處理返回值,并且other代表的CompletionStage也要返回值之后,利用這兩個返回值,進行消耗。
例如:

  @Test
    public void thenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        }), (s1, s2) -> System.out.println(s1 + " " + s2));
        while (true){}
    }

6、在兩個CompletionStage都運行完執(zhí)行

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

不關心這兩個CompletionStage的結果,只關心這兩個CompletionStage執(zhí)行完畢,之后在進行操作(Runnable)。

例如:

 @Test
    public void runAfterBoth(){
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true){}
    }

7、兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的轉化操作

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

我們現(xiàn)實開發(fā)場景中,總會碰到有兩種渠道完成同一個事情,所以就可以調用這個方法,找一個最快的結果進行處理。
例如:

@Test
    public void applyToEither() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), s -> s).join();
        System.out.println(result);
    }

8、兩個CompletionStage,誰計算的快,我就用那個CompletionStage的結果進行下一步的消耗操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

例如:

@Test
    public void acceptEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }), System.out::println);
        while (true){}
    }

9、兩個CompletionStage,任何一個完成了都會執(zhí)行下一步的操作(Runnable)


public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

例如:

 @Test
    public void runAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s2";
        }), () -> System.out.println("hello world"));
        while (true) {
        }
    }

10、當運行時出現(xiàn)了異常,可以通過exceptionally進行補償。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

例如:

 @Test
    public void exceptionally() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

11、當運行完成時,對結果的記錄。這里的完成時有兩種情況,一種是正常執(zhí)行,返回值。另外一種是遇到異常拋出造成程序的中斷。這里為什么要說成記錄,因為這幾個方法都會返回CompletableFuture,當Action執(zhí)行完畢后它的結果返回原始的CompletableFuture的計算結果或者返回異常。所以不會對結果產生任何的作用。

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

例如:

 @Test
    public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結果:

null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world

12、運行完成時,對結果的處理。這里的完成時有兩種情況,一種是正常執(zhí)行,返回值。另外一種是遇到異常拋出造成程序的中斷。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

例如:
出現(xiàn)異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現(xiàn)異常
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結果:hello world
未出現(xiàn)異常時

@Test
    public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結果為:s1

上面就是CompletionStage接口中方法的使用實例,CompletableFuture同樣也同樣實現(xiàn)了Future,所以也同樣可以使用get進行阻塞獲取值,總的來說,CompletableFuture使用起來還是比較爽的,看起來也比較優(yōu)雅一點。

處理自定義異常

1、創(chuàng)建原子對象保存異常對象

final AtomicReference<BizException> foundException = new AtomicReference<>();
...

final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    //todo 業(yè)務邏輯
                } catch (BizException e) {
                    foundException.set(e);
                }
            }
            return "OK";
        });

...

if(foundException.get() != null){
    throw foundException.get();
}

2、使用CompletionException

List<CompletableFuture<Object>> futures =
    tasks.stream()
        .map(task -> CompletableFuture.supplyAsync(() -> businessLogic(task)))
        .collect(Collectors.toList());
try {
    List<Object> results = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
} catch (CompletionException e) {
    throw e.getCause() instanceof BusinessException?
        new BadRequestException("at least one async task had an exception"): e;
}

摘自: http://www.itdecent.cn/p/6f3ee90ab7d3
https://leokongwq.github.io/2017/01/17/java8-CompletableFuture.html
https://www.jdon.com/idea/java/java-8-completablefuture-vs-parallel-stream.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容