CompletableFuture原理與實(shí)踐

準(zhǔn)備

  1. 保證寫入時(shí),線程安全的List和Set結(jié)構(gòu)?
    ConcurrentHashMap是線程安全的HashMap,CopyOnWriteArrayList是線程安全的ArrayList。
    CopyOnWriteArraySet是線程安全的HashSet。

  2. 考慮多線程處理任務(wù)點(diǎn)?

任務(wù)的類型: 計(jì)算能力復(fù)雜,IO操作;

任務(wù)是否異步: 同步,異步

每個(gè)子線程是否有依賴關(guān)系:有,沒有 (例如:使用多線程處理 從1累加到1萬(wàn))

如果是多個(gè)任務(wù),每個(gè)任務(wù)使用多線程處理,主線程需要等待子線程任務(wù)執(zhí)行完畢之后在進(jìn)行執(zhí)行: 在沒有CompletableFuture 使用CountDownLatch解決
https://blog.csdn.net/qq_38599840/article/details/120708245

使用CompletableFuture處理依賴任務(wù)

  1. 線程池的方式并行獲取數(shù)據(jù)弊端?
  • CPU資源大量浪費(fèi)在阻塞等待上,導(dǎo)致CPU資源利用率低。在Java 8之前,一般會(huì)通過回調(diào)的方式來(lái)減少阻塞,但是大量使用回調(diào),又引發(fā)臭名昭著的回調(diào)地獄問題,導(dǎo)致代碼可讀性和可維護(hù)性大大降低。

  • 為了增加并發(fā)度,會(huì)引入更多額外的線程池,隨著CPU調(diào)度線程數(shù)的增加,會(huì)導(dǎo)致更嚴(yán)重的資源爭(zhēng)用,寶貴的CPU資源被損耗在上下文切換上,而且線程本身也會(huì)占用系統(tǒng)資源,且不能無(wú)限增加。
    同步模型下,會(huì)導(dǎo)致硬件資源無(wú)法充分利用,系統(tǒng)吞吐量容易達(dá)到瓶頸。

  1. 保證了多線程寫入時(shí)安全,咋樣使用什么樣的多線程呢?
    使用executor.submit(() -> {})處理RPC任務(wù);
    使用future處理每一個(gè)子線程是否 終端還是跳過 邏輯
public class Test3 {
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,

            new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
            .setNameFormat("coustomThread %d")
            .setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕獲到:" +t.getName()+"發(fā)生異常:"+e.getMessage()))
            .build());

    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);//線程不安全List
        List<String> test = test(list);
        System.out.println("主線程:" + Thread.currentThread().getName() + ":"  + test);

    }

    public static List<String> test(List<Integer> list) {
        List<String> safeList = new CopyOnWriteArrayList();//線程安全的list

        list.stream().map(i -> executor.submit(() -> {
            //模擬rpc
            try {
                Thread.sleep(100);
                if(i == 3) {
                    int j = i / 0;
                }
                safeList.add(i + "A");
                System.out.println("子線程:" + Thread.currentThread().getName() + "返回參數(shù):" + i + "A");
                return i + "A";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toList()).forEach(future -> {
            try {
                if (future != null) {
                    Object o = future.get();
//                    System.out.println("子線程:" + Thread.currentThread().getName() + "返回參數(shù):" + o);
                }
            } catch (InterruptedException e) {
                //如果不拋出異常,那么線程執(zhí)行不會(huì)終端;反之,如果拋出異常,則線程中斷
//                throw new RuntimeException(e);
            } catch (ExecutionException e) {
//                throw new RuntimeException(e);
            }
        });

        System.out.println(123);
        return safeList;

    }

``

4. 使用CountDownLatch主線程需要等待子線程任務(wù)執(zhí)行完畢之后在進(jìn)行執(zhí)行?

```java

public class Test4 {
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,

            new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
            .setNameFormat("coustomThread %d")
            .setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕獲到:" +t.getName()+"發(fā)生異常:"+e.getMessage()))
            .build());

    public static void main(String[] args) throws Exception{
        System.out.println("主線程開始:" + Thread.currentThread().getName());
        CountDownLatch downLatch = new CountDownLatch(2);

        //任務(wù)1
        AtomicReference<String> q1 = new AtomicReference<>("");//線程安全
        Future<String> future1 = executor.submit(() -> {
            try {
                Thread.sleep(1500);
                q1.set("任務(wù)1");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
//            downLatch.countDown();
            return "任務(wù)1";

        });

        //任務(wù)2
        AtomicReference<String> q2 = new AtomicReference<>("");//線程安全
        Future<String> future2 = executor.submit(() -> {
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            q2.set("任務(wù)2" + q1.get());
//            downLatch.countDown();
            return "任務(wù)2" + q1.get();

        });
//        downLatch.await();

        Thread.sleep(600);

        System.out.println("獲取任務(wù)1返回:" + future1.get());

        System.out.println("獲取任務(wù)2返回:" + future2.get());

        System.out.println("主線程結(jié)束:" + Thread.currentThread().getName());



    }

    /*
    *   主線程開始:main
        獲取任務(wù)1返回:任務(wù)1
        獲取任務(wù)2返回:任務(wù)2任務(wù)1
        主線程結(jié)束:main
    * */


}

CompletableFuture

實(shí)例

 public void testCompletableInfo() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
  
          //調(diào)用用戶服務(wù)獲取用戶基本信息
          CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
                  //模擬查詢商品耗時(shí)500毫秒
          {
              try {
                  Thread.sleep(500);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return "用戶A";
          });
  
          //調(diào)用商品服務(wù)獲取商品基本信息
          CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                  //模擬查詢商品耗時(shí)500毫秒
          {
              try {
                  Thread.sleep(400);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              return "商品A";
          });
  
          System.out.println("獲取用戶信息:" + userFuture.get());
          System.out.println("獲取商品信息:" + goodsFuture.get());
  
          //模擬主程序耗時(shí)時(shí)間
          Thread.sleep(600);
          System.out.println("總共用時(shí)" + (System.currentTimeMillis() - startTime) + "ms");
    }

相關(guān)的方法使用


public class TestFuture1 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        //自定義線程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        //cf1 cf2是0依賴
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "cf1", executor);
        CompletableFuture<String> cf2 = CompletableFuture.completedFuture("cf2");

        //CF3,CF5分別依賴于CF1和CF2,一元依賴
        CompletableFuture<String> cf3 = cf1.thenApply(res1 -> "cf3");
        CompletableFuture<String> cf5 = cf2.thenApply(res2 -> "cf5");

        //cf4 依賴 cf1和cf2, 二元依賴
        CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (res1, res2) -> "cf4");

        //cf6 依賴 cf3, cf4, cf5, 多元依賴
        CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);

        //最終結(jié)果
        CompletableFuture<String> result = cf6.thenApply(v -> "cf6");

//        //模擬主程序耗時(shí)時(shí)間
//        Thread.sleep(600);
//        System.out.println("總共用時(shí)" + (System.currentTimeMillis() - startTime) + "ms");
    }


}


注意事項(xiàng)

  1. fulture需要返回值,才能獲取異常信息
    Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。

小伙伴們使用的時(shí)候,注意一下哈,考慮是否加try...catch...或者使用exceptionally方法。

  1. CompletableFuture.get()方法是阻塞的(CompletableFuture.get(5, TimeUnit.SECONDS);)
    CompletableFuture的get()方法是阻塞的,如果使用它來(lái)獲取異步調(diào)用的返回值,需要添加超時(shí)時(shí)間。

  2. 不建議使用默認(rèn)的線程池(ForkJoinPool中的共用線程池CommonPool(CommonPool的大小是CPU核數(shù)-1,如果是IO密集的應(yīng)用,線程數(shù)可能成為瓶頸)。)

CompletableFuture代碼中又使用了默認(rèn)的「ForkJoin線程池」,處理的線程個(gè)數(shù)是電腦「CPU核數(shù)-1」。在大量請(qǐng)求過來(lái)的時(shí)候,處理邏輯復(fù)雜的話,響應(yīng)會(huì)很慢。一般建議使用自定義線程池,優(yōu)化線程池配置參數(shù);
前面提到,異步回調(diào)方法可以選擇是否傳遞線程池參數(shù)Executor,這里我們建議強(qiáng)制傳線程池,且根據(jù)實(shí)際情況做線程池隔離。

當(dāng)不傳遞線程池時(shí),會(huì)使用ForkJoinPool中的公共線程池CommonPool,這里所有調(diào)用將共用該線程池,核心線程數(shù)=處理器數(shù)量-1(單核核心線程數(shù)為1),所有異步回調(diào)都會(huì)共用該CommonPool,核心與非核心業(yè)務(wù)都競(jìng)爭(zhēng)同一個(gè)池中的線程,很容易成為系統(tǒng)瓶頸。手動(dòng)傳遞線程池參數(shù)可以更方便的調(diào)節(jié)參數(shù),并且可以給不同的業(yè)務(wù)分配不同的線程池,以求資源隔離,減少不同業(yè)務(wù)之間的相互干擾。

  1. 自定義線程池,注意飽和策略
    CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。

但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當(dāng)線程池飽和時(shí),會(huì)直接丟棄任務(wù),不會(huì)拋棄異常。因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時(shí)的異步線程,做好線程池隔離哈。

https://mp.weixin.qq.com/s?__biz=MzIzNjg4OTcyNA==&mid=2247484855&idx=1&sn=ad340b3fe63ab30d2e3e001c0f6edb87&scene=21&key=02eedae091b155c60a961e782b2307ab64d986af0bdcad1a803e2a12d697dd24740aaee860a4fa0fad55cef1a1e488a18bc593982f805ebb39a819488a5896ecbf4198d00f973900948cf2450fe53b502954d43ccf30279c71a744e39e7e6559f5fcc141b7f913c23e5e371fac0c1905fce64eab8c7ee55cb1f3f68436a26cee&ascene=14&uin=NzYxNzQxNTIx&devicetype=Windows+10+x64&version=6308011a&lang=zh_CN&exportkey=n_ChQIAhIQu9%2FTeSfvf6HU%2FTFANi4G7BL0AQIE97dBBAEAAAAAAOHfOF8YvT4AAAAOpnltbLcz9gKNyK89dVj0fLR5hkgYPSrWBvYpc1TcPAiVSvNkWX%2FSHdP%2FD9K%2BTEsBVfqk%2B0oZOhMs%2FN9hSBM77%2FiQZZ%2BphmkyMVh4hMNPlp8%2B%2BpmMqsXyA9jy%2BbvS9EYRHMNvj1UNHI0%2FjCL26xTGiPvunos1SXLWxiYCO47rLZOT%2FwFIMFickER5%2B9%2B15bZdDPLJrUP1vPI%2BDpwWZzckwFZiYtElitvSZd2tTe0k0Adc92mbaS34HH4clKUGLFXomPHfSV5cMf9fHayGoHu%2BTJAWjW9QNM8FoiFIVQk%3D&acctmode=0&pass_ticket=%2BfzqdTXdyhHUndM9%2F8BTisVSSsPUfiRdNFUYSuosHmHEOb2SSG2ljnJnKZfSy7vL3E1zqV87pHg%2BAagXuPt85w%3D%3D&wx_header=1&fontgear=2

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容