線程池:業(yè)務代碼常見的問題
在程序中,我們會使用各種池優(yōu)化緩存創(chuàng)建昂貴的對象,比如線程池、連接池、內(nèi)存池。一般是預先創(chuàng)建一些對象放入池中,使用的時候直接取出使用,用完歸還以便復用,還會通過一定策略調(diào)整池中緩存的對象數(shù)量,實現(xiàn)動態(tài)伸縮。
由于線程的創(chuàng)建比較昂貴,隨意、沒有控制地創(chuàng)建大量線程會造成性能問題,因此短平快的任務一般優(yōu)先考慮使用線程池來處理,而不是直接創(chuàng)建線程
1. 線程池的聲明需要手動進行
Java 中的 Executors 類定義了一些快捷的工具方法,來幫助我們快速創(chuàng)建線程池。《阿里 巴巴 Java
開發(fā)手冊》中提到,禁止使用這些方法來創(chuàng)建線程池,而應該手動 new ThreadPoolExecutor
來創(chuàng)建線程池。這一條規(guī)則的背后,是大量血淋淋的生產(chǎn)事故,最典 型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因為資源耗盡導致 OOM 問題。
阿里巴巴文檔:
測試 OOM問題 :
- 來初始化一個單線程的 FixedThreadPool,循環(huán) 1 億次向線程池提
交任務,每個任務都會創(chuàng)建一個比較大的字符串然后休眠一小時
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(100000);
System.out.println("開始執(zhí)行");
for (int i = 0; i < 100000000; i++) {
executorService.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString();
System.out.println("等待一小時開始");
try {
TimeUnit.HOURS.sleep(1);
}catch (Exception e){
log.info(payload);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1,TimeUnit.HOURS);
}
結(jié)果:java.lang.OutOfMemoryError 錯誤
首先我們看下 newFixedThreadPool 方法的源碼,發(fā)現(xiàn),線程池的工作隊列直接 new 了一個 LinkedBlockingQueue,
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
點進去查看 LinkedBlockingQueue構(gòu)造方法 是一個 Integer.MAX_VALUE長度的隊列,可以認為是無界的
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
雖然 newFixedThreadPool 可以把工作線程控制在固定的數(shù)量上,但任務隊列是無界但。如果任務較多并且執(zhí)行較慢但話,隊列可能會快速積壓,撐爆內(nèi)存導致OOM
測試newCachedThreadPool
如果我們把 newFixedThreadPool 改成 newCachedThreadPool方法來獲取線程池。程序運行不久后,同樣會看到 OOM 異常
java.lang.OutOfMemoryError: unable to create new native thread
源碼:
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
這種線程池的最大線程數(shù)是 Integer.MAX_VALUE ,認為是沒有上限的,可以認為是沒有上限的,而其工作隊列 SynchronousQueue 是一個沒有存儲空間的阻塞隊列。這意味著,只要有請求到來,就必須找到一條工作線程來處理,如果當前沒有空閑的線程就再創(chuàng)建一條新的。由于我們的任務需要一小時才能完成,大量的任務進來后會創(chuàng)建大量的線程,我們知道線程是分配一定的內(nèi)存空間做為線程棧,比如 1MB,因此無限創(chuàng)建線程必然會導致OOM
我們不建議使用 Executors 提供的兩種快捷的線程池,原因如下:
- 我們需要根據(jù)自己的場景、并發(fā)情況來評估線程池的幾個核心參數(shù),包括核心線程數(shù)、最大線程數(shù)、線程回收策略、工作隊列的類型,以及拒絕策略,確保線程池的工作行為符合要求,一般都需要設置有界的工作隊列和 可控的線程數(shù)。
- 任何時候,都于根偉自定義線程池指定有意思的名稱,以方便排查問題。當出現(xiàn)線程數(shù)量暴增、線程死鎖、線程占用大量CPU 、線程執(zhí)行出現(xiàn)異常等問題時,我們往往會抓取線程棧,此時,有意義的線程名稱,就可以方便我們定位問題。
總結(jié) 線程池工作行為:
- 如果當前線程池中的線程數(shù)目小于corePoolSize,則每來一個任務,就會創(chuàng)建一個線程去執(zhí)行這個任務;
- 如果當前線程池中的線程數(shù)目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執(zhí)行;若添加失?。ㄒ话銇碚f是任務緩存隊列已滿),則會嘗試創(chuàng)建新的線程去執(zhí)行這個任務;
- 如果隊列已經(jīng)滿了,則在總線程數(shù)不大于maximumPoolSize的前提下,則創(chuàng)建新的線程
- 如果當前線程池中的線程數(shù)目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
- 如果線程池中的線程數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
2.確認線程池是否在復用
- 在生產(chǎn)環(huán)境中,監(jiān)控一直報警當前使用線程數(shù)太多,一會又將下來,但是當前用戶訪問量也不是很大
通過代碼排查 發(fā)現(xiàn)項目中使用了 Executors.newCachedThreadPool(); 創(chuàng)建線程池使用,我們知道newCachedThreadPool 會在需要時創(chuàng)建必要多的線程,業(yè)務代碼的一次業(yè)務操作會向線程池提交多個慢任務,這樣執(zhí)行一次業(yè)務操作就會開啟多個線程,如果業(yè)務操作量較大,的確有可能一下子開啟幾千個線程
源碼發(fā)現(xiàn)
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 它的核心線程數(shù)是0,而最大線程數(shù) Integer的最大值,一般來說機器都沒那么大內(nèi)存給它不斷使用,而 keepAliveTime 是60秒,也就是在 60秒之后所有的線程都是可以回收的,采用SynchronousQueue裝等待的任務,這個阻塞隊列沒有存儲空間,這意味著只要有請求到來,就必須要找到一條工作線程處理他,如果當前沒有空閑的線程,那么就會再創(chuàng)建一條新的線程。
所以我們在使用線程池要根據(jù)任務的“輕重緩急”來指定線程池的核心參數(shù),包括線程數(shù)、回收策略和任務隊列:
: 1. 對于執(zhí)行比較慢、數(shù)量不大的 IO 任務,或許要考慮更多的線程數(shù),而不需要太大的隊
列。
: 2. 而對于吞吐量較大的計算型任務,線程數(shù)量不宜過多,可以是 CPU 核數(shù)或核數(shù) *2(理
由是,線程一定調(diào)度到某個 CPU 進行執(zhí)行,如果任務本身是 CPU 綁定的任務,那么過
多的線程只會增加線程切換的開銷,并不能提升吞吐量),但可能需要較長的隊列來做
緩沖。
個人博客地址:http://blog.yanxiaolong.cn/