JDK線程池源碼分析之ThreadPoolExecutor

前言

JDK中為我們提供了一個并發(fā)線程框架,它是的我們可以在有異步任務(wù)或大量并發(fā)任務(wù)需要執(zhí)行時可以使用它提供的線程池,大大方便了我們使用線程,同時將我們從創(chuàng)建、管理線程的繁瑣任務(wù)中解放出來,能夠更加快速的實(shí)現(xiàn)業(yè)務(wù)、功能。合理的使用線程池可以為我們帶來三個好處:

  • 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程來減少線程創(chuàng)建與銷毀的開銷。
  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等待線程創(chuàng)建就直接運(yùn)行。
  • 提高線程的可管理性。線程是稀缺資源,不可能無限的創(chuàng)建,不僅會消耗大量的系統(tǒng)資源,還會影響系統(tǒng)的穩(wěn)定性,通過使用線程池可以對線程進(jìn)行分配、監(jiān)控等。
    ThreadPoolExecutor就是JDK提供的線程池的核心類,我們使用的Executors框架底層就是對ThreadPoolExecutor進(jìn)行了封裝。下面我們一起通過分析ThreadPoolExecutor的源碼來了解JDK線程池的實(shí)現(xiàn)原理。

線程池的創(chuàng)建-ThreadPoolExecutor的構(gòu)造

創(chuàng)建一個ThreadPoolExecutor需要傳入一些參數(shù),我們常用的一種ThreadPoolExecutor的構(gòu)造函數(shù)如下所示。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue);

這些參數(shù)對應(yīng)的是ThreadPoolExecutor的成員變量,我們通過這些內(nèi)部成員變量也可以先一窺ThreadPoolExecutor的特性。ThreadPoolExecutor的主要成員變量如下:

    private volatile ThreadFactory threadFactory;//創(chuàng)建線程的工廠類

    private volatile RejectedExecutionHandler handler;//當(dāng)線程池飽和或者關(guān)閉時,會執(zhí)行該句柄的鉤子(hook)

    private volatile long keepAliveTime;//空閑線程的等待時間(納秒)

    private volatile boolean allowCoreThreadTimeOut;//默認(rèn)為false,此時核心線程會保持活躍(即使處于空閑狀態(tài));如果為true,則核心線程會在空閑狀態(tài)超時等待keepAliveTime時間等待任務(wù)

    private volatile int corePoolSize;//線程池中保持的線程數(shù),即使有些線程已經(jīng)處于空閑狀態(tài),任然保持存活

    private volatile int maximumPoolSize;//線程池最大值,最大邊界是CAPACITY

   private final BlockingQueue<Runnable> workQueue;//等待執(zhí)行的任務(wù)隊(duì)列

   private final HashSet<Worker> workers = new HashSet<Worker>();//線程池中包含的所有worker線程

線程池創(chuàng)建后,來了一個新的任務(wù)需要執(zhí)行,此時我們調(diào)用

public void execute(Runnable command)

方法,線程池此時指派一個線程來執(zhí)行該任務(wù),我們通過跟蹤分析該方法的源碼,理解線程池的運(yùn)行、管理細(xì)節(jié)。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//線程池的狀態(tài)控制變量
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

ctl是線程池的狀態(tài)控制變量。該變量是一個AtomicInteger類型,它包裝了兩個域:workerCount,活躍的線程數(shù);runState,表示線程池狀態(tài),RUNNING,SHUTDOWN等。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

workerCount由29位表示,因此線程池的線程數(shù)最多有(2^29-1)。runStae用來表示線程池 中的線程在它的整個生命周期中的不同狀態(tài),現(xiàn)在線程池提供了5中狀態(tài):

     /** runState會隨著時間單調(diào)遞增,線程池的運(yùn)行狀態(tài)有以下這些轉(zhuǎn)換:
     * RUNNING -> SHOUTDOWN 線程池顯示調(diào)用shutdown()方法
     * (RUNNING or SHUTDOWN) -> STOP
     *    調(diào)用shutdownNow()
     * SHUTDOWN -> TIDYING
     *    任務(wù)隊(duì)列與線程池都為空時
     * STOP -> TIDYING
     *    線程池為空
     * TIDYING -> TERMINATED
     *    當(dāng)鉤子terminated()執(zhí)行完畢
     *
    // runState is stored in the high-order bits
    //線程池收到了一個新的任務(wù),并且執(zhí)行隊(duì)列中的任務(wù)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //此時 線程池不接受新的任務(wù),但還會執(zhí)行隊(duì)列中的任務(wù)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
   //此時線程池不接受新的任務(wù),不執(zhí)行隊(duì)列中的任務(wù),同時中斷正在 執(zhí)行的任務(wù)
    private static final int STOP       =  1 << COUNT_BITS;
   //所有任務(wù)都已經(jīng)終止,同時workerCount為0,過渡到TRYING狀態(tài) 的線程會運(yùn)行鉤子方法terminated()
    private static final int TIDYING    =  2 << COUNT_BITS;
   //terminated()方法 執(zhí)行完畢
    private static final int TERMINATED =  3 << COUNT_BITS;

在awaitTermination方法 上等待 的線程將會 在 線程的 runState變?yōu)門ERMINATED時返回。
繼續(xù)分析execute方法,接下來會有連續(xù)的三步:

  1. 如果正在運(yùn)行的線程數(shù)小于corePoolSize,會啟動一個線程來執(zhí)行該任務(wù),同時addWorker方法會原子的檢查runState狀態(tài)來保證線程 現(xiàn)在 處于 可以 運(yùn)行的狀態(tài),同時修改workerCount數(shù)量。
  2. 如果線程池中活躍線程數(shù)大于corePoolSize,且線程池處于RUNNING狀態(tài),于是會將任務(wù)加入 等待隊(duì)列。
  3. 如果任務(wù) 不能入隊(duì),我們會嘗試添加一個新的線程,如果還是失敗,我們會根據(jù)拋棄策略調(diào)用對應(yīng)拒絕方法。
    以上execute方法就包含了線程池執(zhí)行一個新的任務(wù)的全部流程,如下圖示:


    線程池處理流程

    線程池中各模塊的工作示意圖如下:


    圖中的數(shù)字是任務(wù)在線程池中的處理邏輯順序

線程池中的線程-Worker原理分析

提交到線程池的任務(wù)會被封裝成一個Worker,worker封裝了一個線程和任務(wù)。由于Worker本身繼承自AQS,是可以直接加鎖的。提交任務(wù)的具體邏輯如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            //判斷線程池狀態(tài)是否可以提交新的任務(wù)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //判斷線程池中的workerCount數(shù)目是否達(dá)到了線程池的邊界值
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS增加workerCount數(shù)目
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//workers是一個集合,包含了所有池中的worker線程
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//worker被你加入集合后,線程開始執(zhí)行任務(wù)
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我們可以發(fā)現(xiàn),在這里當(dāng)workerCount通過CAS正確加1后,后需要獲取一個全局鎖mainLock,在加鎖期間先對線程池的狀態(tài)以及線程池內(nèi)的線程數(shù)進(jìn)行再次檢查,正常后會把該新的worker線程加入workers集合,然后線程開始執(zhí)行該任務(wù)。線程是怎么開始執(zhí)行任務(wù)的呢?我們先看一下Worker的構(gòu)造:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{...}
Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//由于Worker本身就是Runnable的,所以創(chuàng)建一個新的線程的時候,就已自身作為參數(shù)了,當(dāng)線程thread調(diào)用start啟動了線程開始執(zhí)行時,就會運(yùn)行傳入的Woker的run方法。
        }

線程調(diào)用start方法啟動的時候就是Worker的run方法開始執(zhí)行。

public void run() {
            runWorker(this);
        }
/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
      worker重復(fù)的從隊(duì)列里取出任務(wù)執(zhí)行,同時處理以下一些問題
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
       只要線程池處于RUNNING狀態(tài),就不停的從任務(wù)隊(duì)列取出任務(wù)
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and
     * clearInterruptsForTaskRun called to ensure that unless pool is
     * stopping, this thread does not have its interrupt set.
       取出任務(wù)后執(zhí)行任務(wù)前,需要對Woker加鎖,防止任務(wù)執(zhí)行時發(fā)生中斷
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
       任務(wù)執(zhí)行前會有一個前置的方法,該方法可能會拋出異常從而導(dǎo)致任務(wù)還未執(zhí)行線程就退出
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to
     * afterExecute. We separately handle RuntimeException, Error
     * (both of which the specs guarantee that we trap) and arbitrary
     * Throwables.  Because we cannot rethrow Throwables within
     * Runnable.run, we wrap them within Errors on the way out (to the
     * thread's UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//循環(huán)從任務(wù)隊(duì)列里取出任務(wù)執(zhí)行
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                  
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//執(zhí)行task的前置攔截
                    Throwable thrown = null;
                    try {
                        task.run();//任務(wù)運(yùn)行
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//執(zhí)行task的后置攔截器
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//任務(wù)最后的清理工作
        }
    }

我們先看看是如何從任務(wù)隊(duì)列取出任務(wù)的:

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 基于當(dāng)前線程池的配置線程會在該方法上阻塞或是超時等待任務(wù)。
     * 如果該worker由于以下幾種情況必須退出,該方法會返回null:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 由于調(diào)用setMaximumPoolSize使得線程池的worker數(shù)量超過了maximumPoolSize 
     * 2. The pool is stopped.
     * 線程池處于STOPPED狀態(tài)
     * 3. The pool is shutdown and the queue is empty.
     * 線程池被關(guān)閉同時隊(duì)列為空
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//addWorker方法中已經(jīng)先增加了workerCount的數(shù)目,此時既然該任務(wù)不能夠執(zhí)行,則需要通過CAS減小workerCount的數(shù)目
                return null;
            }

            boolean timed;      // Are workers subject to culling?worker是否要被踢出

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                //踢出任務(wù)隊(duì)列首元素返回
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

runWorker方法的最后清理操作是這樣的:

/**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     * 對執(zhí)行完run方法的worker進(jìn)行清理和記錄操作。該方法會從workers線程集合移除
     * 當(dāng)前worker對應(yīng)的線程。如果Worker在run方法執(zhí)行期間發(fā)生異常導(dǎo)致退出,那么completedAbruptly
     * 是會被設(shè)置為true,此時我們會添加一個新的null任務(wù)。
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

tryTerminate會在符合條件的情況下轉(zhuǎn)換線程池狀態(tài)至TERMINATED,具體如下分析:

/**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //線程池處于RUNNING狀態(tài)或者
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

線程池的銷毀-shutDown/shutDownNow

線程池使用完畢,主動關(guān)閉線程池。此時我們會調(diào)用shutDown()方法。

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     * 進(jìn)行有序的任務(wù)關(guān)閉,此時線程池不接受新的任務(wù),但是前面提交的任務(wù)還是會繼續(xù)執(zhí)行完
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     * 該方法不會等待前面提交的任務(wù)完全執(zhí)行完,如需要可以使用awaitTermination 
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//線程池池的全局鎖
        try {
            checkShutdownAccess();//安全驗(yàn)證,確認(rèn)線程池有權(quán)限關(guān)閉線程
            advanceRunState(SHUTDOWN);//線程池狀態(tài)轉(zhuǎn)換為SHUTDOWN
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//通過線程池的狀況判斷是否轉(zhuǎn)移線程池的狀態(tài)至TERMINATED
    }

shutDownNow方法與shutDown有些不同,shutDown是所謂的‘Elegant’關(guān)閉模式,而shutDownNow則比較‘Rude’方式。shutDownNow會立即停止所有正在執(zhí)行的任務(wù),具體代碼如下:

/**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 立即停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的執(zhí)行,并返回正在等待執(zhí)行的任務(wù)列表
     * 該方法返回前會被從任務(wù)列里移除
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     * 該方法只會盡最大努力去停止正在運(yùn)行的任務(wù)-通過Thread.interupt方法取消任務(wù),因此如果任何一個任務(wù)無法響應(yīng)中斷就不會執(zhí)行停止。
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);//線程池狀態(tài)轉(zhuǎn)換為STOP
            interruptWorkers();
            tasks = drainQueue();//
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

線程池的飽和策略-RejectedExecutionHandler

前面在介紹ThreadPoolExecutor的主要成員變量時,我們簡單介紹了包和策略參數(shù):

 private volatile RejectedExecutionHandler handler;//當(dāng)線程池飽和或者關(guān)閉時,會執(zhí)行該句柄的鉤子(hook)

在默認(rèn)情況下,ThreadPoolExecutor使用拋棄策略。

private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

ThreadPoolExecutor為我們提供了四種線程池飽和策略,也即對應(yīng)的四種靜態(tài)內(nèi)部類。這些策略是在線程池與任務(wù)隊(duì)列都滿了的情況下,對新提交給線程池的任務(wù)執(zhí)行的操作。也即前面我們分析過的execute方法在所有情況都無效的情況下執(zhí)行的一步,調(diào)用對應(yīng)飽和策略的鉤子:

final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

這四種策略如下:

  • CallerRunsPolicy: 在線程池沒有關(guān)閉(調(diào)用shut Down)的情況下,直接由調(diào)用線程來執(zhí)行該任務(wù)。否則直接就丟棄該任務(wù),什么也不做。
  • AbortPolicy:直接拋出異常。
  • AbortPolicy:直接丟棄該任務(wù),什么也不做。
  • DiscardOldestPolicy: 在線程池沒有關(guān)閉(調(diào)用shutDown)的情況下,丟棄線程池任務(wù)隊(duì)列中等待最久-即隊(duì)列首部的任務(wù),并嘗試直接執(zhí)行該觸發(fā)飽和策略的任務(wù)。

最后的總結(jié)

我們觀察前面分析的源碼,包括線程池處于生命周期的一些階段(如線程池提交任務(wù),還是線程池的退出,銷毀)都會發(fā)現(xiàn)一個問題,這些地方都會用到線程池的全局鎖。

private final ReentrantLock mainLock = new ReentrantLock();

全局鎖的使用在多線程調(diào)用ThreadPoolExecutor的情況下會導(dǎo)致性能問題。但是我們仔細(xì)思考一下會發(fā)現(xiàn),向線程池提交任務(wù)時獲取全局鎖是在線程池還未預(yù)熱完成(即線程池的活躍線程還小于corePoolSize)的情況發(fā)生的事情,當(dāng)線程池的活躍線程超過corePoolSize后,以后在執(zhí)行execute方法提交新的任務(wù),主要還是執(zhí)行我們前面分析execute方法時說的第二步,把任務(wù)添加到等待隊(duì)列。所以后面不會出現(xiàn)對全局鎖的爭搶場景。也就是說,對全局鎖的爭搶只會出現(xiàn)在線程池預(yù)熱的初期,但這個預(yù)熱的過程是和corePoolSize有關(guān)的,我們需要關(guān)注。

最后,我們對ThreadPoolExecutor進(jìn)行一下總結(jié):

  • 線程池有一個預(yù)熱階段,在線程池的活躍線程數(shù)未達(dá)到corePoolSize時,并發(fā)提交任務(wù)會出現(xiàn)對線程池全局鎖的爭搶。
  • 線程池中的Worker數(shù)超過corePoolSize時,后續(xù)提交的任務(wù)都會進(jìn)入任務(wù)等待隊(duì)列。
  • corePoolSize個活躍線程被線程池創(chuàng)建后,會循環(huán)從任務(wù)等待隊(duì)列獲取任務(wù)執(zhí)行。
  • 當(dāng)線程池飽和或者關(guān)閉時,會執(zhí)行對應(yīng)的飽和策略。ThreadPoolExecutor默認(rèn)使用使用拋棄策略。

以上就是ThreadPoolExecutor的源碼分析,有沒認(rèn)識到的或理解有誤的,歡迎指出、討論。

參考文獻(xiàn)

  • 《Java并發(fā)編程的藝術(shù)》
  • JDK1.7源碼
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容