CompletableFuture 使用示例

背景

Java異步編程離不開Future接口,但是Future接口提供的方法使用起來不夠靈活。為了判斷一個Future是否已經(jīng)完成,我們可以:

  • 通過調(diào)用get方法,以阻塞的形式獲取執(zhí)行結(jié)果。
  • 調(diào)用有最大等待時間的get方法,以阻塞的形式獲取執(zhí)行結(jié)果。
  • 反復(fù)輪詢isDone方法,直到任務(wù)完成,獲取結(jié)果。

以上3種方法都不夠靈活,會造成線程阻塞或耗費CPU資源。需要用戶過多的參與異步編程邏輯,對業(yè)務(wù)代碼的侵入性較強。

另外用戶如果需要異步執(zhí)行多個任務(wù),并且這些任務(wù)具有先后依賴關(guān)系?;趥鹘y(tǒng)的方式我們需要大量使用鎖,CountDownLatchCyclicBarrier和阻塞隊列等,編程十分復(fù)雜。

CompletableFutureFuture的增強版,提供了一系列的同步或異步任務(wù)執(zhí)行操作。除此之外還能夠?qū)Ξ惒饺蝿?wù)多個階段的前后依賴關(guān)系進行控制。使用起來十分方便。

注意事項

所有的以async結(jié)尾的方法都為異步執(zhí)行。它們都可以傳入一個Executor線程池,用于異步執(zhí)行。如果不指定線程池,默認使用ForkJoinPool.commonPool()

下面以所有的async異步方法為例,說明下CompletableFuture的使用方法。

靜態(tài)方法

supplyAsync

提供初始數(shù)據(jù)。接收一個Supplier類型參數(shù)。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 1);
completableFuture.thenApplyAsync((i) -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return i + 1;
    // 這里打印出2
}).thenAcceptAsync((i) -> System.out.println(i));

runAsync

異步執(zhí)行任務(wù),接收Runnable類型參數(shù)。

CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("runAsync");
});

completedFuture

直接返回一個已經(jīng)完成的CompletableFuture。接收一個任意類型數(shù)據(jù)為參數(shù)。

CompletableFuture<Integer> completableFuture = CompletableFuture.completedFuture(500);
// 此處可以立刻返回500
System.out.println(completableFuture.get());

allOf

參數(shù)接收任意多個CompletableFuture。該方法返回一個新的CompletableFuture,只有在參數(shù)所有的CompletableFuture完成的時候它才會完成。

long start = System.currentTimeMillis();

CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("runAsync1");
});

CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("runAsync2");
});

// 等待兩個CompletableFuture都完成的時候,future才會完成
CompletableFuture<Void> future = CompletableFuture.allOf(completableFuture1, completableFuture2);

// 阻塞到CompletableFuture1完成
future.get();
// 耗時大于3000毫秒
System.out.println(System.currentTimeMillis() - start);

anyOf

和allOf一樣,返回一個新的CompletableFuture,參數(shù)中任意一個CompletableFuture完成時它就能完成。

long start = System.currentTimeMillis();

CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("runAsync1");
});

CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("runAsync2");
});

// 兩個CompletableFuture有任何一個完成,future就可以完成
CompletableFuture<Object> future = CompletableFuture.anyOf(completableFuture1, completableFuture2);

// 阻塞到CompletableFuture2完成
future.get();
// 耗時略大于1000毫秒
System.out.println(System.currentTimeMillis() - start);

普通方法

thenApplyAsync

進行數(shù)據(jù)處理,接收前一步驟傳遞的數(shù)據(jù),處理加工后返回。返回數(shù)據(jù)類型可以和前一步驟返回的數(shù)據(jù)類型不同。
接收參數(shù)為Function類型。

CompletableFuture.supplyAsync(() -> 1).thenApplyAsync((i) -> "Hi: " + i).whenCompleteAsync(((s, throwable) -> {
    // 返回 Hi: 1
    System.out.println(s);
}));

thenAcceptAsync

接收上游傳遞過來的數(shù)據(jù)并消費。接收一個Consumer類型參數(shù)。

CompletableFuture.supplyAsync(() -> 1).thenAcceptAsync(System.out::println);

acceptEitherAsync

接收另一個CompletableFuture和一個Consumer。含義為2個CompletableFuture哪個先運行完成,就采用誰的執(zhí)行結(jié)果。

CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).acceptEitherAsync(completableFuture1, integer -> {
    // completableFuture1最早返回,所以integer的值為1
    System.out.println(integer);
});

這個例子中第一個CompletableFuture比第二個先執(zhí)行完畢,因此acceptEitherAsync輸出第一個CompletableFuture的結(jié)果。

applyToEitherAsync

接收一個CompletableFuture和一個Function。和acceptEitherAsync類似,只不過第二個參數(shù)從Consumer變成Function。

CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).applyToEitherAsync(completableFuture1, (integer -> {
    // 這里CompletableFuture1首先完成,所以integer為1
    return integer;
})).whenCompleteAsync((integer, throwable) -> {
    System.out.println(integer);
});

thenAcceptBothAsync

接收另一個CompletableFuture和BiConsumer,用于在兩個CompletableFuture都執(zhí)行完的時候,獲取他們的執(zhí)行結(jié)果并處理。

CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).thenAcceptBothAsync(completableFuture1, (integer, integer2) -> {
    // 等到兩個CompletableFuture都完成之后回調(diào)
    // 在這個例子中需要等待5秒鐘
    回調(diào)的兩個參數(shù)分別對應(yīng)兩個CompletableFuture的執(zhí)行結(jié)果
    System.out.println(integer + integer2);
});

runAfterEitherAsync

接收另一個CompletableFuture和Runnable類型參數(shù)。兩個CompletableFuture有任意一個完成的時候,執(zhí)行Runnable。

long start = System.currentTimeMillis();

CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).runAfterEitherAsync(completableFuture1, () -> {
    // 任何一個CompletableFuture完成的時候都會執(zhí)行Runnable
    // 這個例子等待1秒后會執(zhí)行Runnable,此處打印的值略大于1000
    System.out.println(System.currentTimeMillis() - start);
});

runAfterBothAsync

接收另一個CompletableFuture和Runnable類型參數(shù)。兩個CompletableFuture都執(zhí)行完成的時候,執(zhí)行Runnable。

long start = System.currentTimeMillis();

CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
}).runAfterBothAsync(completableFuture1, () -> {
    // 和runAfterEitherAsync不同的是,這里必須等到兩個CompletableFuture都完成的時候才會執(zhí)行Runnable
    // 這里需要等待5秒鐘后才會執(zhí)行,打印的值略大于5000
    System.out.println(System.currentTimeMillis() - start);
});

complete

接收任意類型數(shù)據(jù)作為參數(shù)。如果CompletableFuture沒有完成的時候調(diào)用complete()方法,后續(xù)再調(diào)用這個future的get()方法時返回這個值。

// 睡眠2秒再返回結(jié)果
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});
// 注意,此處如果睡眠3秒,調(diào)用complete時completableFuture已經(jīng)執(zhí)行完畢返回1,complete方法不會修改返回值。此時調(diào)用get方法返回1
// 如果此處沒有睡眠3秒,調(diào)用complete時completableFuture尚未執(zhí)行完畢,下面調(diào)用get的時候方法返回1000
Thread.sleep(3000);
completableFuture.complete(1000);

Integer integer = completableFuture.get();

System.out.println(integer);

completeExceptionally

接收一個Throwable類型參數(shù)。如果CompletableFuture沒有完成的時候調(diào)用completeExceptionally()方法,后續(xù)再調(diào)用這個future的get()方法時會拋出這個異常。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
});

// 如果調(diào)用completeExceptionally的時候completableFuture沒有執(zhí)行完成,那么下面調(diào)用get的時候會拋出IllegalArgumentException
Thread.sleep(3000);
completableFuture.completeExceptionally(new IllegalArgumentException());

Integer integer = completableFuture.get();

System.out.println(integer);

cancel

相當于completeExceptionally(new CancellationException()),不再贅述。接收的參數(shù)對行為沒有影響。

如果CompletableFuture沒有執(zhí)行完成的時候調(diào)用了cancel,cancel方法返回true。

如果CompletableFuture執(zhí)行完成的時候調(diào)用了cancel,cancel方法返回false。

exceptionally

設(shè)置當CompletableFuture執(zhí)行拋出異常時候的返回值。用于處理異常情況。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {

    // 拋出一個異常
    if (true) {
        throw new RuntimeException("Error Message");
    }

    return 1;
});

// 如果遇到異常,返回500
completableFuture.exceptionally((t) -> 500)
        .whenCompleteAsync(((integer, throwable) -> {
            // 打印出500
            System.out.println(integer);
            // 異常已被處理,返回null
            System.out.println(throwable.getMessage());
        }));

handleAsync

相比exceptionally更為復(fù)雜的處理exception方式。接收一個BiFunction類型的參數(shù)。

handleAsync也可以視為回調(diào)方法具有返回值的whenCompleteAsync。

注意和exceptionally的區(qū)別,handleAsync沒有自動發(fā)現(xiàn)是否拋出exception的能力,需要手工編寫相關(guān)邏輯。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {

    if (true) {
        throw new RuntimeException("Error Message");
    }

    return 1;
});


completableFuture
        .handleAsync((integer, throwable) -> {
            System.out.println(integer);
            System.out.println(throwable);
            return null == throwable ? 1 : 500;
        })
        .whenCompleteAsync(((integer, throwable) -> {
            System.out.println(integer);
            System.out.println(throwable.getMessage());
        }));

whenCompleteAsync

完成時異步回調(diào),接收一個BiConsumer<? super T, ? super Throwable>類型參數(shù)。

例子在前面的代碼中已有體現(xiàn),不再贅述。

thenCombineAsync

含義相當于"thenApplyBothAsync",但是JDK不叫這個名字。

接收另一個CompletableFuture和BiFunction作為參數(shù)。目的為等待兩個CompletableFuture都完成的時候,執(zhí)行BiFunction對兩個CompletableFuture的返回值進行處理。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 1;
});

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
});

completableFuture.thenCombineAsync(completableFuture2, (integer, integer2) -> {
    // 等兩個CompletableFuture都執(zhí)行完畢后,返回他們兩個返回值的和
    return integer + integer2;
}).whenCompleteAsync(((integer, throwable) -> {
    System.out.println(integer);
}));

thenComposeAsync

將兩個CompletableFuture組合起來。接收的參數(shù)類型為Function。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 1;
});

completableFuture.thenComposeAsync(integer -> {
    // 這里返回另一個CompletableFuture,該CompletableFuture可以使用前面CompletableFuture的返回值進行計算
    return CompletableFuture.supplyAsync(() -> {
        return 10 + integer;
    });
}).whenCompleteAsync(((integer, throwable) -> {
    // 返回11
    System.out.println(integer);
}));
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容