Java CompleteFuture 書寫邏輯清晰的異步任務(wù)

CompleteFuture可以把一個(gè)復(fù)雜任務(wù)分解成由諸多計(jì)算節(jié)點(diǎn)組成的有向圖。

三種常用關(guān)系

串行關(guān)系
并行關(guān)系
匯總關(guān)系

CompletableFuture 基本使用

創(chuàng)建異步任務(wù)

有返回的異步任務(wù)

@Test
    public void supplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> boil = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                return "燒水";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        String s = boil.get();
        Assert.assertEquals("燒水", s);
    }

沒有返回的異步任務(wù)

@Test
    public void runAsync() {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        CompletableFuture<Void> run = CompletableFuture.runAsync(() -> {

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        run.join();
        stopWatch.stop();
        double totalTimeSeconds = stopWatch.getTotalTimeSeconds();
        Assert.assertEquals(1, totalTimeSeconds, 0.1);
    }

串行組合

/**
* 正在洗菜
*切菜:豬肉
*/
@Test
    public void thenAccept() {
        CompletableFuture<String> audience = CompletableFuture.supplyAsync(() -> {
            System.out.println("正在洗菜");
            return "豬肉";
        });

        CompletableFuture<Void> end = audience.thenAccept(it -> {
            System.out.println("切菜:" + it);
        });

        end.join();
    }

并行組合 + 匯總組合

/**
*正在燒水...
*正在包餃子
*水開了
*餃子包好了
*準(zhǔn)備,下10個(gè)餃子
*/
@Test
    public void thenAcceptBoth() {
        CompletableFuture<Void> water = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("正在燒水...");
                TimeUnit.SECONDS.sleep(4);
                System.out.println("水開了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture<String> dumpling = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("正在包餃子");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("餃子包好了");
                return "10個(gè)餃子";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        });

        CompletableFuture<Void> end = water.thenAcceptBoth(dumpling, (a, b) -> {
            System.out.println("準(zhǔn)備,下" + b);
        });

        end.join();
    }

異常

/**
*正在燒水...
*充錢
*余額不足
*/
@Test
    public void exception() {
        CompletableFuture<Void> water = CompletableFuture.runAsync(() -> {
                System.out.println("正在燒水...");
                TimeUnit.SECONDS.sleep(4);
                throw new IllegalArgumentException("沒燃?xì)饬?);     
        });

        CompletableFuture<Void> end = water.exceptionally(throwable -> {
            System.out.println("充錢");
            throw new RuntimeException("余額不足");
        });
        try {
            end.join();
        } catch (CompletionException e) {
            System.out.println(e.getCause().getMessage());
            Assert.assertEquals("余額不足", e.getCause().getMessage());
        }
    }

異常的抓獲與處理


/*
* 1. handle..() 不會(huì)抓獲異常 ,所以配置多個(gè)都會(huì)被執(zhí)行;
* 2. exceptionally() 會(huì)抓獲異常所以只生效一次。
* 3. exceptionally() 必須在結(jié)束前調(diào)用,否則不生效

*我是handleAsync
*我是handleAsync2
*我是handle
*我是exceptionally1
*/
    @Test
    public void multiExceptionHandle() {
        CompletableFuture<Object> handle = CompletableFuture.runAsync(() -> {
            throw new RuntimeException("異常");
        })
                .handleAsync((aVoid, throwable) -> {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("我是handleAsync");
                    return null;
                })
                .handle((o, throwable) -> {
                    System.out.println("我是handle");
                    return null;
                })
                .handleAsync((avoid, throwable) -> {
                    System.out.println("我是handleAsync2");
                    throw new RuntimeException("處理異常中的異常");
//                  return null;
                })
                .exceptionally(throwable -> {
                    System.out.println("我是exceptionally1");
                    return null;
                })
                              .exceptionally(throwable -> {
                    System.out.println("我是exceptionally2");
                    return null;
                })
                ;

        handle.join();
    }

主動(dòng)拋異常

/**
* 如果將主動(dòng)拋異常的時(shí)機(jī)延長(zhǎng)到6s, 
* 由于該節(jié)點(diǎn)計(jì)算5s就完成, 所以不會(huì)收到異常
*
* 否則收到異常:手動(dòng)失敗
*/
@Test
    public void obtException() throws InterruptedException, ExecutionException, TimeoutException {

        CompletableFuture<String> fu = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("成功執(zhí)行了");
            } catch (InterruptedException ignored) {
            }
            return null;
        });

        CompletableFuture<String> end = fu.exceptionally(throwable -> {
            System.out.println("收到異常:" + throwable.getMessage());
            return null;
        });


        TimeUnit.SECONDS.sleep(3);
                // 如果等6s后再拋,由于結(jié)果已經(jīng)計(jì)算完成,則不會(huì)受到異常
                // TimeUnit.SECONDS.sleep(6);  
        fu.obtrudeException(new RuntimeException("手動(dòng)失敗"));
        end.join();
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容