Java線程池_ThreadPoolExecutor原理分析

線程池中有一定數(shù)量的工作線程,工作線程會循環(huán)從任務(wù)隊(duì)列中獲取任務(wù),并執(zhí)行這個(gè)任務(wù)。那么怎么去停止這些工作線程呢?
這里就涉及到線程池兩個(gè)重要概念:工作線程數(shù)量和線程池狀態(tài)。

一.線程池狀態(tài)和工作線程數(shù)量

這本來是兩個(gè)不同的概念,但是在ThreadPoolExecutor中我們使用一個(gè)變量ctl來存儲這兩個(gè)值,這樣我們只需要維護(hù)這一個(gè)變量的并發(fā)問題,提高運(yùn)行效率。

    /**
     * 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
     * int類型是32位,它的高3位,表示線程池的狀態(tài),低29位表示W(wǎng)orker的數(shù)量
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // COUNT_BITS 29位,
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 表示線程池中創(chuàng)建Worker工作線程數(shù)量的最大值。即 0b0001.....1(29位1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

怎么使用一個(gè)變量ctl存儲兩個(gè)值呢?
就是利用int變量的高3位來儲存線程池狀態(tài),用int變量的低29位來儲存工作線程數(shù)量。

這樣就有兩個(gè)需要注意的地方:

  1. 工作線程數(shù)量最大值不能超過int類型29位的值CAPACITY 即0b0001.....1(29位1)
  2. 因?yàn)榫€程池狀態(tài)都是高3位儲存的,所以工作線程數(shù)量不會影響狀態(tài)值大小關(guān)系。

1.1 線程池狀態(tài)

    // 高3位值是111
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高3位值是000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高3位值是001
    private static final int STOP       =  1 << COUNT_BITS;
    // 高3位值是010
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高3位值是011
    private static final int TERMINATED =  3 << COUNT_BITS;

線程池狀態(tài)分析:

  1. RUNNING狀態(tài):線程池剛創(chuàng)建時(shí)的狀態(tài)。向任務(wù)隊(duì)列中添加任務(wù),并執(zhí)行任務(wù)隊(duì)列中的任務(wù)。因?yàn)楦?位值是111,即處于RUNNING狀態(tài)下的ctl值都是負(fù)數(shù)。
  2. SHUTDOWN狀態(tài): 調(diào)用shutdown方法,會將線程池設(shè)置成這個(gè)狀態(tài)。不能向任務(wù)隊(duì)列中添加任務(wù),但是可以執(zhí)行任務(wù)隊(duì)列中已添加的任務(wù)。并且處于SHUTDOWN狀態(tài)下正在運(yùn)行任務(wù)的工作線程不能中斷的,就是保證任務(wù)能夠執(zhí)行完成。
  3. STOP狀態(tài): 調(diào)用shutdownNow方法,會將線程池設(shè)置成這個(gè)狀態(tài)。不能向任務(wù)隊(duì)列中添加任務(wù),也不能再執(zhí)行任務(wù)隊(duì)列中已添加的任務(wù)。
  4. TIDYING狀態(tài): 調(diào)用tryTerminate方法,可能會將線程池設(shè)置成這個(gè)狀態(tài)。這個(gè)只是中斷過度狀態(tài),表示線程池即將變成TERMINATED狀態(tài)。
  5. TERMINATED狀態(tài): 調(diào)用tryTerminate方法,可能會將線程池設(shè)置成這個(gè)狀態(tài)。表示線程池已經(jīng)完全終止,即任務(wù)隊(duì)列為空,工作線程數(shù)量也是0.

線程池為什么要定義這么多狀態(tài)呢?按道理說線程池只應(yīng)該有運(yùn)行和終止這兩種狀態(tài)啊。
主要是因?yàn)榻K止線程池時(shí),要考慮正在執(zhí)行的任務(wù)和已經(jīng)添加到任務(wù)隊(duì)列中待執(zhí)行的任務(wù)該如何處理,否則的話,這些任務(wù)可能就會被丟失。

線程池提供了兩個(gè)方式處理:

  1. shutdown方法: 它會將線程池狀態(tài)變成SHUTDOWN 狀態(tài)。禁止向添加新的任務(wù),但是會讓任務(wù)隊(duì)列中的任務(wù)繼續(xù)執(zhí)行,最后釋放所有的工作線程,讓線程池狀態(tài)變成TERMINATED狀態(tài)。
  2. shutdownNow方法: 它會將線程池狀態(tài)變成STOP 狀態(tài)。禁止向添加新的任務(wù),也不會執(zhí)行任務(wù)隊(duì)列中的任務(wù),但是會返回這個(gè)任務(wù)集合,釋放所有的工作線程,讓線程池狀態(tài)變成TERMINATED狀態(tài)。

1.2 操作ctl的方法

1.2.1 獲取線程池的狀態(tài)

    /**
     * 獲取線程池的狀態(tài)。因?yàn)榫€程池的狀態(tài)是使用高3位儲存,所以屏蔽低29位就行了。
     * 所以就c與~CAPACITY(0b1110..0)進(jìn)行&操作,屏蔽低29位的值了。
     * 注意:這里是屏蔽低29位的值,而不是右移29位。
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

1.2.2 獲取工作線程數(shù)量

    /**
     * 獲取線程池中Worker工作線程的數(shù)量,
     * 因?yàn)橹皇褂玫?9位保存Worker的數(shù)量,只要屏蔽高3位的值就行了
     * 所以就c與CAPACITY(0b0001...1)進(jìn)行&操作,屏蔽高3位的值了。
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }

1.2.3 合并ctl的值

    /**
     * 得到ctl的值。
     * 接受兩個(gè)參數(shù)rs和wc。rs表示線程池的狀態(tài),wc表示W(wǎng)orker工作線程的數(shù)量。
     * 對于rs來說我們只需要高3位的值,對于wc來說我們需要低29位的值。
     * 所以我們將rs | wc就可以得到ctl的值了。
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

1.2.4 其他方法

    // 因?yàn)镽UNNING狀態(tài)高三位是111,所以狀態(tài)值rs與工作線程數(shù)量ws相與的結(jié)果值c一定是個(gè)負(fù)數(shù),
    // 而其他狀態(tài)值都是大于等于0的數(shù),所以c是負(fù)數(shù),那么表示當(dāng)前線程處于運(yùn)行狀態(tài)。
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 使用CAS函數(shù)將ctl值自增
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * 使用CAS函數(shù)將ctl值自減
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    /**
     * 使用CAS函數(shù)加循環(huán)方法這種樂觀鎖的方式,解決并發(fā)問題。
     * 保證使ctl值減一
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

二. 重要成員變量

    // 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // 任務(wù)線程的阻塞隊(duì)列,因?yàn)槭亲枞?duì)列,所以它是并發(fā)安全的
    private final BlockingQueue<Runnable> workQueue;

    // 獨(dú)占鎖,用來保證操作成員變量的并發(fā)安全問題
    private final ReentrantLock mainLock = new ReentrantLock();

    // 等待線程池完全終止的條件Condition,
    private final Condition termination = mainLock.newCondition();

    //-----------------  需要mainLock來保證并發(fā)安全-------------------------//
    // 線程池中工作線程集合。Worker中持有線程thread變量
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 線程池中曾擁有過的最大工作線程個(gè)數(shù)
    private int largestPoolSize;

    // 線程池完成過任務(wù)的總個(gè)數(shù)
    private long completedTaskCount;
    //-----------------  需要mainLock來保證并發(fā)安全-------------------------//

    // 創(chuàng)建線程的工廠類
    private volatile ThreadFactory threadFactory;

    // 當(dāng)任務(wù)被拒絕時(shí),用來處理這個(gè)被拒絕的任務(wù)
    private volatile RejectedExecutionHandler handler;
    // 工作線程空閑的超時(shí)時(shí)間keepAliveTime
    private volatile long keepAliveTime;

    // 是否允許核心池線程超時(shí)釋放
    private volatile boolean allowCoreThreadTimeOut;

    // 線程池核心池線程個(gè)數(shù)
    private volatile int corePoolSize;

    // 線程池最大的線程個(gè)數(shù)
    private volatile int maximumPoolSize;

成員變量的含義已經(jīng)標(biāo)注了:

  1. mainLock:使用mainLock來保證會發(fā)生變化成員變量的并發(fā)安全問題。會發(fā)生的成員變量有5個(gè):ctl、workQueue、workers、largestPoolSize和completedTaskCount。但是其中ctl和workQueue的類型本身就是多線程安全的,所以不用mainLock鎖保護(hù)。
  2. termination:等待線程池完全終止的條件,如果線程池沒有完全終止,調(diào)用它的awaitNanos方法,讓線程等待。當(dāng)線程池完全終止后,調(diào)用它的signalAll方法,喚醒所有等待termination條件的線程。
  3. workers:記錄所有的工作線程Worker
  4. workQueue:記錄所有待執(zhí)行的任務(wù)。使用阻塞隊(duì)列BlockingQueue,可以在隊(duì)列為空時(shí),線程等待,隊(duì)列有值時(shí),喚醒等待的線程。
  5. largestPoolSize:線程池中曾擁有過的最大工作線程個(gè)數(shù)
  6. completedTaskCount:線程池完成過任務(wù)的總個(gè)數(shù)
  7. threadFactory:創(chuàng)建線程的工廠類
  8. handler:當(dāng)任務(wù)被拒絕時(shí),用來處理這個(gè)被拒絕的任務(wù)
  9. keepAliveTime:工作線程允許空閑的超時(shí)時(shí)間,一般都是針對超過核心池?cái)?shù)量的工作線程。
  10. allowCoreThreadTimeOut: 是否允許核心池的工作線程超時(shí)釋放。
  11. corePoolSize:線程池核心池線程個(gè)數(shù)。
  12. maximumPoolSize: 線程池最大的線程個(gè)數(shù)。

這里注意一下兩個(gè)概念核心池個(gè)數(shù)和最大線程池個(gè)數(shù):

  1. 核心池個(gè)數(shù)就是線程池能夠維持的常用工作線程個(gè)數(shù),當(dāng)工作線程沒有執(zhí)行任務(wù)空閑時(shí),它不會被銷毀,而是在等待。但是如果設(shè)置allowCoreThreadTimeOut為true,那么核心池工作線程也是會被銷毀。
  2. 最大線程池個(gè)數(shù)就是線程池允許開啟的最大工作線程個(gè)數(shù)。最大線程池的意義就是當(dāng)核心池的工作線程不夠用,且任務(wù)隊(duì)列也已經(jīng)滿了,不能添加新的任務(wù)了,那么就要開啟新的工作線程來執(zhí)行任務(wù)。

三. 執(zhí)行任務(wù)execute方法

在線程池中如何執(zhí)行一個(gè)任務(wù)command,要分三種情況:

  1. 線程池中工作線程的數(shù)量沒有達(dá)到核心池個(gè)數(shù),那么線程池就應(yīng)該開啟新的工作線程來執(zhí)行任務(wù)。
  2. 線程池中工作線程的數(shù)量達(dá)到核心池個(gè)數(shù),那么就應(yīng)該將任務(wù)添加到任務(wù)隊(duì)列中,等待著工作線程去任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。
  3. 如果任務(wù)添加到任務(wù)隊(duì)列失敗,那么就要開啟新的工作線程來執(zhí)行任務(wù)。

    public void execute(Runnable command) {
        // 如果command為null,拋出異常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 分為三個(gè)步驟:
         * 1. 如果運(yùn)行的工作線程數(shù)量少于核心池?cái)?shù)量corePoolSize,
         * 那么就調(diào)用addWorker方法開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
         * 2. 如果開啟新的工作線程失敗,就將任務(wù)添加到任務(wù)隊(duì)列中。
         * 3. 添加到任務(wù)隊(duì)列失敗,
         * 那么仍然addWorker方法在最大池中開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。     
         */
        int c = ctl.get();
        // 運(yùn)行的工作線程數(shù)量少于核心池?cái)?shù)量corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            /**
             * 開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
             * 返回true,表示開啟工作線程成功,直接return。
             * 返回false,表示沒有開啟新線程。那么任務(wù)command就沒有運(yùn)行,所以要執(zhí)行下面代碼。
              */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 線程池處于運(yùn)行狀態(tài),
        // 且任務(wù)添加到任務(wù)阻塞隊(duì)列workQueue中成功,即workQueue隊(duì)列有剩余空間。
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次檢查線程池狀態(tài)和工作線程數(shù)量
            int recheck = ctl.get();
            /**
             * 如果線程池不在運(yùn)行狀態(tài),那么就調(diào)用remove方法移除workQueue隊(duì)列這個(gè)任務(wù)command,
             * 如果移除成功,那么調(diào)用reject(command)方法,進(jìn)行拒絕任務(wù)的處理。
             * 如果移除失敗,那么這個(gè)任務(wù)還是會被執(zhí)行,那么就不用調(diào)用reject(command)方法
             */
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果工作線程數(shù)量為0,但是workQueue隊(duì)列中我們添加過任務(wù),
            // 那么必須調(diào)用addWorker方法,開啟一個(gè)新的工作線程。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 調(diào)用addWorker方法,開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
        // 如果還是失敗,那么這個(gè)任務(wù)command就不會不可能執(zhí)行了,
        // 那么調(diào)用reject(command)方法拒絕這個(gè)任務(wù)
        else if (!addWorker(command, false))
            reject(command);
    }

方法流程上面已經(jīng)有標(biāo)注,注意有以下幾點(diǎn):

  1. addWorker(Runnable firstTask, boolean core):表示開啟一個(gè)新的工作線程執(zhí)行任務(wù)firstTask。core是用來判斷核心池還是最大池。返回false,表示開啟新線程失敗,即任務(wù)firstTask沒有機(jī)會執(zhí)行。
  2. isRunning(c)線程池處于RUNNING狀態(tài),只有處于RUNNING狀態(tài)下,才能將任務(wù)添加到任務(wù)隊(duì)列。
  3. reject(command) 當(dāng)任務(wù)command不能在線程池中執(zhí)行時(shí),就會調(diào)用這個(gè)方法,告訴調(diào)用值,線程池拒絕執(zhí)行這個(gè)任務(wù)。

四. 添加工作線程addWorker方法

就是利用任務(wù)task創(chuàng)建一個(gè)新的工作線程Work,然后將它添加到工作線程集合workers中。但是需要注意多線程并發(fā)問題。

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 利用死循環(huán)和CAS函數(shù),實(shí)現(xiàn)樂觀鎖,來實(shí)現(xiàn)多線程改變ctl值的并發(fā)問題
        // 因?yàn)閏tl值代表兩個(gè)東西,工作線程數(shù)量和線程池狀態(tài)。
        // 這里就用了兩個(gè)for循環(huán),一個(gè)是線程池狀態(tài)的for循環(huán),一個(gè)是工作線程數(shù)量的for循環(huán)
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取線程池運(yùn)行狀態(tài)rs,
            int rs = runStateOf(c);

            // 首先判斷線程池狀態(tài)和任務(wù)隊(duì)列狀態(tài),
            // 來判斷能否創(chuàng)建新的工作線程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            
            for (;;) {
                // 線程池中工作線程數(shù)量wc
                int wc = workerCountOf(c);
                // 當(dāng)線程池工作線程數(shù)量wc大于線程上限CAPACITY,
                // 或者用戶規(guī)定核心池?cái)?shù)量corePoolSize或用戶規(guī)定最大線程池?cái)?shù)量maximumPoolSize
                // 表示不能創(chuàng)建工作線程了,所以返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 使用CAS函數(shù),使工作線程數(shù)量wc加一
                if (compareAndIncrementWorkerCount(c))
                    // 跳出retry循環(huán)
                    break retry;
                // 來到這里表示CAS函數(shù)失敗,那么就要循環(huán)重新判斷
                // 但是c還代表線程狀態(tài),如果線程狀態(tài)改變,那么就必須跳轉(zhuǎn)到retry循環(huán)
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 工作線程是否開始,即調(diào)用了線程的start方法
        boolean workerStarted = false;
        // 工作線程是否添加到工作線程隊(duì)列workers中
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 創(chuàng)建一個(gè)Worker對象
            w = new Worker(firstTask);
            // 得到Worker所擁有的線程thread
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 并發(fā)鎖
                mainLock.lock();
                try {
                    //  獲取線程池運(yùn)行狀態(tài)rs
                    int rs = runStateOf(ctl.get());

                    // 當(dāng)線程池是運(yùn)行狀態(tài),或者是SHUTDOWN狀態(tài)但firstTask為null,
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果線程t已經(jīng)被開啟,就拋出異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將w添加到工作線程集合workers中
                        workers.add(w);
                        // 獲取工作線程集合workers的個(gè)數(shù)
                        int s = workers.size();
                        // 記錄線程池歷史最大的工作線程個(gè)數(shù)
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果已經(jīng)添加到工作線程隊(duì)列中,那么開啟線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果開啟工作線程失敗,那么這個(gè)任務(wù)也就沒有執(zhí)行
            // 因此移除這個(gè)任務(wù)w(如果隊(duì)列中有),減少工作線程數(shù)量,因?yàn)檫@個(gè)數(shù)量在之前已經(jīng)增加了
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

添加一個(gè)新的工作線程,就涉及到兩個(gè)成員變量的改變,一個(gè)是工作線程數(shù)量ctl,一個(gè)是工作線程集合workers。而ctl的類型是AtomicInteger,所以它可以使用樂觀鎖解決并發(fā)問題,workers就只能使用mainLock互斥鎖來保證并發(fā)安全問題。

4.1 更改工作線程數(shù)量ctl

因?yàn)閏tl儲存了兩個(gè)值,工作線程數(shù)量和線程池狀態(tài)。所以使用了兩個(gè)for循環(huán)來監(jiān)控多線程對這兩個(gè)值的更改。
用線程池狀態(tài)來判斷是否允許添加新的工作線程:

            //  是對addWorker中線程狀態(tài)if判斷的拆分
            // 當(dāng)線程池不是處于運(yùn)行狀態(tài)
            if (rs >= SHUTDOWN) {
                /**
                 * 線程池狀態(tài)不是SHUTDOWN,或者firstTask不為null,或者任務(wù)隊(duì)列為空,
                 * 都直接返回false,表示開啟新工作線程失敗。
                 * 只有當(dāng)線程池狀態(tài)是SHUTDOWN,firstTask為null,任務(wù)隊(duì)列不為空時(shí),
                 * 需要?jiǎng)?chuàng)建新的工作線程。
                 * 從execute(Runnable command)方法中分析,firstTask參數(shù)為空只有一種情況,
                 * 此時(shí)線程池中工作線程數(shù)量是0,而任務(wù)隊(duì)列不為空,
                 * 那么就要開啟一個(gè)新工作線程去執(zhí)行任務(wù)隊(duì)列中的任務(wù),否則這些任務(wù)會被丟失。
                 */
                if (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) {
                    return false;
                }
            }

由此可以得出,只有兩種情形允許添加新的工作線程:

  1. 線程池處于RUNNING狀態(tài)
  2. 線程池雖然處于SHUTDOWN狀態(tài),但是線程池工作線程個(gè)數(shù)是0(即這里的firstTask != null),且任務(wù)隊(duì)列workQueue不為空,那么就要開啟一個(gè)新工作線程去執(zhí)行任務(wù)隊(duì)列中的任務(wù)。

然后使用for循環(huán)和CAS函數(shù)方式,來給工作線程數(shù)量加一。注意此時(shí)工作線程還沒有創(chuàng)建,并添加到線程集合workers中,所以如果線程添加失敗,那么還要將工作線程數(shù)量減一。

4.2 添加工作線程集合workers

創(chuàng)建一個(gè)工作線程Worker,將它添加到線程集合workers中,然后開啟這個(gè)工作線程,使用mainLock獨(dú)占鎖保證成員變量workers的并發(fā)安全問題。

五. 內(nèi)部類Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** 該Worker所擁有的工作線程 */
        final Thread thread;
        /** Worker擁有的第一個(gè)任務(wù),初始化的時(shí)候賦值 */
        Runnable firstTask;
        /** 該工作線程Worker完成任務(wù)的數(shù)量 */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // 將state設(shè)置為-1,禁止發(fā)起中斷請求,
            // 直到調(diào)用過runWorker方法,即線程已經(jīng)運(yùn)行時(shí)。
            setState(-1);
            // 第一個(gè)任務(wù)
            this.firstTask = firstTask;
            // 創(chuàng)建一個(gè)thread線程對象,它的run方法就是本W(wǎng)orker的run方法
            // 這個(gè)thread就是Worker真正執(zhí)行任務(wù)的工作線程
            this.thread = getThreadFactory().newThread(this);
        }

        /** 復(fù)寫的是Runnable中的run方法,所以當(dāng)工作線程開啟運(yùn)行后,會調(diào)用這個(gè)方法。  */
        public void run() {
            runWorker(this);
        }

        // 當(dāng)前獨(dú)占鎖是否空閑
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 嘗試獲取獨(dú)占鎖
        protected boolean tryAcquire(int unused) {
            // 如果通過CAS函數(shù),可以將state值從0改變成1,那么表示獲取獨(dú)占鎖成功。
            // 否則獨(dú)占鎖被別的線程獲取了。
            if (compareAndSetState(0, 1)) {
                // 設(shè)置擁有獨(dú)占鎖的線程是當(dāng)前線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 釋放獨(dú)占鎖
        protected boolean tryRelease(int unused) {
            // 設(shè)置擁有獨(dú)占鎖的線程為null
            setExclusiveOwnerThread(null);
            // 設(shè)置獲取獨(dú)占鎖的次數(shù)是0,表示鎖是空閑狀態(tài)
            setState(0);
            return true;
        }

        // 獲取獨(dú)占鎖,如果鎖被別的獲取,就一直等待。
        public void lock()        { acquire(1); }
        // 嘗試獲取獨(dú)占鎖,如果鎖被別的獲取,就直接返回false,表示獲取失敗。
        public boolean tryLock()  { return tryAcquire(1); }
        // 釋放獨(dú)占鎖
        public void unlock()      { release(1); }
        // 當(dāng)前獨(dú)占鎖是否空閑
        public boolean isLocked() { return isHeldExclusively(); }

        // 如果Worker的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求。
        void interruptIfStarted() {
            Thread t;
            // getState() >= 0表示thread已經(jīng)開啟
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker實(shí)現(xiàn)了Runnable接口,那么就可以通過Worker對象創(chuàng)建一個(gè)新線程thread,這個(gè)thread就是Worker的工作線程,而任務(wù)都在run方法中執(zhí)行。
Worker還繼承自AbstractQueuedSynchronizer類。我們知道可以通過AQS類實(shí)現(xiàn)獨(dú)占鎖和共享鎖,而Worker中實(shí)現(xiàn)了tryAcquire和tryRelease方法,說明Worker對象也是個(gè)獨(dú)占鎖對象。我們可以考慮一下Worker這個(gè)獨(dú)占鎖的作用是什么?在后面會介紹到。

六. 工作線程運(yùn)行任務(wù)runWorker方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 將w的state狀態(tài)設(shè)置成0,這樣就允許對w的thread線程進(jìn)行中斷請求了。
        w.unlock();
        // completedAbruptly表示線程突然終結(jié)
        boolean completedAbruptly = true;
        try {
            // 通過getTask從任務(wù)隊(duì)列中獲取任務(wù)task執(zhí)行,這個(gè)方法是個(gè)阻塞方法。
            while (task != null || (task = getTask()) != null) {
                // 獲取w獨(dú)占鎖,保證當(dāng)本工作線程運(yùn)行任務(wù)時(shí),
                // 不能對該線程進(jìn)行中斷請求。
                w.lock();
                /**
                 * 如果線程池大于STOP狀態(tài),且Worker工作線程中斷標(biāo)志位是false,
                 * 那么就調(diào)用wt的interrupt方法發(fā)起中斷請求。
                 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // Worker工作線程發(fā)起中斷請求
                    wt.interrupt();
                try {
                    // 鉤子方法,提供給子類。在執(zhí)行任務(wù)之前調(diào)用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 調(diào)用run方法,執(zhí)行任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 鉤子方法,提供給子類。在執(zhí)行任務(wù)完成后調(diào)用
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 將task設(shè)置為null,進(jìn)行下一次循環(huán)
                    task = null;
                    // 將work完成的任務(wù)數(shù)completedTasks加一
                    w.completedTasks++;
                    // 釋放w獨(dú)占鎖
                    w.unlock();
                }
            }
            // completedAbruptly = false表示線程正常完成終結(jié)
            completedAbruptly = false;
        } finally {
            // 進(jìn)行一個(gè)工作線程完結(jié)后的后續(xù)操作
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法是在每個(gè)工作線程的run方法中調(diào)用,通過getTask()方法從任務(wù)隊(duì)列中獲取任務(wù)task執(zhí)行,這個(gè)方法可以阻塞當(dāng)前工作線程,如果getTask()方法返回null,那么工作線程就會運(yùn)行結(jié)束,釋放線程。

雖然runWorker方法運(yùn)行在每個(gè)工作線程中,但是對于一個(gè)Worker來說,只會有它的工作線程能夠運(yùn)行runWorker方法,而且改變的也是這個(gè)Worker的成員變量,且這些成員變量也只能在runWorker方法改變,那么它沒有多線程并發(fā)問題啊,那么為什么在這里加鎖呢?
這是因?yàn)閃orker中有一個(gè)變量是可以被其他線程改變的,就是它的工作線程thread的中斷請求,所以Worker獨(dú)占鎖的作用就是控制別的線程對它的工作線程thread中斷請求的。

最后調(diào)用processWorkerExit方法,進(jìn)行一個(gè)工作線程完結(jié)后的后續(xù)操作。

七. 獲取任務(wù)getTask方法

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // 獲取線程池狀態(tài)rs
            int rs = runStateOf(c);

            // 如果有需要檢查任務(wù)隊(duì)列workQueue是否為空
            // 即rs >= STOP或者rs == SHUTDOWN且workQueue為空,那么返回null,停止工作線程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 將工作線程數(shù)量減一
                decrementWorkerCount();
                return null;
            }

            // 獲取工作線程數(shù)量wc
            int wc = workerCountOf(c);

            /**
             * 如果allowCoreThreadTimeOut為true或者wc > corePoolSize時(shí),
             * 就要減少工作線程數(shù)量了。
             * 當(dāng)工作線程在keepAliveTime時(shí)間內(nèi),沒有獲取到可執(zhí)行的任務(wù),
             * 那么該工作線程就要被銷毀。
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 工作線程數(shù)量減一,返回null,銷毀工作線程。
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 從任務(wù)隊(duì)列workQueue中獲取了任務(wù)r,會阻塞當(dāng)前線程。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果r不為null,返回這個(gè)任務(wù)r
                if (r != null)
                    return r;
                // r是null,表示獲取任務(wù)超時(shí)
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

從阻塞任務(wù)隊(duì)列workQueue中獲取任務(wù)返回,因?yàn)槭亲枞蝿?wù)隊(duì)列,所以可以阻塞當(dāng)前線程。如果返回null,那么會完結(jié)調(diào)用getTask方法的那個(gè)工作線程。那么getTask方法在什么情況下返回null呢?

  1. 線程池的狀態(tài)大于等于STOP,或者線程狀態(tài)是SHUTDOWN且當(dāng)前任務(wù)隊(duì)列為空,那么返回null,停止工作線程。
  2. 獲取任務(wù)時(shí)間超時(shí),那么也會返回null,停止工作線程。因?yàn)榫€程池一般只維護(hù)一定數(shù)量的工作線程,如果超過這個(gè)數(shù)量,那么超過數(shù)量的工作線程,在空閑一定時(shí)間后,應(yīng)該被釋放。

八. 終止線程池的方法

8.1 shutdown和shutdownNow方法

    /**
     * 終止線程池。不能在添加新任務(wù)了,但是已經(jīng)添加到任務(wù)隊(duì)列的任務(wù)還是會執(zhí)行。
     * 且對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查是否擁有Shutdown的權(quán)限
            checkShutdownAccess();
            // 將線程池狀態(tài)變成SHUTDOWN狀態(tài)
            advanceRunState(SHUTDOWN);
            // 對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求
            interruptIdleWorkers();
            // 鉤子方法,提供給子類實(shí)現(xiàn)。表示線程池已經(jīng)shutdown了
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        // 嘗試去終結(jié)線程池
        tryTerminate();
    }

    /**
     * 終止線程池。不能在添加新任務(wù)了,也不會執(zhí)行已經(jīng)添加到任務(wù)隊(duì)列的任務(wù),只是將這些任務(wù)返回。
     * 且對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查是否擁有Shutdown的權(quán)限
            checkShutdownAccess();
            // 將線程池狀態(tài)變成STOP狀態(tài)
            advanceRunState(STOP);
            // 對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)
            interruptWorkers();
            // 返回阻塞隊(duì)列workQueue中未執(zhí)行任務(wù)的集合
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 嘗試去終結(jié)線程池
        tryTerminate();
        return tasks;
    }

shutdown和shutdownNow區(qū)別:

  1. shutdown方法將線程池設(shè)置成SHUTDOWN狀態(tài),shutdownNow將線程池設(shè)置成STOP狀態(tài)。
  2. shutdown方法調(diào)用之后不能在添加新任務(wù)了,但是已經(jīng)添加到任務(wù)隊(duì)列的任務(wù)還是會執(zhí)行。shutdownNow方法調(diào)用之后不能在添加新任務(wù)了,也不會執(zhí)行已經(jīng)添加到任務(wù)隊(duì)列的任務(wù),只是將這些任務(wù)返回。
  3. shutdown方法會對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求,shutdownNow方法會對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)。

8.2 advanceRunState方法

    private void advanceRunState(int targetState) {
        // 采用樂觀鎖的方法,來并發(fā)更改線程池狀態(tài)。
        for (;;) {
            int c = ctl.get();
            // 如果runStateAtLeast方法返回true,表示當(dāng)前線程池狀態(tài)已經(jīng)是目標(biāo)狀態(tài)targetState
            // 采用CAS函數(shù)嘗試更改線程池狀態(tài),如果失敗就循環(huán)繼續(xù)。
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

這個(gè)方法來改變線程池狀態(tài),使用樂觀鎖的方式保證并發(fā)安全。

8.3 中斷空閑狀態(tài)下的工作線程

    /**
     * 對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求。
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍歷工作線程Worker集合
            for (Worker w : workers) {
                Thread t = w.thread;
                // 如果工作線程中斷標(biāo)志位是false,
                // 且能夠獲取鎖,即當(dāng)前工作線程沒有運(yùn)行任務(wù)
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 發(fā)起中斷請求。
                        // 因?yàn)楂@取了鎖,所以在進(jìn)入中斷請求時(shí),worker工作線程不會執(zhí)行任務(wù)
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // 釋放鎖
                        w.unlock();
                    }
                }
                // 是否只進(jìn)行一個(gè)工作線程的中斷請求。
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

遍歷工作線程Worker集合,如果工作線程出于空閑狀態(tài),且沒有被中斷,那么就發(fā)起中斷請求。通過獨(dú)占鎖Worker知道,當(dāng)前工作線程是否在執(zhí)行任務(wù)。

8.4 對所有已開啟的工作線程發(fā)起中斷請求

    /**
     * 對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                 // 如果w的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求。
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

遍歷工作線程Worker集合,調(diào)用Worker的interruptIfStarted方法,如果工作線程已開啟,那么就會發(fā)起中斷。

8.5 嘗試完結(jié)線程池的方法

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 如果線程池是RUNNING狀態(tài),
             * 或者線程池是TIDYING狀態(tài)(是因?yàn)橐呀?jīng)有別的線程在終止線程池了)
             * 或者線程池是SHUTDOWN狀態(tài)且任務(wù)隊(duì)列不為空,
             * 線程池不能被terminate終止,直接return返回
             *
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 線程池中工作線程數(shù)量不是0,線程池不能被terminate終止,所以要return
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 鉤子方法,提供給子類實(shí)現(xiàn)。表示線程池已經(jīng)終止。
                        terminated();
                    } finally {
                        // 設(shè)置線程池狀態(tài)是TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

線程池在什么情況下算是完全停止了呢?有三個(gè)條件:

  1. 線程池不是RUNNING狀態(tài)。
  2. 線程池中工作線程數(shù)量是0。
  3. 線程池中任務(wù)隊(duì)列為空。

所以在看看tryTerminate()中,前面兩個(gè)if判斷條件,就可以理解了。

8.6 等待線程池完結(jié)的方法

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 如果是TERMINATED已終止?fàn)顟B(tài),那么就返回true
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                // 如果已經(jīng)超時(shí)就返回false
                if (nanos <= 0)
                    return false;
                // 讓當(dāng)前線程等待。并設(shè)置超時(shí)時(shí)間nanos
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

如果線程池不是TERMINATED狀態(tài),就讓當(dāng)前線程在termination條件上等待,直到線程池變成TERMINATED狀態(tài),或者等待時(shí)間超時(shí)才會被喚醒。

8.7 工作線程退出的方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果工作線程突然被終結(jié),那么工作線程的數(shù)量就沒有減一。
        if (completedAbruptly)
            // 將工作線程數(shù)量減一。
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將工作線程的任務(wù)完成數(shù)添加到線程池完成任務(wù)總數(shù)中
            completedTaskCount += w.completedTasks;
            // 從工作線程集合中移除本工作線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 因?yàn)橛幸粋€(gè)工作線程已經(jīng)完成被釋放,那么就去嘗試終結(jié)線程池。
        tryTerminate();

        int c = ctl.get();
        // 如果線程池狀態(tài)小于STOP,
        // 就要判斷終結(jié)了這個(gè)工作線程之后,線程池中工作線程數(shù)量是否滿足需求。
        if (runStateLessThan(c, STOP)) {
            // 如果工作線程正常終結(jié),
            // 那么要看線程池中工作線程數(shù)量是否滿足需求。
            if (!completedAbruptly) {
                // 不允許核心池線程釋放,那么最小值是corePoolSize,否則就可以為0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 但是如果任務(wù)隊(duì)列中還有任務(wù),那么工作線程數(shù)量最少為1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果工作線程數(shù)量小于min值,就要?jiǎng)?chuàng)建新的工作線程了。
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 開啟一個(gè)新的工作線程
            addWorker(null, false);
        }
    }

工作線程被釋放,有兩種情況,一種是運(yùn)行完成正常結(jié)束,一種是發(fā)生異常意外終止。
當(dāng)工作線程被釋放時(shí),需要將它從工作線程集合workers中移除,將該工作線程任務(wù)完成數(shù)添加到線程池完成任務(wù)總數(shù)中。調(diào)用tryTerminate方法嘗試終結(jié)線程池。
另外因?yàn)橛幸粋€(gè)工作線程被釋放,那么就要考慮線程池中當(dāng)前工作線程數(shù)量是否符合要求,要不要添加新的工作線程。

九. 創(chuàng)建線程池的方法。

上面分析完線程池的功能方法后,再來說說怎樣創(chuàng)建一個(gè)線程池。

9.1 構(gòu)造函數(shù)

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 判斷設(shè)置的核心池?cái)?shù)量corePoolSize、最大池?cái)?shù)量maximumPoolSize、
        // 與線程空閑存活時(shí)間keepAliveTime的值,是否符合要求
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();

        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();

        // 賦值
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor類一共有四個(gè)構(gòu)造函數(shù),前面三個(gè)構(gòu)造函數(shù)都是調(diào)用后面那個(gè)構(gòu)造函數(shù)來實(shí)現(xiàn)的。參數(shù)意義:

  1. corePoolSize: 線程池核心池線程個(gè)數(shù)。
  2. maximumPoolSize: 線程池允許最大的線程個(gè)數(shù)。
  3. keepAliveTime: 線程空閑時(shí),允許存活的時(shí)間。
  4. unit:輔助變量,用來將keepAliveTime參數(shù),轉(zhuǎn)成對應(yīng)納秒值。
  5. workQueue:儲存所有待執(zhí)行任務(wù)的阻塞隊(duì)列
  6. threadFactory:用來創(chuàng)建線程的工廠類
  7. handler:通過它來通知調(diào)用值,線程池拒絕了任務(wù)。
    注:有沒有注意到,沒有傳遞allowCoreThreadTimeOut這個(gè)參數(shù),那么怎么設(shè)置這個(gè)成語變量呢?通過allowCoreThreadTimeOut(boolean value)方法來設(shè)置。

一般我們不用自己來new ThreadPoolExecutor對象,而是通過Executors這個(gè)工具類來創(chuàng)建ThreadPoolExecutor實(shí)例。

9.2 創(chuàng)建固定數(shù)量的線程池

    // 創(chuàng)建固定數(shù)量的線程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    // 創(chuàng)建固定數(shù)量的線程池
    public static ExecutorService newFixedThreadPool(int nThreads, 
              ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

根據(jù)我們前面講解,要想線程池維持固定數(shù)量的工作線程,那么工作線程就不能被釋放,就要做到兩點(diǎn):

  1. allowCoreThreadTimeOut為false,這個(gè)是默認(rèn)的。keepAliveTime設(shè)置為0,這樣當(dāng)調(diào)用allowCoreThreadTimeOut(boolean value)方法修改allowCoreThreadTimeOut值時(shí),會拋出異常,不允許修改。
  2. 核心池?cái)?shù)量和最大池?cái)?shù)量一樣,防止添加新的工作線程池。任務(wù)隊(duì)列容量要足夠大,防止任務(wù)添加到任務(wù)隊(duì)列中失敗,不能執(zhí)行。

9.3 創(chuàng)建單個(gè)線程的線程池

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(
              ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

與固定數(shù)量的線程池相比:

  1. 將固定數(shù)量nThreads變成了1
  2. 使用了FinalizableDelegatedExecutorService這個(gè)代理類,主要作用就是當(dāng)對象被銷毀時(shí),會調(diào)用shutdown方法,停止線程池。

9.4 創(chuàng)建緩存線程池

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

什么叫做緩存線程池,當(dāng)有任務(wù)執(zhí)行時(shí),會創(chuàng)建工作線程來執(zhí)行任務(wù),當(dāng)任務(wù)執(zhí)行完畢后,工作線程會等待一段時(shí)間,如果還是沒有任務(wù)需要執(zhí)行,那么就會釋放工作線程。

十. ThreadPoolExecutor 重要參數(shù)方法

10.1 getActiveCount 正在執(zhí)行任務(wù)線程數(shù)

    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                // isLocked() 表示這個(gè) Worker 正在執(zhí)行任務(wù)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

這個(gè)方法獲取正在執(zhí)行任務(wù)的線程數(shù)量。w.isLocked() 表示正在執(zhí)行任務(wù)的 Worker 。

10.2 getCompletedTaskCount 返回完成任務(wù)大致數(shù)量

  public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和。
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

返回已完成執(zhí)行的任務(wù)的大致總數(shù)。由于任務(wù)和線程的狀態(tài)可能在計(jì)算過程中動(dòng)態(tài)變化,因此返回值只是一個(gè)近似值。

completedTaskCount 表示已完成 WorkercompletedTasks 數(shù)量之和。在 Worker 退出方法 processWorkerExit() 中進(jìn)行增加操作。

10.3 getTaskCount 返回已計(jì)劃執(zhí)行的任務(wù)的大致總數(shù)

    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和。
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                // w.isLocked() 表示一個(gè)任務(wù)正在執(zhí)行
                if (w.isLocked())
                    ++n;
            }
            // 再加上待執(zhí)行的任務(wù)數(shù)量
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

返回已計(jì)劃執(zhí)行的任務(wù)的大致總數(shù)。由于任務(wù)和線程的狀態(tài)可能在計(jì)算過程中動(dòng)態(tài)變化,因此返回值只是一個(gè)近似值。

10.4 getCorePoolSize 方法

    public int getCorePoolSize() {
        return corePoolSize;
    }

返回線程池核心線程數(shù)量。

10.5 getKeepAliveTime 方法

   public long getKeepAliveTime(TimeUnit unit) {
        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    }

返回線程保持活動(dòng)時(shí)間。一是當(dāng)線程超過核心線程數(shù)時(shí),超過的線程超過 keepAliveTime 時(shí)間沒有執(zhí)行任務(wù),就會關(guān)閉;二是 allowCoreThreadTimeOut 值為 true,那么核心線程空閑事件超過 keepAliveTime 也會關(guān)閉。

10.6 getLargestPoolSize 方法

    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

獲取曾經(jīng)出現(xiàn)的最大線程數(shù)。

10.7 getMaximumPoolSize 方法

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

獲取最大線程數(shù)。

10.8 getPoolSize 方法

    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

返回池中的當(dāng)前線程數(shù)。

10.9 getQueue 方法

    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

返回待執(zhí)行任務(wù)隊(duì)列。

十一 總結(jié)

線程池有兩個(gè)概念核心池與最大池。

  1. 核心池:線程池應(yīng)該維持的工作線程數(shù)量,如果線程池中工作線程數(shù)量小于核心池?cái)?shù)量,就會創(chuàng)建新的工作線程添加到線程池中。
  2. 最大池: 線程池中臨時(shí)存在的工作線程,當(dāng)任務(wù)隊(duì)列不能添加新任務(wù)時(shí),就會創(chuàng)建新的工作線程添加到線程池中。執(zhí)行完任務(wù)后,超過一定時(shí)間沒有接受到新任務(wù),這個(gè)臨時(shí)工作線程就會被釋放。

兩者的區(qū)別:

  1. 線程釋放:最大池中的線程當(dāng)超過一定時(shí)間沒有接受到新任務(wù),就會被釋放,而核心池中的線程,一般不釋放,只有設(shè)置allowCoreThreadTimeOut為true,且超過一定時(shí)間沒有接受到新任務(wù),也會被釋放。
  2. 創(chuàng)建時(shí)機(jī):線程池中工作線程數(shù)量小于核心池?cái)?shù)量,就會創(chuàng)建核心池線程。但是對于最大池來說,只有任務(wù)隊(duì)列已滿,不能添加新任務(wù)時(shí),才會創(chuàng)建新線程,放入最大池中。
    注:一般稱小于等于corePoolSize數(shù)量的工作線程池是核心池中的線程,大于corePoolSize數(shù)量的工作線程池就是最大池中的線程。

11.1 線程池執(zhí)行任務(wù)流程

通過execute方法執(zhí)行新任務(wù)command,分為三個(gè)步驟:

  1. 線程池中工作線程數(shù)量小于核心池?cái)?shù)量,那么就開啟新的工作線程來執(zhí)行任務(wù)。
  2. 線程池中工作線程數(shù)量達(dá)到核心池?cái)?shù)量,那么就將新任務(wù)添加到任務(wù)隊(duì)列中。
  3. 如果新任務(wù)添加到任務(wù)隊(duì)列失敗,那么就開啟新的工作線程來執(zhí)行任務(wù)(這個(gè)線程就在最大池中了)。

在每個(gè)工作線程,會通過循環(huán),調(diào)用getTask方法,不斷地從任務(wù)隊(duì)列中獲取任務(wù)來執(zhí)行。如果任務(wù)隊(duì)列中沒有任務(wù),那么getTask方法會阻塞當(dāng)前工作線程。

但是工作線程被喚醒后,getTask方法返回null,那么就會跳出循環(huán),該工作線程運(yùn)行結(jié)束,準(zhǔn)備釋放。

11.2 終止線程池

線程池不可能立即就終止,因?yàn)樯婕暗骄€程池正在執(zhí)行任務(wù)的線程和任務(wù)隊(duì)列中等待執(zhí)行的任務(wù)該如何處理問題,有兩個(gè)方式:

  1. shutdown方法:不能再向線程池中添加新任務(wù)了,但是已經(jīng)添加到任務(wù)隊(duì)列的任務(wù)還是會執(zhí)行,也不會對正在執(zhí)行任務(wù)的線程發(fā)起中斷請求。等待任務(wù)隊(duì)列任務(wù)執(zhí)行完成,釋放線程池中所有線程,線程池進(jìn)入完全終止?fàn)顟B(tài)。
  2. shutdownNow方法:不能再向線程池中添加新任務(wù)了,也不會執(zhí)行已經(jīng)添加到任務(wù)隊(duì)列的任務(wù),但是會返回未執(zhí)行的任務(wù)集合。而且對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)。等待線程池中所有線程釋放,線程池進(jìn)入完全終止?fàn)顟B(tài)。

兩者的區(qū)別:

兩者都不能再向線程池中添加新任務(wù)了。shutdown方法還是會將已添加的任務(wù)都執(zhí)行完畢,而shutdownNow方法不會再執(zhí)行任何新任務(wù)了。
注:對于正在執(zhí)行的任務(wù)是可能執(zhí)行完成的,因?yàn)橹袛嗾埱笾荒苤袛嗵幱赪AITING與TIMED_WAITING狀態(tài)的線程,對于處于其他狀態(tài)的線程不起作用。

十二. 重要示例

12.1 正常運(yùn)行線程池

package com.zhang._22._5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class Run implements Runnable {

    private int index;

    public Run(int index) {
        this.index = index+1;
    }

    @Override
    public void run() {
        System.out.println("--"+Thread.currentThread().getName()+"開始運(yùn)行 任務(wù)"+index);
        try {
            int waitTime = 100 + index * 10;
            Thread.sleep(waitTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務(wù)"+index);
    }
}

class MyThreadFactory implements ThreadFactory {
    private int sequenceNumber = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "線程"+(++sequenceNumber));
    }
}

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {

        ThreadFactory threadFactory = new MyThreadFactory();

        // 固定數(shù)量的線程池
        ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);

//        // 單個(gè)線程的線程池
//        ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
//        // 緩存線程池
//        ExecutorService service = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 6; i++) {
            service.execute(new Run(i));
        }
    }
}

運(yùn)行結(jié)果:

--線程1開始運(yùn)行 任務(wù)1
--線程2開始運(yùn)行 任務(wù)2
--線程3開始運(yùn)行 任務(wù)3
=======線程1結(jié)束 任務(wù)1
--線程1開始運(yùn)行 任務(wù)4
=======線程2結(jié)束 任務(wù)2
--線程2開始運(yùn)行 任務(wù)5
=======線程3結(jié)束 任務(wù)3
--線程3開始運(yùn)行 任務(wù)6
=======線程1結(jié)束 任務(wù)4
=======線程2結(jié)束 任務(wù)5
=======線程3結(jié)束 任務(wù)6

這里使用的是固定數(shù)量的線程池,所以只有三個(gè)線程來執(zhí)行任務(wù),未執(zhí)行到的任務(wù)只能等待。
如果換成單個(gè)線程的線程池,那么只有一個(gè)線程在執(zhí)行任務(wù)。
而緩存線程池呢?你就會發(fā)現(xiàn)居然有六個(gè)線程在執(zhí)行任務(wù),就是有多少任務(wù)創(chuàng)建多少個(gè)線程。

運(yùn)行完任務(wù)后,你會發(fā)現(xiàn)程序沒有結(jié)束,那是因?yàn)榫€程池沒有被終止。

12.2 終止線程池

package com.zhang._22._5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class Run implements Runnable {

    private int index;

    public Run(int index) {
        this.index = index+1;
    }

    @Override
    public void run() {
        System.out.println("--"+Thread.currentThread().getName()+"開始運(yùn)行 任務(wù)"+index);
        try {
            int waitTime = 100 + index * 10;
            Thread.sleep(waitTime);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常  exception=="+e.getMessage());
        }
        System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務(wù)"+index);
    }
}

class MyThreadFactory implements ThreadFactory {
    private int sequenceNumber = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "線程"+(++sequenceNumber));
    }
}

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {

        ThreadFactory threadFactory = new MyThreadFactory();

        // 固定數(shù)量的線程池
        ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);

//        // 單個(gè)線程的線程池
//        ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
//        // 緩存線程池
//        ExecutorService service = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 6; i++) {
            service.execute(new Run(i));
        }
        // 還是會執(zhí)行完已經(jīng)添加的任務(wù)
        service.shutdown();
    }
}

運(yùn)行結(jié)果:

--線程1開始運(yùn)行 任務(wù)1
--線程3開始運(yùn)行 任務(wù)3
--線程2開始運(yùn)行 任務(wù)2
=======線程1結(jié)束 任務(wù)1
--線程1開始運(yùn)行 任務(wù)4
=======線程2結(jié)束 任務(wù)2
--線程2開始運(yùn)行 任務(wù)5
=======線程3結(jié)束 任務(wù)3
--線程3開始運(yùn)行 任務(wù)6
=======線程1結(jié)束 任務(wù)4
=======線程2結(jié)束 任務(wù)5
=======線程3結(jié)束 任務(wù)6

Process finished with exit code 0

使用shutdown方法,還是會執(zhí)行完已經(jīng)添加的任務(wù)。最后程序退出。

package com.zhang._22._5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class Run implements Runnable {

    private int index;

    public Run(int index) {
        this.index = index+1;
    }

    @Override
    public void run() {
        System.out.println("--"+Thread.currentThread().getName()+"開始運(yùn)行 任務(wù)"+index);
        try {
            int waitTime = 100 + index * 10;
            Thread.sleep(waitTime);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常  exception=="+e.getMessage());
        }
        System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務(wù)"+index);
    }
}

class MyThreadFactory implements ThreadFactory {
    private int sequenceNumber = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "線程"+(++sequenceNumber));
    }
}

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {

        ThreadFactory threadFactory = new MyThreadFactory();

        // 固定數(shù)量的線程池
        ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);

//        // 單個(gè)線程的線程池
//        ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
//        // 緩存線程池
//        ExecutorService service = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 6; i++) {
            service.execute(new Run(i));
        }

        service.shutdownNow();
    }
}

運(yùn)行結(jié)果:

--線程1開始運(yùn)行 任務(wù)1
--線程2開始運(yùn)行 任務(wù)2
--線程3開始運(yùn)行 任務(wù)3
線程2 發(fā)生中斷異常  exception==sleep interrupted
線程1 發(fā)生中斷異常  exception==sleep interrupted
=======線程1結(jié)束 任務(wù)1
=======線程2結(jié)束 任務(wù)2
線程3 發(fā)生中斷異常  exception==sleep interrupted
=======線程3結(jié)束 任務(wù)3

Process finished with exit code 0

使用shutdownNow方法,在任務(wù)隊(duì)列中等待的任務(wù)是不會執(zhí)行的,而且立即發(fā)起線程中斷請求。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容