引入
來著于一個實際場景案例中遇到的問題:
“某應用中的幾個節(jié)點,jstack查看有大量的timed_wait worker線程,阻塞在任務隊列poll()。說明沒有任務分配給這些worker,但是為啥這些worker不釋放。是不是springboot配置出了問題?minSpareThreads默認10(corePoolSize也是這個數(shù)),整個系統(tǒng)最空閑的時候應該只有10個worker線程處于WAITING狀態(tài)。那這個“非繁忙時段有121個TIMED_WAIT狀態(tài)的worker線程”是咋回事?
附帶目前設置:
acceptCount=200 maxThreads=800 connectionTimeout、keepAliveTimeout都是默認的60s
也即,還沒有完全搞清楚tomcat線程池是如何判定idle的(判定池中一個線程為空閑線程的標準),以及如何對超時的idle線程進行關閉的?!?/p>
分析
為了搞清楚上面這個問題,我們需要深入分析tomcat的線程池、任務隊列、任務封裝,底層的jdk線程池的實現(xiàn),以及這兩個線程池的區(qū)別與聯(lián)系。如果對線程池還沒一點認知的話,可以先看看《動手實現(xiàn)一個簡單的線程池來理解其原理》這篇文章對線程池有個最簡單的直觀認識。有了這一點概念之后,開始分析之旅。
tomcat的worker線程池總的來說是基于jdk線程池ThreadPoolExecutor的,線程池里邊用到的同步阻塞隊列使用的是基于LinkedBlockingQueue但優(yōu)化了的TaskQueue
tomcat線程池的創(chuàng)建
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
任務隊列TaskQueue
作為任務隊列,專門設計用于與線程池Executor一起運行。任務隊列經(jīng)過優(yōu)化,以正確利用線程池Executor中的線程。如果使用普通隊列,當存在空閑線程時,Executor將生成線程,并且您無法將item force到隊列本身。
@Override
public boolean offer(Runnable o) {
//we can't do any checks 線程池為空,啥也判斷不了
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
//達到最大線程數(shù)了,只能無奈的先放隊列試試了,排隊等著唄
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
//如果提交的執(zhí)行中的任務數(shù)<池中線程數(shù),說明有空閑線程,很安心的放隊列里、馬上就會被執(zhí)行了。
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
//如果線程數(shù)還沒達到最大,那么返回false,意思是接著創(chuàng)建一個新線程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
//如果到這了,說明前面的情況都不滿足,那么就入隊。
return super.offer(o);
}
jdk線程池的構(gòu)造方法
org.apache.tomcat.util.threads tomcat線程池基于jdk ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心常駐線程數(shù)
int maximumPoolSize, //最大允許線程數(shù)
long keepAliveTime, //超過coreSize的線程最多可以空閑時間,之后會被終止
TimeUnit unit, //keepAliveTime的時間單位
BlockingQueue<Runnable> workQueue, //基于LinkedBlockingQueue但經(jīng)過優(yōu)化的任務隊列
ThreadFactory threadFactory //線程工廠
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
prestartAllCoreThreads();
}
corePoolSize the number of threads to keep in the pool, evenif they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize the maximum number of threads to allow in thepool
keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit the time unit for the keepAliveTime argument
workQueue the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
threadFactory the factory to use when the executor creates a new thread
handler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
請求任務如何提交給tomcat線程池
是Poller將SocketProcessor提交給executor.executr()的。
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
其中sc是SocketProcessorBase<S> implements Runnable
從上面可以看出,構(gòu)造好線程池之后,執(zhí)行runnable從是execute()方法開始:
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread.
If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current {@code RejectedExecutionHandler}.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
execute中的處理共分3步:
If fewer than corePoolSize threads are running, try to
start a new thread with the given command as its first
task. The call to addWorker atomically checks runState and
workerCount, and so prevents false alarms that would add
threads when it shouldn't, by returning false.If a task can be successfully queued, then we still need
to double-check whether we should have added a thread
(because existing ones died since last checking) or that
the pool shut down since entry into this method. So we
recheck state and if necessary roll back the enqueuing if
stopped, or start a new thread if there are none.If we cannot queue task, then we try to add a new
thread. If it fails, we know we are shut down or saturated
and so reject the task.
addWorker是worker啟動的方法,啟動以后執(zhí)行runWorker方法,這個方法就是線程池中每個線程的核心自旋loop,循環(huán)得到firstTask或者從getTask()得到的task,然后執(zhí)行task.run(),當task != null || (task = getTask()) != null條件不滿足時將跳出loop并執(zhí)行processWorkerExit(w, completedAbruptly)然后線程就tryTerminate()結(jié)束了。代碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法,這個方法返回不為null時,worker線程的自旋loop才能繼續(xù),也是這個方法從同步阻塞的任務隊列拿任務的:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling? 當前worker線程是否應該最終被釋放
//超出corePoolSize了或者指定了core線程也不常駐的情況下為true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //超過core線程數(shù)了
workQueue.take(); //不超過core線程數(shù)的時候一直阻塞等著
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
至此,我們場景案例里問題的答案就在這里了!當timed=true也就是線程數(shù)超過corePoolSize的時候、線程執(zhí)行完上一次任務后從隊列里阻塞等待poll最多keepAliveTime時間仍沒有拿到任務時,將會timeout=true,下一次for循環(huán)會走到如下代碼塊:
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
這個if條件里(timed && timedOut)為true,workerCount>1,所以workQueue.isEmpty()也不影響最終結(jié)果,都是true,執(zhí)行compareAndDecrementWorkerCount(c),返回null,前面的runWoker里的自旋Loop就跳出了,線程terminate結(jié)束掉。
問題的答案
現(xiàn)在回答一下案例里的問題,如果有超出corePoolSize數(shù)量的線程存在,通過jstack查看還都處于TIMED_WAIT狀態(tài)在workQueue.poll方法上,那么根據(jù)上面的分析,每個這樣的線程一定是在60秒之內(nèi)都從隊列里拿到了一個任務來執(zhí)行,不然早被結(jié)束掉了。也就是說tomcat還是有一定的請求量的,1分鐘里121個線程都至少執(zhí)行了一個task。
我們需要做什么嗎?
要根據(jù)tomcat的確切的請求量來決定,如果是每個線程一刻不停的在不斷的去隊列里邊獲取任務執(zhí)行(只不過jstack觀察的時候剛好看起來這些woker很空閑),那么可以保持現(xiàn)狀,等請求量下去了線程數(shù)也就跟著下去了。如果是每個線程較為空閑,比如剛好真的1分鐘只執(zhí)行了1兩次任務,那么其實可以減少maximumPoolSize、比如只設置默認的200個,或者通過配置tomcat線程池的keepAliveTime、比如改為30s,加快釋放空閑線程。