為了避免系統(tǒng)頻繁的創(chuàng)建和銷毀線程,可以使用線程池來管理線程,以實(shí)現(xiàn)線程的復(fù)用。同時,線程池還可以幫助管理系統(tǒng)中的線程數(shù)量,防止過多的并發(fā)線程耗盡系統(tǒng)的資源。
Executor框架

ThreadPoolExecutor表示一個線程池,而Executors可以當(dāng)作是一個線程池工廠類,用于方便的生產(chǎn)線程池,可以通過Executors的靜態(tài)方法獲得幾種預(yù)先設(shè)定的線程池。從上圖可知,ThreadPoolExecutor類實(shí)現(xiàn)了Executor接口,通過這個接口,線程池ThreadPoolExecutor可以接受并運(yùn)行任何實(shí)現(xiàn)Runnable接口的對象。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
······
1.幾種經(jīng)典線程池
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
通過Executors.FixedThreadPool(n)方法生成的固定線程數(shù)量為n的線程池,該線程池中的線程數(shù)量始終不變,當(dāng)有一個新的任務(wù)提交時,如果線程中有空閑的線程,則立即執(zhí)行該任務(wù);如果沒有空閑線程,則新的任務(wù)會緩存在一個任務(wù)隊列中,等到線程池中有空閑線程了再執(zhí)行。線程池使用的是無界隊列LinkedBlockingQueue,任務(wù)可以無限的添加進(jìn)隊列中,但是也有耗盡系統(tǒng)資源的風(fēng)險。
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通過Executors.newCachedThreadPool()方法生成的可根據(jù)實(shí)際情況調(diào)整線程數(shù)量的線程池。線程中的數(shù)量不固定,如果有空閑線程則優(yōu)先使用空閑線程,如果所有線程都忙碌則會創(chuàng)建一個新的線程處理任務(wù)。線程組將corePoolSize設(shè)為0,而maximumPoolSize設(shè)為無窮大,也就是說平時線程組中沒有線程,如果有任務(wù)提交而所有線程繁忙時,線程組會無限制的創(chuàng)建線程,同樣可能會耗盡系統(tǒng)資源。線程組使用的SynchronousQueue容量為0,無法存儲任務(wù),只是提交任務(wù)的中介,因此所有任務(wù)都會立刻提交成功,不會被拒絕。
- SingleThreadExecutor
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通過Executors.newSingleThreadExecutor()方法生成線程數(shù)量永遠(yuǎn)為1的線程池。
- SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
通過Executors.newSingleThreadScheduledExecutor()方法生成的線程數(shù)量永遠(yuǎn)為1的線程池。該線程池可以控制線程的啟動時間,達(dá)到延時啟動的效果。
- ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
通過Executors.newThreadScheduledExecutor(n)方法生成的固定線程數(shù)量為n的線程池。該線程池也可以控制線程的啟動時間。
2.ScheduledThreadPool的使用
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
-
schedule()方法會在給定時間后調(diào)用一次指定方法。 -
scheduleAtFixedRate()會周期性的調(diào)用指定方法,在initialDelay之后執(zhí)行第一次方法,在上一次方法開始執(zhí)行后period時間后再執(zhí)行下一次方法。上一次任務(wù)的執(zhí)行時長不會影響下一次任務(wù)開始執(zhí)行的時間。 -
scheduleWithFixedDelay()會周期性的調(diào)用指定方法,在initialDelay之后執(zhí)行第一次方法,在上一次方法結(jié)束執(zhí)行后delay時間后再執(zhí)行下一次方法。上一次任務(wù)的執(zhí)行時長會影響下一次任務(wù)開始執(zhí)行的時間。
3.創(chuàng)建自定義的ThreadPoolExecutor線程池
ThreadPoolExecutor的構(gòu)造方法如下,
public ThreadPoolExecutor(
// 線程池中穩(wěn)定線程數(shù)量
int corePoolSize,
// 線程池中最大線程數(shù)量
int maximumPoolSize,
// 線程超過穩(wěn)定數(shù)量時,多余的線程的存活時間,
// 超時的線程將被回收
long keepAliveTime,
// 超時時間的單位
TimeUnit unit,
// 任務(wù)隊列,被提交但尚未被執(zhí)行的任務(wù)存放的地方
BlockingQueue<Runnable> workQueue,
// 線程工廠,用于創(chuàng)建線程
ThreadFactory threadFactory,
// 拒絕策略,當(dāng)任務(wù)太多來不及處理時,如何拒絕任務(wù)
RejectedExecutionHandler handler)
任務(wù)隊列(workQueue)
-
SynchronousQueue直接提交的隊列:SynchronousQueue沒有容量,每一個插入操作都要等待一個刪除操作,也就是說SynchronousQueue只是任務(wù)提交的中介,提交的任務(wù)不會被真實(shí)的保存,而是直接提交給線程組執(zhí)行,如果沒有空閑線程則創(chuàng)建新線程,如果進(jìn)程數(shù)量達(dá)到最大值則拒絕任務(wù)。因此使用SynchronousQueue總要配合很大的maximumPoolSize值以防止任務(wù)被拒絕。 -
ArrayBlockingQueue有界的任務(wù)隊列:使用ArrayBlockingQueue時,如果線程池實(shí)際線程數(shù) n 小于穩(wěn)定數(shù)量corePoolSize,則會優(yōu)先創(chuàng)建新的線程;如果 n 大于corePoolSize,則將任務(wù)存入隊列中等待;如果隊列存滿了,而 n 小于最大線程數(shù)maximumPoolSize,就創(chuàng)建新線程處理任務(wù);如果隊列已滿,而線程數(shù)也達(dá)到最大值,則拒絕任務(wù)。也就是說,除非系統(tǒng)非常繁忙,否則ArrayBlockingQueue會使線程數(shù)穩(wěn)定在corePoolSize。 -
LinkedBlockingQueue無界的任務(wù)隊列:除非系統(tǒng)資源耗盡,否則任務(wù)可以一直加入隊列中。當(dāng)有新任務(wù)到來時,如果線程池實(shí)際線程數(shù) n 小于穩(wěn)定數(shù)量corePoolSize,則會創(chuàng)建新的線程;而如果 n 大于corePoolSize,則將任務(wù)存入隊列中等待。也就是說,線程中的數(shù)量永遠(yuǎn)不會超過corePoolSize,而線程組的拒絕策略也永遠(yuǎn)不會觸發(fā)。而如果隊列中的任務(wù)數(shù)量持續(xù)增長,將會耗盡系統(tǒng)資源。 -
PriorityBlockingQueue優(yōu)先任務(wù)隊列:優(yōu)先隊列可以控制任務(wù)的執(zhí)行順序,按照任務(wù)的優(yōu)先級執(zhí)行任務(wù)。
4.線程組執(zhí)行邏輯
ThreadPoolExecutor類的核心方法execute()的執(zhí)行邏輯如下,源碼的英文注解也描述了線程組是如何執(zhí)行任務(wù)的:
- 如果線程組線程數(shù)少于穩(wěn)定數(shù)
corePoolSize,方法會直接嘗試調(diào)用addWorker()方法創(chuàng)建一個新的線程來執(zhí)行任務(wù),此時會原子性的檢查runState和workerCount的值,如果不滿足則會返回false來阻止線程組創(chuàng)建新線程。 - 如果線程數(shù)高于
corePoolSize或創(chuàng)建線程失敗,則方法會嘗試將任務(wù)加入等待隊列中,如果加入成功,則任務(wù)在隊列中等待執(zhí)行。 - 如果加入隊列失敗,則方法會嘗試再次調(diào)用
addWorker()方法提交線程,這次如果線程數(shù)小于maximumPoolSize則會為任務(wù)創(chuàng)建線程執(zhí)行。而如果這次提交失敗,則方法會根據(jù)拒絕策略拒絕任務(wù)。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
5.拒絕策略
JDK內(nèi)置四種拒絕策略:
- AbortPolicy:該策略會直接拋出異常,阻止系統(tǒng)正常工作。
- CallerRunsPolicy:只要線程池未關(guān)閉,該策略會直接在調(diào)用者線程中運(yùn)行被拒絕的任務(wù)。
- DiscardOldestPolicy:該策略將丟棄最老的一個任務(wù),也就是即將被執(zhí)行的那一個任務(wù),然后嘗試再次提交當(dāng)前任務(wù)。
- DiscardOldestPolicy:該策略會直接丟棄當(dāng)前這個被拒絕的任務(wù)。
6.線程工廠(ThreadFactory)
ThreadFactory是一個接口,用于描述如何生成線程。Executor中的線程池構(gòu)造方法都有額外接受一個ThreadFactory參數(shù)的版本。建議使用自定義的ThreadFactory方法來為創(chuàng)建的線程起個名字,這是ThreadFactory最簡單也是最有用的功能,免得在排查線程問題時面對報錯信息無從下手。
7.最優(yōu)的線程池線程數(shù)
線程池的大小對系統(tǒng)性能的影響并不大,因此大小并不需要過分精確,只需要避免極大和極小的極端情況。
線程池線程數(shù) = CPU數(shù)量 * CPU使用率 * (1 + 等待時間/計算時間)