1.線程池(java.util.concurrent)
為什么要有線程池 ?
我們知道。使用線程來(lái)處理任務(wù),可以達(dá)到一定程度的并行計(jì)算的效果,在一些比較耗時(shí)的操作時(shí)候不用一直等待,比如以下i/o操作。那么每次需要的時(shí)候就創(chuàng)建一個(gè)線程來(lái)處理這種任務(wù)就好了,為什么要引入線程池這個(gè)概念呢?
主要存在三方面的原因:
- 線程生命周期的開(kāi)銷非常高。 創(chuàng)建線程是需要時(shí)間的,并且需要JVM和底層操作系統(tǒng)提供一些輔助的支持,無(wú)限創(chuàng)建線程,必定在創(chuàng)建線程的時(shí)候消耗很多資源。
- 資源消耗。 活躍的線程必定要占據(jù)一定的內(nèi)存,線程越多,使用的內(nèi)存越大。當(dāng)可運(yùn)行的線程多于可用的處理器數(shù)量的時(shí)候,線程就會(huì)閑置。大量的閑置線程就會(huì)占據(jù)大量?jī)?nèi)存,給垃圾回收帶來(lái)很多的壓力。而且這些線程在資源CPU競(jìng)爭(zhēng)的時(shí)候也將產(chǎn)生更大的開(kāi)銷。
- 穩(wěn)定性。 之前的JVM的OOM中有提到過(guò),過(guò)多的線程還會(huì)可能出現(xiàn)OOM異常。因?yàn)榫€程數(shù)量受制于JVM的參數(shù)配置,Thread構(gòu)造方法中的請(qǐng)求棧大小,以及底層操作系統(tǒng)對(duì)線程的閑置,一旦超出就會(huì)出現(xiàn)OOM的異常
所以,使用線程池,用它來(lái)管理線程,可以有效的減少因?yàn)榫€程創(chuàng)建和線程數(shù)量過(guò)多導(dǎo)致的問(wèn)題
1.1 Executor框架
1.1.1 框架基礎(chǔ)
先來(lái)看看住基本的框架結(jié)構(gòu)圖:

1. 主要元素:
- 頂層是一個(gè)Executor接口,主要常用的實(shí)現(xiàn)類是ThreadPoolExecutor和ScheduledThreadPoolExecutor
- BlockingQueue接口及其實(shí)現(xiàn)
- Future接口以及實(shí)現(xiàn)
- Executors 創(chuàng)建線程池的關(guān)鍵類
2. 框架執(zhí)行原理
關(guān)于執(zhí)行原理,說(shuō)到這個(gè)問(wèn)題,不得不說(shuō)jdk源碼的作者寫代碼真是習(xí)慣好,跟進(jìn)源碼,查看Executor接口,在類上面,很大段的解釋和說(shuō)明,還有示例代碼來(lái)說(shuō)明。相比周圍的我們寫的代碼,簡(jiǎn)簡(jiǎn)單單的幾行注釋,甚至有的完全寫出來(lái)就是沒(méi)有注釋,試問(wèn)這樣代碼怎么看。很多時(shí)候我覺(jué)得寫代碼好不好,代碼風(fēng)格和格式很重要。
回答我們剛才的話題,一起來(lái)看看Executor接口上面的注釋吧
2.1 Excutor接口
我們?nèi)タ丛创a就發(fā)現(xiàn),Executor接口只有個(gè)方核心方法execute,接收的參數(shù)是Runnable。Runnable在jdk里面,我們都稱之為Task也就是要執(zhí)行的任務(wù),使用Executor可是避免我們反復(fù)的使用new Thread(new(RunnableTask())).start()。當(dāng)有很多任務(wù)需要執(zhí)行的時(shí)候,可以如下的方式:
// 異步執(zhí)行任務(wù)
Executor executor = anExecutor; // 此處偽代碼,實(shí)現(xiàn)時(shí)候就是使用Executors創(chuàng)建一個(gè)子類
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
上面的代碼,會(huì)使得多個(gè)任務(wù)異步的執(zhí)行。在executor源碼注釋上有寫明,這個(gè)接口也可以不要求任務(wù)是異步執(zhí)行的,一個(gè)簡(jiǎn)單例子就是直接執(zhí)行提交的任務(wù)的run方法
// 直接同步執(zhí)行
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
但是更典型的方式使用一個(gè)線程來(lái)執(zhí)行任務(wù)而不是使用run方法,例如:
// 每個(gè)任務(wù)一個(gè)線程異步去執(zhí)行
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
而在Executor框架中,Executor的實(shí)現(xiàn)類都是解決的批量任務(wù)的執(zhí)行順序和時(shí)間的問(wèn)題。下面的例子是一個(gè)順序執(zhí)行的Executor的一個(gè)實(shí)現(xiàn)。
// 多任務(wù)順序執(zhí)行
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
上面這個(gè)例子基本能簡(jiǎn)單表現(xiàn)出執(zhí)行任務(wù)的思路,值得注意的一點(diǎn)就是,這個(gè)jdk注釋中的例子在executor中引入了一個(gè)任務(wù)隊(duì)列,再把隊(duì)列中的任務(wù)取出順序執(zhí)行。在JDK提供的Executor的實(shí)現(xiàn)類中,使用workQueue來(lái)存儲(chǔ)需要執(zhí)行的任務(wù),使用一個(gè)Worker集合works來(lái)執(zhí)行任務(wù)(不同于上例中的順序執(zhí)行,且上例中工作線程相當(dāng)于只有一個(gè))。執(zhí)行Worker啟動(dòng)后執(zhí)行完自己的runnable后還會(huì)從workQueue中繼續(xù)獲取任務(wù)執(zhí)行,直到任務(wù)隊(duì)列為空。
2.2 ExecutorService 接口
ExecutorService接口繼承自Executor 接口,主要增加了線程生命周幾管理的幾個(gè)方法以及Future 來(lái)跟蹤任務(wù)一個(gè)或多個(gè)異步任務(wù)的處理情況。
其中
- shutDown() 關(guān)閉executor,已經(jīng)提交的任務(wù)會(huì)被執(zhí)行,新的任務(wù)不會(huì)再接受
- shutDownNow() 立即關(guān)閉executor,停止執(zhí)行,并返回一個(gè)等待執(zhí)行的任務(wù)列表
- isShutDown() executor是否終止
- isTerminated() 所有任務(wù)執(zhí)行完成,只有在調(diào)用了shutDown或者shutDownNow之后,才會(huì)返回true
- submit() 幾種提交任務(wù)的方式
2.3 Executors
提供各種方法創(chuàng)建線程池,從大的方向看,線程主要分為兩類,一種就是不同的異步執(zhí)行的,一種就是實(shí)現(xiàn)了ScheduledExecutorService 接口的線程,兩類線程的區(qū)別在于在于ScheduledExecutorService是那種有計(jì)劃執(zhí)行的任務(wù),比如說(shuō)定時(shí)任務(wù)或者延時(shí)執(zhí)行的任務(wù)。
具體使用查看Executors.newXXX() 相關(guān)文檔
1.1.2. ThreadPoolExecutor & ScheduledThreadPoolExecutor
ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 都是executorService的實(shí)現(xiàn)類,他們關(guān)系從之前類圖已經(jīng)可以清楚地看出來(lái)?;臼褂貌畈欢啵瑓s別就在于定位或者延時(shí)功能。所以本文只分析ThreadPoolExecutor的源碼,來(lái)看看線程池的工作大致流程。
1.1.2.1 ThreadPoolExecutor源碼分析
在分析源碼前,我根據(jù)個(gè)人的理解,先簡(jiǎn)單說(shuō)明線程池工作的流程,在進(jìn)入代碼查看。
之前在看JDK的Executor接口的文檔的時(shí)候,在源碼上面的標(biāo)準(zhǔn)注釋里面的例子(也是生成的javadoc里面的注釋)的最后一個(gè),有提到過(guò)一個(gè)概念,任務(wù)隊(duì)列。前文還簡(jiǎn)單說(shuō)了下具體實(shí)現(xiàn)類和那個(gè)例子的不同?,F(xiàn)在來(lái)具體看看,在說(shuō)之前,先明白幾個(gè)概念。
- 工作隊(duì)列
BlockingQueue<Runnable> workQueue。存放所有的runnable任務(wù)。 - 工作線程集合
HashSet<Worker> workers。線程池中所有的工作線程集合
Runnable都清楚是什么,Woker呢,先看看worker類可能更能方便理解線程池的工作過(guò)程
// Woker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
.....
很明顯就是有個(gè)線程,一個(gè)任務(wù),和任務(wù)完成數(shù)量,核心方法是runWorker
// runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker做的事情很明確,如果Worker創(chuàng)建的時(shí)候帶了任務(wù),則執(zhí)行這個(gè)任務(wù)的run()方法,如果沒(méi)有就去執(zhí)行g(shù)etTask()在workQueue中獲得一個(gè)任務(wù)來(lái)執(zhí)行,直到?jīng)]任務(wù)可執(zhí)行為止。
在回頭看execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
這段執(zhí)行邏輯:
- 查看當(dāng)前Worker(工作線程)數(shù)量有沒(méi)有達(dá)到coreSize,沒(méi)有就創(chuàng)建一個(gè)工作線程
- 如果線程池沒(méi)有關(guān)閉,并且添加到隊(duì)列成功,再次執(zhí)行下檢測(cè),或者拒絕,或者由于工作線程沒(méi)有重新添加工作線程。這個(gè)分支需要注意的是,可能這個(gè)分支走完只添加了任務(wù),沒(méi)有添加線程。也就是重復(fù)利用線程。利用已有的工作線程自己去隊(duì)列中消費(fèi)任務(wù)。例外注意runWorker里面使用的getTask() 實(shí)際是個(gè)阻塞的,一直循環(huán)在取隊(duì)列中的任務(wù),取不到一直循環(huán),這個(gè)線程就會(huì)一直在。runWorker也是個(gè)死循環(huán)一直執(zhí)行task.run。所以線程中的線程其實(shí)一直在運(yùn)行的。但是getActiveCount 是去HashSet<Worker> workers 里面的上鎖(在執(zhí)行run的線程,而不是在getTask的)的線程數(shù)量。
- 添加任務(wù)失敗的時(shí)候,直接拒絕
這里另外說(shuō)一下,。
// addWorker 部分代碼
...
w = new Worker(firstTask);
final Thread t = w.thread;
...
if (workerAdded) {
t.start();
workerStarted = true;
}
...
addWorker最后會(huì)啟動(dòng)worker的私有屬性thread的線程,開(kāi)始執(zhí)行runWorker,同事把worker添加到HashSet<Worker>中
由于worker的構(gòu)造函數(shù)中this.thread = getThreadFactory().newThread(this); 所以woker的thread啟動(dòng)的時(shí)候,執(zhí)行的就是Wroker的run,即threadPoolExecutor的runWorker方法。整個(gè)執(zhí)行鏈如下:
ThreadPoolExecutor.execute()-->addWorkder(可能添加成功或者失敗,失敗是涉及到拒絕處理問(wèn)題)-->Workder.thread.start()-->Worker.run-->threadPoolExecutor.runWorker-->循環(huán)執(zhí)行g(shù)etTask、task.run
以上就是線程基本的執(zhí)行流程了,觀察ThreadPoolExecutor的完整參數(shù)的構(gòu)造方法發(fā)現(xiàn):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
其中ThreadFactory 是用來(lái)創(chuàng)建Worker的thread用的,管理所有的線程。
RejectedExecutionHandler handler是在addWorker的時(shí)候如果添加失敗,執(zhí)行的飽和策略。JUC(java.util.concurrent)包中有提供幾種實(shí)現(xiàn)。也可以根據(jù)需要自己實(shí)現(xiàn)自己的飽和策略。
1.1.2.2 Exexutors.newXXX的參數(shù)意義和是使用時(shí)候注意的問(wèn)題
newFixedThreadPool
創(chuàng)建一個(gè)固定長(zhǎng)度的線程池,每次提交任務(wù)就會(huì)創(chuàng)建線程,知道達(dá)到最大線程數(shù)。如果線程發(fā)生Exception死掉,會(huì)新補(bǔ)充線程進(jìn)來(lái)。默認(rèn)工作隊(duì)列最大長(zhǎng)度是Integer.MXA_VALUE。認(rèn)為是一個(gè)無(wú)界的隊(duì)列newCachedThreadPool
創(chuàng)建一個(gè)可緩存的線程池,如果線程池的當(dāng)前規(guī)模超出了處理需求,就回收空閑線程,如果需求增加就添加新的線程。線程值規(guī)模不受限制,所以在使用的時(shí)候,操作不當(dāng)可能創(chuàng)建很多線程導(dǎo)致OOM。
使用的隊(duì)列是SynchronousQueue.newScheduledThreadPool
創(chuàng)建固定長(zhǎng)度線程池,而且以延遲或定時(shí)的方式來(lái)執(zhí)行任務(wù)newSingleThreadExecutor、newSingleThreadScheduledExecutor
創(chuàng)建一個(gè)單線程的Executor,如果單個(gè)線程出現(xiàn)Exeception死掉,就是創(chuàng)建一個(gè)線程來(lái)替代。他可以確保任務(wù)隊(duì)列中的任務(wù)是順序執(zhí)行的。
1.2. 線程池任務(wù)管理 Queue & Deque
ThreadPoolExecutor提供了三中隊(duì)列方式:無(wú)界隊(duì)列、有界對(duì)列、同步移交。隊(duì)列的選擇與其他的參數(shù)有關(guān),例如:線程池的大小。
無(wú)界、有界隊(duì)列。使用無(wú)界隊(duì)列當(dāng)線程池中的線程都處于忙碌狀態(tài)的時(shí)候,工作隊(duì)列就會(huì)無(wú)限制的增長(zhǎng)。一種更加穩(wěn)妥的方式使用有界隊(duì)列,例如:ArrayBlockingQueue,有界LinkedBlockingQueue,PriorityBlockingQueue。有界隊(duì)列有助于避免資源耗盡情況的發(fā)生,但是就需要考慮隊(duì)列填滿時(shí)候的飽和策略問(wèn)題。
同步移交。對(duì)于非常大或者無(wú)界的線程池,可以使用SynchronousQueue來(lái)避免任務(wù)排隊(duì),以及直接將任務(wù)從生產(chǎn)者直接移交給工作線程,移交的時(shí)候必須要求有線程等待接受,如果沒(méi)有切線程池線程數(shù)小于最大線程,就創(chuàng)建線程接受,否則就拒絕。
執(zhí)行順序 。ArrayBlockingQueue 和 PriorityBlockingQueue是FIFO類型隊(duì)列,如果想進(jìn)一步的控制任務(wù)執(zhí)行的順序,可以使用PriorityBlockingQueue來(lái)進(jìn)行管理,任務(wù)優(yōu)先級(jí)是通過(guò)自然順序或者Comparator接口來(lái)定義的。
注意:只有當(dāng)任務(wù)相互獨(dú)立是,為線程池或者工作隊(duì)列設(shè)置界限才是合理的,如果任務(wù)之間存在依賴,那么有界的線程池或者隊(duì)列就可能導(dǎo)致“饑餓”死鎖問(wèn)題
1.3 線程池飽和策略 RejectedExecutionHandler
當(dāng)有界隊(duì)列被填滿的時(shí)候,飽和策略就開(kāi)始發(fā)揮作用了。ThreadPoolExecutor的飽和策略可以通過(guò)調(diào)用setRejectedExecutionHandler來(lái)修改。JDK提供了四種默認(rèn)的飽和策略。
AbortPolicy 默認(rèn)策略,拋出一個(gè)未經(jīng)檢測(cè)的RejectedExecutionException,調(diào)用者捕獲這個(gè)異常,根據(jù)自己的需求編寫自己的代碼。
DiscardPolicy 拋棄策略, 當(dāng)新的任務(wù)無(wú)法添加到隊(duì)列的時(shí)候,默默的拋棄該任務(wù)
DiscardOldestPolicy 拋棄最早策略,次策略會(huì)拋棄寫一個(gè)要執(zhí)行的任務(wù),然后嘗試提交任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
因此如果是個(gè)優(yōu)先隊(duì)列,則拋棄優(yōu)先級(jí)最高的策略,所有不建議這個(gè)策略和優(yōu)先隊(duì)列一起使用
CallerRunsPolicy 調(diào)用者直接執(zhí)行run策略,這種直接在調(diào)用者的線程執(zhí)行任務(wù)的run方法。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
因?yàn)橹苯酉抡{(diào)用者里面執(zhí)行的任務(wù),所有會(huì)是一個(gè)同步的效果,就會(huì)帶來(lái)響應(yīng)的延時(shí)。
以上四種是JDK提供的策略,我們還可以根據(jù)自己的需要,自己實(shí)現(xiàn)RejectedExecutionHandler,實(shí)現(xiàn)我們自己的飽和策略。
1.4 線程池如何重復(fù)利用線程的 ?
1.4.1 ThreadFactory
線程工廠是創(chuàng)建線程的地方,實(shí)際就是創(chuàng)建工作線程。
// DefaultthreadFactory
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
1.4.2 線程池如何重復(fù)利用線程?
通過(guò)前面對(duì)線程池的理解,線程池的實(shí)現(xiàn)思路基本有一定的了解,那么線程池究竟如何重復(fù)利用線程的呢?
其實(shí)這里的“重復(fù)” 并沒(méi)有放開(kāi)重新獲取,而是工作線程一直運(yùn)行。當(dāng)運(yùn)行的線程數(shù)量沒(méi)有達(dá)到coreSize的時(shí)候,不管任務(wù)多少,新來(lái)任務(wù)會(huì)重新創(chuàng)建工作線程。工作線程中執(zhí)行的是死循環(huán)一直獲取任務(wù)來(lái)執(zhí)行。通過(guò)使用工作線程來(lái)執(zhí)行任務(wù)的run方法達(dá)到避免創(chuàng)建線程的目的。前面源碼分析部分,查看execute、addWorker、runWorker、getTask 四個(gè)方法就很明了。
- execute: 添加工作線程,或者只添加任務(wù)、或者拒絕任務(wù)
- addWorker: 實(shí)際上的創(chuàng)建工作線程,并start
- runWorker: 工作線程的run方法里面執(zhí)行的代碼,循環(huán)取隊(duì)列的中的任務(wù)進(jìn)行執(zhí)行。
- getTask: 一直去任務(wù),隊(duì)列為空就一直循環(huán)直到取到值或者線程池關(guān)閉。
所以線程池的工作線程一點(diǎn)啟動(dòng),是一直在運(yùn)行的。沒(méi)有任務(wù)可執(zhí)行的時(shí)候,也是在執(zhí)行,只不過(guò)這個(gè)時(shí)候是阻塞在了getTask方法中。所以千萬(wàn)不要理解成線程池做完任務(wù)就把線程放回去,要用的時(shí)候在拿出來(lái)。