Java 8 CompletableFuture

Java 8 CompletableFuture

Java 8 有大量的新特性和增強如 Lambda 表達(dá)式,StreamsCompletableFuture等。在本篇文章中我將詳細(xì)解釋清楚CompletableFuture以及它所有方法的使用。

什么是CompletableFuture?

在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運行的任務(wù)在一個單獨的線程,與主線程隔離,并且會通知主線程它的進(jìn)度,成功或者失敗。

在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。

使用這種并行方式,可以極大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是 Future API的擴(kuò)展。

Future 被用于作為一個異步計算結(jié)果的引用。提供一個 isDone() 方法來檢查計算任務(wù)是否完成。當(dāng)任務(wù)完成時,get() 方法用來接收計算任務(wù)的結(jié)果。

Callbale和 Future 教程可以學(xué)習(xí)更多關(guān)于 Future 知識.

Future API 是非常好的 Java 異步編程進(jìn)階,但是它缺乏一些非常重要和有用的特性。

Future 的局限性

  1. 不能手動完成 當(dāng)你寫了一個函數(shù),用于通過一個遠(yuǎn)程API獲取一個電子商務(wù)產(chǎn)品最新價格。因為這個 API 太耗時,你把它允許在一個獨立的線程中,并且從你的函數(shù)中返回一個 Future?,F(xiàn)在假設(shè)這個API服務(wù)宕機了,這時你想通過該產(chǎn)品的最新緩存價格手工完成這個Future 。你會發(fā)現(xiàn)無法這樣做。
  2. Future 的結(jié)果在非阻塞的情況下,不能執(zhí)行更進(jìn)一步的操作 Future 不會通知你它已經(jīng)完成了,它提供了一個阻塞的 get() 方法通知你結(jié)果。你無法給 Future 植入一個回調(diào)函數(shù),當(dāng) Future 結(jié)果可用的時候,用該回調(diào)函數(shù)自動的調(diào)用 Future 的結(jié)果。
  3. 多個 Future 不能串聯(lián)在一起組成鏈?zhǔn)秸{(diào)用 有時候你需要執(zhí)行一個長時間運行的計算任務(wù),并且當(dāng)計算任務(wù)完成的時候,你需要把它的計算結(jié)果發(fā)送給另外一個長時間運行的計算任務(wù)等等。你會發(fā)現(xiàn)你無法使用 Future 創(chuàng)建這樣的一個工作流。
  4. 不能組合多個 Future 的結(jié)果 假設(shè)你有10個不同的Future,你想并行的運行,然后在它們運行未完成后運行一些函數(shù)。你會發(fā)現(xiàn)你也無法使用 Future 這樣做。
  5. 沒有異常處理 Future API 沒有任務(wù)的異常處理結(jié)構(gòu)居然有如此多的限制,幸好我們有CompletableFuture,你可以使用 CompletableFuture 達(dá)到以上所有目的。

CompletableFuture 實現(xiàn)了 FutureCompletionStage接口,并且提供了許多關(guān)于創(chuàng)建,鏈?zhǔn)秸{(diào)用和組合多個 Future 的便利方法集,而且有廣泛的異常處理支持。

創(chuàng)建 CompletableFuture

1. 簡單的例子 可以使用如下無參構(gòu)造函數(shù)簡單的創(chuàng)建 CompletableFuture:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

這是一個最簡單的 CompletableFuture,想獲取CompletableFuture 的結(jié)果可以使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法會一直阻塞直到 Future 完成。因此,以上的調(diào)用將被永遠(yuǎn)阻塞,因為該Future一直不會完成。

你可以使用 CompletableFuture.complete() 手工的完成一個 Future:

completableFuture.complete("Future's Result")

所有等待這個 Future 的客戶端都將得到一個指定的結(jié)果,并且 completableFuture.complete()之后的調(diào)用將被忽略。

2. 使用 runAsync() 運行異步計算 如果你想異步的運行一個后臺任務(wù)并且不想改任務(wù)返回任務(wù)東西,這時候可以使用 CompletableFuture.runAsync()方法,它持有一個Runnable 對象,并返回 CompletableFuture<Void>。

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

你也可以以 lambda 表達(dá)式的形式傳入 Runnable 對象:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

在本文中,我使用lambda表達(dá)式會比較頻繁,如果以前你沒有使用過,建議你也多使用lambda 表達(dá)式。

3. 使用 supplyAsync() 運行一個異步任務(wù)并且返回結(jié)果 當(dāng)任務(wù)不需要返回任何東西的時候, CompletableFuture.runAsync() 非常有用。但是如果你的后臺任務(wù)需要返回一些結(jié)果應(yīng)該要怎么樣?

CompletableFuture.supplyAsync() 就是你的選擇。它持有supplier<T> 并且返回CompletableFuture<T>T 是通過調(diào)用 傳入的supplier取得的值的類型。

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier<T> 是一個簡單的函數(shù)式接口,表示supplier的結(jié)果。它有一個get()方法,該方法可以寫入你的后臺任務(wù)中,并且返回結(jié)果。

你可以使用lambda表達(dá)式使得上面的示例更加簡明:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});

一個關(guān)于Executor 和Thread Pool筆記

CompletableFuture的方法如果以Async結(jié)尾,它會異步的執(zhí)行(沒有指定executor的情況下), 異步執(zhí)行通過ForkJoinPool實現(xiàn), 它使用守護(hù)線程去執(zhí)行任務(wù)。注意這是CompletableFuture的特性, 其它CompletionStage可以override這個默認(rèn)的行為。CompletableFuture API 的所有方法都有兩個變體一個接受Executor作為參數(shù),另一個不接受Executor

// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

創(chuàng)建一個線程池,并傳遞給其中一個方法:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

在 CompletableFuture 轉(zhuǎn)換和運行

CompletableFuture.get()方法是阻塞的。它會一直等到Future完成并且在完成后返回結(jié)果。 但是,這是我們想要的嗎?對于構(gòu)建異步系統(tǒng),我們應(yīng)該附上一個回調(diào)給CompletableFuture,當(dāng)Future完成的時候,自動的獲取結(jié)果。 如果我們不想等待結(jié)果返回,我們可以把需要等待Future完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。

可以使用 thenApply(), thenAccept()thenRun()方法附上一個回調(diào)給CompletableFuture。

1. thenApply() 可以使用 thenApply() 處理和改變CompletableFuture的結(jié)果。持有一個Function<R,T>作為參數(shù)。Function<R,T>是一個簡單的函數(shù)式接口,接受一個T類型的參數(shù),產(chǎn)出一個R類型的結(jié)果。

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

你也可以通過附加一系列的thenApply()在回調(diào)方法 在CompletableFuture寫一個連續(xù)的轉(zhuǎn)換。這樣的話,結(jié)果中的一個 thenApply方法就會傳遞給該系列的另外一個 thenApply方法。

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. thenAccept() 和 thenRun() 如果你不想從你的回調(diào)函數(shù)中返回任何東西,僅僅想在Future完成后運行一些代碼片段,你可以使用thenAccept()thenRun()方法,這些方法經(jīng)常在調(diào)用鏈的最末端的最后一個回調(diào)函數(shù)中使用。 CompletableFuture.thenAccept()持有一個Consumer<T>,返回一個CompletableFuture<Void>。它可以訪問CompletableFuture的結(jié)果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

雖然thenAccept()可以訪問CompletableFuture的結(jié)果,但thenRun()不能訪Future的結(jié)果,它持有一個Runnable返回CompletableFuture:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

異步回調(diào)方法的筆記 CompletableFuture提供的所有回調(diào)方法都有兩個變體: // thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 這些異步回調(diào)變體通過在獨立的線程中執(zhí)行回調(diào)任務(wù)幫助你進(jìn)一步執(zhí)行并行計算。 以下示例:

CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

在以上示例中,在thenApply()中的任務(wù)和在supplyAsync()中的任務(wù)執(zhí)行在相同的線程中。任何supplyAsync()立即執(zhí)行完成,那就是執(zhí)行在主線程中(嘗試刪除sleep測試下)。 為了控制執(zhí)行回調(diào)任務(wù)的線程,你可以使用異步回調(diào)。如果你使用thenApplyAsync()回調(diào),將從ForkJoinPool.commonPool()獲取不同的線程執(zhí)行。

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

此外,如果你傳入一個ExecutorthenApplyAsync()回調(diào)中,,任務(wù)將從Executor線程池獲取一個線程執(zhí)行。

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);

組合兩個CompletableFuture

1. 使用 thenCompose()組合兩個獨立的future 假設(shè)你想從一個遠(yuǎn)程API中獲取一個用戶的詳細(xì)信息,一旦用戶信息可用,你想從另外一個服務(wù)中獲取他的貸方。 考慮下以下兩個方法getUserDetail()getCreditRating()的實現(xiàn):

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    }); 
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

現(xiàn)在讓我們弄明白當(dāng)使用了thenApply()后是否會達(dá)到我們期望的結(jié)果-

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函數(shù)傳入thenApply將返回一個簡單的值,但是在本例中,將返回一個CompletableFuture。以上示例的最終結(jié)果是一個嵌套的CompletableFuture。 如果你想獲取最終的結(jié)果給最頂層future,使用 thenCompose()方法代替-

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

因此,規(guī)則就是-如果你的回調(diào)函數(shù)返回一個CompletableFuture,但是你想從CompletableFuture鏈中獲取一個直接合并后的結(jié)果,這時候你可以使用thenCompose()。

2. 使用thenCombine()組合兩個獨立的 future 雖然thenCompose()被用于當(dāng)一個future依賴另外一個future的時候用來組合兩個future。thenCombine()被用來當(dāng)兩個獨立的Future都完成的時候,用來做一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

當(dāng)兩個Future都完成的時候,傳給``thenCombine()的回調(diào)函數(shù)將被調(diào)用。

組合多個CompletableFuture

我們使用thenCompose()thenCombine()把兩個CompletableFuture組合在一起?,F(xiàn)在如果你想組合任意數(shù)量的CompletableFuture,應(yīng)該怎么做?我們可以使用以下兩個方法組合任意數(shù)量的CompletableFuture。

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

1. CompletableFuture.allOf() CompletableFuture.allOf的使用場景是當(dāng)你一個列表的獨立future,并且你想在它們都完成后并行的做一些事情。

假設(shè)你想下載一個網(wǎng)站的100個不同的頁面。你可以串行的做這個操作,但是這非常消耗時間。因此你想寫一個函數(shù),傳入一個頁面鏈接,返回一個CompletableFuture,異步的下載頁面內(nèi)容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
    });
} 

現(xiàn)在,當(dāng)所有的頁面已經(jīng)下載完畢,你想計算包含關(guān)鍵字CompletableFuture頁面的數(shù)量??梢允褂?code>CompletableFuture.allOf()達(dá)成目的。

List<String> webPageLinks = Arrays.asList(...)  // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的問題是它返回CompletableFuture。但是我們可以通過寫一些額外的代碼來獲取所有封裝的CompletableFuture結(jié)果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

花一些時間理解下以上代碼片段。當(dāng)所有future完成的時候,我們調(diào)用了future.join(),因此我們不會在任何地方阻塞。

join()方法和get()方法非常類似,這唯一不同的地方是如果最頂層的CompletableFuture完成的時候發(fā)生了異常,它會拋出一個未經(jīng)檢查的異常。

現(xiàn)在讓我們計算包含關(guān)鍵字頁面的數(shù)量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介紹的一樣,當(dāng)任何一個CompletableFuture完成的時候【相同的結(jié)果類型】,返回一個新的CompletableFuture。以下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,當(dāng)三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。因為future2的休眠時間最少,因此她最先完成,最終的結(jié)果將是future2的結(jié)果。

CompletableFuture.anyOf()傳入一個Future可變參數(shù),返回CompletableFuture。CompletableFuture.anyOf()的問題是如果你的CompletableFuture返回的結(jié)果是不同類型的,這時候你講會不知道你最終CompletableFuture是什么類型。

CompletableFuture 異常處理

我們探尋了怎樣創(chuàng)建CompletableFuture,轉(zhuǎn)換它們,并組合多個CompletableFuture?,F(xiàn)在讓我們弄明白當(dāng)發(fā)生錯誤的時候我們應(yīng)該怎么做。

首先讓我們明白在一個回調(diào)鏈中錯誤是怎么傳遞的。思考下以下回調(diào)鏈:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

如果在原始的supplyAsync()任務(wù)中發(fā)生一個錯誤,這時候沒有任何thenApply會被調(diào)用并且future將以一個異常結(jié)束。如果在第一個thenApply發(fā)生錯誤,這時候第二個和第三個將不會被調(diào)用,同樣的,future將以異常結(jié)束。

1. 使用 exceptionally() 回調(diào)處理異常 exceptionally()回調(diào)給你一個從原始Future中生成的錯誤恢復(fù)的機會。你可以在這里記錄這個異常并返回一個默認(rèn)值。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 

2. 使用 handle() 方法處理異常 API提供了一個更通用的方法 - handle()從異常恢復(fù),無論一個異常是否發(fā)生它都會被調(diào)用。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

如果異常發(fā)生,res參數(shù)將是 null,否則,ex將是 null。

參考文章:

https://juejin.im/post/5adbf8226fb9a07aac240a67

http://www.importnew.com/28319.html

completablefuture和parallel stream比較

如果你是I/O密集型的計算,可以使用自定義線程池,增加線程池大小,如果是CPU密集型計算,就不能增加比你的處理器個數(shù)再多的線程了,還是使用parallel stream比較好一些。

參考文章:
https://www.jdon.com/idea/java/java-8-completablefuture-vs-parallel-stream.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容