???????傳統(tǒng)的多線程是通過(guò)繼承Thread類及實(shí)現(xiàn)Runnable接口來(lái)實(shí)現(xiàn)的,每次創(chuàng)建及銷毀線程都會(huì)消耗資源、響應(yīng)速度慢,且線程缺乏統(tǒng)一管理,容易出現(xiàn)阻塞的情況,針對(duì)以上缺點(diǎn),線程池就出現(xiàn)了。
一.簡(jiǎn)介
a.定義
???????線程池是一個(gè)創(chuàng)建使用線程并能保存使用過(guò)的線程以達(dá)到復(fù)用的對(duì)象,簡(jiǎn)單的說(shuō)就是一塊緩存了一定數(shù)量線程的區(qū)域。
b.作用
???????1.復(fù)用線程:線程執(zhí)行完不會(huì)立刻退出,繼續(xù)執(zhí)行其他線程;
???????2.管理線程:統(tǒng)一分配、管理、控制最大并發(fā)數(shù);
c.優(yōu)點(diǎn)
???????1.降低因頻繁創(chuàng)建&銷毀線程帶來(lái)的性能開(kāi)銷,復(fù)用緩存在線程池中的線程;
???????2.提高線程執(zhí)行效率&響應(yīng)速度,復(fù)用線程:響應(yīng)速度;管理線程:優(yōu)化線程執(zhí)行順序,避免大量線程搶占資源導(dǎo)致阻塞現(xiàn)象;
???????3.提高對(duì)線程的管理度;
二.使用流程
???????線程池的使用也比較簡(jiǎn)單,流程如下:
a.創(chuàng)建線程池,通過(guò)配置線程池的參數(shù),從而實(shí)現(xiàn)自己所需的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
sPoolWorkQueue,
sThreadFactory);
b.向線程池提交任務(wù):execute()
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
...
//執(zhí)行任務(wù)
}
});
c.關(guān)閉線程池shutdown()
threadPoolExecutor.shutdown();
三.工作原理
???????接下來(lái)通過(guò)源碼來(lái)介紹一下ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)及工作原理。
a.參數(shù)介紹
| 參數(shù) | 含義 | 備注 |
|---|---|---|
| corePoolSize | 核心線程數(shù) | 默認(rèn)情況下,核心線程會(huì)一直存活 |
| maximumPoolSize | 線程池容納的最大線程數(shù) | 當(dāng)活動(dòng)線程數(shù)大于該值時(shí),后續(xù)的新任務(wù)會(huì)阻塞 |
| keepAliveTime | 非核心線程閑置超時(shí)時(shí)間 | 超過(guò)該時(shí)間后,非核心線程會(huì)被回收,當(dāng)設(shè)置allowCoreThreadTimeOut(true),核心線程也會(huì)被回收 |
| unit | 時(shí)間單位 | 常用TimeUnit.SECONDS |
| workQueue | 任務(wù)隊(duì)列 | 執(zhí)行execute()后,如果核心線程滿了,會(huì)將Runnable加入到該參數(shù)內(nèi) |
| threadFactory | 線程工廠 | 為線程池創(chuàng)建新線程 |
b.源碼分析
???????線程池的最終實(shí)現(xiàn)類是ThreadPoolExecutor,通過(guò)實(shí)現(xiàn)可以一步一步的看到,父接口為Executor:
public interface Executor {
void execute(Runnable command);
}
???????其他的繼承及實(shí)現(xiàn)關(guān)系就不一一列舉了,直接通過(guò)以下圖來(lái)看一下:

c.ThreadPoolExecutor內(nèi)部實(shí)現(xiàn)
???????從構(gòu)造方法開(kāi)始看:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
???????通過(guò)以上可以看到,在創(chuàng)建ThreadPoolExecutor時(shí),對(duì)傳入的參數(shù)是有要求的:corePoolSize不能小于0;maximumPoolSize需要大于0,且需要大于等于corePoolSize;keepAliveTime大于0;workQueue、threadFactory都不能為null。
???????在創(chuàng)建完后就需要執(zhí)行Runnable了,看以下execute()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
.....
int c = ctl.get();
//--------------分析1-----------------
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//--------------分析2-----------------
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//--------------分析3-----------------
else if (!addWorker(command, false))
reject(command);
}
???????在execute()內(nèi)部主要執(zhí)行的邏輯如下:
???????分析點(diǎn)1:如果當(dāng)前線程數(shù)未超過(guò)核心線程數(shù),則將runnable作為參數(shù)執(zhí)行addWorker(),true表示核心線程,false表示非核心線程;
???????分析點(diǎn)2:核心線程滿了,如果線程池處于運(yùn)行狀態(tài)則往workQueue隊(duì)列中添加任務(wù),接下來(lái)判斷是否需要拒絕或者執(zhí)行addWorker();
???????分析點(diǎn)3:以上都不滿足時(shí)[corePoolSize=0且沒(méi)有運(yùn)行的線程,或workQueue已經(jīng)滿了],執(zhí)行addWorker()添加runnable,失敗則執(zhí)行拒絕策略;
???????總結(jié)一下:線程池對(duì)線程創(chuàng)建的管理,流程圖如下:

???????以上可以看到,核心線程數(shù)量或非核心線程隊(duì)列不滿時(shí),就執(zhí)行addWorker(),否則執(zhí)行reject(),接下來(lái)看一下addWorker()執(zhí)行邏輯:
private boolean addWorker(Runnable firstTask, boolean core) {
......
//對(duì)添加時(shí)一些狀態(tài)進(jìn)行判斷,提前判斷是否成功
......
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//--------分析1----------
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//----分析2-----
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
???????在執(zhí)行addWorker時(shí),主要做了以下兩件事:
???????分析點(diǎn)1:將runnable作為參數(shù)創(chuàng)建Worker對(duì)象w,然后獲取w內(nèi)部的變量thread;
???????分析點(diǎn)2:調(diào)用start()來(lái)啟動(dòng)thread;
???????在addWorker()內(nèi)部會(huì)將runnable作為參數(shù)傳給Worker,然后從Worker內(nèi)部讀取變量thread,看一下Worker類的實(shí)現(xiàn):
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
......
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
.......
.......
}
???????Worker實(shí)現(xiàn)了Runnable接口,在Worker內(nèi)部,進(jìn)行了賦值及創(chuàng)建操作,先將execute()時(shí)傳入的runnable賦值給內(nèi)部變量firstTask,然后通過(guò)ThreadFactory.newThread(this)創(chuàng)建Thread,上面講到在addWorker內(nèi)部執(zhí)行t.start()后,會(huì)執(zhí)行到Worker內(nèi)部的run()方法,接著會(huì)執(zhí)行runWorker(this),一起看一下:
final void runWorker(Worker w) {
//-------------分析1--------------
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//-------------分析2-------------------------
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//----分析3-----
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
???????前面可以看到,runWorker是執(zhí)行在子線程內(nèi)部,主要執(zhí)行了三件事:
???????分析1:獲取當(dāng)前線程,當(dāng)執(zhí)行shutdown()時(shí)需要將線程interrupt(),接下來(lái)從Worker內(nèi)部取到firstTask,即execute傳入的runnable,接下來(lái)會(huì)執(zhí)行;
???????分析2:while循環(huán),task不空直接執(zhí)行;否則執(zhí)行g(shù)etTask()去獲取,不為空直接執(zhí)行;
???????分析3:對(duì)有效的task執(zhí)行run(),由于是在子線程中執(zhí)行,因此直接run()即可,不需要start();
???????前面看到,在while內(nèi)部有執(zhí)行g(shù)etTask(),一起看一下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//-------------------------分析1---------------------------
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//-------------------------分析2---------------------------
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
???????getTask()是從workQueue內(nèi)部獲取接下來(lái)需要執(zhí)行的runnable,內(nèi)部主要做了兩件事:
???????分析1:先獲取到當(dāng)前正在執(zhí)行工作的線程數(shù)量wc,通過(guò)判斷allowCoreThreadTimeOut[在創(chuàng)建ThreadPoolExecutor時(shí)可以進(jìn)行設(shè)置]及wc > corePoolSize來(lái)確定timed值;
???????分析2:通過(guò)timed值來(lái)決定執(zhí)行poll()或者take(),如果WorkQueue中有未執(zhí)行的線程時(shí),兩者作用是相同的,立刻返回線程;如果WorkQueue中沒(méi)有線程時(shí),poll()有超時(shí)返回,take()會(huì)一直阻塞;如果allowCoreThreadTimeOut為true,則核心線程在超時(shí)時(shí)間沒(méi)有使用的話,是需要退出的;wc > corePoolSize時(shí),非核心線程在超時(shí)時(shí)間沒(méi)有使用的話,是需要退出的;
???????allowCoreThreadTimeOut是可以通過(guò)以下方式進(jìn)行設(shè)置的:
threadPoolExecutor.allowCoreThreadTimeOut(true);
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
???????如果沒(méi)有進(jìn)行設(shè)置,那么corePoolSize數(shù)量的核心線程會(huì)一直存在。
???????總結(jié)一下:ThreadPoolExecutor內(nèi)部的核心線程如何確保一直存在,不退出?
???????上面分析已經(jīng)回答了這個(gè)問(wèn)題,每個(gè)線程在執(zhí)行時(shí)會(huì)執(zhí)行runWorker(),而在runWorker()內(nèi)部有while()循環(huán)會(huì)判斷getTask(),在getTask()內(nèi)部會(huì)對(duì)當(dāng)前執(zhí)行的線程數(shù)量及allowCoreThreadTimeOut進(jìn)行實(shí)時(shí)判斷,如果工作數(shù)量大于corePoolSize且workQueue中沒(méi)有未執(zhí)行的線程時(shí),會(huì)執(zhí)行poll()超時(shí)退出;如果工作數(shù)量不大于corePoolSize且workQueue中沒(méi)有未執(zhí)行的線程時(shí),會(huì)執(zhí)行take()進(jìn)行阻塞,確保有corePoolSize數(shù)量的線程阻塞在runWorker()內(nèi)部的while()循環(huán)不退出。
???????如果需要關(guān)閉線程池,需要如何操作呢,看一下shutdown()方法:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
???????以上可以看到,關(guān)閉線程池的原理:a. 遍歷線程池中的所有工作線程;b. 逐個(gè)調(diào)用線程的interrupt()中斷線程(注:無(wú)法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無(wú)法終止)
???????也可調(diào)用shutdownNow()來(lái)關(guān)閉線程池,二者區(qū)別:
???????shutdown():設(shè)置線程池的狀態(tài)為SHUTDOWN,然后中斷所有沒(méi)有正在執(zhí)行任務(wù)的線程;
???????shutdownNow():設(shè)置線程池的狀態(tài)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表;
???????使用建議:一般調(diào)用shutdown()關(guān)閉線程池;若任務(wù)不一定要執(zhí)行完,則調(diào)用shutdownNow();
???????總結(jié)一下:ThreadPoolExecutor在執(zhí)行execute()及shutdown()時(shí)的調(diào)用關(guān)系,流程圖如下:

???????1.客戶端在創(chuàng)建完線程池后,調(diào)用execute()來(lái)執(zhí)行一個(gè)runnable任務(wù);
???????2.在execute()內(nèi)部會(huì)執(zhí)行addWorker()來(lái)創(chuàng)建一個(gè)Worker對(duì)象,然后調(diào)用線程的start()方法,即執(zhí)行Worker內(nèi)部的run()方法;
???????3.Worker內(nèi)部的run()方法中會(huì)調(diào)用到runWorker()方法;
???????4.runWorker()方法內(nèi)部會(huì)在while()循環(huán)內(nèi)執(zhí)行g(shù)etTask()來(lái)不斷的從workQueue中獲取未執(zhí)行的runnable然后執(zhí)行;
???????5.getTask()內(nèi)部會(huì)實(shí)時(shí)判斷當(dāng)前正在執(zhí)行的Worker數(shù)量與corePoolSize進(jìn)行比較,如果數(shù)量不大于corePoolSize且workQueue為空,會(huì)執(zhí)行task()進(jìn)行阻塞,確保corePoolSize的線程不退出,即核心線程不退出;
???????6.執(zhí)行shutdown()來(lái)中斷那些沒(méi)有在執(zhí)行的線程;
四.線程池類型
???????線程池可以通過(guò)Executors來(lái)進(jìn)行不同類型的創(chuàng)建,具體分為四種不同的類型,如下:
a.newCachedThreadPool
???????可緩存線程池:不固定線程數(shù)量,且支持最大為Integer.MAX_VALUE的線程數(shù)量:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
???????1、線程數(shù)無(wú)限制
???????2、有空閑線程則復(fù)用空閑線程,若無(wú)空閑線程則新建線程
???????3、一定程度上減少頻繁創(chuàng)建/銷毀線程,減少系統(tǒng)開(kāi)銷
b.newFixedThreadPool
???????固定線程數(shù)量的線程池:定長(zhǎng)線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
???????1、可控制線程最大并發(fā)數(shù)(同時(shí)執(zhí)行的線程數(shù))
???????2、超出的線程會(huì)在隊(duì)列中等待。
c.newSingleThreadExecutor
???????單線程化的線程池:可以理解為線程數(shù)量為1的FixedThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
???????1、有且僅有一個(gè)工作線程執(zhí)行任務(wù)
???????2、所有任務(wù)按照指定順序執(zhí)行,即遵循隊(duì)列的入隊(duì)出隊(duì)規(guī)則
d.newScheduledThreadPool
???????定時(shí)以指定周期循環(huán)執(zhí)行任務(wù)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
五.隊(duì)列類型
???????一般來(lái)說(shuō),等待隊(duì)列 BlockingQueue 有: ArrayBlockingQueue 、 LinkedBlockingQueue 與 SynchronousQueue 。
???????假設(shè)向線程池提交任務(wù)時(shí),核心線程都被占用的情況下:
???????ArrayBlockingQueue :基于數(shù)組的阻塞隊(duì)列,初始化需要指定固定大小。
???????當(dāng)使用此隊(duì)列時(shí),向線程池提交任務(wù),會(huì)首先加入到等待隊(duì)列中,當(dāng)?shù)却?duì)列滿了之后,再次提交任務(wù),嘗試加入隊(duì)列就會(huì)失敗,這時(shí)就會(huì)檢查如果當(dāng)前線程池中的線程數(shù)未達(dá)到最大線程,則會(huì)新建線程執(zhí)行新提交的任務(wù)。所以最終可能出現(xiàn)后提交的任務(wù)先執(zhí)行,而先提交的任務(wù)一直在等待。
???????LinkedBlockingQueue:基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,初始化可以指定大小,也可以不指定。
???????當(dāng)指定大小后,行為就和 ArrayBlockingQueue一致。而如果未指定大小,則會(huì)使用默認(rèn)的 Integer.MAX_VALUE 作為隊(duì)列大小。這時(shí)候就會(huì)出現(xiàn)線程池的最大線程數(shù)參數(shù)無(wú)用,因?yàn)闊o(wú)論如何,向線程池提交任務(wù)加入等待隊(duì)列都會(huì)成功。最終意味著所有任務(wù)都是在核心線程執(zhí)行。如果核心線程一直被占,那就一直等待。
???????SynchronousQueue :無(wú)容量的隊(duì)列。
???????使用此隊(duì)列意味著希望獲得最大并發(fā)量。因?yàn)闊o(wú)論如何,向線程池提交任務(wù),往隊(duì)列提交任務(wù)都會(huì)失敗。而失敗后如果沒(méi)有空閑的非核心線程,就會(huì)檢查如果當(dāng)前線程池中的線程數(shù)未達(dá)到最大線程,則會(huì)新建線程執(zhí)行新提交的任務(wù)。完全沒(méi)有任何等待,唯一制約它的就是最大線程數(shù)的個(gè)數(shù)。因此一般配合Integer.MAX_VALUE就實(shí)現(xiàn)了真正的無(wú)等待。
???????但是需要注意的是,進(jìn)程的內(nèi)存是存在限制的,而每一個(gè)線程都需要分配一定的內(nèi)存。所以線程并不能無(wú)限個(gè)。