簡介
1. 什么是java線程池
一個(gè)管理線程的池子,它幫我們我們管理線程,避免增加創(chuàng)建線程和銷毀線程的資源損耗
2. 線程池的優(yōu)點(diǎn)
重用線程池中的線程:避免因?yàn)榫€程的創(chuàng)建和銷毀所帶來的性能開銷
提高相應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行
提高線程的可管理性:線程是稀缺資源,如果無限制的創(chuàng)建不僅會(huì)消耗系統(tǒng)的資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控
相關(guān)類及架構(gòu)圖

- Executor:任務(wù)執(zhí)行者,線程池中幾乎所有的類都直接或者間接的實(shí)現(xiàn)了Executor,它是線程池框架的基礎(chǔ),它提供了一種將“任務(wù)提交”與“任務(wù)執(zhí)行”分離開來的機(jī)制
- ExecutorServices:它繼承自Executor,它是“執(zhí)行者服務(wù)接口”,添加了一些用來管理執(zhí)行器生命周期和任務(wù)生命周期的方法
- AbstractExecutorService:是一個(gè)抽象類,實(shí)現(xiàn)了ExecutorService 接口,為ExecutorService中的函數(shù)提供了默認(rèn)實(shí)現(xiàn)
- ThreadPoolExecutor:線程池的核心類,用來處理被提交的任務(wù)
- ScheduledExecutorService:一個(gè)接口,它相當(dāng)于提供了“延時(shí)”和“周期執(zhí)行”功能的ExecutorService
- ScheduledThreadPoolExecutor:一個(gè)實(shí)現(xiàn)類,可以在給定的延遲后執(zhí)行任務(wù),或者定期執(zhí)行命令,比Timer靈活強(qiáng)大
- Executors:它通過靜態(tài)工廠方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等類的對(duì)象。
核心類ThreadPoolExecutor
1. 構(gòu)造方法及參數(shù)含義
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
corePoolSize:核心線程數(shù),默認(rèn)情況下會(huì)一直存活,設(shè)置了allowCoreThreadTimeOut屬性為true時(shí),當(dāng)?shù)却龝r(shí)間超過 keepAliveTime時(shí),核心線程數(shù)會(huì)被終止
maximumPoolSize:線程池中最大的線程數(shù),活動(dòng)線程數(shù)達(dá)到這個(gè)數(shù)值后,后續(xù)的新任務(wù)會(huì)被阻塞
keepAliveTime:非核心線程的閑置時(shí)的超時(shí)時(shí)長,超過這個(gè)時(shí)長,非核心線程就會(huì)被回收
unit:超時(shí)時(shí)長的時(shí)間單位
workQueue:任務(wù)隊(duì)列
threadFactory:線程工廠,為線程池提供創(chuàng)建新線程的功能
RejectedExecutionHandler:拒絕策略
2. 關(guān)鍵參數(shù)
轉(zhuǎn)自:https://juejin.im/entry/58fada5d570c350058d3aaad
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
ctl是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常量表示workerCount的上限值,大約是5億。
下面再介紹下線程池的運(yùn)行狀態(tài). 線程池一共有五種狀態(tài), 分別是:
RUNNING:能接受新提交的任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù);
SHUTDOWN:關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)。在線程池處于 RUNNING 狀態(tài)時(shí),調(diào)用 shutdown()方法會(huì)使線程池進(jìn)入到該狀態(tài)。(finalize() 方法在執(zhí)行過程中也會(huì)調(diào)用shutdown()方法進(jìn)入該狀態(tài));
STOP:不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài);
TIDYING:如果所有的任務(wù)都已終止了,workerCount (有效線程數(shù)) 為0,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)用 terminated() 方法進(jìn)入TERMINATED 狀態(tài)。
TERMINATED:在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài),默認(rèn)terminated()方法中什么也沒有做。
進(jìn)入TERMINATED的條件如下:
線程池不是RUNNING狀態(tài);
線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);
如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;
workerCount為0;
設(shè)置TIDYING狀態(tài)成功。

ctl相關(guān)方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- runStateOf:獲取運(yùn)行狀態(tài);
- workerCountOf:獲取活動(dòng)線程數(shù);
- ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。
3.線程池初始化執(zhí)行過程
- 未達(dá)到核心線程數(shù)時(shí),會(huì)直接啟動(dòng)一個(gè)核心線程執(zhí)行任務(wù)
- 線程池中的線程數(shù)已達(dá)到或超過核心線程數(shù),任務(wù)會(huì)被插入到任務(wù)隊(duì)列中排隊(duì)等待執(zhí)行
- 如果任務(wù)隊(duì)列已滿,且此時(shí)未達(dá)到線程池規(guī)定的最大值,那此時(shí)立即啟動(dòng)一個(gè)非核心線程來執(zhí)行任務(wù)
- 如果線程數(shù)量已達(dá)到線程池規(guī)定的最大值,就拒絕執(zhí)行此任務(wù),執(zhí)行拒絕策略
代碼:
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* clt記錄著runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值,表示當(dāng)前活動(dòng)的線程數(shù);
* 如果當(dāng)前活動(dòng)線程數(shù)小于corePoolSize,則新建一個(gè)線程放入線程池中;
* 并把任務(wù)添加到該線程中。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二個(gè)參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷;
* 如果為true,根據(jù)corePoolSize來判斷;
* 如果為false,則根據(jù)maximumPoolSize來判斷
*/
if (addWorker(command, true))
return;
/*
* 如果添加失敗,則重新獲取ctl值
*/
c = ctl.get();
}
/*
* 如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊(duì)列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新獲取ctl值
int recheck = ctl.get();
// 再次判斷線程池的運(yùn)行狀態(tài),如果不是運(yùn)行狀態(tài),由于之前已經(jīng)把command添加到workQueue中了,
// 這時(shí)需要移除該command
// 執(zhí)行過后通過handler使用拒絕策略對(duì)該任務(wù)進(jìn)行處理,整個(gè)方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法
* 這里傳入的參數(shù)表示:
* 1. 第一個(gè)參數(shù)為null,表示在線程池中創(chuàng)建一個(gè)線程,但不去啟動(dòng);
* 2. 第二個(gè)參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize,添加線程時(shí)根據(jù)maximumPoolSize來判斷;
* 如果判斷workerCount大于0,則直接返回,在workQueue中新增的command會(huì)在將來的某個(gè)時(shí)刻被執(zhí)行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果執(zhí)行到這里,有兩種情況:
* 1. 線程池已經(jīng)不是RUNNING狀態(tài);
* 2. 線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且workQueue已滿。
* 這時(shí),再次調(diào)用addWorker方法,但第二個(gè)參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
* 如果失敗則拒絕該任務(wù)
*/
else if (!addWorker(command, false))
reject(command);
}
addWorker方法
addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行,firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù),core參數(shù)為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize,代碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取運(yùn)行狀態(tài)
int rs = runStateOf(c);
/*
* 這個(gè)if判斷
* 如果rs >= SHUTDOWN,則表示此時(shí)不再接收新任務(wù);
* 接著判斷以下3個(gè)條件,只要有1個(gè)不滿足,則返回false:
* 1. rs == SHUTDOWN,這時(shí)表示關(guān)閉狀態(tài),不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)
* 2. firsTask為空
* 3. 阻塞隊(duì)列不為空
*
* 首先考慮rs == SHUTDOWN的情況
* 這種情況下不會(huì)接受新提交的任務(wù),所以在firstTask不為空的時(shí)候會(huì)返回false;
* 然后,如果firstTask為空,并且workQueue也為空,則返回false,
* 因?yàn)殛?duì)列中已經(jīng)沒有任務(wù)了,不需要再添加線程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取線程數(shù)
int wc = workerCountOf(c);
// 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進(jìn)制是29個(gè)1),返回false;
// 這里的core是addWorker方法的第二個(gè)參數(shù),如果為true表示根據(jù)corePoolSize來比較,
// 如果為false則根據(jù)maximumPoolSize來比較。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試增加workerCount,如果成功,則跳出第一個(gè)for循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失敗,則重新獲取ctl的值
c = ctl.get(); // Re-read ctl
// 如果當(dāng)前的運(yùn)行狀態(tài)不等于rs,說明狀態(tài)已被改變,返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據(jù)firstTask來創(chuàng)建Worker對(duì)象
w = new Worker(firstTask);
// 每一個(gè)Worker對(duì)象都會(huì)創(chuàng)建一個(gè)線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING狀態(tài);
// 如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且firstTask為null,向線程池中添加線程。
// 因?yàn)樵赟HUTDOWN時(shí)不會(huì)在添加新的任務(wù),但還是會(huì)執(zhí)行workQueue中的任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個(gè)HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng)線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
注意一下這里的t.start()這個(gè)語句,啟動(dòng)時(shí)會(huì)調(diào)用Worker類中的run方法,Worker本身實(shí)現(xiàn)了Runnable接口,所以一個(gè)Worker類型的對(duì)象也是一個(gè)線程。
Worker類
線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,ThreadPool維護(hù)的其實(shí)就是一組Worker對(duì)象,看一下Worker的定義:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker類繼承了AQS,并實(shí)現(xiàn)了Runnable接口,注意其中的firstTask和thread屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。
在調(diào)用構(gòu)造方法時(shí),需要把任務(wù)傳入,這里通過getThreadFactory().newThread(this);來新建一個(gè)線程,newThread方法傳入的參數(shù)是this,因?yàn)閃orker本身繼承了Runnable接口,也就是一個(gè)線程,所以一個(gè)Worker對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用ReentrantLock來實(shí)現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的
runWorker方法
在Worker類中的run方法調(diào)用了runWorker方法來執(zhí)行任務(wù),runWorker方法的代碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 獲取第一個(gè)任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
// 允許中斷
w.unlock(); // allow interrupts
// 是否因?yàn)楫惓M顺鲅h(huán)
boolean completedAbruptly = true;
try {
// 如果task為空,則通過getTask來獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里說明一下第一個(gè)if判斷,目的是:
如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài);
如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài);
這里要考慮在執(zhí)行該if語句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會(huì)把狀態(tài)設(shè)置為STOP,回顧一下STOP狀態(tài):
不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入到該狀態(tài)。
STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態(tài)時(shí)線程是非中斷狀態(tài)的,因?yàn)門hread.interrupted()方法會(huì)復(fù)位中斷的狀態(tài)。
總結(jié)一下runWorker方法的執(zhí)行過程:
while循環(huán)不斷地通過getTask()方法獲取任務(wù);
getTask()方法從阻塞隊(duì)列中取任務(wù);
如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是中斷狀態(tài);
調(diào)用task.run()執(zhí)行任務(wù);
如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;
runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。
這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給子類來實(shí)現(xiàn)。
completedAbruptly變量來表示在執(zhí)行任務(wù)過程中是否出現(xiàn)了異常,在processWorkerExit方法中會(huì)對(duì)該變量的值進(jìn)行判斷。
getTask方法
getTask方法用來從阻塞隊(duì)列中取任務(wù),代碼如下:
private Runnable getTask() {
// timeOut變量的值表示上次從阻塞隊(duì)列中取任務(wù)時(shí)是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果線程池狀態(tài)rs >= SHUTDOWN,也就是非RUNNING狀態(tài),再進(jìn)行以下判斷:
* 1. rs >= STOP,線程池是否正在stop;
* 2. 阻塞隊(duì)列是否為空。
* 如果以上條件滿足,則將workerCount減1并返回null。
* 因?yàn)槿绻?dāng)前線程池狀態(tài)的值是SHUTDOWN或以上時(shí),不允許再向阻塞隊(duì)列中添加任務(wù)。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed變量用于判斷是否需要進(jìn)行超時(shí)控制。
// allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進(jìn)行超時(shí);
// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
// 對(duì)于超過核心線程數(shù)量的這些線程,需要進(jìn)行超時(shí)控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情況是因?yàn)榭赡茉诖朔椒▓?zhí)行階段同時(shí)執(zhí)行了setMaximumPoolSize方法;
* timed && timedOut 如果為true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制,并且上次從阻塞隊(duì)列中獲取任務(wù)發(fā)生了超時(shí)
* 接下來判斷,如果有效線程數(shù)量大于1,或者阻塞隊(duì)列是空的,那么嘗試將workerCount減1;
* 如果減1失敗,則返回重試。
* 如果wc == 1時(shí),也就說明當(dāng)前線程是線程池中唯一的一個(gè)線程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根據(jù)timed來判斷,如果為true,則通過阻塞隊(duì)列的poll方法進(jìn)行超時(shí)控制,如果在keepAliveTime時(shí)間內(nèi)沒有獲取到任務(wù),則返回null;
* 否則通過take方法,如果這時(shí)隊(duì)列為空,則take方法會(huì)阻塞直到隊(duì)列不為空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,說明已經(jīng)超時(shí),timedOut設(shè)置為true
timedOut = true;
} catch (InterruptedException retry) {
// 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回循環(huán)重試
timedOut = false;
}
}
}
這里重要的地方是第二個(gè)if判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時(shí),如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize且小于maximumPoolSize,并且workQueue已滿時(shí),則可以增加工作線程,但這時(shí)如果超時(shí)沒有獲取到任務(wù),也就是timedOut為true的情況,說明workQueue已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。
什么時(shí)候會(huì)銷毀?當(dāng)然是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動(dòng)回收。
getTask方法返回null時(shí),在runWorker方法中會(huì)跳出while循環(huán),然后會(huì)執(zhí)行processWorkerExit方法。
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值為true,則說明線程執(zhí)行時(shí)出現(xiàn)了異常,需要將workerCount減1;
// 如果線程執(zhí)行時(shí)沒有出現(xiàn)異常,說明在getTask()方法中已經(jīng)已經(jīng)對(duì)workerCount進(jìn)行了減1操作,這里就不必再減了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//統(tǒng)計(jì)完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
// 從workers中移除,也就表示著從線程池中移除了一個(gè)工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
tryTerminate();
int c = ctl.get();
/*
* 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時(shí),如果worker是異常結(jié)束,那么會(huì)直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待隊(duì)列有任務(wù),至少保留一個(gè)worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個(gè)工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程,runWorker通過getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進(jìn)入processWorkerExit方法,整個(gè)線程結(jié)束,如圖所示:

【2,3部分轉(zhuǎn)自:https://juejin.im/entry/58fada5d570c350058d3aaad
】
4 線程池如何實(shí)現(xiàn)復(fù)用的
線程重用的核心是,我們知道,Thread.start()只能調(diào)用一次,一旦這個(gè)調(diào)用結(jié)束,則該線程就到了stop狀態(tài),不能再次調(diào)用start。
則要達(dá)到復(fù)用的目的,則必須從Runnable接口的run()方法上入手,可以這樣設(shè)計(jì)這個(gè)Runnable.run()方法(就叫外面的run()方法):
它本質(zhì)上是個(gè)無限循環(huán),跑的過程中不斷檢查我們是否有新加入的子Runnable對(duì)象(就叫內(nèi)部的runnable:run()吧,它就是用來實(shí)現(xiàn)我們自己的任務(wù)),有就調(diào)一下我們的run(),其實(shí)就一個(gè)大run()把其它小run()#1,run()#2,...給串聯(lián)起來了,基本原理就這么簡單
詳細(xì)請(qǐng)看:https://www.cnblogs.com/myseries/p/10895078.html
5 java 中自帶的幾個(gè)線程池
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定線程數(shù)的線程池:最大線程數(shù)和核心線程數(shù)相同,在默認(rèn)設(shè)置時(shí),線程不受keepAliveTime影響;使用的無界隊(duì)列,則表示運(yùn)行中不會(huì)拒絕任務(wù),由于newFixedThreadPool只有核心線程,并且這些線程都不會(huì)被回收,也就是它能夠更快速的響應(yīng)外界請(qǐng)求
- SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
單一任務(wù)線程池:保證任務(wù)按順序執(zhí)行,其他的參數(shù)和Fix 無異.這一個(gè)任務(wù)處于活動(dòng)狀態(tài)時(shí),其他任務(wù)都會(huì)在任務(wù)隊(duì)列中排隊(duì)等候依次執(zhí)行,所以在這個(gè)任務(wù)執(zhí)行之間我們不需要處理線程同步的問題。
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
緩沖線程池:是一個(gè)根據(jù)需求創(chuàng)建新線程的線程池,max 是無界的,提供了一個(gè)沒有容量的隊(duì)列,如果主線程提供任務(wù)的速度大于線程處理的速度,則會(huì)不斷的創(chuàng)建線程,極端情況會(huì)耗盡cpu和內(nèi)存資源,所以建議執(zhí)行好事少的任務(wù)
- SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
- ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
定期執(zhí)行任務(wù)的線程池:可以根據(jù)給定的時(shí)間定期的執(zhí)行任務(wù)
使用方法
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.schedule(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+"延遲三秒執(zhí)行");
}
}, 3, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName()+"延遲三秒后每隔2秒執(zhí)行");
}
}, 3, 2, TimeUnit.SECONDS);
schedule(Runnable command, long delay, TimeUnit unit):延遲一定時(shí)間后執(zhí)行Runnable任務(wù);
schedule(Callable callable, long delay, TimeUnit unit):延遲一定時(shí)間后執(zhí)行Callable任務(wù);
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):延遲一定時(shí)間后,以間隔period時(shí)間的頻率周期性地執(zhí)行任務(wù);
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit):與scheduleAtFixedRate()方法很類似,但是不同的是scheduleWithFixedDelay()方法的周期時(shí)間間隔是以上一個(gè)任務(wù)執(zhí)行結(jié)束到下一個(gè)任務(wù)開始執(zhí)行的間隔,而scheduleAtFixedRate()方法的周期時(shí)間間隔是以上一個(gè)任務(wù)開始執(zhí)行到下一個(gè)任務(wù)開始執(zhí)行的間隔,也就是這一些任務(wù)系列的觸發(fā)時(shí)間都是可預(yù)知的。
線程池的使用技巧
需要針對(duì)具體情況而具體處理,不同的任務(wù)類別應(yīng)采用不同規(guī)模的線程池,任務(wù)類別可劃分為CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。(N代表CPU個(gè)數(shù))
CPU密集型任務(wù):線程池中線程個(gè)數(shù)應(yīng)盡量少,如配置N+1個(gè)線程的線程池。
IO密集型任務(wù):由于IO操作速度遠(yuǎn)低于CPU速度,那么在運(yùn)行這類任務(wù)時(shí),CPU絕大多數(shù)時(shí)間處于空閑狀態(tài),那么線程池可以配置盡量多些的線程,以提高CPU利用率,如2*N。
混合型任務(wù):可以拆分為CPU密集型任務(wù)和IO密集型任務(wù),當(dāng)這兩類任務(wù)執(zhí)行時(shí)間相差無幾時(shí),通過拆分再執(zhí)行的吞吐率高于串行執(zhí)行的吞吐率,但若這兩類任務(wù)執(zhí)行時(shí)間有數(shù)據(jù)級(jí)的差距,那么沒有拆分的意義。