前言
JDK中為我們提供了一個并發(fā)線程框架,它是的我們可以在有異步任務(wù)或大量并發(fā)任務(wù)需要執(zhí)行時可以使用它提供的線程池,大大方便了我們使用線程,同時將我們從創(chuàng)建、管理線程的繁瑣任務(wù)中解放出來,能夠更加快速的實(shí)現(xiàn)業(yè)務(wù)、功能。合理的使用線程池可以為我們帶來三個好處:
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程來減少線程創(chuàng)建與銷毀的開銷。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等待線程創(chuàng)建就直接運(yùn)行。
- 提高線程的可管理性。線程是稀缺資源,不可能無限的創(chuàng)建,不僅會消耗大量的系統(tǒng)資源,還會影響系統(tǒng)的穩(wěn)定性,通過使用線程池可以對線程進(jìn)行分配、監(jiān)控等。
ThreadPoolExecutor就是JDK提供的線程池的核心類,我們使用的Executors框架底層就是對ThreadPoolExecutor進(jìn)行了封裝。下面我們一起通過分析ThreadPoolExecutor的源碼來了解JDK線程池的實(shí)現(xiàn)原理。
線程池的創(chuàng)建-ThreadPoolExecutor的構(gòu)造
創(chuàng)建一個ThreadPoolExecutor需要傳入一些參數(shù),我們常用的一種ThreadPoolExecutor的構(gòu)造函數(shù)如下所示。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
這些參數(shù)對應(yīng)的是ThreadPoolExecutor的成員變量,我們通過這些內(nèi)部成員變量也可以先一窺ThreadPoolExecutor的特性。ThreadPoolExecutor的主要成員變量如下:
private volatile ThreadFactory threadFactory;//創(chuàng)建線程的工廠類
private volatile RejectedExecutionHandler handler;//當(dāng)線程池飽和或者關(guān)閉時,會執(zhí)行該句柄的鉤子(hook)
private volatile long keepAliveTime;//空閑線程的等待時間(納秒)
private volatile boolean allowCoreThreadTimeOut;//默認(rèn)為false,此時核心線程會保持活躍(即使處于空閑狀態(tài));如果為true,則核心線程會在空閑狀態(tài)超時等待keepAliveTime時間等待任務(wù)
private volatile int corePoolSize;//線程池中保持的線程數(shù),即使有些線程已經(jīng)處于空閑狀態(tài),任然保持存活
private volatile int maximumPoolSize;//線程池最大值,最大邊界是CAPACITY
private final BlockingQueue<Runnable> workQueue;//等待執(zhí)行的任務(wù)隊(duì)列
private final HashSet<Worker> workers = new HashSet<Worker>();//線程池中包含的所有worker線程
線程池創(chuàng)建后,來了一個新的任務(wù)需要執(zhí)行,此時我們調(diào)用
public void execute(Runnable command)
方法,線程池此時指派一個線程來執(zhí)行該任務(wù),我們通過跟蹤分析該方法的源碼,理解線程池的運(yùn)行、管理細(xì)節(jié)。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//線程池的狀態(tài)控制變量
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ctl是線程池的狀態(tài)控制變量。該變量是一個AtomicInteger類型,它包裝了兩個域:workerCount,活躍的線程數(shù);runState,表示線程池狀態(tài),RUNNING,SHUTDOWN等。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
workerCount由29位表示,因此線程池的線程數(shù)最多有(2^29-1)。runStae用來表示線程池 中的線程在它的整個生命周期中的不同狀態(tài),現(xiàn)在線程池提供了5中狀態(tài):
/** runState會隨著時間單調(diào)遞增,線程池的運(yùn)行狀態(tài)有以下這些轉(zhuǎn)換:
* RUNNING -> SHOUTDOWN 線程池顯示調(diào)用shutdown()方法
* (RUNNING or SHUTDOWN) -> STOP
* 調(diào)用shutdownNow()
* SHUTDOWN -> TIDYING
* 任務(wù)隊(duì)列與線程池都為空時
* STOP -> TIDYING
* 線程池為空
* TIDYING -> TERMINATED
* 當(dāng)鉤子terminated()執(zhí)行完畢
*
// runState is stored in the high-order bits
//線程池收到了一個新的任務(wù),并且執(zhí)行隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//此時 線程池不接受新的任務(wù),但還會執(zhí)行隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//此時線程池不接受新的任務(wù),不執(zhí)行隊(duì)列中的任務(wù),同時中斷正在 執(zhí)行的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
//所有任務(wù)都已經(jīng)終止,同時workerCount為0,過渡到TRYING狀態(tài) 的線程會運(yùn)行鉤子方法terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//terminated()方法 執(zhí)行完畢
private static final int TERMINATED = 3 << COUNT_BITS;
在awaitTermination方法 上等待 的線程將會 在 線程的 runState變?yōu)門ERMINATED時返回。
繼續(xù)分析execute方法,接下來會有連續(xù)的三步:
- 如果正在運(yùn)行的線程數(shù)小于corePoolSize,會啟動一個線程來執(zhí)行該任務(wù),同時addWorker方法會原子的檢查runState狀態(tài)來保證線程 現(xiàn)在 處于 可以 運(yùn)行的狀態(tài),同時修改workerCount數(shù)量。
- 如果線程池中活躍線程數(shù)大于corePoolSize,且線程池處于RUNNING狀態(tài),于是會將任務(wù)加入 等待隊(duì)列。
-
如果任務(wù) 不能入隊(duì),我們會嘗試添加一個新的線程,如果還是失敗,我們會根據(jù)拋棄策略調(diào)用對應(yīng)拒絕方法。
以上execute方法就包含了線程池執(zhí)行一個新的任務(wù)的全部流程,如下圖示:
線程池處理流程
線程池中各模塊的工作示意圖如下:
圖中的數(shù)字是任務(wù)在線程池中的處理邏輯順序
線程池中的線程-Worker原理分析
提交到線程池的任務(wù)會被封裝成一個Worker,worker封裝了一個線程和任務(wù)。由于Worker本身繼承自AQS,是可以直接加鎖的。提交任務(wù)的具體邏輯如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//判斷線程池狀態(tài)是否可以提交新的任務(wù)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判斷線程池中的workerCount數(shù)目是否達(dá)到了線程池的邊界值
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加workerCount數(shù)目
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//workers是一個集合,包含了所有池中的worker線程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//worker被你加入集合后,線程開始執(zhí)行任務(wù)
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
我們可以發(fā)現(xiàn),在這里當(dāng)workerCount通過CAS正確加1后,后需要獲取一個全局鎖mainLock,在加鎖期間先對線程池的狀態(tài)以及線程池內(nèi)的線程數(shù)進(jìn)行再次檢查,正常后會把該新的worker線程加入workers集合,然后線程開始執(zhí)行該任務(wù)。線程是怎么開始執(zhí)行任務(wù)的呢?我們先看一下Worker的構(gòu)造:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{...}
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//由于Worker本身就是Runnable的,所以創(chuàng)建一個新的線程的時候,就已自身作為參數(shù)了,當(dāng)線程thread調(diào)用start啟動了線程開始執(zhí)行時,就會運(yùn)行傳入的Woker的run方法。
}
線程調(diào)用start方法啟動的時候就是Worker的run方法開始執(zhí)行。
public void run() {
runWorker(this);
}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
worker重復(fù)的從隊(duì)列里取出任務(wù)執(zhí)行,同時處理以下一些問題
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
只要線程池處于RUNNING狀態(tài),就不停的從任務(wù)隊(duì)列取出任務(wù)
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and
* clearInterruptsForTaskRun called to ensure that unless pool is
* stopping, this thread does not have its interrupt set.
取出任務(wù)后執(zhí)行任務(wù)前,需要對Woker加鎖,防止任務(wù)執(zhí)行時發(fā)生中斷
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
任務(wù)執(zhí)行前會有一個前置的方法,該方法可能會拋出異常從而導(dǎo)致任務(wù)還未執(zhí)行線程就退出
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to
* afterExecute. We separately handle RuntimeException, Error
* (both of which the specs guarantee that we trap) and arbitrary
* Throwables. Because we cannot rethrow Throwables within
* Runnable.run, we wrap them within Errors on the way out (to the
* thread's UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//循環(huán)從任務(wù)隊(duì)列里取出任務(wù)執(zhí)行
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//執(zhí)行task的前置攔截
Throwable thrown = null;
try {
task.run();//任務(wù)運(yùn)行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//執(zhí)行task的后置攔截器
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//任務(wù)最后的清理工作
}
}
我們先看看是如何從任務(wù)隊(duì)列取出任務(wù)的:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 基于當(dāng)前線程池的配置線程會在該方法上阻塞或是超時等待任務(wù)。
* 如果該worker由于以下幾種情況必須退出,該方法會返回null:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 由于調(diào)用setMaximumPoolSize使得線程池的worker數(shù)量超過了maximumPoolSize
* 2. The pool is stopped.
* 線程池處于STOPPED狀態(tài)
* 3. The pool is shutdown and the queue is empty.
* 線程池被關(guān)閉同時隊(duì)列為空
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//addWorker方法中已經(jīng)先增加了workerCount的數(shù)目,此時既然該任務(wù)不能夠執(zhí)行,則需要通過CAS減小workerCount的數(shù)目
return null;
}
boolean timed; // Are workers subject to culling?worker是否要被踢出
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//踢出任務(wù)隊(duì)列首元素返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker方法的最后清理操作是這樣的:
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
* 對執(zhí)行完run方法的worker進(jìn)行清理和記錄操作。該方法會從workers線程集合移除
* 當(dāng)前worker對應(yīng)的線程。如果Worker在run方法執(zhí)行期間發(fā)生異常導(dǎo)致退出,那么completedAbruptly
* 是會被設(shè)置為true,此時我們會添加一個新的null任務(wù)。
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate會在符合條件的情況下轉(zhuǎn)換線程池狀態(tài)至TERMINATED,具體如下分析:
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//線程池處于RUNNING狀態(tài)或者
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
線程池的銷毀-shutDown/shutDownNow
線程池使用完畢,主動關(guān)閉線程池。此時我們會調(diào)用shutDown()方法。
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 進(jìn)行有序的任務(wù)關(guān)閉,此時線程池不接受新的任務(wù),但是前面提交的任務(wù)還是會繼續(xù)執(zhí)行完
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* 該方法不會等待前面提交的任務(wù)完全執(zhí)行完,如需要可以使用awaitTermination
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//線程池池的全局鎖
try {
checkShutdownAccess();//安全驗(yàn)證,確認(rèn)線程池有權(quán)限關(guān)閉線程
advanceRunState(SHUTDOWN);//線程池狀態(tài)轉(zhuǎn)換為SHUTDOWN
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();//通過線程池的狀況判斷是否轉(zhuǎn)移線程池的狀態(tài)至TERMINATED
}
shutDownNow方法與shutDown有些不同,shutDown是所謂的‘Elegant’關(guān)閉模式,而shutDownNow則比較‘Rude’方式。shutDownNow會立即停止所有正在執(zhí)行的任務(wù),具體代碼如下:
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
* 立即停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的執(zhí)行,并返回正在等待執(zhí)行的任務(wù)列表
* 該方法返回前會被從任務(wù)列里移除
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
* 該方法只會盡最大努力去停止正在運(yùn)行的任務(wù)-通過Thread.interupt方法取消任務(wù),因此如果任何一個任務(wù)無法響應(yīng)中斷就不會執(zhí)行停止。
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);//線程池狀態(tài)轉(zhuǎn)換為STOP
interruptWorkers();
tasks = drainQueue();//
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
線程池的飽和策略-RejectedExecutionHandler
前面在介紹ThreadPoolExecutor的主要成員變量時,我們簡單介紹了包和策略參數(shù):
private volatile RejectedExecutionHandler handler;//當(dāng)線程池飽和或者關(guān)閉時,會執(zhí)行該句柄的鉤子(hook)
在默認(rèn)情況下,ThreadPoolExecutor使用拋棄策略。
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
ThreadPoolExecutor為我們提供了四種線程池飽和策略,也即對應(yīng)的四種靜態(tài)內(nèi)部類。這些策略是在線程池與任務(wù)隊(duì)列都滿了的情況下,對新提交給線程池的任務(wù)執(zhí)行的操作。也即前面我們分析過的execute方法在所有情況都無效的情況下執(zhí)行的一步,調(diào)用對應(yīng)飽和策略的鉤子:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
這四種策略如下:
- CallerRunsPolicy: 在線程池沒有關(guān)閉(調(diào)用shut Down)的情況下,直接由調(diào)用線程來執(zhí)行該任務(wù)。否則直接就丟棄該任務(wù),什么也不做。
- AbortPolicy:直接拋出異常。
- AbortPolicy:直接丟棄該任務(wù),什么也不做。
- DiscardOldestPolicy: 在線程池沒有關(guān)閉(調(diào)用shutDown)的情況下,丟棄線程池任務(wù)隊(duì)列中等待最久-即隊(duì)列首部的任務(wù),并嘗試直接執(zhí)行該觸發(fā)飽和策略的任務(wù)。
最后的總結(jié)
我們觀察前面分析的源碼,包括線程池處于生命周期的一些階段(如線程池提交任務(wù),還是線程池的退出,銷毀)都會發(fā)現(xiàn)一個問題,這些地方都會用到線程池的全局鎖。
private final ReentrantLock mainLock = new ReentrantLock();
全局鎖的使用在多線程調(diào)用ThreadPoolExecutor的情況下會導(dǎo)致性能問題。但是我們仔細(xì)思考一下會發(fā)現(xiàn),向線程池提交任務(wù)時獲取全局鎖是在線程池還未預(yù)熱完成(即線程池的活躍線程還小于corePoolSize)的情況發(fā)生的事情,當(dāng)線程池的活躍線程超過corePoolSize后,以后在執(zhí)行execute方法提交新的任務(wù),主要還是執(zhí)行我們前面分析execute方法時說的第二步,把任務(wù)添加到等待隊(duì)列。所以后面不會出現(xiàn)對全局鎖的爭搶場景。也就是說,對全局鎖的爭搶只會出現(xiàn)在線程池預(yù)熱的初期,但這個預(yù)熱的過程是和corePoolSize有關(guān)的,我們需要關(guān)注。
最后,我們對ThreadPoolExecutor進(jìn)行一下總結(jié):
- 線程池有一個預(yù)熱階段,在線程池的活躍線程數(shù)未達(dá)到corePoolSize時,并發(fā)提交任務(wù)會出現(xiàn)對線程池全局鎖的爭搶。
- 線程池中的Worker數(shù)超過corePoolSize時,后續(xù)提交的任務(wù)都會進(jìn)入任務(wù)等待隊(duì)列。
- corePoolSize個活躍線程被線程池創(chuàng)建后,會循環(huán)從任務(wù)等待隊(duì)列獲取任務(wù)執(zhí)行。
- 當(dāng)線程池飽和或者關(guān)閉時,會執(zhí)行對應(yīng)的飽和策略。ThreadPoolExecutor默認(rèn)使用使用拋棄策略。
以上就是ThreadPoolExecutor的源碼分析,有沒認(rèn)識到的或理解有誤的,歡迎指出、討論。
參考文獻(xiàn)
- 《Java并發(fā)編程的藝術(shù)》
- JDK1.7源碼

