Tomcat 線程池

tomcat的線程池擴(kuò)展了jdk的executor,而且隊列用的是自己的task queue,因此其策略與jdk的有所不同。JDK 原生線程池可以說功能比較完善,使用也比較簡單,那為何 Tomcat/Jetty 卻不選擇這個方案,反而自己去動手實現(xiàn)那?

image.png

tomcat線程池策略

image.png

可以看到,tomcat 其實是分為兩個線程池, 一個是acceptor 來接受外部的請求,一個是Executor 來執(zhí)行請求。 為什么要分兩個線程池,這個問題在前面已經(jīng)講過。 大家不理解可以看看前面的文章。

Tomcat 作為一個老牌的 servlet 容器,處理多線程肯定得心應(yīng)手,為了能保證多線程環(huán)境下的高效,必然使用了線程池。

但是,Tomcat 并沒有直接使用 j.u.c 里面的線程池,而是對線程池進(jìn)行了擴(kuò)展,首先我們回憶一下,j.u.c 中的線程池的幾個核心參數(shù)是怎么配合的:

  • 如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個新的線程來執(zhí)行任務(wù)。
  • 如果運(yùn)行的線程等于或多于 corePoolSize,將任務(wù)加入 BlockingQueue。
  • 如果 BlockingQueue 內(nèi)的任務(wù)超過上限,則創(chuàng)建新的線程來處理任務(wù)。
  • 如果創(chuàng)建的線程超出 maximumPoolSize,任務(wù)將被拒絕策略拒絕。

Tomcat 的線程池其實封裝不厚重,只是對 jdk 線程池做了簡單優(yōu)化:

  • 任務(wù)執(zhí)行失敗時不會直接拋出錯誤,而是裝回隊列里再次嘗試執(zhí)行;
  • 當(dāng)線程池沒有達(dá)到最大執(zhí)行線程的時候,會優(yōu)先開線程再使用任務(wù)隊列;
  • 擴(kuò)展計數(shù)用于追蹤任務(wù)的執(zhí)行情況;
  • 將線程池融入 Catalina 的生命周期組件中。

StandardThreadExecutor

StandardThreadExecutor 是 Catalina 結(jié)構(gòu)中的一部分,是 Tomcat 生命周期中的池化線程資源的封裝。類總覽:

public class StandardThreadExecutor extends LifecycleMBeanBase
        implements Executor, ResizableExecutor {

    // 統(tǒng)一的 String 的管理類,用來防止編碼等一系列的問題
    protected static final StringManager sm =
            StringManager.getManager(Constants.Package);

    // 創(chuàng)建出來的線程的優(yōu)先級
    protected int threadPriority = Thread.NORM_PRIORITY;

    // 線程是否是守護(hù)線程,默認(rèn)為是
    protected boolean daemon = true;

    // 線程名稱
    protected String namePrefix = "tomcat-exec-";

    // 默認(rèn)的最大線程數(shù)量
    protected int maxThreads = 200;

    // 默認(rèn)的最小線程數(shù)量
    protected int minSpareThreads = 25;

    // 存在時間
    protected int maxIdleTime = 60000;

    // 真實工作的 ThreadPoolExecutor
    // 本質(zhì)是 StandardThreadExecutor 只是 ThreadPoolExecutor 的裝飾器
    // 此處的對象類型是 org.apache.tomcat.util.threads.ThreadPoolExecutor
    protected ThreadPoolExecutor executor = null;

    // 線程池名稱
    protected String name;

    // 是否開啟線程池最小線程數(shù)量,如果此處為 false 的話,minSpareThreads 就沒有意義
    protected boolean prestartminSpareThreads = false;

    // 默認(rèn)的任務(wù)隊列長度
    protected int maxQueueSize = Integer.MAX_VALUE;

     // 重建線程的時間間隔
    protected long threadRenewalDelay =
        org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY;

    // 任務(wù)隊列
    private TaskQueue taskqueue = null;
    
    // 其它方法暫時忽略
}

生命周期

/**
 * 初始化線程池
 */
@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();
}


/**
 * 開始線程池
 */
@Override
protected void startInternal() throws LifecycleException {

    // 任務(wù)隊列
    taskqueue = new TaskQueue(maxQueueSize);
    
    // 線程工廠
    TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
    
    // 初始化線程池
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
    
    // 重建線程的時間間隔
    executor.setThreadRenewalDelay(threadRenewalDelay);
    
    // 設(shè)置線程池最小線程數(shù)量
    if (prestartminSpareThreads) {
        executor.prestartAllCoreThreads();
    }
    
    // 線程池任務(wù)隊列的 parent
    taskqueue.setParent(executor);

    // 設(shè)置組件的生命周期狀態(tài)
    setState(LifecycleState.STARTING);
}


/**
 * 關(guān)閉線程池
 */
@Override
protected void stopInternal() throws LifecycleException {

    setState(LifecycleState.STOPPING);
    if (executor != null) {
        executor.shutdownNow();
    }
    executor = null;
    taskqueue = null;
}

/**
 * 清除線程池
 */
@Override
protected void destroyInternal() throws LifecycleException {
    super.destroyInternal();
}

/**
 * 關(guān)閉線程池
 */
public void contextStopping() {
    if (executor != null) {
        executor.contextStopping();
    }
}


任務(wù)執(zhí)行

/**
 * 加入一個帶超時的任務(wù)
 **/
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {

    // 調(diào)用 executor 對象去執(zhí)行
    // 如果 executor 對象是空的,則拋出異常
    if (executor != null) {
        executor.execute(command,timeout,unit);
    } else {
        throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
    }
}

/**
 * 加入一個不帶超時的任務(wù)
 **/
@Override
public void execute(Runnable command) {
    
    // 邏輯基本同上
    if (executor != null) {
        try {
            executor.execute(command);
        } catch (RejectedExecutionException rx) {
            
            // 此處會再嘗試將任務(wù)加入一次等待隊列中
            // TaskQueue.force(...) 方法底層會調(diào)用 Queue.offer(...) 方法
            // 如果仍然失敗,會拋出異常
            if (!((TaskQueue) executor.getQueue()).force(command)) {
                throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
            }
        }
    } else {
        throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
    }
}

其它方法基本都是 get / set 方法,可以忽略。

ThreadPoolExecutor

org.apache.tomcat.util.threads.ThreadPoolExecutor 是 Tomcat 中的線程池類。
類和變量總覽

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    
    // 統(tǒng)一的 String 的管理類,用來防止編碼等一系列的問題
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");

    // 任務(wù)提交數(shù)量的計數(shù)
    private final AtomicInteger submittedCount = new AtomicInteger(0);


    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);

     // 線程自殺計數(shù)
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);

     // 重建線程的時間間隔
    private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;

    // 其它方法暫時忽略
    // ...

構(gòu)造器

/**
 * 基本是沿用了 java.util.concurrent.ThreadPoolExecutor 的構(gòu)造方法
 **/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    // 調(diào)用 juc 中的 ThreadPoolExecutor 進(jìn)行線程池的初始化
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    // 用于初始化常駐線程
    prestartAllCoreThreads();
}

// 下列的構(gòu)造方法都差不多

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    prestartAllCoreThreads();
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
    prestartAllCoreThreads();
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
    prestartAllCoreThreads();
}

拒絕策略

// ThreadPoolExecutor.class
private static class RejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r,
    java.util.concurrent.ThreadPoolExecutor executor) {
        // 直接拋出錯誤
        throw new RejectedExecutionException();
    }

}

任務(wù)執(zhí)行

@Override
public void execute(Runnable command) {
    execute(command,0,TimeUnit.MILLISECONDS);
}

public void execute(Runnable command, long timeout, TimeUnit unit) {
    // 提交計數(shù)加一
    submittedCount.incrementAndGet();
    try {
        // 此處調(diào)用 java.util.concurrent.ThreadPoolExecutor 中的 execute(...) 方法
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        // 如果調(diào)用父類中的方法執(zhí)行錯誤,會嘗試將任務(wù)再一次放入到等待隊列里
        if (super.getQueue() instanceof TaskQueue) {
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try {
                // 此處嘗試放入等待隊列
                // 如果也失敗了,就回滾提交計數(shù),并拋出異常
                if (!queue.force(command, timeout, unit)) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                }
            } catch (InterruptedException x) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}


@Override
protected void afterExecute(Runnable r, Throwable t) {
    // 計數(shù)加一
    submittedCount.decrementAndGet();

    // 如果沒有報錯,那么此處嘗試關(guān)閉多余的線程
    // 拋出錯誤的方式停止線程
    if (t == null) {
        stopCurrentThreadIfNeeded();
    }
}

/**
 * 判斷是否需要關(guān)閉線程
 **/
protected void stopCurrentThreadIfNeeded() {
    // 如果線程存活時間超過了 delay 值,那么此處會拋出一個錯誤,使線程停止
    if (currentThreadShouldBeStopped()) {
        long lastTime = lastTimeThreadKilledItself.longValue();
        if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
            if (lastTimeThreadKilledItself.compareAndSet(lastTime,
            System.currentTimeMillis() + 1)) {

                final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                Thread.currentThread().getName());

                throw new StopPooledThreadException(msg);
            }
        }
    }
}

protected boolean currentThreadShouldBeStopped() {
    // 如果當(dāng)前線程并非工作線程,或者不存在線程存活 delay 值,那么此處返回 false
    // 如果當(dāng)前線程是工作線程,且設(shè)置了 delay 時間,且當(dāng)前線程的存活時間已經(jīng)超過了設(shè)置值,那么此處返回 true
    if (threadRenewalDelay >= 0 
        && Thread.currentThread() instanceof TaskThread) {
        TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
        if (currentTaskThread.getCreationTime() <
            this.lastContextStoppedTime.longValue()) {
            return true;
        }
    }
    return false;
}

從execute 方法可以看出,當(dāng)提交線程的時候,如果被線程池拒絕了,Tomcat 的線程池,還會厚著臉皮再次嘗試,調(diào)用 force() 方法"強(qiáng)行"的嘗試向阻塞隊列中添加任務(wù)。

優(yōu)雅關(guān)閉


public void contextStopping() {
        this.lastContextStoppedTime.set(System.currentTimeMillis());

        // 保存 corePoolSize 的值
        int savedCorePoolSize = this.getCorePoolSize();
    
        // 獲取隊列
        TaskQueue taskQueue =
                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
        // 將 taskQueue 中的 forcedRemainingCapacity 置為零
        // 不太清楚 forcedRemainingCapacity 有什么作用
        if (taskQueue != null) {
            taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
        }

        // corePoolSize 置為零
        this.setCorePoolSize(0);

        // 將 taskQueue 中的 forcedRemainingCapacity 置空
        if (taskQueue != null) {
            taskQueue.setForcedRemainingCapacity(null);
        }
    
        // 恢復(fù) corePoolSize
        this.setCorePoolSize(savedCorePoolSize);
    }

TaskQueue

TaskQueue 是 Tomcat 中對任務(wù)隊列的增強(qiáng)和封裝:

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    // 序列編碼
    private static final long serialVersionUID = 1L;
    
    // 字符串管理類
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
            
    // 任務(wù)隊列關(guān)聯(lián)的線程池
    private transient volatile ThreadPoolExecutor parent = null;

    // 不太清楚是做什么用的一個容量計數(shù)
    private Integer forcedRemainingCapacity = null;

    // 其它方法暫時忽略
    // ...

加入、獲取任務(wù)的相關(guān)方法

// 不帶超時的添加任務(wù)方法
public boolean force(Runnable o) {
    // 關(guān)聯(lián)線程池不可為空
    if (parent == null || parent.isShutdown())
        throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
    // 調(diào)用 LinkedBlockingQueue 的 offer(...) 方法添加任務(wù)
    return super.offer(o);
}

// 帶超時的添加任務(wù)方法
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
    if (parent == null || parent.isShutdown())
        throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
    return super.offer(o,timeout,unit);
}

// 本質(zhì)上是調(diào)用父類的 offer(...) 方法
// 非阻塞添加任務(wù)的方法
@Override
public boolean offer(Runnable o) {

    if (parent == null) 
        return super.offer(o);
    
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) 
        return super.offer(o);
    
    if (parent.getSubmittedCount() <= (parent.getPoolSize())) 
        return super.offer(o);
    
    // 這種情況下線程池可以直接消費(fèi)任務(wù),無需放入任務(wù)隊列等待
    if (parent.getPoolSize() < parent.getMaximumPoolSize()) 
        return false;
    
    return super.offer(o);
}

// 帶超時的阻塞方式獲取任務(wù)
@Override
public Runnable poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    
    // 獲取一個任務(wù),如果獲取到的為空,則停止當(dāng)前線程
    // 能獲取到就返回任務(wù)
    Runnable runnable = super.poll(timeout, unit);
    if (runnable == null && parent != null) {
        parent.stopCurrentThreadIfNeeded();
    }
    return runnable;
}

// 阻塞方式獲取任務(wù)
@Override
public Runnable take() throws InterruptedException {
    // 線程池存在的情況下,會使用限時的方式去獲取任務(wù)
    if (parent != null 
        && parent.currentThreadShouldBeStopped()) {
        return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
    }
    return super.take();
}

TaskQueue 在設(shè)計的時候都考慮了關(guān)聯(lián)線程池存不存在的情況,筆者認(rèn)為這應(yīng)該是 Tomcat 的作者考慮到開發(fā)者可能會需要復(fù)用 TaskQueue 到其它的場景中。
在提交任務(wù)的時候,增加了幾個分支判斷。

首先我們看看 parent 是什么:

private transient volatile ThreadPoolExecutor parent = null;
這里需要特別注意這里的 ThreadPoolExecutor 并不是 jdk里面的 java.util.concurrent.ThreadPoolExecutor 而是 tomcat 自己實現(xiàn)的。

我們分別來看 offer 中的幾個 if 分支。

首先我們需要明確一下,當(dāng)一個線程池需要調(diào)用阻塞隊列的 offer 的時候,說明線程池的核心線程數(shù)已經(jīng)被占滿了。(記住這個前提非常重要)

要理解下面的代碼,首先需要復(fù)習(xí)一下線程池的 getPoolSize() 獲取的是什么?我們看源碼:

/**
 * Returns the current number of threads in the pool.
 *
 * @return the number of threads
 */
public int getPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Remove rare and surprising possibility of
        // isTerminated() && getPoolSize() > 0
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
            : workers.size();
    } finally {
        mainLock.unlock();
    }
}

需要注意的是,workers.size() 包含了 coreSize 的核心線程和臨時創(chuàng)建的小于 maxSize 的臨時線程。

先看第一個 if

// 如果線程池的工作線程數(shù)等于 線程池的最大線程數(shù),這個時候沒有工作線程了,就嘗試加入到阻塞隊列中
    if (parent.getPoolSize() == parent.getMaximumPoolSize()){
        return super.offer(o);
    }

經(jīng)過第一個 if 之后,線程數(shù)必然在核心線程數(shù)和最大線程數(shù)之間。

if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
    return super.offer(o);
}

對于 parent.getSubiitedCount() ,我們要先搞清楚 submiitedCount 是什么

/**
 * The number of tasks submitted but not yet finished. This includes tasks
 * in the queue and tasks that have been handed to a worker thread but the
 * latter did not start executing the task yet.
 * This number is always greater or equal to {@link #getActiveCount()}.
 */
private final AtomicInteger submittedCount = new AtomicInteger(0);

這個數(shù)是一個原子類的整數(shù),用于記錄提交到線程中,且還沒有結(jié)束的任務(wù)數(shù)。包含了在阻塞隊列中的任務(wù)數(shù)和正在被執(zhí)行的任務(wù)數(shù)兩部分之和 。

所以這行代碼的策略是,如果已提交的線程數(shù)小于等于線程池中的線程數(shù),表明這個時候還有空閑線程,直接加入阻塞隊列中。為什么會有這種情況發(fā)生?其實我的理解是,之前創(chuàng)建的臨時線程還沒有被回收,這個時候直接把線程加入到隊里里面,自然就會被空閑的臨時線程消費(fèi)掉了。

我們繼續(xù)往下看:

//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
    return false;
}

由于上一個 if 條件的存在,走到這個 if 條件的時候,提交的線程數(shù)已經(jīng)大于核心線程數(shù)了,且沒有空閑線程,所以返回一個 false 標(biāo)明,表示任務(wù)添加到阻塞隊列失敗。線程池就會認(rèn)為阻塞隊列已經(jīng)無法繼續(xù)添加任務(wù)到隊列中了,根據(jù)默認(rèn)線程池的工作邏輯,線程池就會創(chuàng)建新的線程直到最大線程數(shù)。

回憶一下 jdk 默認(rèn)線程池的實現(xiàn),如果阻塞隊列是無界的,任務(wù)會無限的添加到無界的阻塞隊列中,線程池就無法利用核心線程數(shù)和最大線程數(shù)之間的線程數(shù)了。

Tomcat 的實現(xiàn)就是為了,線程池即使核心線程數(shù)滿了以后,且使用無界隊列的時候,線程池依然有機(jī)會創(chuàng)建新的線程,直到達(dá)到線程池的最大線程數(shù)。

總結(jié)一下:

Tomcat 線程池的邏輯:

  • 如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個新的線程來執(zhí)行任務(wù)。
  • 如果線程數(shù)大于 corePoolSize了,Tomcat 的線程不會直接把線程加入到無界的阻塞隊列中,而是去判斷,submittedCount(已經(jīng)提交線程數(shù))是否等于 maximumPoolSize。
  • 如果等于,表示線程池已經(jīng)滿負(fù)荷運(yùn)行,不能再創(chuàng)建線程了,直接把線程提交到隊列,
  • 如果不等于,則需要判斷,是否有空閑線程可以消費(fèi)。
  • 如果有空閑線程則加入到阻塞隊列中,等待空閑線程消費(fèi)。
  • 如果沒有空閑線程,嘗試創(chuàng)建新的線程。(這一步保證了使用無界隊列,仍然可以利用線程的 maximumPoolSize)。
  • 如果總線程數(shù)達(dá)到 maximumPoolSize,則繼續(xù)嘗試把線程加入 BlockingQueue 中。
  • 如果 BlockingQueue 達(dá)到上限(假如設(shè)置了上限),被默認(rèn)線程池啟動拒絕策略,tomcat 線程池會 catch 住拒絕策略拋出的異常,再次把嘗試任務(wù)加入中 BlockingQueue 中。
  • 再次加入失敗,啟動拒絕策略。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容