tomcat的線程池擴(kuò)展了jdk的executor,而且隊列用的是自己的task queue,因此其策略與jdk的有所不同。JDK 原生線程池可以說功能比較完善,使用也比較簡單,那為何 Tomcat/Jetty 卻不選擇這個方案,反而自己去動手實現(xiàn)那?

tomcat線程池策略

可以看到,tomcat 其實是分為兩個線程池, 一個是acceptor 來接受外部的請求,一個是Executor 來執(zhí)行請求。 為什么要分兩個線程池,這個問題在前面已經(jīng)講過。 大家不理解可以看看前面的文章。
Tomcat 作為一個老牌的 servlet 容器,處理多線程肯定得心應(yīng)手,為了能保證多線程環(huán)境下的高效,必然使用了線程池。
但是,Tomcat 并沒有直接使用 j.u.c 里面的線程池,而是對線程池進(jìn)行了擴(kuò)展,首先我們回憶一下,j.u.c 中的線程池的幾個核心參數(shù)是怎么配合的:
- 如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個新的線程來執(zhí)行任務(wù)。
- 如果運(yùn)行的線程等于或多于 corePoolSize,將任務(wù)加入 BlockingQueue。
- 如果 BlockingQueue 內(nèi)的任務(wù)超過上限,則創(chuàng)建新的線程來處理任務(wù)。
- 如果創(chuàng)建的線程超出 maximumPoolSize,任務(wù)將被拒絕策略拒絕。
Tomcat 的線程池其實封裝不厚重,只是對 jdk 線程池做了簡單優(yōu)化:
- 任務(wù)執(zhí)行失敗時不會直接拋出錯誤,而是裝回隊列里再次嘗試執(zhí)行;
- 當(dāng)線程池沒有達(dá)到最大執(zhí)行線程的時候,會優(yōu)先開線程再使用任務(wù)隊列;
- 擴(kuò)展計數(shù)用于追蹤任務(wù)的執(zhí)行情況;
- 將線程池融入 Catalina 的生命周期組件中。
StandardThreadExecutor
StandardThreadExecutor 是 Catalina 結(jié)構(gòu)中的一部分,是 Tomcat 生命周期中的池化線程資源的封裝。類總覽:
public class StandardThreadExecutor extends LifecycleMBeanBase
implements Executor, ResizableExecutor {
// 統(tǒng)一的 String 的管理類,用來防止編碼等一系列的問題
protected static final StringManager sm =
StringManager.getManager(Constants.Package);
// 創(chuàng)建出來的線程的優(yōu)先級
protected int threadPriority = Thread.NORM_PRIORITY;
// 線程是否是守護(hù)線程,默認(rèn)為是
protected boolean daemon = true;
// 線程名稱
protected String namePrefix = "tomcat-exec-";
// 默認(rèn)的最大線程數(shù)量
protected int maxThreads = 200;
// 默認(rèn)的最小線程數(shù)量
protected int minSpareThreads = 25;
// 存在時間
protected int maxIdleTime = 60000;
// 真實工作的 ThreadPoolExecutor
// 本質(zhì)是 StandardThreadExecutor 只是 ThreadPoolExecutor 的裝飾器
// 此處的對象類型是 org.apache.tomcat.util.threads.ThreadPoolExecutor
protected ThreadPoolExecutor executor = null;
// 線程池名稱
protected String name;
// 是否開啟線程池最小線程數(shù)量,如果此處為 false 的話,minSpareThreads 就沒有意義
protected boolean prestartminSpareThreads = false;
// 默認(rèn)的任務(wù)隊列長度
protected int maxQueueSize = Integer.MAX_VALUE;
// 重建線程的時間間隔
protected long threadRenewalDelay =
org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY;
// 任務(wù)隊列
private TaskQueue taskqueue = null;
// 其它方法暫時忽略
}
生命周期
/**
* 初始化線程池
*/
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
}
/**
* 開始線程池
*/
@Override
protected void startInternal() throws LifecycleException {
// 任務(wù)隊列
taskqueue = new TaskQueue(maxQueueSize);
// 線程工廠
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
// 初始化線程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
// 重建線程的時間間隔
executor.setThreadRenewalDelay(threadRenewalDelay);
// 設(shè)置線程池最小線程數(shù)量
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
// 線程池任務(wù)隊列的 parent
taskqueue.setParent(executor);
// 設(shè)置組件的生命周期狀態(tài)
setState(LifecycleState.STARTING);
}
/**
* 關(guān)閉線程池
*/
@Override
protected void stopInternal() throws LifecycleException {
setState(LifecycleState.STOPPING);
if (executor != null) {
executor.shutdownNow();
}
executor = null;
taskqueue = null;
}
/**
* 清除線程池
*/
@Override
protected void destroyInternal() throws LifecycleException {
super.destroyInternal();
}
/**
* 關(guān)閉線程池
*/
public void contextStopping() {
if (executor != null) {
executor.contextStopping();
}
}
任務(wù)執(zhí)行
/**
* 加入一個帶超時的任務(wù)
**/
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
// 調(diào)用 executor 對象去執(zhí)行
// 如果 executor 對象是空的,則拋出異常
if (executor != null) {
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
/**
* 加入一個不帶超時的任務(wù)
**/
@Override
public void execute(Runnable command) {
// 邏輯基本同上
if (executor != null) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
// 此處會再嘗試將任務(wù)加入一次等待隊列中
// TaskQueue.force(...) 方法底層會調(diào)用 Queue.offer(...) 方法
// 如果仍然失敗,會拋出異常
if (!((TaskQueue) executor.getQueue()).force(command)) {
throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
}
}
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
其它方法基本都是 get / set 方法,可以忽略。
ThreadPoolExecutor
org.apache.tomcat.util.threads.ThreadPoolExecutor 是 Tomcat 中的線程池類。
類和變量總覽
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
// 統(tǒng)一的 String 的管理類,用來防止編碼等一系列的問題
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
// 任務(wù)提交數(shù)量的計數(shù)
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
// 線程自殺計數(shù)
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
// 重建線程的時間間隔
private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
// 其它方法暫時忽略
// ...
構(gòu)造器
/**
* 基本是沿用了 java.util.concurrent.ThreadPoolExecutor 的構(gòu)造方法
**/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
// 調(diào)用 juc 中的 ThreadPoolExecutor 進(jìn)行線程池的初始化
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
// 用于初始化常駐線程
prestartAllCoreThreads();
}
// 下列的構(gòu)造方法都差不多
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
prestartAllCoreThreads();
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
prestartAllCoreThreads();
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
prestartAllCoreThreads();
}
拒絕策略
// ThreadPoolExecutor.class
private static class RejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r,
java.util.concurrent.ThreadPoolExecutor executor) {
// 直接拋出錯誤
throw new RejectedExecutionException();
}
}
任務(wù)執(zhí)行
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable command, long timeout, TimeUnit unit) {
// 提交計數(shù)加一
submittedCount.incrementAndGet();
try {
// 此處調(diào)用 java.util.concurrent.ThreadPoolExecutor 中的 execute(...) 方法
super.execute(command);
} catch (RejectedExecutionException rx) {
// 如果調(diào)用父類中的方法執(zhí)行錯誤,會嘗試將任務(wù)再一次放入到等待隊列里
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 此處嘗試放入等待隊列
// 如果也失敗了,就回滾提交計數(shù),并拋出異常
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 計數(shù)加一
submittedCount.decrementAndGet();
// 如果沒有報錯,那么此處嘗試關(guān)閉多余的線程
// 拋出錯誤的方式停止線程
if (t == null) {
stopCurrentThreadIfNeeded();
}
}
/**
* 判斷是否需要關(guān)閉線程
**/
protected void stopCurrentThreadIfNeeded() {
// 如果線程存活時間超過了 delay 值,那么此處會拋出一個錯誤,使線程停止
if (currentThreadShouldBeStopped()) {
long lastTime = lastTimeThreadKilledItself.longValue();
if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
if (lastTimeThreadKilledItself.compareAndSet(lastTime,
System.currentTimeMillis() + 1)) {
final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
Thread.currentThread().getName());
throw new StopPooledThreadException(msg);
}
}
}
}
protected boolean currentThreadShouldBeStopped() {
// 如果當(dāng)前線程并非工作線程,或者不存在線程存活 delay 值,那么此處返回 false
// 如果當(dāng)前線程是工作線程,且設(shè)置了 delay 時間,且當(dāng)前線程的存活時間已經(jīng)超過了設(shè)置值,那么此處返回 true
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {
TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
if (currentTaskThread.getCreationTime() <
this.lastContextStoppedTime.longValue()) {
return true;
}
}
return false;
}
從execute 方法可以看出,當(dāng)提交線程的時候,如果被線程池拒絕了,Tomcat 的線程池,還會厚著臉皮再次嘗試,調(diào)用 force() 方法"強(qiáng)行"的嘗試向阻塞隊列中添加任務(wù)。
優(yōu)雅關(guān)閉
public void contextStopping() {
this.lastContextStoppedTime.set(System.currentTimeMillis());
// 保存 corePoolSize 的值
int savedCorePoolSize = this.getCorePoolSize();
// 獲取隊列
TaskQueue taskQueue =
getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
// 將 taskQueue 中的 forcedRemainingCapacity 置為零
// 不太清楚 forcedRemainingCapacity 有什么作用
if (taskQueue != null) {
taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
}
// corePoolSize 置為零
this.setCorePoolSize(0);
// 將 taskQueue 中的 forcedRemainingCapacity 置空
if (taskQueue != null) {
taskQueue.setForcedRemainingCapacity(null);
}
// 恢復(fù) corePoolSize
this.setCorePoolSize(savedCorePoolSize);
}
TaskQueue
TaskQueue 是 Tomcat 中對任務(wù)隊列的增強(qiáng)和封裝:
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
// 序列編碼
private static final long serialVersionUID = 1L;
// 字符串管理類
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
// 任務(wù)隊列關(guān)聯(lián)的線程池
private transient volatile ThreadPoolExecutor parent = null;
// 不太清楚是做什么用的一個容量計數(shù)
private Integer forcedRemainingCapacity = null;
// 其它方法暫時忽略
// ...
加入、獲取任務(wù)的相關(guān)方法
// 不帶超時的添加任務(wù)方法
public boolean force(Runnable o) {
// 關(guān)聯(lián)線程池不可為空
if (parent == null || parent.isShutdown())
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
// 調(diào)用 LinkedBlockingQueue 的 offer(...) 方法添加任務(wù)
return super.offer(o);
}
// 帶超時的添加任務(wù)方法
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (parent == null || parent.isShutdown())
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
return super.offer(o,timeout,unit);
}
// 本質(zhì)上是調(diào)用父類的 offer(...) 方法
// 非阻塞添加任務(wù)的方法
@Override
public boolean offer(Runnable o) {
if (parent == null)
return super.offer(o);
if (parent.getPoolSize() == parent.getMaximumPoolSize())
return super.offer(o);
if (parent.getSubmittedCount() <= (parent.getPoolSize()))
return super.offer(o);
// 這種情況下線程池可以直接消費(fèi)任務(wù),無需放入任務(wù)隊列等待
if (parent.getPoolSize() < parent.getMaximumPoolSize())
return false;
return super.offer(o);
}
// 帶超時的阻塞方式獲取任務(wù)
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
// 獲取一個任務(wù),如果獲取到的為空,則停止當(dāng)前線程
// 能獲取到就返回任務(wù)
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
// 阻塞方式獲取任務(wù)
@Override
public Runnable take() throws InterruptedException {
// 線程池存在的情況下,會使用限時的方式去獲取任務(wù)
if (parent != null
&& parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
return super.take();
}
TaskQueue 在設(shè)計的時候都考慮了關(guān)聯(lián)線程池存不存在的情況,筆者認(rèn)為這應(yīng)該是 Tomcat 的作者考慮到開發(fā)者可能會需要復(fù)用 TaskQueue 到其它的場景中。
在提交任務(wù)的時候,增加了幾個分支判斷。
首先我們看看 parent 是什么:
private transient volatile ThreadPoolExecutor parent = null;
這里需要特別注意這里的 ThreadPoolExecutor 并不是 jdk里面的 java.util.concurrent.ThreadPoolExecutor 而是 tomcat 自己實現(xiàn)的。
我們分別來看 offer 中的幾個 if 分支。
首先我們需要明確一下,當(dāng)一個線程池需要調(diào)用阻塞隊列的 offer 的時候,說明線程池的核心線程數(shù)已經(jīng)被占滿了。(記住這個前提非常重要)
要理解下面的代碼,首先需要復(fù)習(xí)一下線程池的 getPoolSize() 獲取的是什么?我們看源碼:
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
需要注意的是,workers.size() 包含了 coreSize 的核心線程和臨時創(chuàng)建的小于 maxSize 的臨時線程。
先看第一個 if
// 如果線程池的工作線程數(shù)等于 線程池的最大線程數(shù),這個時候沒有工作線程了,就嘗試加入到阻塞隊列中
if (parent.getPoolSize() == parent.getMaximumPoolSize()){
return super.offer(o);
}
經(jīng)過第一個 if 之后,線程數(shù)必然在核心線程數(shù)和最大線程數(shù)之間。
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
對于 parent.getSubiitedCount() ,我們要先搞清楚 submiitedCount 是什么
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
這個數(shù)是一個原子類的整數(shù),用于記錄提交到線程中,且還沒有結(jié)束的任務(wù)數(shù)。包含了在阻塞隊列中的任務(wù)數(shù)和正在被執(zhí)行的任務(wù)數(shù)兩部分之和 。
所以這行代碼的策略是,如果已提交的線程數(shù)小于等于線程池中的線程數(shù),表明這個時候還有空閑線程,直接加入阻塞隊列中。為什么會有這種情況發(fā)生?其實我的理解是,之前創(chuàng)建的臨時線程還沒有被回收,這個時候直接把線程加入到隊里里面,自然就會被空閑的臨時線程消費(fèi)掉了。
我們繼續(xù)往下看:
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
由于上一個 if 條件的存在,走到這個 if 條件的時候,提交的線程數(shù)已經(jīng)大于核心線程數(shù)了,且沒有空閑線程,所以返回一個 false 標(biāo)明,表示任務(wù)添加到阻塞隊列失敗。線程池就會認(rèn)為阻塞隊列已經(jīng)無法繼續(xù)添加任務(wù)到隊列中了,根據(jù)默認(rèn)線程池的工作邏輯,線程池就會創(chuàng)建新的線程直到最大線程數(shù)。
回憶一下 jdk 默認(rèn)線程池的實現(xiàn),如果阻塞隊列是無界的,任務(wù)會無限的添加到無界的阻塞隊列中,線程池就無法利用核心線程數(shù)和最大線程數(shù)之間的線程數(shù)了。
Tomcat 的實現(xiàn)就是為了,線程池即使核心線程數(shù)滿了以后,且使用無界隊列的時候,線程池依然有機(jī)會創(chuàng)建新的線程,直到達(dá)到線程池的最大線程數(shù)。
總結(jié)一下:
Tomcat 線程池的邏輯:
- 如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個新的線程來執(zhí)行任務(wù)。
- 如果線程數(shù)大于 corePoolSize了,Tomcat 的線程不會直接把線程加入到無界的阻塞隊列中,而是去判斷,submittedCount(已經(jīng)提交線程數(shù))是否等于 maximumPoolSize。
- 如果等于,表示線程池已經(jīng)滿負(fù)荷運(yùn)行,不能再創(chuàng)建線程了,直接把線程提交到隊列,
- 如果不等于,則需要判斷,是否有空閑線程可以消費(fèi)。
- 如果有空閑線程則加入到阻塞隊列中,等待空閑線程消費(fèi)。
- 如果沒有空閑線程,嘗試創(chuàng)建新的線程。(這一步保證了使用無界隊列,仍然可以利用線程的 maximumPoolSize)。
- 如果總線程數(shù)達(dá)到 maximumPoolSize,則繼續(xù)嘗試把線程加入 BlockingQueue 中。
- 如果 BlockingQueue 達(dá)到上限(假如設(shè)置了上限),被默認(rèn)線程池啟動拒絕策略,tomcat 線程池會 catch 住拒絕策略拋出的異常,再次把嘗試任務(wù)加入中 BlockingQueue 中。
- 再次加入失敗,啟動拒絕策略。