Tomcat線程池詳解

寫在前面的話

最近一直都在研究Java的線程池ThreadPoolExecutor,但是雖然它那么好,但是在實際的用途中怎么去用,對于我來說就不知道如何下手了,還好有開源社區(qū)我們可以了解很多項目中所運用到的線程池,比如最熟悉的就是Apache Tomcat了,相信都對它不默生,一個Apache軟件基金下的一個開源Web容器,所以今天就來聊一下Tomcat的線程池實現。

準備工作

首先去Apache Tomcat的官網下載Tomcat的源代碼,這里給出<a >Tomcat源碼鏈接</a>,下載下來之后,它是一個zip文件,需要把它進行解壓到相應的文件夾下面,以便我們能方便的查看其源代碼。分析源碼最行之有效的方法就是知道這個類有哪些方法,哪些字段,繼承了哪些類,實現了哪些接口,所以我們這里推薦一款UML工具, astah-professional,可自行下載安裝,這是一個收費軟件,但是它有50天的試用期,所以我們可以以使用的身份使用該軟件。準備工作做好之后就可以進行下一步的操作了。

初探Tomcat線程池

Tomcat的線程池的類文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面,定位到這個文件夾下面可以看到StandardThreadExecutor.java就是我們找尋的類了,用文本工具打開就可以查看其源碼了。這里源碼如下:
StandardThreadExecutor.java

public class StandardThreadExecutor extends LifecycleMBeanBase
        implements Executor, ResizableExecutor {
    //默認線程的優(yōu)先級
    protected int threadPriority = Thread.NORM_PRIORITY;
    //守護線程
    protected boolean daemon = true;
    //線程名稱的前綴
    protected String namePrefix = "tomcat-exec-";
    //最大線程數默認200個
    protected int maxThreads = 200;
    //最小空閑線程25個
    protected int minSpareThreads = 25;
    //超時時間為6000
    protected int maxIdleTime = 60000;
    //線程池容器
    protected ThreadPoolExecutor executor = null;
    //線程池的名稱
    protected String name;
     //是否提前啟動線程
    protected boolean prestartminSpareThreads = false;
    //隊列最大大小
    protected int maxQueueSize = Integer.MAX_VALUE;
    //為了避免在上下文停止之后,所有的線程在同一時間段被更新,所以進行線程的延遲操作
    protected long threadRenewalDelay = 1000L;
    //任務隊列
    private TaskQueue taskqueue = null;

    //容器啟動時進行,具體可參考org.apache.catalina.util.LifecycleBase#startInternal()
    @Override
    protected void startInternal() throws LifecycleException {
        //實例化任務隊列
        taskqueue = new TaskQueue(maxQueueSize);
        //自定義的線程工廠類,實現了JDK的ThreadFactory接口
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        //這里的ThreadPoolExecutor是tomcat自定義的,不是JDK的ThreadPoolExecutor
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        //是否提前啟動線程,如果為true,則提前初始化minSpareThreads個的線程,放入線程池內
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        //設置任務容器的父級線程池對象
        taskqueue.setParent(executor);
        //設置容器啟動狀態(tài)
        setState(LifecycleState.STARTING);
    }
    
  //容器停止時的生命周期方法,進行關閉線程池和資源清理
    @Override
    protected void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);
        if ( executor != null ) executor.shutdownNow();
        executor = null;
        taskqueue = null;
    }
    
    //這個執(zhí)行線程方法有超時的操作,參考org.apache.catalina.Executor接口
    @Override
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        if ( executor != null ) {
            executor.execute(command,timeout,unit);
        } else { 
            throw new IllegalStateException("StandardThreadExecutor not started.");
        }
    }

    //JDK默認操作線程的方法,參考java.util.concurrent.Executor接口
    @Override
    public void execute(Runnable command) {
        if ( executor != null ) {
            try {
                executor.execute(command);
            } catch (RejectedExecutionException rx) {
                //there could have been contention around the queue
                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
            }
        } else throw new IllegalStateException("StandardThreadPool not started.");
    }

    //由于繼承了org.apache.tomcat.util.threads.ResizableExecutor接口,所以可以重新定義線程池的大小
    @Override
    public boolean resizePool(int corePoolSize, int maximumPoolSize) {
        if (executor == null)
            return false;

        executor.setCorePoolSize(corePoolSize);
        executor.setMaximumPoolSize(maximumPoolSize);
        return true;
    }
}

??看完了上面的源碼之后,不知此刻的你是一面茫然還是認為小菜一碟呢,不管怎樣,我們先來看下UML類圖吧,了解一下具體的繼承關系,你就明白了,廢話不多說,能用圖片解決的東西盡量少用文字。

StandardThreadExecutor類繼承關系

??接下來,我們來看一下ResizableExecutor這個接口:

import java.util.concurrent.Executor;

public interface ResizableExecutor extends Executor {

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize();

    public int getMaxThreads();

    /**
     * Returns the approximate number of threads that are actively executing
     * tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount();

    public boolean resizePool(int corePoolSize, int maximumPoolSize);

    public boolean resizeQueue(int capacity);

}

??實現這個接口之后,就能動態(tài)改變線程池的大小和任務隊列的大小了,它是繼承自JDK的Executor接口的,其它的接口不再多說,可自行查看源碼。

Tomcat線程池的實現

??Tomcat的線程池的名字也叫作ThreadPoolExecutor,剛開始看源代碼的時候還以為是使用了JDK的ThreadPoolExecutor了呢,后面仔細查看才知道是Tomcat自己實現的一個ThreadPoolExecutor,不過基本上都差不多,都是在JDK之上封裝了一些自己的東西,上源碼:

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
    /**
     *  已經提交但尚未完成的任務數量。
     *  這包括已經在隊列中的任務和已經交給工作線程的任務但還未開始執(zhí)行的任務
     *  這個數字總是大于getActiveCount()的
     **/
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    /**
    *  最近的時間在ms時,一個線程決定殺死自己來避免
    *  潛在的內存泄漏。 用于調節(jié)線程的更新速率。
    */
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
    //延遲2個線程之間的延遲。 如果為負,不更新線程。
    private long threadRenewalDelay = 1000L;
    //4個構造方法  ... 省略

    public long getThreadRenewalDelay() {
        return threadRenewalDelay;
    }

    public void setThreadRenewalDelay(long threadRenewalDelay) {
        this.threadRenewalDelay = threadRenewalDelay;
    }
    
    /**
    *  方法在完成給定Runnable的執(zhí)行時調用。
    *  此方法由執(zhí)行任務的線程調用。 如果
    *  非null,Throwable是未捕獲的{@code RuntimeException}
    *  或{@code Error},導致執(zhí)行突然終止。...
    *  @param r 已完成的任務
    *  @param t 引起終止的異常,如果執(zhí)行正常完成則為null
    **/
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedCount.decrementAndGet();

        if (t == null) {
            stopCurrentThreadIfNeeded();
        }
    }

    //如果當前線程在上一次上下文停止之前啟動,則拋出異常,以便停止當前線程。
    protected void stopCurrentThreadIfNeeded() {
        if (currentThreadShouldBeStopped()) {
            long lastTime = lastTimeThreadKilledItself.longValue();
            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
                        System.currentTimeMillis() + 1)) {
                    // OK, it's really time to dispose of this thread

                    final String msg = sm.getString(
                                    "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                                    Thread.currentThread().getName());

                    throw new StopPooledThreadException(msg);
                }
            }
        }
    }
    
    //當前線程是否需要被終止
    protected boolean currentThreadShouldBeStopped() {
        if (threadRenewalDelay >= 0
            && Thread.currentThread() instanceof TaskThread) {
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
            //線程創(chuàng)建的時間<上下文停止的時間,則可以停止該線程
            if (currentTaskThread.getCreationTime() <
                    this.lastContextStoppedTime.longValue()) {
                return true;
            }
        }
        return false;
    }

    public int getSubmittedCount() {
        return submittedCount.get();
    }

    @Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    Thread.interrupted();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

    public void contextStopping() {
        this.lastContextStoppedTime.set(System.currentTimeMillis());
        int savedCorePoolSize = this.getCorePoolSize();
        TaskQueue taskQueue =
                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
        if (taskQueue != null) {
            taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
        }

        // setCorePoolSize(0) wakes idle threads
        this.setCorePoolSize(0);

        if (taskQueue != null) {
            // ok, restore the state of the queue and pool
            taskQueue.setForcedRemainingCapacity(null);
        }
        this.setCorePoolSize(savedCorePoolSize);
    }
}

Tomcat的線程池根據文檔來說:和java.util.concurrent一樣,但是它實現了一個高效的方法getSubmittedCount()方法用來處理工作隊列。具體可以查看上面的注釋和源碼就知道了。把UML圖獻上。

ThreadPoolExecutor

Tomcat線程工廠

??想要自定義線程工廠類,只需要實現JDK的ThreadFactory接口就可以了,我們來看看Tomcat是如何實現的吧:

public class TaskThreadFactory implements ThreadFactory {
    //線程組
    private final ThreadGroup group;
    //線程增長因子
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    //名稱前綴
    private final String namePrefix;
    //是否是守護線程
    private final boolean daemon;
    //線程優(yōu)先級
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }

}

Tomcat的線程工廠類和JDK實現的線程工廠類相差無幾,具體可以參考一下JDK線程工廠Executors.DefaultThreadFactory工廠類的實現。

Tomcat的線程類

??Tomcat自己定義了TaskThread用于線程的執(zhí)行,里面增加了creationTime字段用于定義線程創(chuàng)建的開始時間,以便后面線程池獲取這個時間來進行優(yōu)化。

/**
 * 一個實現創(chuàng)建時間紀錄的線程類
 */
public class TaskThread extends Thread {

    private static final Log log = LogFactory.getLog(TaskThread.class);
    private final long creationTime;

    public TaskThread(ThreadGroup group, Runnable target, String name) {
        super(group, new WrappingRunnable(target), name);
        this.creationTime = System.currentTimeMillis();
    }

    public TaskThread(ThreadGroup group, Runnable target, String name,
            long stackSize) {
        super(group, new WrappingRunnable(target), name, stackSize);
        this.creationTime = System.currentTimeMillis();
    }

    public final long getCreationTime() {
        return creationTime;
    }

    /**
    *  封裝{@link Runnable}以接受任何{@link StopPooledThreadException},而不是讓它走,并可能在調試器中觸發(fā)中斷。
     */
    private static class WrappingRunnable implements Runnable {
        private Runnable wrappedRunnable;
        WrappingRunnable(Runnable wrappedRunnable) {
            this.wrappedRunnable = wrappedRunnable;
        }
        @Override
        public void run() {
            try {
                wrappedRunnable.run();
            } catch(StopPooledThreadException exc) {
                //expected : we just swallow the exception to avoid disturbing
                //debuggers like eclipse's
                log.debug("Thread exiting on purpose", exc);
            }
        }
    }
}

按照Tomcat的注解可知,它就是一個普通的線程類然后增加一個紀錄線程創(chuàng)建的時間紀錄而已,后面還使用動態(tài)內部類封裝了一個Runnable,用于調試出發(fā)中斷。

Tomcat任務隊列

??Tomcat的線程隊列由org.apache.tomcat.util.threads.TaskQueue來處理,它集成自LinkedBlockingQueue(一個阻塞的鏈表隊列),來看下源代碼吧。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;

    private ThreadPoolExecutor parent = null;

    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }

    public boolean force(Runnable o) {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }


    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }

    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity != null) {
            return forcedRemainingCapacity.intValue();
        }
        return super.remainingCapacity();
    }

    public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }
}

TaskQueue這個任務隊列是專門為線程池而設計的。優(yōu)化任務隊列以適當地利用線程池執(zhí)行器內的線程。
如果你使用一個普通的隊列,執(zhí)行器將產生線程,當有空閑線程,你不能強制項目到隊列本身。

總結

從0到1分析一下Apache Tomcat的線程池,感覺心好累啊,不過有收獲,至少多線程池這一塊又加強了,首先是定位到了StandardThreadExecutor這個類,然后由此展開,ResizableExecutor(動態(tài)大小的線程池接口) 、ThreadPoolExecutor (Tomcat線程池具體實現對象)、TaskThreadFactory(Tomcat線程工廠)、TaskThread(Tomcat線程類-一個紀錄創(chuàng)建時間的線程類)、TaskQueue(Tomcat的任務隊列-一個專門為線程池而設計優(yōu)化的任務隊列),喝口水,壓壓驚。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容