ExecutorService的submit和execute
ExecuteService代表的是Executors創(chuàng)建的線程池
submit提交的是Callable方法,返回Future,說明submit是有返回值的
execute執(zhí)行的是Runnable方法,沒有返回值
所以submit和execute的區(qū)別是提交的方法和是否有返回值
ExecutorService的shutdown,shutdownNow,awaitTermination
flume中的關(guān)閉源碼
public void stop() {
LOGGER.info("Configuration provider stopping");
executorService.shutdown();
try {
if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
executorService.shutdownNow();
while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for file watcher to terminate");
}
}
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for file watcher to terminate");
Thread.currentThread().interrupt();
}
lifecycleState = LifecycleState.STOP;
LOGGER.debug("Configuration provider stopped");
}
shutdown方法:平滑的關(guān)閉ExecutorService,當此方法被調(diào)用時,ExecutorService停止接收新的任務(wù)并且等待已經(jīng)提交的任務(wù)(包含提交正在執(zhí)行和提交未執(zhí)行)執(zhí)行完成。當所有提交任務(wù)執(zhí)行完畢,線程池即被關(guān)閉。
awaitTermination方法:接收人timeout和TimeUnit兩個參數(shù),用于設(shè)定超時時間及單位。當?shù)却^設(shè)定時間時,會監(jiān)測ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。
- 場景
應(yīng)用場景為線程池的有效執(zhí)行時間為20S,20S之后不管子任務(wù)有沒有執(zhí)行完畢,都要關(guān)閉線程池。代碼如下
ExecutorService es = Executors.newFixedThreadPool(10);
es.execute(new Thread());//執(zhí)行子線程任務(wù)
try {
es.shutdown();
if(!es.awaitTermination(20,TimeUnit.SECONDS)){//20S
System.out.println(" 到達指定時間,還有線程沒執(zhí)行完,不再等待,關(guān)閉線程池!");
es.shutdownNow();
}
} catch (Throwable e) { // TODO Auto-generated catch block
es.shutdownNow();
e.printStackTrace();
}
awaitTermination方法調(diào)用會被阻塞,直到所有任務(wù)執(zhí)行完畢并且shutdown請求被調(diào)用,或者參數(shù)中定義的timeout時間到達或者當前線程被打斷,這幾種情況任意一個發(fā)生了就會導(dǎo)致該方法的執(zhí)行。
當我們調(diào)用pool.awaitTermination時,首先該方法會被阻塞,這時會執(zhí)行子線程中的任務(wù),子線程執(zhí)行完畢后該方法仍然會被阻塞,因為shutdown()方法還未被調(diào)用,而代碼中將shutdown的請求放在了awaitTermination之后,這樣就導(dǎo)致了只有awaitTermination方法執(zhí)行完畢后才會執(zhí)行shutdown請求,這樣就造成了死鎖。
shutdown的請求一定要放在awaitTermination之前
ExecuteService執(zhí)行任務(wù)的異常處理
https://www.cnblogs.com/langtianya/p/4520373.html
下面這段代碼執(zhí)行的結(jié)果是什么?
executorService.submit(() -> {
System.out.println(1 / 0);
});
我被它坑過無數(shù)回了:它什么也不會輸出。沒有任何的java.lang.ArithmeticException: / by zero的征兆,啥也沒有。線程池會把這個異常吞掉,就像什么也沒發(fā)生過一樣。如果是你自己創(chuàng)建的java.lang.Thread還好,這樣 UncaughtExceptionHandler 還能起作用。不過如果是線程池的話你就得小心了。如果你提交的是Runnable對象的話(就像上面那個一樣,沒有返回值),你得將整個方法體用try- catch包起來,至少打印一下異常。如果你提交的是Callable<Integer>的話,得確保你在用get()方法取值的時候重新拋 出異常:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();
監(jiān)控隊列長度,確保隊列有界
不當?shù)木€程池大小會使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊列會持續(xù)變大,消耗過多內(nèi)存。而過多的線程又會 由于頻繁的上下文切換導(dǎo)致整個系統(tǒng)的速度變緩——殊途而同歸。隊列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負了它可以暫時拒絕掉新的請 求:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
上面的代碼等價于Executors.newFixedThreadPool(n),然而不同的是默認的實現(xiàn)是一個無界的 LinkedBlockingQueue。這里我們用的是一個固定100大小的ArrayBlockingQueue。也就是說如果已經(jīng)有100個任務(wù)在 隊列中了(還有N個在執(zhí)行中),新的任務(wù)就會被拒絕掉,并拋出RejectedExecutionException異常。由于這里的隊列是在外部聲明 的,我們還可以時不時地調(diào)用下它的size()方法來將隊列大小記錄在到日志/JMX/或者你所使用的監(jiān)控系統(tǒng)中。
Executors.newCacheThreadPool線程和new ThreadPoolExecutor的區(qū)別和用法
Executors.去創(chuàng)建線程
五種線程池的適應(yīng)場景
- newCachedThreadPool:用來創(chuàng)建一個可以無限擴大的線程池,適用于服務(wù)器負載較輕,執(zhí)行很多短期異步任務(wù)。
- newFixedThreadPool:創(chuàng)建一個固定大小的線程池,因為采用無界的阻塞隊列,所以實際線程數(shù)量永遠不會變化,適用于可以預(yù)測線程數(shù)量的業(yè)務(wù)中,或者服務(wù)器負載較重,對當前線程數(shù)量進行限制。
- newSingleThreadExecutor:創(chuàng)建一個單線程的線程池,適用于需要保證順序執(zhí)行各個任務(wù),并且在任意時間點,不會有多個線程是活動的場景。
- newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用于需要多個后臺線程執(zhí)行周期任務(wù)的場景。
- newWorkStealingPool:創(chuàng)建一個擁有多個任務(wù)隊列的線程池,可以減少連接數(shù),創(chuàng)建當前可用cpu數(shù)量的線程來并行執(zhí)行,適用于大耗時的操作,可以并行來執(zhí)行
為什么Executors創(chuàng)建線程不安全
Java中的BlockingQueue主要有兩種實現(xiàn),分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個用數(shù)組實現(xiàn)的有界阻塞隊列,必須設(shè)置容量。
LinkedBlockingQueue是一個用鏈表實現(xiàn)的有界阻塞隊列,容量可以選擇進行設(shè)置,不設(shè)置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。
這里的問題就出在:不設(shè)置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設(shè)置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。
而newFixedThreadPool中創(chuàng)建LinkedBlockingQueue時,并未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對于一個無邊界隊列來說,是可以不斷的向隊列中加入任務(wù)的,這種情況下就有可能因為任務(wù)過多而導(dǎo)致內(nèi)存溢出問題。
上面提到的問題主要體現(xiàn)在newFixedThreadPool和newSingleThreadExecutor兩個工廠方法上,并不是說newCachedThreadPool和newScheduledThreadPool這兩個方法就安全了,這兩種方式創(chuàng)建的最大線程數(shù)可能是Integer.MAX_VALUE,而創(chuàng)建這么多線程,必然就有可能導(dǎo)致OOM。
自定義創(chuàng)建線程池
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
使用guava的ThreadFactoryBuilder
public class ExecutorsDemo {
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
pool.execute(new SubThread());
}
}
}
positionWriter = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
other
。。。