前言
Java中的線程池用過吧?來說說你是怎么使用線程池的???
這句話在面試過程中遇到過好幾次了。我甚至這次標(biāo)題都想寫成【Java八股文之線程池】,但是有點(diǎn)太俗套了。雖然,線程池是一個已經(jīng)被說爛的知識點(diǎn)了,但是還是要寫這一篇用來加深自己的印象,但是想使用一個特殊的方式寫出來。
01 線程池
1.1 使用線程池的目的
先說一下我們?yōu)槭裁匆褂镁€程池?
- 線程是稀缺資源,不能頻繁的創(chuàng)建。而且創(chuàng)建和銷毀線程也是比較占用系統(tǒng)開銷的。
- 為了做到解耦,線程的創(chuàng)建與執(zhí)行任務(wù)分開,方便對線程進(jìn)行維護(hù)。
- 為了復(fù)用,前面也說了創(chuàng)建和銷毀線程比較耗系統(tǒng)開銷,那么創(chuàng)建出來線程放到一個池子里,可以給其他任務(wù)進(jìn)行復(fù)用。
1.2 線程池是如何一步一步創(chuàng)建的
- 第一版
正常的我們在創(chuàng)建一個線程去執(zhí)行任務(wù)的時候是這樣的:
new Thread(r).start();
但是這是最基本的方式,我們的項(xiàng)目中有可能很多地方都需要創(chuàng)建一個新的線程。這個使用為了減少重復(fù)代碼,我們會把這段創(chuàng)建線程的代碼<typo id="typo-452" data-origin="放的" ignoretag="true">放的</typo>一個工具類里面,然后對外提供工具方法,使用的時候直接調(diào)用此方法即可。
- 第二版
/**
* 先定義接口(任務(wù)執(zhí)行器)
*/
public interface Executor {
/**
* 執(zhí)行任務(wù)
* @param runnable 線程任務(wù)
*/
void execute(Runnable runnable);
}
/**
* 實(shí)現(xiàn):直接創(chuàng)建線程。
*/
class ExecutorImpl implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
這種方式實(shí)現(xiàn)了創(chuàng)建線程的代碼的復(fù)用,但是并沒有實(shí)現(xiàn)線程資源的復(fù)用,當(dāng)有1000個地方需要線程的時候,會創(chuàng)建1000個線程。
- 第三版
為了實(shí)現(xiàn)資源也復(fù)用,增加一個阻塞隊(duì)列,當(dāng)來了創(chuàng)建線程的任務(wù)的時候,先放到隊(duì)列里,然后再用一個線程(Worker),來處理任務(wù)。這樣就完成了線程資源的復(fù)用了,全程只有一個線程在來回的復(fù)用,一直在處理隊(duì)列中的任務(wù)。

通過上面的方式,實(shí)現(xiàn)了線程資源的復(fù)用,并且也起到提交任務(wù)和處理任務(wù)之間的解耦。但是只有一個線程處理任務(wù),會有瓶頸的,所以具體需要多少線程來處理任務(wù)最好是根據(jù)具體的業(yè)務(wù)場景來確定,這樣我們把這個值,設(shè)置成一個參數(shù),當(dāng)創(chuàng)建線程池的時候傳入,就叫corePoolSize吧。
而且任務(wù)隊(duì)列最好也要有容量,但也應(yīng)該是根據(jù)業(yè)務(wù)場景來配置容量,而且任務(wù)隊(duì)列還可以定制一些規(guī)則,例如:按照一定的規(guī)則出隊(duì)。所以我們把任務(wù)隊(duì)列也配置成參數(shù),在創(chuàng)建線程池的時候傳入。參數(shù)名稱就叫:workQueue吧。
當(dāng)隊(duì)列中任務(wù)滿了之后,任務(wù)就會被拋棄,但是如果是重要業(yè)務(wù)任務(wù),還不能拋棄,所以,當(dāng)隊(duì)列中任務(wù)滿了之后,在線程池沒有資源處理任務(wù)的時候,拒絕策略,我們也根據(jù)業(yè)務(wù)場景來確定,這樣也在創(chuàng)建的時候傳入一種拒絕策略,參數(shù)名稱就叫:rejectedExecutionHandler。
- 繼續(xù)優(yōu)化
雖然多了上面的三個參數(shù)后效果優(yōu)化了不少,但是還可以繼續(xù)優(yōu)化:
并不用上來就創(chuàng)建corePoolSize數(shù)量的線程,我們可以增加了一個變量workCount,來記錄已經(jīng)創(chuàng)建出來了工作線程,這樣在初始化的時候只有workCount<corePoolSize的時候,我們才創(chuàng)建線程來執(zhí)行任務(wù),當(dāng)workCount>CorePoolSize的時候,再來了任務(wù),就去進(jìn)隊(duì)列。
在增加拒絕策略的時候,我定義一個接口:RejectedExecutionHandler,然后使用者可以自己去實(shí)現(xiàn)這個接口,來完成自己的拒絕策略。
增加一個線程工廠的<typo id="typo-1623" data-origin="入?yún)? ignoretag="true">入?yún)?lt;/typo>:ThreadFactory,這樣保證每次創(chuàng)建線程的時候不用手動去創(chuàng)建線程了,而是通過ThreadFactory來獲取線程,并且也可以增加一些線程的標(biāo)識。
第四版
雖然說第三版的線程池已經(jīng)可以應(yīng)對日常工作中的情況了,但是還是不夠有彈性,所謂的彈性就是指,在任務(wù)提交頻繁時應(yīng)該處理能力提高,任務(wù)提交不頻繁時處理能力應(yīng)該降低。
上面這版線程池就不夠彈性。
如果某個時間段,任務(wù)提交量劇增,這個時候,corePoolSize和隊(duì)列都滿了,再來提交任務(wù)就只能走拒絕策略了。
你或許會想到,那我可以增大corePoolSize的值,這樣就會創(chuàng)建出來更多的線程來處理任務(wù),但是這個任務(wù)提交量劇增,只是某個時間段,過了這個時間段之后,創(chuàng)建出來這么多的線程,可以大部分都會是空閑的狀態(tài)。這樣也是浪費(fèi)資源了。
這樣就導(dǎo)致了一個兩難的情況,corePoolSize的值設(shè)置太大了也不好,設(shè)置太小了也不好。
這個時候,為讓線程池做到彈性伸縮,我們可以為他再添加一個參數(shù):maximumPoolSize,這個參數(shù)代表的意思是最大線程數(shù)。
當(dāng)corePoolSize和workQueue都滿了的時候,新提交的任務(wù)仍然可以創(chuàng)建新線程來進(jìn)行處理,這些超過corePoolSize創(chuàng)建出來的線程,被稱為非核心線程。當(dāng)corePoolSize與非核心線程數(shù)量的和等于maximumPoolSize再執(zhí)行拒絕策略。

通過這樣的方式,corePoolSize,負(fù)責(zé)平時情況的線程使用量,maximumPoolSize負(fù)責(zé)提交任務(wù)高峰時的,臨時擴(kuò)充容量。
但是目前這樣的方式只是考慮到了提交任務(wù)量高峰時期的擴(kuò)充,但這個高峰期只是暫時的,過了這個高峰期,非核心線程一直放著也是浪費(fèi)資源,所以我們再設(shè)定一個非核心線程的空閑活躍時間的參數(shù):keepAliveTime,這樣當(dāng)非核心線程數(shù),空閑時間超過這個值就銷毀線程,釋放資源。

這一版的線程池,做到了在提交任務(wù)高峰時可臨時擴(kuò)容,低谷時又可及時回收非核心線程,從而節(jié)省資源。真正的做到了收放自如。
通過上面幾版線程池的改進(jìn),最終改進(jìn)成了和Java中的線程池原理基本相似了。這樣也能更透徹的理解創(chuàng)建線程池時要傳入的這幾個關(guān)鍵參數(shù)的意義了。
02 下面說幾個線程池常見的考察點(diǎn)
2.1 Java中的線程池的阻塞隊(duì)列都有哪幾種?
- ArrayBlockingQueue : 有界隊(duì)列,按照阻塞的先后順序訪問隊(duì)列,默認(rèn)情況下不保證線程公平的訪問隊(duì)列~如果要保證公平性,會降低一定的吞吐量。底層是靠ReentrantLock來實(shí)現(xiàn)的,每一個方法中,都是靠ReentrantLock加鎖來完成阻塞。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
- LinkedBlockingQueue:基于鏈表的阻塞隊(duì)列,按照先進(jìn)先出的順序排列,在不設(shè)置隊(duì)列長度的時候默認(rèn)Integer.MAX_VALUE。所以認(rèn)為當(dāng)不設(shè)置隊(duì)列長度時,LinkedBlockingQueue為無界隊(duì)列。當(dāng)指定了隊(duì)列長度后變?yōu)橛薪珀?duì)列,通常LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue;
- SynchronousQueue:一個不存儲元素的阻塞隊(duì)列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。在不允許任務(wù)在隊(duì)列中等待的時候可以使用此隊(duì)列。
- DelayQueue:延遲獲取元素隊(duì)列,按照指定時間后獲取,為無界阻塞隊(duì)列。
- PriorityBlockingQueue:優(yōu)先級排序隊(duì)列,按照一定的優(yōu)先級對任務(wù)進(jìn)行排序,默認(rèn)是小頂堆。
- LinkedBlockingDeque:基于鏈表的雙端阻塞隊(duì)列。
2.2 Java提供了哪幾個默認(rèn)的線程池,為什么實(shí)際開發(fā)中不建議直接使用?
- Executors.newCachedThreadPool();:阻塞隊(duì)列采用的SynchronousQueue,所以是不存儲等待任務(wù)的,并且最大線程數(shù)的值是Integer.MAX_VALUE。所以當(dāng)任務(wù)提交量高峰時,相當(dāng)于無限制的創(chuàng)建線程。并且空閑時間是60秒,QPS高峰期最終會將服務(wù)器資源耗盡,所以真正實(shí)際應(yīng)用中不建議使用。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- Executors.newFixedThreadPool(int nThreads);:可重用固定線程數(shù)的線程池,源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心線程數(shù)和最大線程數(shù)相等的線程池,并且阻塞任務(wù)隊(duì)列還是一個無界隊(duì)列,這樣,當(dāng)處理任務(wù)的線程數(shù)量達(dá)到核心線程數(shù)時,再提交的任務(wù)都會進(jìn)行到阻塞隊(duì)列里,但是阻塞隊(duì)列是無界的,這樣就提交任務(wù)高峰期有可能會造成任務(wù)一直堆積在隊(duì)列里,超出內(nèi)存容量最終導(dǎo)致內(nèi)存溢出。
- Executors.newScheduledThreadPool(int corePoolSize);:一個定長線程池,支持定時及周期性任務(wù)執(zhí)行,這個線程池的最大線程數(shù)也是Integer.MAX_VALUE,可以理解為會無限創(chuàng)建線程。存在將資源耗盡的風(fēng)險,所以一般場景下不建議使用。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- Executors.newSingleThreadExecutor();
這種線程池,會創(chuàng)建一個線程數(shù)固定是1的線程池,并且任務(wù)隊(duì)列是無界的LinkedBlockingQueue,存在任務(wù)隊(duì)列無限添加造成OOM的風(fēng)險。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- Executors.newWorkStealingPool();:一個具有搶占式操作的線程池。
參數(shù)中傳入的是一個線程并發(fā)的數(shù)量,這里和之前就有很明顯的區(qū)別,前面4種線程池都有核心線程數(shù)、最大線程數(shù)等等,而這就使用了一個并發(fā)線程數(shù)解決問題。這個線程池不會保證任務(wù)的順序執(zhí)行,也就是 WorkStealing 的意思,搶占式的工作,哪個線程搶到任務(wù)就執(zhí)行。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Java中的線程池提供了哪幾種拒絕策略
- AbortPolicy:該策略默認(rèn)是飽和策略。當(dāng)不能在處理提交的任務(wù)時,直接拋出RejectedExecutionException,使用者可以自行捕獲此異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- CallerRunsPolicy:該策略是在線程池處理不了任務(wù)時,交給提交任務(wù)的主線程去處理任務(wù),主線程在處理任務(wù)的時候,不能在提交任務(wù)了,這樣線程池就可以有時間去處理堆積的任務(wù)了。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- DiscardOldestPolicy:該策略是,拋棄最老的任務(wù),然后再嘗試提交任務(wù),若阻塞隊(duì)列使用PriorityBlockingQueue優(yōu)先級隊(duì)列,將會導(dǎo)致優(yōu)先級最高的任務(wù)被拋棄,所以在阻塞隊(duì)列為PriorityBlockingQueue時,不建議使用此策略。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
- DiscardPolicy:這是一個比較任性的策略,當(dāng)線程池處理不了任務(wù)時,直接拋棄,再來了新任務(wù)也直接拋棄。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- RejectHandler:
直接拋拒絕異常。
public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
throw new RejectedExecutionException();
}
2.4 Java中線程池核心線程數(shù)與最大線程數(shù)該如何配置?
可以根據(jù)提交的任務(wù)不同,將線程池分開。
- 處理CPU密集型任務(wù),線程數(shù)量應(yīng)該較少,可為N(CPU核數(shù))+1或 N(CPU核數(shù)) * 2,因?yàn)榇藭r線程一定調(diào)度到某個CPU執(zhí)行,若任務(wù)本身是CPU綁定的任務(wù),那么過多的線程只會增加線程切換的開銷,而不能提升吞吐量,但可能需要較長隊(duì)列做緩沖。
- I/O密集型任務(wù),執(zhí)行較慢、數(shù)量不大的IO任務(wù),要考慮更多線程數(shù),而無需太大隊(duì)列。相比計算型任務(wù),需多一些線程,要結(jié)合具體的 I/O 阻塞時長考慮。
但是實(shí)際情況下,有些任務(wù)是既耗CPU資源,又占用I/O資源的。所以這個時候可以采用類似美團(tuán)技術(shù)提出方案,實(shí)時的監(jiān)控線程池狀態(tài)信息,然后對線程池的數(shù)據(jù)進(jìn)行調(diào)整。
在監(jiān)控線程池的時候可以使用如下幾個線程池屬性:
- getTaskCount():線程池需要執(zhí)行的任務(wù)數(shù)量。
- completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量,小于或等于taskCount。
- largestPoolSize:線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過,如該數(shù)值等于線程池的最大線程數(shù)量,則表示線程池曾經(jīng)滿過。
- getPoolSize():線程池的線程數(shù)量,如果線程池不銷毀的話,線程池里的線程不會自動銷毀,所以這個大小只增不減。
- getActiveCount():獲取活動的線程數(shù)。
作者:紀(jì)莫
原文鏈接:https://juejin.cn/post/6960222745653149732