ThreadPoolExecutor源碼解析

1. 簡介

在引入線程池之前,我們先來了解幾個事情:

  • 線程的創(chuàng)建和銷毀是有代價的,如線程創(chuàng)建需要時間和相關(guān)計算資源。如果在Web服務器上為每個來到的請求都創(chuàng)建一個線程,而大多數(shù)請求都是輕量級的處理過程。那么創(chuàng)建線程的代價與請求處理的代價相比就非常大了,導致影響整體性能。
  • 當線程數(shù)量達到能讓CPU忙綠起來的時候,此時再創(chuàng)建線程,線程也基本處于閑置狀態(tài),這時候多出來的線程除了占用內(nèi)存外,還可能因為與其他線程爭用CPU資源導致出現(xiàn)其他性能開銷.
  • 在可創(chuàng)建線程的數(shù)量上存在一個限制,如果超過這個限制,可能會拋出OutOfMemoryError異常。

這時候如果能出現(xiàn)一個東西能夠?qū)€程的生命周期進行管理,對現(xiàn)有的線程重復利用,并且能夠以一種簡單的方式將任務的提交與執(zhí)行相解耦。沒錯,這就是線程池(Thread Pool),在要了解Java中的線程池,首先必須了解ThreadPoolExecutor這個類。

2. ThreadPoolExecutor詳解

類繼承圖

ThreadPoolExecutor類繼承

構(gòu)造函數(shù)

/線程池配置信息,volatile修飾保證變量在多線程下的可見性
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile long keepAliveTime;
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

private static final RuntimePermission shutdownPerm =
    new RuntimePermission("modifyThread");

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
            
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
            
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

從上面的JDK中ThreadPoolExecutor類的構(gòu)造函數(shù)源碼看出該構(gòu)造函數(shù)一共有7個參數(shù),下面介紹七個參數(shù)的含義:

參數(shù) 含義
corePoolSize 基本大小,即線程池中的核心線程數(shù)
maximumPoolSize 最大大小,即線程池中允許的最大線程數(shù)
keepAliveTime 存活時間,當線程的沒執(zhí)行任務時,空閑的時間超過了這個時間就會被標記為可回收,直到線程池的大小超過基本大小,被標記的線程就會被終止
unit keepAliveTime的單位,有DAYS、HOURS、MINUTES、SECONDSMILLISECONDS、MICROSECONDS、NANOSECONDS7個單位可選
workQueue 工作隊列,一個用來保存等待被執(zhí)行的任務的阻塞隊列
threadFactory 線程工廠。線程池在創(chuàng)建線程時通過調(diào)用線程工廠的Thread newThread(Runnable r)來創(chuàng)建線程
handler 飽和策略。當阻塞隊列已滿、線程池當前的線程數(shù)已達到最大值且沒有線程處于空閑狀態(tài)時,此時對于提交過來的任務將執(zhí)行飽和策略。(如果某個任務提交到一個已關(guān)閉的Executor時,也會執(zhí)行飽和策略)

ThreadPoolExecutor類中有四個重載的構(gòu)造函數(shù),每個構(gòu)造函數(shù)都必須指定上表中的前5個參數(shù),最后兩個參數(shù)可以隨意指定,不指定的話構(gòu)造函數(shù)會使用默認的線程工廠飽和策略

線程工廠(ThreadFactory)

線程池創(chuàng)建線程都是通過的ThreadFactoryThread newThread(Runnable r)方法來創(chuàng)建的。下面是Executors類里的默認線程工廠方法的源碼。

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

從上面可以看出默認線程工廠創(chuàng)建出的是一個非守護、優(yōu)先級為Thread.NORM_PRIORITY 的線程。如果想要自己定制線程工廠滿足需求,只需實現(xiàn)ThreadFactory接口的Thread newThread(Runnable r)方法。

飽和策略(RejectedExecutionHandler)

JDK中的ThreadPoolExecutor類提供了4種不同的RejectedExecutionHandler實現(xiàn):

  • AbortPolicy
    默認的飽和策略,該策略拋出未檢查(運行時異常)的RejectedExecutionException。
  • DiscardPolicy 不執(zhí)行任何操作,直接拋棄任務
  • CallerRunsPolicy 在調(diào)用者線程中執(zhí)行該任務
  • DiscardOldestPolicy 丟棄阻塞隊列中的第一個任務, 然后重新將該任務交給線程池執(zhí)行

同樣的,可以通過實現(xiàn)RejectedExecutionHandler接口自定義飽和策略。

線程池狀態(tài)和線程數(shù)量

/代表線程池當前狀態(tài)和線程數(shù)量的原子變量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  /COUNT_BITS為29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; /CAPACITY為能表示的最大線程數(shù)。

/線程池狀態(tài)
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

/對線程池狀態(tài)和線程數(shù)量進行打包和拆包的函數(shù):
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

/判斷線程池狀態(tài)的三個函數(shù)
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/線程數(shù)量增1,成功返回true,失敗返回false
private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

/線程數(shù)量減1,成功返回true,失敗返回false
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

/線程數(shù)量減1,失敗則重試直到成功
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

AtomicInteger類型的變量ctl用高3位來表示當前線程池狀態(tài),低29位來表示當前的線程數(shù)。

Java線程池有5種不同的狀態(tài),分別為運行(RUNNING)、關(guān)閉(SHUTDOWN)、停止(STOP)、整理(TIDYING)、結(jié)束(TERMINATED)。
ThreadPoolExecutor里由5個整型常量表示,每個整型常量的都由高3位表示狀態(tài):

  • RUNNING 高3位為111,該狀態(tài)的線程池會接收新任務,并處理阻塞隊列中的任務
  • SHUTDOWN 高3位為000,該狀態(tài)的線程池不會接收新任務,但會處理阻塞隊列中的任務。調(diào)用void shutdown()方法實現(xiàn)
  • STOP 高3位為001,該狀態(tài)的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務。調(diào)用List<Runnable> shutdownNow()實現(xiàn)。
  • TIDYING 高3位為010,當線程池關(guān)閉后阻塞隊列的任務已完成或線程池停止,然后workerCount(當前線程數(shù)量)為0,線程池進入該狀態(tài)后會調(diào)用terminated()方法進入TERMINATED狀態(tài)。
  • TERMINATED 高3位為011

啟動線程池

當創(chuàng)建完一個ThreadPoolExecutor對象后,線程池里并沒有線程。一般都是調(diào)用void execute(Runnable command)執(zhí)行任務時才創(chuàng)建線程并啟動,不過可以通過調(diào)用如下方法預先創(chuàng)建核心線程并啟動(在addWorker方法里啟動):

public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

執(zhí)行過程

執(zhí)行過程

如上圖所示,當調(diào)用void execute(Runnable command)這個方法執(zhí)行任務時:

  1. 判斷當前線程池線程數(shù)量是否小于核心線程池大小,是則創(chuàng)建線程并啟動,否則到第2步
  2. 判斷任務隊列是否已滿,未滿則將任務加入阻塞隊列,已滿則到第3步
  3. 判斷當前線程池線程數(shù)量是否小于最大線程池大小,是則創(chuàng)建線程并啟動,否則執(zhí)行飽和策略
public void execute(Runnable command) {
    /任務為空,拋出空指針異常
    if (command == null)
        throw new NullPointerException();
   
    int c = ctl.get();
   
   /判斷當前線程數(shù)量是否小于核心線程數(shù)
    if (workerCountOf(c) < corePoolSize) {  
        /是則添加一個核心線程(true表示核心線程)到線程池,并且啟動線程執(zhí)行任務(addWorker方法里會啟動)
        if (addWorker(command, true))   
            return; /添加成功則返回
        c = ctl.get(); 
    }
    
    /線程池處于運行狀態(tài)則向阻塞隊列添加該任務
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        /判斷線程池是否處于運行狀態(tài),不是就移除剛才添加的任務
        if (! isRunning(recheck) && remove(command))
            /移除成功就執(zhí)行飽和策略,這樣整個方法就結(jié)束了
            reject(command);
        /否則若處于運行狀態(tài)或移除失敗,這時無論處于哪種情況任務都在阻塞隊列里,判斷當前線程數(shù)量是否為0
        else if (workerCountOf(recheck) == 0)
            若是則添加一個線程并啟動
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}   

addWorker方法

boolean addWorker(Runnable firstTask, boolean core)方法的作用就是創(chuàng)建Worker對象并啟動這個對象里的線程(Worker里一個Thread類型的字段)。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        /如果線程池不處于運行狀態(tài),理論上不應該添加一個執(zhí)行該任務的線程,但如果滿足下面三個條件的話就可以通過:
        1. 線程池狀態(tài)是關(guān)閉
        2. 要執(zhí)行的任務為空
        3. 阻塞隊列不為空
        因為線程池關(guān)閉后不允許提交任務,但關(guān)閉后會執(zhí)行完阻塞隊列的任務,所以允許添加一個firstTask為空的線程
        來幫助執(zhí)行完阻塞隊列里的任務
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            /若當前線程池的線程數(shù)量達到了線程池所允許的最大線程數(shù)或所指定要添加線程類型的線程數(shù)量則返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /到這里前面的限制條件都通過,現(xiàn)在嘗試將線程數(shù)量增一,成功則退出最外層的循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            /失敗則重新獲取線程池狀態(tài),狀態(tài)改變則從最外層循環(huán)開始執(zhí)行,不變則從內(nèi)循環(huán)開始執(zhí)行
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        /構(gòu)造一個Worker對象,每個Worker對象綁定一個線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                /若線程池處于運行狀態(tài)或處于關(guān)閉且firstTask為null
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    線程提前啟動,則拋出異常
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    /將w加到Worker的集合里
                    workers.add(w);
                    獲取Worker集合大小,若大小比largestPoolSize大小大,則更新一下
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    /添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            /若添加成功則啟動線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        /若啟動失?。╰線程為空或添加過程中拋出異常)則執(zhí)行addWorkerFailed方法
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker類

線程池維護的線程其實是一組Worker對象,Worker封裝了線程也繼承了AbstractQueuedSynchronizer類并實現(xiàn)了Runnable接口,重寫了void run()方法。至于為什么要繼承AbstractQueuedSynchronizer類,請看下面的runWorker方法講解。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    private static final long serialVersionUID = 6138294804551838833L;
    final Thread thread;
    Runnable firstTask;
    /綁定這個對象線程已執(zhí)行完成的任務數(shù)
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {
        /阻止中斷,在任務獲取前不允許中斷
        setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /線程啟動時執(zhí)行的方法
    public void run() {
        runWorker(this);
    }
    
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    /獲取鎖,不可重入
    public void lock()        { acquire(1); }
    /嘗試獲取鎖
    public boolean tryLock()  { return tryAcquire(1); }
    /釋放鎖
    public void unlock()      { release(1); }
    /判斷鎖是否被獨占
    public boolean isLocked() { return isHeldExclusively(); }

    /中斷已開始執(zhí)行的線程,這個就是為什么要設置setState(-1)的一個原因了,這個方法會被`shutdownNow()`方法調(diào)用。
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker方法

上面說到為什么Worker類要繼承AbstractQueuedSynchronizer,其實是要用鎖的狀態(tài)來區(qū)分空閑線程和非空閑線程,在執(zhí)行runWorker方法中:

  • 獲取任務時沒有加鎖(空閑狀態(tài),可中斷線程)
  • 要執(zhí)行任務時才加鎖(不允許中斷線程)

在調(diào)用void tryTerminate()void shutdown()這兩個方法時,會中斷空閑線程,所以沒有在執(zhí)行任務的線程就可能被中斷。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); /允許中斷,與Worker構(gòu)造函數(shù)的setState(-1)是一對的
    boolean completedAbruptly = true;
    try {
        /獲取到任務才進入循環(huán)
        while (task != null || (task = getTask()) != null) {
            /加鎖,表示非空閑狀態(tài)
            w.lock();
            /1. 如果線程池狀態(tài)大于等于STOP并且本線程未中斷,則應該執(zhí)行中斷方法
             2. 或者執(zhí)行Thread.interrupted()方法判斷本線程是否中斷并且清除中斷狀態(tài),
                如果發(fā)現(xiàn)線程池狀態(tài)大于等于STOP則執(zhí)行中斷方法。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
                
            try {
                /ThreadPoolExecutor中的beforeExecute(wt, task)方法一個空方法,用來留給繼承ThreadPoolExecutor的類
                 來重寫該方法并在任務執(zhí)行前執(zhí)行
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    /執(zhí)行獲取到的任務
                    task.run();
![](https://user-gold-cdn.xitu.io/2017/12/30/160a72c7c9f35844?w=977&h=318&f=png&s=14007)
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    /ThreadPoolExecutor中的afterExecute(task,thrown)方法也是一個空方法,用來留給繼承
                    ThreadPoolExecutor的類來重寫該方法并在任務執(zhí)行后執(zhí)行
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                /該線程執(zhí)行的任務加1,即使拋出異常
                w.completedTasks++;
                /釋放鎖,表示回到空閑狀態(tài)
                w.unlock();
            }
        }
        /執(zhí)行到這一步表示是由于獲取不到任務而正常退出的,所以completedAbruptly為false
        completedAbruptly = false;
    } finally {
        /無論怎樣退出都要執(zhí)行
        processWorkerExit(w, completedAbruptly);
    }
}

getTask方法

private Runnable getTask() {
    /表示獲取任務是否已超時
    boolean timedOut = false; 
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /1. 若線程池狀態(tài)大于等于停止狀態(tài),此時線程池不再處理隊列的任務,并且會回收所有線程(不管空不空閑),
            所以此時應該把線程池線程數(shù)量減1,并且獲取的任務為空
         /2. 處于關(guān)閉狀態(tài)且任務隊列為空,表示任務隊列為空且不會有任務提交,所以線程數(shù)減1,并且獲取的任務為空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        /是否啟用超時機制。當允許核心線程超時或當前線程數(shù)超過核心線程則啟用
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        /如果線程數(shù)量超過線程池所允許的最大線程數(shù)或者啟用超時機制情況下獲取任務超時,理論上應該回收線程。
         但是如果該線程是線程池中的最后一個線程且任務隊列不為空就可以不回收,繼續(xù)運行,要是還有其他線程或者任務隊列為空則回收該線程。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            /嘗試將線程數(shù)量減1,成功返回null,失敗繼續(xù)從循環(huán)開始處開始。這里為什么不是用decrementWorkerCount()
            這種不會失敗的方法減1而采用這種方式。是因為 wc > 1,如果線程池不只有一個線程它們互相發(fā)現(xiàn)不只一個線程,
            且它們同時執(zhí)行不會失敗的將線程數(shù)量減一的方法,到時線程池線程數(shù)量可能就為0了,哪么隊列中的任務就沒線程執(zhí)行了。
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /1. 如果啟用超時機制就執(zhí)行poll()方法,在keepAliveTime納秒內(nèi)還沒獲取就返回null。
             2. 如果未啟用超時機制就執(zhí)行take()方法,隊列沒任務就一直阻塞直到有任務。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            /到這里就是因為超時獲取不到任務
            timedOut = true;
        } catch (InterruptedException retry) {
            /在執(zhí)行take()過程中被中斷并不算超時
            timedOut = false;
        }
    }
}

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /由于不是獲取不到任務而正常退出的,得在這里將線程數(shù)減1,正常退出的在getTask()方法有這個減1操作
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    /加鎖,因為HashSet和completedTaskCount不是線程安全的
    mainLock.lock();
    try {
        /將線程執(zhí)行的任務數(shù)統(tǒng)一加到線程池維護的completedTaskCount字段
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    /嘗試將線程池設置為結(jié)束狀態(tài)
    tryTerminate();

    int c = ctl.get();
     /滿足當前線程池狀態(tài)小于STOP(運行或關(guān)閉狀態(tài))才繼續(xù)
    if (runStateLessThan(c, STOP)) {
        若線程是異常退出runWorker方法就直接添加一個沒有帶初始任務的非核心線程
        if (!completedAbruptly) {
            /這三行代碼找出當前線程池所至少存在的線程數(shù)
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            /如果當前線程數(shù)已經(jīng)大于等于min,就直接返回,否則添加一個沒有帶初始任務的非核心線程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

下圖是向線程池提交任務后,線程池的正常執(zhí)行過程:


線程池正常運行過程

tryTerminate方法

線程池狀態(tài)轉(zhuǎn)換

terminate(結(jié)束)是線程池的最后一個狀態(tài),只能由關(guān)閉或停止狀態(tài)轉(zhuǎn)變?yōu)榻Y(jié)束狀態(tài)。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
         /如果滿足下面任意一個條件就沒辦法到達結(jié)束狀態(tài)
         1. 線程池處于運行狀態(tài)
         2. 線程池狀態(tài)是TIDYING或已經(jīng)是結(jié)束狀態(tài)
         3. 線程池處于關(guān)閉狀態(tài)且任務隊列不為空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
         /當前線程數(shù)量不為0也無法到達結(jié)束狀態(tài)
        if (workerCountOf(c) != 0) { 
            /中斷一個空閑線程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
             /嘗試將線程池狀態(tài)設置為TIDYING,失敗重循環(huán)開始處開始
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    /terminated()是一個空方法,留給繼承ThreadPoolExecutor的類覆蓋
                    terminated();
                } finally {
                    /嘗試將線程池狀態(tài)設置為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        
    }
}

關(guān)閉操作

我們可以通過調(diào)用void shutdown()方法關(guān)閉線程池,關(guān)閉后線程池后不允許接受新任務

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        / 安全策略判斷
        checkShutdownAccess();
        /設置線程池狀態(tài)為SHUTDOWN狀態(tài)
        advanceRunState(SHUTDOWN);
        /中斷所有空閑線程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    /嘗試結(jié)束線程池
    tryTerminate();
}

停止操作

我們可以在運行和關(guān)閉狀態(tài)下通過調(diào)用void shutdownNow()方法停止線程池,停止后線程池后不允許接受新任務,也不會執(zhí)行阻塞隊列里的任務,還會中斷當前所有的線程。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        / 安全策略判斷  
        checkShutdownAccess();
        /設置線程池狀態(tài)為STOP狀態(tài)
        advanceRunState(STOP);
        /中斷所有線程,不管是空閑還是非空閑
        interruptWorkers();
        /取出阻塞隊列的所有任務
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    /嘗試結(jié)束線程池
    tryTerminate();
    return tasks;
}

3. 線程池的配置

Executors提供了四種靜態(tài)工廠方法來創(chuàng)建四種不同配置的線程池:

  • newFixedThreadPool(int nThreads)

    接受一個int類型的nThreads變量,創(chuàng)建一個核心線程數(shù)最大線程數(shù)都為nThreads的線程池(即最大線程數(shù)為nThreads),且使用一個無界的阻塞隊列LinkedBlockingQueue。如果不設置核心線程超時的話,創(chuàng)建的線程是不會超時的。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor()

    創(chuàng)建一個核心線程數(shù)最大線程數(shù)都為1的線程池(即最大線程數(shù)為1),且使用一個無界的阻塞隊列LinkedBlockingQueue,不設置核心線程超時的話,創(chuàng)建的線程也是不會超時的。唯一線程可以保證任務的順序執(zhí)行,如果這個唯一的線程執(zhí)行過程中因為異常而結(jié)束的話,在processWorkerExit方法最后會判斷是否因異常而結(jié)束而創(chuàng)建一個新線程繼續(xù)運行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • newCachedThreadPool()

    創(chuàng)建一個核心線程數(shù)為0,最大線程數(shù)Integer.MAX_VALUE的線程池,超時時間為60秒,所以線程空閑時間超過60秒就會被回收。使用了一個同步隊列作為阻塞隊列,同步隊列不存儲元素,且在一端進行插入,另一端要有移除操作插入才會成功,否則插入操作會阻塞等待。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • newScheduledThreadPool()

    創(chuàng)建一個核心線程數(shù)為corePoolSize的線程池,用于指定的時間內(nèi)周期性的執(zhí)行所的任務。ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容