concurrency-threadpoolexecutor

concurrency-threadpoolexecutor

Java 中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過程中,合理地使用線程池能夠帶來 3 個好處。

  1. 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
  2. 提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
  3. 提高線程的可管理性。線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

ThreadPoolExecutor 執(zhí)行流程

ThreadPoolExecutor 執(zhí)行 execute 方法分下面 4 種情況。

  1. 如果當(dāng)前運行的線程少于 corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
  2. 如果運行的線程等于或多于 corePoolSize,則將任務(wù)加入 BlockingQueue。
  3. 如果無法將任務(wù)加入 BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
  4. 如果創(chuàng)建新線程將使當(dāng)前運行的線程超出 maximumPoolSize,任務(wù)將被拒絕,并調(diào)用 RejectedExecutionHandler.rejectedExecution() 方法。

ThreadPoolExecutor 采取上述步驟的總體設(shè)計思路,是為了在執(zhí)行 execute() 方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在 ThreadPoolExecutor 完成預(yù)熱之后(當(dāng)前運行的線程數(shù)大于等于 corePoolSize),幾乎所有的 execute() 方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。

ThreadPoolExecutor 源碼分析

線程池生命周期和線程狀態(tài)標(biāo)識ctl

線程池用 ctl 的低 29 位表示線程池中的線程數(shù),高 3 位表示當(dāng)前線程狀態(tài)。

高3位表示狀態(tài)

  1. RUNNING:運行狀態(tài),高3位為111;
  2. SHUTDOWN:關(guān)閉狀態(tài),高3位為000,在此狀態(tài)下,線程池不再接受新任務(wù),但是仍然處理阻塞隊列中的任務(wù);
  3. STOP:停止?fàn)顟B(tài),高3位為001,在此狀態(tài)下,線程池不再接受新任務(wù),也不會處理阻塞隊列中的任務(wù),正在運行的任務(wù)也會停止;
  4. TIDYING:高3位為010;
  5. TERMINATED:終止?fàn)顟B(tài),高3位為011。

線程狀態(tài)標(biāo)識ctl

// ctl 高3位表示線程池狀態(tài),低29位表示當(dāng)前工作線程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;         // 低29位表示工作線程數(shù)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // 最大線程數(shù) 0x1fffffff

// 獲取線程池狀態(tài)、線程總數(shù)、構(gòu)造 ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

全局鎖

// 全局鎖,創(chuàng)建工作線程等操作時需要獲取全局鎖
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();

工作線程

// 工作線程
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;
private volatile int corePoolSize;

任務(wù)提交execute

// ThreadPoolExecutor 的任務(wù)提交過程
    // java.util.concurrent.ThreadPoolExecutor#execute
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        // ctl 是一個重要的控制全局狀態(tài)的數(shù)據(jù)結(jié)構(gòu),定義為一個線程安全的 AtomicInteger
        // ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        int c = ctl.get();
        /**
        * workerCountOf方法取出低29位的值,表示當(dāng)前活動的線程數(shù);
        * 如果當(dāng)前活動的線程數(shù)小于corePoolSize,則新建一個線程放入線程池中,并把該任務(wù)放到線程中
        */
        if (workerCountOf(c) < corePoolSize) {
            /**
            * addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量 是根據(jù)據(jù)corePoolSize 來判斷還是maximumPoolSize來判斷;
            * 如果是ture,根據(jù)corePoolSize判斷
            * 如果是false,根據(jù)maximumPoolSize判斷
            */
            if (addWorker(command, true))
                return;
            /**
            * 如果添加失敗,則重新獲取ctl值
            */
            c = ctl.get();
        }
        /**
        * 如果線程池是Running狀態(tài),并且任務(wù)添加到隊列中
        */
        if (isRunning(c) && workQueue.offer(command)) {
            //double-check,重新獲取ctl的值
            int recheck = ctl.get();
            /**
            * 再次判斷線程池的狀態(tài),如果不是運行狀態(tài),由于之前已經(jīng)把command添加到阻塞隊列中,這時候需要從隊列中移除command;
            * 通過handler使用拒絕策略對該任務(wù)進行處理,整個方法返回
            */
            if (!isRunning(recheck) && remove(command))
                reject(command);
            /**
            * 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法;
            * 第一個參數(shù)為null,表示在線程池中創(chuàng)建一個線程,但不去啟動
            * 第二個參數(shù)為false,將線程池的線程數(shù)量的上限設(shè)置為maximumPoolSize,添加線程時根據(jù)maximumPoolSize來判斷
            */
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
            /**
            * 執(zhí)行到這里,有兩種情況:
            * 1、線程池的狀態(tài)不是RUNNING;
            * 2、線程池狀態(tài)是RUNNING,但是workerCount >= corePoolSize, workerQueue已滿
            * 這個時候,再次調(diào)用addWorker方法,第二個參數(shù)傳false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
            * 如果失敗則執(zhí)行拒絕策略;
            */
        } else if (!addWorker(command, false))
            reject(command);
    }

execute總結(jié)

通過上面這一小段代碼,我們就已經(jīng)完整地看到了。通過一個 ctl 變量進行全局狀態(tài)控制,從而保證了線程安全性。整個框架并沒有使用鎖,但是卻是線程安全的。

整段代碼剛好完整描述了線程池的執(zhí)行流程:

  1. 如果workerCount < corePoolSize,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù);
  2. 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務(wù)添加到該阻塞隊列中;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù);
  4. 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認的處理方式是直接拋異常。

這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中獲取任務(wù)。所以,在workerCountOf(recheck) == 0時執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。

工作線程worker

線程池中的每一個對象被封裝成一個Worker對象,ThreadPool維護的就是一組Worker對象。Worker類繼承了AQS,并實現(xiàn)了Runnable接口,其中包含了兩個重要屬性:firstTask用來保存?zhèn)魅氲娜蝿?wù),thread是在調(diào)用構(gòu)造方法是通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。

// Worker 是對線程 Thread 的包裝,實現(xiàn)了 AbstractQueuedSynchronizer
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
 
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
       /**
        *  把state設(shè)置為-1,,阻止中斷直到調(diào)用runWorker方法;
        *  因為AQS默認state是0,如果剛創(chuàng)建一個Worker對象,還沒有執(zhí)行任務(wù)時,這時候不應(yīng)該被中斷
        */
        setState(-1);
        this.firstTask = firstTask;
        /**
         * 創(chuàng)建一個線程,newThread方法傳入的參數(shù)是this,因為Worker本身繼承了Runnable接口,也就是一個線程;
         * 所以一個Worker對象在啟動的時候會調(diào)用Worker類中run方法
         */
        this.thread = getThreadFactory().newThread(this);
    }
} 

Worker 為什么要繼承 AbstractQueuedSynchronizer 實現(xiàn)自己的鎖,而不使用 ReentrantLock 呢?

  1. lock方法一旦獲取獨占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中;
  2. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;
  3. 如果該線程現(xiàn)在不是獨占鎖的狀態(tài),也就是空閑狀態(tài),說明它沒有處理任務(wù),這時可以對該線程進行中斷;
  4. 線程池中執(zhí)行shutdown方法或tryTerminate方法時會調(diào)用interruptIdleWorkers方法來中斷空閑線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài);
  5. 之所以設(shè)置為不可重入的,是因為在任務(wù)調(diào)用setCorePoolSize這類線程池控制的方法時,不會中斷正在運行的線程所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否處于被中斷。

創(chuàng)建工作線程 addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    /**
     * 由于線程執(zhí)行過程中,各種情況都有可能處于,通過自旋的方式來保證worker的增加;
     */
    for (; ; ) {
        int c = ctl.get();
        //獲取線程池運行狀態(tài)
        int rs = runStateOf(c);

        /**
         *
         * 如果rs >= SHUTDOWN, 則表示此時不再接收新任務(wù);
         * 接下來是三個條件 通過 && 連接,只要有一個任務(wù)不滿足,就返回false;
         * 1.rs == SHUTDOWN,表示關(guān)閉狀態(tài),不再接收提交的任務(wù),但卻可以繼續(xù)處理阻塞隊列中已經(jīng)保存的任務(wù);
         * 2.fisrtTask為空
         * 3.Check if queue empty only if necessary.
         */
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;

        for (; ; ) {
            //獲取線程池的線程數(shù)
            int wc = workerCountOf(c);
            /**
             * 如果線程數(shù) >= CAPACITY, 也就是ctl的低29位的最大值,則返回false;
             * 這里的core用來判斷 限制線程數(shù)量的上限是corePoolSize還是maximumPoolSize;
             * 如果core是ture表示根據(jù)corePoolSize來比較;
             * 如果core是false表示根據(jù)maximumPoolSize來比較;
             */
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /**
             * 通過CAS原子的方式來增加線程數(shù)量;
             * 如果成功,則跳出第一個for循環(huán);
             */
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //如果當(dāng)前運行的狀態(tài)不等于rs,說明線程池的狀態(tài)已經(jīng)改變了,則返回第一個for循環(huán)繼續(xù)執(zhí)行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //根據(jù)firstTask來創(chuàng)建Worker對象
        w = new Worker(firstTask);
        //每一個Worker對象都會創(chuàng)建一個線程
        final Thread t = w.thread;
        if (t != null) {
            //創(chuàng)建可重入鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 獲取線程池的狀態(tài)
                int rs = runStateOf(ctl.get());

                /**
                 * 線程池的狀態(tài)小于SHUTDOWN,表示線程池處于RUNNING狀態(tài);
                 * 如果rs是RUNNING狀態(tài)或rs是SHUTDOWN狀態(tài)并且firstTask為null,向線程池中添加線程;
                 * 因為在SHUTDOWN狀態(tài)時不會再添加新的任務(wù),但還是處理workQueue中的任務(wù);
                 */
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一個hashSet
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize記錄線程池中出現(xiàn)的最大的線程數(shù)量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //啟動線程,Worker實現(xiàn)了Running方法,此時會調(diào)用Worker的run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
  • addWorker 前半部分主要是判斷能否新建工作線程,如果允許則執(zhí)行 compareAndIncrementWorkerCount(c),利用 CAS 原則,將線程數(shù)量+1。
  • addWorker 后半部分則是真正創(chuàng)建工作線程并啟動,這個過程需要獲取全局鎖。創(chuàng)建失敗則需要回滾 addWorkerFailed。

addWorker 的 4 種調(diào)用方式:

  1. addWorker(command, true) 線程數(shù) < coreSize 時,則創(chuàng)建新線程
  2. addWorker(command, false) 當(dāng)①阻塞隊列已滿,②線程數(shù) < maximumPoolSize 時,則創(chuàng)建新線程
  3. addWorker(null, true) 同 1。只是線程初始化任務(wù)為 null,相當(dāng)于創(chuàng)建一個新的線程。實際的使用是在 prestartCoreThread() 等方法。
  4. addWorker(null, false) 同 2。只是線程初始化任務(wù)為 null,相當(dāng)于創(chuàng)建一個新的線程,沒立馬分配任務(wù);

線程執(zhí)行 runWorker

worker類中的runworker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //獲取第一個任務(wù)
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允許中斷
    w.unlock();
    //是否因異常退出循環(huán)
    boolean completedAbruptly = true;
    try {
        //如果task為空,則通過getTask來獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /**
             * 如果線程池正在停止,那么要保證當(dāng)前線程時中斷狀態(tài);
             * 如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài)
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                //beforeExecute和afterExecute是留給子類來實現(xiàn)的
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //通過任務(wù)方式執(zhí)行,不是線程方式
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //processWorkerExit會對completedAbruptly進行判斷,表示在執(zhí)行過程中是否出現(xiàn)異常
        processWorkerExit(w, completedAbruptly);
    }
}

總結(jié) runworker

  1. while循環(huán)不斷地通過getTask方法來獲取任務(wù);

  2. getTask方法從阻塞隊列中獲取任務(wù);

  3. 如果線程池正在停止,那么要保證當(dāng)前線程處于中斷狀態(tài), 否則要保證當(dāng)前線程不是中斷狀態(tài);

  4. 調(diào)用task.run()執(zhí)行任務(wù);

  5. 如果task為null則會跳出循環(huán),執(zhí)行processWorkerExit方法;

  6. runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。

  7. 線程啟動后,釋放鎖,設(shè) AQS 狀態(tài)為 0,釋放鎖。此時其它線程才可以獲取鎖,中斷線程 interrupt;

  8. 獲取 firstTask 任務(wù)并執(zhí)行,執(zhí)行任務(wù)前后可定制 beforeExecute 和 afterExecute;

  9. 如果 getTask 從阻塞隊列獲取等待任務(wù)執(zhí)行,如果獲取的任務(wù)為 null,while 則退出循環(huán),線程關(guān)閉。

  10. 如果線程已經(jīng)STOP,則一定要將線程 interrupt。如果線程處于運行狀態(tài)(包括SHUTDOWN),則一定不能 interrupt。但實際上 interrupt() 方法并不一定能中斷正在運行的線程,它只能喚醒 wait 阻塞的線程或給線程設(shè)置一個標(biāo)記位。業(yè)務(wù)線程必須對 interrupt 做出響應(yīng)才能中斷線程,否則會一直等線程執(zhí)行結(jié)束才會銷毀。

獲取任務(wù) getTask

getTask方法用于從阻塞隊列中獲取任務(wù)

private Runnable getTask() {
    //timeout變量的值表示上次從阻塞隊列中獲取任務(wù)是否超時
    boolean timedOut = false;
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 如果rs >= SHUTDOWN,表示線程池非RUNNING狀態(tài),需要再次判斷:
         * 1、rs >= STOP ,線程池是否正在STOP
         * 2、阻塞隊列是否為空
         * 滿足上述條件之一,則將workCount減一,并返回null;
         * 因為如果當(dāng)前線程池的狀態(tài)處于STOP及以上或隊列為空,不能從阻塞隊列中獲取任務(wù);
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        /**
         * timed變量用于判斷是否需要進行超時控制;
         * allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
         * wc > corePoolSize,表示當(dāng)前線程數(shù)大于核心線程數(shù)量;
         * 對于超過核心線程數(shù)量的這些線程,需要進行超時控制;
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /**
         * wc > maximumPoolSize的情況是因為可能在此方法執(zhí)行階段同時執(zhí)行了 setMaximumPoolSize方法;
         * timed && timedOut 如果為true,表示當(dāng)前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務(wù)發(fā)生了超時;
         * 接下來判斷,如果有效咸亨數(shù)量大于1,或者workQueue為空,那么將嘗試workCount減1;
         * 如果減1失敗,則返回重試;
         * 如果wc==1時,也就說明當(dāng)前線程是線程池中的唯一線程了;
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        /**
         * timed為trure,則通過workQueue的poll方法進行超時控制,如果在keepAliveTime時間內(nèi)沒有獲取任務(wù),則返回null;
         * 否則通過take方法,如果隊列為空,則take方法會阻塞直到隊列中不為空;
         */
        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            //如果r==null,說明已經(jīng)超時了,timedOut = true;
            timedOut = true;
        } catch (InterruptedException retry) {
            //如果獲取任務(wù)時當(dāng)前線程發(fā)生了中斷,則將timedOut = false;
            timedOut = false;
        }
    }
}

getTask 總結(jié)

第二個if判斷,目的是為了控制線程池的有效線程數(shù)量。有上文分析得到,在execute方法時,如果當(dāng)前線程池的線程數(shù)量超過coolPoolSize且小于maxmumPoolSize,并且阻塞隊列已滿時,則可以通過增加工作線程。但是如果工作線程在超時時間內(nèi)沒有獲取到任務(wù),timeOut=true,說明workQueue為空,也就說當(dāng)前線程池不需要那么多線程來執(zhí)行任務(wù)了,可以把多于的corePoolSize數(shù)量的線程銷毀掉,保證線程數(shù)量在corePoolSize即可。

  1. getTask 時,worker 已經(jīng)釋放了鎖,也就是說其它線程可以調(diào)用 wt.interrupt() 喚醒等待的線程。
  2. 如果當(dāng)前線程數(shù)大于最大線程數(shù),或允許核心線程銷毀時,如果獲取任務(wù)超時則返回 null,即銷毀線程。

processWorkerExit方法

processWorkerExit執(zhí)行完之后,工作線程被銷毀。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 如果completedAbruptly為true,則說明線程執(zhí)行時出現(xiàn)異常,需要將workerCount數(shù)量減一
     * 如果completedAbruptly為false,說明在getTask方法中已經(jīng)對workerCount進行減一,這里不用再減
     */
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統(tǒng)計完成的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        //從workers中移除,也就表示從線程池中移除一個工作線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //鉤子函數(shù),根據(jù)線程池的狀態(tài)來判斷是否結(jié)束線程池
    tryTerminate();

    int c = ctl.get();
    /**
     * 當(dāng)前線程是RUNNING或SHUTDOWN時,如果worker是異常結(jié)束,那么會直接addWorker;
     * 如果allowCoreThreadTimeOut=true,那么等待隊列有任務(wù),至少保留一個worker;
     * 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

線程關(guān)閉

shutdown

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        // 為保證線程安全,使用 mainLock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // SecurityManager 檢查
            checkShutdownAccess();
            // 設(shè)置狀態(tài)為 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中斷空閑的 Worker, 即相當(dāng)于依次關(guān)閉每個空閑線程
            interruptIdleWorkers();
            // 關(guān)閉鉤子,默認實現(xiàn)為空操作,為方便子類實現(xiàn)自定義清理功能
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 再
        tryTerminate();
    }
    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            // 自身CAS更新成功或者被其他線程更新成功
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    // 關(guān)閉空閑線程(非 running 狀態(tài))
    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        // 上文已介紹, 此處 ONLY_ONE 為 false, 即是最大可能地中斷所有 Worker
        interruptIdleWorkers(false);
    }

shutdownNow

與 shutdown 對應(yīng)的,有一個 shutdownNow, 其語義是 立即停止所有任務(wù)。

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 與 shutdown 的差別,設(shè)置的狀態(tài)不一樣
            advanceRunState(STOP);
            // 強行中斷線程
            interruptWorkers();
            // 將未完成的任務(wù)返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                // 調(diào)用 worker 的提供的中斷方法
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
        // ThreadPoolExecutor.Worker#interruptIfStarted
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 直接調(diào)用任務(wù)的 interrupt
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

shutdown 和 shutdownNow 區(qū)別:

  • shutdown 會執(zhí)行完成已提交的任務(wù)后關(guān)閉線程池,而 shutdownNow 則會踢除已提交的任務(wù)。
  • shutdown 調(diào)用 interruptIdleWorkers 關(guān)閉空閑的線程,而 shutdownNow 調(diào)用 interruptWorkers 強行中斷所有的線程。

interruptIdleWorkers 和 interruptWorkers

  • interruptIdleWorkers 只會嘗試獲取鎖,因此只會中斷空閑線程。而 interruptWorkers 不需要獲取鎖,強行中斷線程。實際上業(yè)務(wù)線程必須對 interrupt 做出響應(yīng)才能中斷線程,否則會一直等線程執(zhí)行結(jié)束才會銷毀。
  • 而 interruptIdleWorkers 和 interruptWorkers 都是 interrupt 所有線程, 因此大部分線程將立刻被中斷。之所以是大部分,而不是全部,是因為 interrupt() 方法能力有限。 如果線程中沒有 sleep 、wait、Condition、定時鎖等應(yīng)用, interrupt() 方法是無法中斷當(dāng)前的線程的。所以,ShutdownNow() 并不代表線程池就一定立即就能退出,它可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。 如下面這個線程永遠不會中斷,因為該線程沒有響應(yīng) Thread.interrupted() 或者是直接將 InterruptedException 異常 catch 了。

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容