ExecutorService

ExecutorService的submit和execute

ExecuteService代表的是Executors創(chuàng)建的線程池
submit提交的是Callable方法,返回Future,說明submit是有返回值的
execute執(zhí)行的是Runnable方法,沒有返回值
所以submit和execute的區(qū)別是提交的方法和是否有返回值

ExecutorService的shutdown,shutdownNow,awaitTermination

flume中的關(guān)閉源碼

  public void stop() {
    LOGGER.info("Configuration provider stopping");

    executorService.shutdown();
    try {
      if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
        LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
        executorService.shutdownNow();
        while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
          LOGGER.debug("Waiting for file watcher to terminate");
        }
      }
    } catch (InterruptedException e) {
      LOGGER.debug("Interrupted while waiting for file watcher to terminate");
      Thread.currentThread().interrupt();
    }
    lifecycleState = LifecycleState.STOP;
    LOGGER.debug("Configuration provider stopped");
  }

shutdown方法:平滑的關(guān)閉ExecutorService,當此方法被調(diào)用時,ExecutorService停止接收新的任務(wù)并且等待已經(jīng)提交的任務(wù)(包含提交正在執(zhí)行和提交未執(zhí)行)執(zhí)行完成。當所有提交任務(wù)執(zhí)行完畢,線程池即被關(guān)閉。
awaitTermination方法:接收人timeout和TimeUnit兩個參數(shù),用于設(shè)定超時時間及單位。當?shù)却^設(shè)定時間時,會監(jiān)測ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。

  • 場景
    應(yīng)用場景為線程池的有效執(zhí)行時間為20S,20S之后不管子任務(wù)有沒有執(zhí)行完畢,都要關(guān)閉線程池。代碼如下
ExecutorService es = Executors.newFixedThreadPool(10);        
es.execute(new Thread());//執(zhí)行子線程任務(wù)     
   try {    
    es.shutdown();  
        if(!es.awaitTermination(20,TimeUnit.SECONDS)){//20S 
       System.out.println(" 到達指定時間,還有線程沒執(zhí)行完,不再等待,關(guān)閉線程池!");      
   es.shutdownNow();    
    }
    } catch (Throwable e) {     // TODO Auto-generated catch block  
    es.shutdownNow();   
    e.printStackTrace();
    }

awaitTermination方法調(diào)用會被阻塞,直到所有任務(wù)執(zhí)行完畢并且shutdown請求被調(diào)用,或者參數(shù)中定義的timeout時間到達或者當前線程被打斷,這幾種情況任意一個發(fā)生了就會導(dǎo)致該方法的執(zhí)行。
當我們調(diào)用pool.awaitTermination時,首先該方法會被阻塞,這時會執(zhí)行子線程中的任務(wù),子線程執(zhí)行完畢后該方法仍然會被阻塞,因為shutdown()方法還未被調(diào)用,而代碼中將shutdown的請求放在了awaitTermination之后,這樣就導(dǎo)致了只有awaitTermination方法執(zhí)行完畢后才會執(zhí)行shutdown請求,這樣就造成了死鎖。
shutdown的請求一定要放在awaitTermination之前

ExecuteService執(zhí)行任務(wù)的異常處理

https://www.cnblogs.com/langtianya/p/4520373.html

下面這段代碼執(zhí)行的結(jié)果是什么?
executorService.submit(() -> {
    System.out.println(1 / 0);
});
我被它坑過無數(shù)回了:它什么也不會輸出。沒有任何的java.lang.ArithmeticException: / by zero的征兆,啥也沒有。線程池會把這個異常吞掉,就像什么也沒發(fā)生過一樣。如果是你自己創(chuàng)建的java.lang.Thread還好,這樣 UncaughtExceptionHandler 還能起作用。不過如果是線程池的話你就得小心了。如果你提交的是Runnable對象的話(就像上面那個一樣,沒有返回值),你得將整個方法體用try- catch包起來,至少打印一下異常。如果你提交的是Callable<Integer>的話,得確保你在用get()方法取值的時候重新拋 出異常:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();

監(jiān)控隊列長度,確保隊列有界

不當?shù)木€程池大小會使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊列會持續(xù)變大,消耗過多內(nèi)存。而過多的線程又會 由于頻繁的上下文切換導(dǎo)致整個系統(tǒng)的速度變緩——殊途而同歸。隊列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請 求:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
上面的代碼等價于Executors.newFixedThreadPool(n),然而不同的是默認的實現(xiàn)是一個無界的 LinkedBlockingQueue。這里我們用的是一個固定100大小的ArrayBlockingQueue。也就是說如果已經(jīng)有100個任務(wù)在 隊列中了(還有N個在執(zhí)行中),新的任務(wù)就會被拒絕掉,并拋出RejectedExecutionException異常。由于這里的隊列是在外部聲明 的,我們還可以時不時地調(diào)用下它的size()方法來將隊列大小記錄在到日志/JMX/或者你所使用的監(jiān)控系統(tǒng)中。

Executors.newCacheThreadPool線程和new ThreadPoolExecutor的區(qū)別和用法

Executors.去創(chuàng)建線程

五種線程池的適應(yīng)場景

  • newCachedThreadPool:用來創(chuàng)建一個可以無限擴大的線程池,適用于服務(wù)器負載較輕,執(zhí)行很多短期異步任務(wù)。
  • newFixedThreadPool:創(chuàng)建一個固定大小的線程池,因為采用無界的阻塞隊列,所以實際線程數(shù)量永遠不會變化,適用于可以預(yù)測線程數(shù)量的業(yè)務(wù)中,或者服務(wù)器負載較重,對當前線程數(shù)量進行限制。
  • newSingleThreadExecutor:創(chuàng)建一個單線程的線程池,適用于需要保證順序執(zhí)行各個任務(wù),并且在任意時間點,不會有多個線程是活動的場景。
  • newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用于需要多個后臺線程執(zhí)行周期任務(wù)的場景。
  • newWorkStealingPool:創(chuàng)建一個擁有多個任務(wù)隊列的線程池,可以減少連接數(shù),創(chuàng)建當前可用cpu數(shù)量的線程來并行執(zhí)行,適用于大耗時的操作,可以并行來執(zhí)行
為什么Executors創(chuàng)建線程不安全

Java中的BlockingQueue主要有兩種實現(xiàn),分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個用數(shù)組實現(xiàn)的有界阻塞隊列,必須設(shè)置容量。
LinkedBlockingQueue是一個用鏈表實現(xiàn)的有界阻塞隊列,容量可以選擇進行設(shè)置,不設(shè)置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。
這里的問題就出在:不設(shè)置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設(shè)置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。
而newFixedThreadPool中創(chuàng)建LinkedBlockingQueue時,并未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對于一個無邊界隊列來說,是可以不斷的向隊列中加入任務(wù)的,這種情況下就有可能因為任務(wù)過多而導(dǎo)致內(nèi)存溢出問題。
上面提到的問題主要體現(xiàn)在newFixedThreadPool和newSingleThreadExecutor兩個工廠方法上,并不是說newCachedThreadPool和newScheduledThreadPool這兩個方法就安全了,這兩種方式創(chuàng)建的最大線程數(shù)可能是Integer.MAX_VALUE,而創(chuàng)建這么多線程,必然就有可能導(dǎo)致OOM。

自定義創(chuàng)建線程池
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));
使用guava的ThreadFactoryBuilder
public class ExecutorsDemo {
    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();
    private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            pool.execute(new SubThread());
        }
    }
}
positionWriter = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactoryBuilder().setNameFormat("positionWriter").build());

other

。。。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容