ThreadPoolExecutor關(guān)閉線程池詳解

概述

在之前的一篇博客里談談ThreadPoolExecutor的實現(xiàn)已經(jīng)對ThreadPoolExecutor中的線程如何運行進行了簡單的介紹,本文將介紹線程池是如何進行結(jié)束的,并對上篇文章遺留問題進行解答。

功能介紹

java中線程池提供了兩個關(guān)閉方法shutdown和shutdownNow,兩個方法的具體使用如下:

//執(zhí)行該方法,線程處于shutdown狀態(tài),線程池不允許再提交任務,但是已提交的任務會繼續(xù)執(zhí)行直到結(jié)束
void shutdown();
//執(zhí)行該方法,線程會處于stop狀態(tài),線程池會試圖停止正在執(zhí)行的任務,并返回沒有執(zhí)行成功的任務列表
List<Runnable> shutdownNow();

源碼分析

我們知道,ThreadPoolExecutor在會將每個線程封裝為一個Worker對象,該對象會持有任務并且在執(zhí)行完其創(chuàng)建時的第一個任務firstTask后,會阻塞的從workQueue中獲取任務。前面我們講到shutdown方法會將已提交的任務執(zhí)行直到結(jié)束,同時會將空閑線程回收(可以先思考下,如何判斷線程是否空閑?)。我們進入源碼進行分析:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
       //全局加鎖,保證只會有一個線程內(nèi)執(zhí)行該方法
        mainLock.lock();
        try {
           //權(quán)限檢查,忽略
            checkShutdownAccess();
            //將線程池狀態(tài)置為SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中斷空閑線程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
 }
private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
 }
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷每個Worker對象
            for (Worker w : workers) {
                Thread t = w.thread;
                //如果沒有被中斷,并且持有Worker的互斥鎖,說明該woker對象為空閑線程并且沒有被中斷
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //中斷worker對象持有的線程,因為該線程可能正在阻塞在任務隊列中
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

我們可以回到我之前的博客談談ThreadPoolExecutor的實現(xiàn)查看Worker類在執(zhí)行任務之前會首先獲取到自身的互斥鎖,這樣如果獲取不到Worker的互斥鎖,則說明該worker正在執(zhí)行任務,這就回答了我們上面的問題,也是為什么Worker類實現(xiàn)AQS的原因(這里并不是為了并發(fā)安全,只是為了判斷線程是否正在執(zhí)行任務,下面會有更深刻的認識)。當我們在主線程中中斷了worker中持有的線程,woker線程中的runWorker方法會執(zhí)行最外圍的processWorkerExit函數(shù)銷毀線程,進而完全結(jié)束worker的生命周期。

而正在執(zhí)行的線程在執(zhí)行完其任務也會因為獲取不到任務進入processWorkerExit函數(shù),結(jié)束線程生命周期。

下面,我們繼續(xù)看下shutdownNow方法如何實現(xiàn)的,相比于shutdown方法,該方法簡直太狠了直接對所有的線程執(zhí)行中斷,具體代碼如下:

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //權(quán)限檢查
            checkShutdownAccess();
            //將線程池狀態(tài)置為STOP
            advanceRunState(STOP);
            //中斷所有線程包括
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
}
//這個函數(shù)就比較狠了,無論是否持有鎖,只要線程沒有被中斷,就中斷Worker持有的線程
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
}
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
           //循環(huán)遍歷中斷線程
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
}
void interruptIfStarted() {
            Thread t;
            //無論是否持有互斥鎖,只有線程沒有被中斷就執(zhí)行interrupt
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
  }

使用建議

在日常開發(fā)中,我們一般不會去使用shutdownNow,這個方法會導致部分任務無法執(zhí)行。我們通常會調(diào)用shutdown方法使線程池不接受新的任務,然后等正在執(zhí)行的任務執(zhí)行完成后再結(jié)束。下面我給出一個個人覺得比較優(yōu)雅的結(jié)束線程池的使用示例:

public class ThreadPoolExecutorTest {
    private static Logger logger = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);

    private static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String []args) {
        for (int i = 0; i< 5; i++) {
            executorService.submit(new SleepRunnable());
        }
        executorService.shutdown();//執(zhí)行該方法相當于通知線程池結(jié)束
        try {
           //阻塞等待線程池結(jié)束
            executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            //出現(xiàn)異常是強制結(jié)束
            List<Runnable> notExcuteRunnables = executorService.shutdownNow();
            logger.info("awaitTermination exception, notExcuteRunnables = {}", notExcuteRunnables, e);
        }
    }

    //測試任務,簡單的sleep 1s
    static class SleepRunnable implements Runnable {

        @Override
        public void run() {
            try {
                logger.info("sleep in thread = {}", Thread.currentThread().getName());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.info("exception in sleep", e);
            }
        }
    }
}

原文

袁瓊瓊的技術(shù)博客,歡迎指針
http://yuanqiongqiong.cn/2019/07/15/ThreadPoolExecutor%E5%85%B3%E9%97%AD%E7%BA%BF%E7%A8%8B%E6%B1%A0%E8%AF%A6%E8%A7%A3/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容