引言
合理利用線程池能帶來三個(gè)好處:
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀所造成的消耗。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。
線程池的使用
線程池的創(chuàng)建
我們可以通過ThreadPoolExecutor來創(chuàng)建一個(gè)線程池
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit,runnableTaskQueue, rejectedExecutionHandler);
創(chuàng)建線程池需要輸入以下幾個(gè)參數(shù):
corePoolSize(線程池的基本大小) : 當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來執(zhí)行任務(wù),即使其他空閑的基本新城能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建新線程。等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的
prestartAllCoreThreads(),線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。maximumPoolSize(線程池最大大小) : 線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。但是如果使用到了無界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒什么效果了。
keepAliveTime(線程活動(dòng)保持時(shí)間) : 線程池的工作線程空閑后,保持存活的時(shí)間。所以如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,可以調(diào)大這個(gè)時(shí)間提高線程的利用率。
TimeUnit(線程活動(dòng)保持時(shí)間的單位) : 可選的單位有天(Days),小時(shí)(Hours),分鐘(Minutes),毫秒(MillSeconds),微秒(MicroSeconds)和毫微秒(NanoSeconds) 。
RunnableTaskQueue(任務(wù)隊(duì)列) : 用于保持等待執(zhí)行的任務(wù)的阻塞隊(duì)列。可以選擇以下幾個(gè)阻塞隊(duì)列。
- ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按照FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
- LinkedBlockingQueue:是一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按照FIFO(先進(jìn)先出)排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executor.newFixedThreadPool()使用了這個(gè)隊(duì)列。
- SynchronousQueue:一個(gè)不存儲元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,負(fù)責(zé)插入操作一直處于阻塞狀態(tài)。吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Exexutors.newCachedThreadPool使用了這個(gè)隊(duì)列。
- PriorityBlockingQueue:一個(gè)具有優(yōu)先級的無限阻塞隊(duì)列。
- RejectedExecutionHandler(飽和策略) : 當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個(gè)策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時(shí)拋出異常。以下是JDK5提供的種策略:
1.AbortPolicy :直接拋出異常。
2.CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
3.DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
4.DiscardPolicy:不處理,丟棄掉。
5.也可以根據(jù)應(yīng)用場景來實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)。
向線程池提交任務(wù)
我們可以使用execute提交任務(wù),但是execute方法沒有返回值,所以無法判斷當(dāng)前任務(wù)是否被線程池執(zhí)行成功。通過以下代碼可知execute方法輸入的任務(wù)是一個(gè)Runnable類的實(shí)例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
我們可以使用submit方法來提交任務(wù),它會(huì)返回一個(gè)future對象,我們可以根據(jù)這個(gè)future對象來判斷任務(wù)是否執(zhí)行成功,通過future的get方法來獲取返回值,get方法會(huì)阻塞住直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會(huì)阻塞一段時(shí)間后立即返回,這個(gè)時(shí)候可能任務(wù)沒有執(zhí)行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執(zhí)行任務(wù)異常
} finally {
// 關(guān)閉線程池
executor.shutdown();
}
線程池的關(guān)閉
我們可以通過調(diào)用Shutdown或者ShutdownNow方法來關(guān)閉線程池,但是它們的實(shí)現(xiàn)原理不同,shutdown的原理是只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。shutdownNow的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。shutdownNow會(huì)首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。
只要調(diào)用了這兩個(gè)關(guān)閉方法的其中一個(gè),isShutdown方法就會(huì)返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true。至于我們應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow。
線程池的分析
從上圖可知,每當(dāng)提交一個(gè)新任務(wù)給線程池,線程池的處理流程如下:
首先線程池判斷核心線程池是否已滿?沒滿,創(chuàng)建一個(gè)工作線程來執(zhí)行任務(wù)。滿了,則進(jìn)入下個(gè)流程。
其次線程池判斷工作隊(duì)列是否已滿?沒滿,則將新提交的任務(wù)儲存在工作隊(duì)列之中。滿了,則進(jìn)入下一個(gè)流程。
最后線程池判斷整個(gè)線程池是否已滿?沒滿,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。滿了,則交給飽和策略來處理這個(gè)任務(wù)。
源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果線程數(shù)小于基本線程數(shù),則創(chuàng)建線程并執(zhí)行當(dāng)前任務(wù)
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗,則將當(dāng)前任務(wù)放到工作隊(duì)列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果線程池不處于運(yùn)行中或任務(wù)無法放入隊(duì)列,并且當(dāng)前線程數(shù)量小于最大允許的線程數(shù)量,
則創(chuàng)建一個(gè)線程執(zhí)行任務(wù)。
else if (!addIfUnderMaximumPoolSize(command))
//拋出RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}
工作線程:
線程池創(chuàng)建線程時(shí),會(huì)將線程封裝成工作線程Worker,Worker在執(zhí)行完任務(wù)后,還會(huì)無限循環(huán)獲取工作隊(duì)列里的任務(wù)來執(zhí)行。我們可以從Worker的run方法里看到這點(diǎn):
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
如何合理的配置線程池
想要合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來進(jìn)行分析。
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)
- 任務(wù)的優(yōu)先級:高,中和低。
- 任務(wù)的執(zhí)行時(shí)間:長,中和短。
- 任務(wù)的依賴性:是否依賴于其他的系統(tǒng)資源,如數(shù)據(jù)庫連接。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)配置盡可能小的線程,如配置N(CPU核心數(shù))+1個(gè)線程的線程池。IO密集型任務(wù)則由于線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*N(CPU核心數(shù))?;旌闲偷娜蝿?wù),如果可以拆分,則將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率。如果這兩個(gè)任務(wù)的執(zhí)行時(shí)間相差太大,則沒必要進(jìn)行分解。我們可以通過Runtime.getRuntime().availableProcessors()來獲取當(dāng)前設(shè)備的CPU個(gè)數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊(duì)列PriorityBlockingQueue來處理。他可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。
執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時(shí)間越長CPU空閑時(shí)間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。
建議使用有界隊(duì)列,有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn),比如幾千。有一次我們組使用的后臺任務(wù)線程池的隊(duì)列和線程池全滿了,不斷的拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因?yàn)楹笈_任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞住,任務(wù)積壓在線程池里。如果當(dāng)時(shí)我們設(shè)置成無界隊(duì)列,線程池的隊(duì)列就會(huì)越來越多,有可能會(huì)撐滿內(nèi)存,導(dǎo)致整個(gè)系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨(dú)的服務(wù)器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務(wù),但是出現(xiàn)這樣問題時(shí)也會(huì)影響到其他任務(wù)。
線程池的監(jiān)控
通過線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時(shí)候可以使用
- taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
- completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量。小于或等于taskCount。
- largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個(gè)數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
- getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會(huì)自動(dòng)銷毀,所以這個(gè)大小只增不+ getActiveCount:獲取活動(dòng)的線程數(shù)。
通過擴(kuò)展線程池進(jìn)行監(jiān)控。
通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務(wù)執(zhí)行前,執(zhí)行后和線程池關(guān)閉前干一些事情。如監(jiān)控任務(wù)的平均執(zhí)行時(shí)間,最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。這幾個(gè)方法在線程池里是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }