組合式異步編程

背景

如果你想要在同一個CPU上執(zhí)行幾個松耦合的任務(wù),同時防止因某個任務(wù)等待過長而阻塞線程的執(zhí)行,那么你需要做的是充分利用CPU的核,讓其足夠忙碌,最大化程序的吞吐量從而實(shí)現(xiàn)并發(fā)。

并行與并發(fā)的區(qū)別:

并行:在同一個核上同時執(zhí)行多個任務(wù),任務(wù)不互相阻塞

并發(fā):多個任務(wù)分發(fā)給多個核去執(zhí)行

在java5中,已經(jīng)引入了Future接口方便開發(fā)人員進(jìn)行異步編程。由于其使用繁瑣,代碼復(fù)雜,不足以讓我們編寫簡介并發(fā)代碼,因此java8引入了CompletableFuture接口。

使用CompletableFuture構(gòu)建異步應(yīng)用

查詢商品價格的例子,假設(shè)獲取價格是一個遠(yuǎn)程服務(wù),我們使用sleep 1秒來模擬此行為。

public class Shop {
    private String name;
    public Double getPrice(String product){
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("MyFavoriteShop2"),
                new Shop("MyFavoriteShop3"),
                new Shop("MyFavoriteShop4"),
                new Shop("MyFavoriteShop5"),
                new Shop("MyFavoriteShop6"),
                new Shop("MyFavoriteShop7"),
                new Shop("BuyItAll"));
  • 情景一

試想此場景,我們需要根據(jù)某個商品名稱去查詢商品的價格,發(fā)貨地等等一系列操作,我們可能會寫出如下偽代碼:

操作A
shop.getPrice(product)
操作B
操作C
...

這些操作直接沒有什么關(guān)聯(lián)性,上述代碼中靠后的操作需要等待前面的操作執(zhí)行完之后才能執(zhí)行,造成了阻塞。

那么我們其實(shí)可以使用CompletableFuture來實(shí)現(xiàn)異步執(zhí)行,下面的代碼中每個操作都不需要等待前面的操作便能執(zhí)行。

操作A
CompletableFuture<Double> completableFuture 
                            = CompletableFuture.supplyAsync(() -> shop.getPrice(product));
操作B
操作C
...
Double d = completableFuture.get();
  • 情景二

    如果給定一個product和一個List<Shop>,想要獲取所有shop中對此product的定價,該如何實(shí)現(xiàn)?

    我們已經(jīng)知道流的使用,按照常規(guī)思路,寫出下列代碼應(yīng)該不難

    List<Double> list = 
          shops.stream()
          .map(
              (Shop s) -> s.getPrice(product)
          )
          .collect(Collectors.toList());
    

    但是我們可不可以把map中獲取價格的代碼實(shí)現(xiàn)異步執(zhí)行呢?答案當(dāng)然是可以的。

    其中一種操作是將流轉(zhuǎn)為并行流,這里我們使用另一種方式:

    List<CompletableFuture<Double>> list 
              = shops.stream()
                              .map(
                              (Shop s) -> CompletableFuture.supplyAsync(
                                                  () -> s.getPrice(product)
                                                  )
                              )
                              .collect(Collectors.toList());
    List<Double> list2    
          =list.stream()
                      .map(CompletableFuture::join).collect(Collectors.toList());
    
  • 情景三

    如果你試過情景三種的實(shí)現(xiàn)方式后,你會發(fā)現(xiàn)其執(zhí)行速度并沒有多少提升。那么有沒有方法能夠讓他更快點(diǎn)呢?我們可以通過調(diào)整線程池的大小,確保整體的計算不會因?yàn)榫€程都在等待I/O而發(fā)生阻塞。

    List中有10個shop,我們可以調(diào)整線程池大小為10個。

    final Executor executor =
                    Executors.newFixedThreadPool(11,
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    Thread t = new Thread(r);
                                    t.setDaemon(true);
                                    return t;
                                }
                            });
    // CompletableFuture.supplyAsync()方法可以設(shè)置第二個參數(shù)            
    CompletableFuture.supplyAsync(
                                                  () -> s.getPrice(product),executor
                                                  )             
    
    • 情景四

      對兩個異步操作進(jìn)行流水線,第一個操作完成時,將其 結(jié)果作為參數(shù)傳遞給第二個操作。使用thenCompose連接。

      List<CompletableFuture<String>> list 
              = shops.stream()
              .map(
                          (Shop s) -> CompletableFuture.supplyAsync(
                                                                          () -> s.getPrice(product), executor
                                                                          )
              )
              .map(c -> c.thenCompose(
                      (Double d) -> CompletableFuture.supplyAsync(
                                                      () ->d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)))
              ))
              .collect(Collectors.toList());
      
    • 情景五

      將兩個完全不相干的CompletableFuture對象的結(jié)果整合起來,而且你也不希望等到第一個任務(wù)完全結(jié)束才開始第二項任務(wù) 。使用thenCombine連接

      List<CompletableFuture<String>> list = shops.stream()
              .map(s ->
                      CompletableFuture.supplyAsync(() -> s.getPrice(product), executor)
                              .thenCombine(
                                      CompletableFuture.supplyAsync(
                                              () -> new Random().nextInt(10)
                                      ), (d, c) -> d + "-----" + c
      
                              ))
              .collect(Collectors.toList());
      
  • 情景六

    響應(yīng)CompletableFuture的completion事件

    一旦CompletableFuture計算得到結(jié)果,就得到一個相應(yīng)。那么可以使用thenAccept

    Stream<CompletableFuture<String>> list = shops.stream()
                    .map((Shop s) -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor))
                    .map(c -> c.thenCompose(
                            (Double d) -> CompletableFuture.supplyAsync(() -> d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)),executor)
                    ));
    
            list.map(c->c.thenAccept(System.out::println));
    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容