(十)深入理解Java并發(fā)編程之線程池、工作原理、復(fù)用原理及源碼分析

一、引言

  • 一般在開發(fā)過程中,一個(gè)功能是運(yùn)行時(shí)長太久了,一般是通過什么方式去優(yōu)化的?
    異步/多線程,對(duì)于一個(gè)業(yè)務(wù)方法而言,如果其中的調(diào)用鏈太長勢(shì)必會(huì)引起程序運(yùn)行時(shí)間延長,導(dǎo)致整個(gè)系統(tǒng)吞吐來量下降,而我們使用多線程方式來對(duì)該方法的調(diào)用鏈進(jìn)行優(yōu)化,對(duì)于一些耦合度不是特別高的調(diào)用關(guān)系可以直接通過多線程來走異步的方式進(jìn)行處理,大大的縮短了程序的運(yùn)行時(shí)長,但是如果我們的多線程創(chuàng)建方式是通過 new Thread();這種方式去進(jìn)行顯式創(chuàng)建的話它真的可以嗎?答案是不可以,Why?答案如下:

  • 如果在生產(chǎn)環(huán)境使用new Thread();這種方式去進(jìn)行顯式創(chuàng)建線程會(huì)帶來什么后果?

    • 1. OOM: 如果當(dāng)前方法突遇高并發(fā)情況,假設(shè)此時(shí)來了1000個(gè)請(qǐng)求,而按傳統(tǒng)的網(wǎng)絡(luò)模型是BIO,此時(shí)服務(wù)器會(huì)開1000個(gè)線程來處理這1000個(gè)請(qǐng)求(不考慮WEB容器的最大線程數(shù)配置),當(dāng)1000個(gè)請(qǐng)求執(zhí)行時(shí)又會(huì)發(fā)現(xiàn)此方法中存在new Thread();創(chuàng)建線程,此時(shí)每個(gè)執(zhí)行請(qǐng)求的線程又會(huì)創(chuàng)建一個(gè)線程,此時(shí)就會(huì)出現(xiàn)1000*2=2000個(gè)線程的情況出現(xiàn),而在一個(gè)程序中創(chuàng)建線程是需要向JVM申請(qǐng)內(nèi)存分配的,但是此時(shí)大量線程在同一瞬間向JVM申請(qǐng)分配內(nèi)存,此時(shí)會(huì)很容易造成內(nèi)存溢出(OOM)的情況發(fā)生。
    • 2. 資源開銷與耗時(shí): Java對(duì)象的生命周期大致包括三個(gè)階段:對(duì)象的創(chuàng)建,對(duì)象的使用,對(duì)象的清除。因此,對(duì)象的生命周期長度可用如下的表達(dá)式表示:Object = O1 + O2 +O3。其中O1表示對(duì)象的創(chuàng)建時(shí)間,O2表示對(duì)象的使用時(shí)間,而O3則表示其清除(垃圾回收)時(shí)間。由此,我們可以看出,只有O2是真正有效的時(shí)間,而O1、O3則是對(duì)象本身的開銷。當(dāng)我們?nèi)?chuàng)建一個(gè)線程時(shí)也是一樣,因?yàn)榫€程在Java中其實(shí)也是一個(gè)Thread類的實(shí)例,所以對(duì)于線程而言,其實(shí)它的創(chuàng)建(申請(qǐng)內(nèi)存分配、JVM向OS提交線程映射進(jìn)程申請(qǐng)、OS真實(shí)線程映射)和銷毀對(duì)資源是開銷非常大的并且非常耗時(shí)的。
    • 3. 不可管理性: 對(duì)于new Thread();的顯示創(chuàng)建出來的線程是無法管理的,一旦CPU調(diào)度成功,此線程的可管理性幾乎為零。
  • 那么我們使用線程池能給我們帶來什么好處?

      1. 降低資源消耗:通過重用已經(jīng)創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的消耗。
      1. 提高響應(yīng)速度:任務(wù)到達(dá)時(shí)不需要等待線程創(chuàng)建就可以立即執(zhí)行。
      1. 提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調(diào)優(yōu)和監(jiān)控。

而在Java中為我們提供四種原生線程池,它們都是基于ThreadPoolExecutor類實(shí)現(xiàn)的,所以ThreadPoolExecutor類這也是我們待會(huì)兒分析線程池原理時(shí)的重點(diǎn)~

二、JDK提供的原生線程池

在Java中,JDK通過Executors類為我們提供了四種封裝好的線程池類型(ForkJoinPool不在本章探討范圍之內(nèi)),源碼如下:

//創(chuàng)建一個(gè)定長的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
//創(chuàng)建一個(gè)單線程的線程池
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }
//創(chuàng)建一個(gè)可緩存支持靈活回收的線程池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
//創(chuàng)建一個(gè)支持周期執(zhí)行任務(wù)的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

在上面的源碼中,其實(shí)我們通過觀察發(fā)現(xiàn)JDK為我們提供的四種線程池內(nèi)部都是通過封裝ThreadPoolExecutor類的構(gòu)造函數(shù)來進(jìn)行線程池的初始化的,所以我們先來理清楚線程池“家族”體系。

image.png

從上圖中我們可以得知,線程池的最上層接口是Executor,而這個(gè)接口定義了一個(gè)核心方法execute(Runnable command),當(dāng)我們使用它時(shí),需要傳遞一個(gè)Runnable類型的異步任務(wù)作為參數(shù)。我們看一下Executor接口的定義:

public interface Executor {
    // 提交任務(wù)到線程池并執(zhí)行的方法
    void execute(Runnable command);
}

而Executor接口是一個(gè)函數(shù)式接口,其中只定義了一個(gè)方法,但是我們?cè)谑褂镁€程池的時(shí)候?yàn)槭裁茨軌蛘{(diào)用的方法卻會(huì)有那么多呢?因?yàn)檫€有一個(gè)ExecutorService接口,它繼承了Executor接口作為Executor接口的子接口,為Executor接口提供了很多拓展方法。我們接著看ExecutorService接口的實(shí)現(xiàn):   
```java
public interface ExecutorService extends Executor {
    // 等待線程池執(zhí)行完成已接收的任何后關(guān)閉線程池,將線程池置為SHUNTDOWM狀態(tài)
    void shutdown();
    // 嘗試主動(dòng)終止線程池中的所有正在執(zhí)行的任務(wù)并返回未執(zhí)行的任務(wù)列表,
    // 將線程池置為STOP狀態(tài)
    List<Runnable> shutdownNow();
    // 判斷線程池是否已關(guān)閉:線程池調(diào)用過shutdown或者shutdownNow后返回true
    boolean isShutdown();
    // 判斷線程池中的子線程是否已全部終止
    // 當(dāng)調(diào)用shutdown后全部任務(wù)執(zhí)行完成返回true或調(diào)用shutdownNow成功后返回true
    boolean isTerminated();
    // 配合shutdown使用,在調(diào)用shutdown后調(diào)用該方法,讓線程池在指定時(shí)間內(nèi)關(guān)閉,
    // 不管任務(wù)是否執(zhí)行完成,在指定時(shí)間內(nèi)還在執(zhí)行任務(wù)則拋出異常中斷線程
    // 注意:有時(shí)能夠關(guān)閉線程池單并不能完全保證線程池中子線程停止執(zhí)行
    // 比如子線程中用到 BufferedReader,那么需要配合shutdownNow主動(dòng)中斷所有子線程
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    // 向線程池提交一個(gè)Callable類型的異步任務(wù),當(dāng)線程池執(zhí)行后返回執(zhí)行結(jié)果
    <T> Future<T> submit(Callable<T> task);
    // 向線程池提交一個(gè)Runnable類型的異步任務(wù),線程池執(zhí)行完成后將返回指定類型的執(zhí)行結(jié)果
    <T> Future<T> submit(Runnable task, T result);
    // 向線程池提交一個(gè)Runnable類型的異步任務(wù),線程池執(zhí)行完成后執(zhí)行的結(jié)果
    Future<?> submit(Runnable task);
    // 傳入一個(gè)Collection類型的異步任務(wù)集合,批量執(zhí)行并返回執(zhí)行結(jié)果
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    // 傳入一個(gè)Collection類型的異步任務(wù)集合,在指定的時(shí)間內(nèi)批量執(zhí)行并返回執(zhí)行
    // 結(jié)果,如果超時(shí)則拋出異常中斷線程
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    // 傳入一個(gè)Collection類型的異步任務(wù)集合,返回第一個(gè)執(zhí)行完成的結(jié)果并終止其他線程
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    // 傳入一個(gè)Collection類型的異步任務(wù)集合,在指定的時(shí)間內(nèi)返回第一個(gè)執(zhí)行完成的結(jié)果
    // 并終止其他線程,如果超時(shí)則拋出異常中斷線程
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

通過上面的代碼我們會(huì)發(fā)現(xiàn)ExecutorService的確繼承了Executor接口,作為Executor拓展接口提供了很多其他的方法以便于開發(fā)人員使用線程池,而Executor和ExecutorService接口中的方法實(shí)現(xiàn)全部都是由ThreadPoolExecutor類來完成的,而ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實(shí)現(xiàn):

public abstract class AbstractExecutorService implements ExecutorService {
    // 將異步任務(wù)包裝為Future,傳遞Runnable類型異步任務(wù),聲明返回類型,返回一個(gè)RunnableFuture
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    // 將異步任務(wù)包裝為Future,傳遞Callable類型異步任務(wù),返回一個(gè)RunnableFuture
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    // 在指定的時(shí)間內(nèi)執(zhí)行傳入的異步任務(wù)集合,返回最后一個(gè)任務(wù)執(zhí)行
    //執(zhí)行集合tasks結(jié)果是最后一個(gè)執(zhí)行結(jié)束的任務(wù)結(jié)果
    //可以設(shè)置超時(shí) timed為true并且nanos是未來的一個(gè)時(shí)間
    //任何一個(gè)任務(wù)完成都將會(huì)返回結(jié)果
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
        //傳入的任務(wù)集合不能為null
        if (tasks == null)
            throw new NullPointerException();
        //傳入的任務(wù)數(shù)不能是0
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        //滿足上面的校驗(yàn)后將任務(wù)分裝到一個(gè)ArrayList中
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        //并且創(chuàng)建一個(gè)執(zhí)行器傳入this
        //這里簡單講述他的執(zhí)行原理,傳入this會(huì)使用傳入的this(類型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù),當(dāng)submit提交任務(wù)的時(shí)候回將任務(wù)
        //封裝為一個(gè)內(nèi)部的Future并且重寫他的done而此方法就是在future完成的時(shí)候調(diào)用的,而他的寫法則是將當(dāng)前完成的future添加到esc
        //維護(hù)的結(jié)果隊(duì)列中
        ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);

        try {
            //創(chuàng)建一個(gè)執(zhí)行異常,以便后面拋出
            ExecutionException ee = null;
            //如果開啟了超時(shí)則計(jì)算死線時(shí)間如果時(shí)間是0則代表沒有開啟執(zhí)行超時(shí)
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //獲取任務(wù)的迭代器
            Iterator<? extends Callable<T>> it = tasks.iterator();
            //先獲取迭代器中的第一個(gè)任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器
            futures.add(ecs.submit(it.next()));
            //前面記錄的任務(wù)數(shù)減一
            --ntasks;
            //當(dāng)前激活數(shù)為1
            int active = 1;
            //進(jìn)入死循環(huán)
            for (;;) {
                //獲取剛才提價(jià)的任務(wù)是否完成如果完成則f不是null否則為null
                Future<T> f = ecs.poll();
                //如果為null則代表任務(wù)還在繼續(xù)
                if (f == null) {
                    //如果當(dāng)前任務(wù)大于0 說明除了剛才的任務(wù)還有別的任務(wù)存在
                    if (ntasks > 0) {
                        //則任務(wù)數(shù)減一
                        --ntasks;
                        //并且再次提交新的任務(wù)
                        futures.add(ecs.submit(it.next()));
                        //當(dāng)前的存活的執(zhí)行任務(wù)加一
                        ++active;
                    }
                    //如果當(dāng)前存活任務(wù)數(shù)是0則代表沒有任務(wù)在執(zhí)行了從而跳出循環(huán)
                    else if (active == 0)
                        break;
                        //如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時(shí)時(shí)間
                    else if (timed) {
                        //則設(shè)置指定的超時(shí)時(shí)間獲取
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        //等待執(zhí)行超時(shí)還沒有獲取到則拋出超時(shí)異常
                        if (f == null)
                            throw new TimeoutException();
                        //否則使用當(dāng)前時(shí)間計(jì)算剩下的超時(shí)時(shí)間用于下一個(gè)循環(huán)使用
                        nanos = deadline - System.nanoTime();
                    }
                    //如果沒有設(shè)置超時(shí)則直接獲取任務(wù)
                    else
                        f = ecs.take();
                }
                //如果獲取到了任務(wù)結(jié)果f!=null
                if (f != null) {
                    //激活數(shù)減一
                    --active;
                    try {
                        //返回獲取到的結(jié)果
                        return f.get();
                        //如果獲取結(jié)果出錯(cuò)則包裝異常
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            //如果異常不是null則拋出如果是則創(chuàng)建一個(gè)
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //其他任務(wù)則設(shè)置取消
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, 
    TimeUnit unit) throws InterruptedException {
    };
}

(Executor接口有一個(gè)子接口ExecutorService,而AbstracExecutorService類又實(shí)現(xiàn)了ExecutorService接口,而ThreadPoolExcutor正是AbstrcExecutorService的子類)

到這里,大家應(yīng)該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個(gè)之間的關(guān)系了。

Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。

在ThreadPoolExecutor類中有幾個(gè)非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。

submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒有對(duì)其進(jìn)行重寫,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在以后章節(jié)講述)。

shutdown()和shutdownNow()是用來關(guān)閉線程池的。

還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。

而Executor接口最終被ThreadPoolExecutor類實(shí)現(xiàn)。而且ThreadPoolExecutor是線程池體系的核心類,此類的構(gòu)造方法如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

ThreadPoolExecutor類總共為我們提供了四個(gè)構(gòu)造方法,前面三個(gè)構(gòu)造方法都是調(diào)用最后一個(gè)全參的構(gòu)造函數(shù)來完成工作的,最后一個(gè)全參的構(gòu)造方法需要我們傳遞7個(gè)參數(shù),這七個(gè)參數(shù)的具體含義如下:

  • 構(gòu)造函數(shù)參數(shù)列表:
    • corePoolSize: 核心線程池的大小,如果核心線程池有空閑位置,這時(shí)新的任務(wù)就會(huì)被核心線程池新建一個(gè)線程執(zhí)行,執(zhí)行完畢后不會(huì)銷毀線程,線程會(huì)進(jìn)入緩存隊(duì)列等待再次被運(yùn)行。
    • maximunPoolSize: 線程池能創(chuàng)建最大的線程數(shù)量。如果核心線程池和緩存隊(duì)列都已經(jīng)滿了,新的任務(wù)進(jìn)來就會(huì)創(chuàng)建新的線程來執(zhí)行。但是數(shù)量不能超過maximunPoolSize,否側(cè)會(huì)采取拒絕接受任務(wù)策略,我們下面會(huì)具體分析。
    • keepAliveTime: 非核心線程能夠空閑的最長時(shí)間,超過時(shí)間,線程終止。這個(gè)參數(shù)默認(rèn)只有在線程數(shù)量超過核心線程池大小時(shí)才會(huì)起作用。只要線程數(shù)量不超過核心線程大小,就不會(huì)起作用(當(dāng)然如果設(shè)置了allowCoreThreadTimeOut(true)線程池中的核心線程也受該參數(shù)的影響)。
    • unit: 時(shí)間單位,和keepAliveTime配合使用,可選擇項(xiàng)如下:
      • TimeUnit.DAYS:天
      • TimeUnit.HOURS:小時(shí)
      • TimeUnit.MINUTES:分鐘
      • TimeUnit.SECONDS:秒
      • TimeUnit.MILLISECONDS:毫秒
      • TimeUnit.MICROSECONDS:微妙
      • TimeUnit.NANOSECONDS:納秒
    • workQueue: 任務(wù)隊(duì)列,用來存放等待被執(zhí)行的任務(wù),一般為阻塞隊(duì)列(BlockingQueue)三種常用為:(可自定義阻塞隊(duì)列)。
      • ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時(shí)必須指定大??;
      • LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
      • SynchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來執(zhí)行新來的任務(wù)。
    • threadFactory: 線程工廠,用來創(chuàng)建線程,一般有三種選擇策略(可自定義)。
    • handler: 任務(wù)拒絕策略,線程數(shù)量大于最大線程數(shù)就會(huì)采用拒絕處理策略。ThreadPoolExecutor中為我們提供了四種默認(rèn)策略可選擇(可自定義):
      • ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
      • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
      • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
      • ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

而當(dāng)我們需要使用線程池時(shí),我們可以通過調(diào)用Executors中為我們封裝好的方法創(chuàng)建線程池,也可以通過自己對(duì)于ThreadPoolExecutor的構(gòu)造方法進(jìn)行封裝自定義線程池(后面會(huì)詳細(xì)談到),示例如下:

public class ThreadPoolDemo {

    public static void main(String[] args) {

        /*
         *  創(chuàng)建可緩存的線程池
         *  優(yōu)點(diǎn):當(dāng)線程池中線程執(zhí)行完任務(wù)后會(huì)將線程緩存起來,默認(rèn)60s后空閑線程會(huì)自動(dòng)回收
         *  缺點(diǎn):任然存在由于并發(fā)過高導(dǎo)致瞬間創(chuàng)建大量線程產(chǎn)生的OOM
         */
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(() -> {
            System.out.println("我是遞交到cachedThreadPool的異步任務(wù)....竹子....");
        });

        /*
         *  創(chuàng)建定長的線程池
         *  優(yōu)點(diǎn):可以避免由于并發(fā)過高導(dǎo)致瞬間創(chuàng)建大量線程產(chǎn)生的OOM
         *  缺點(diǎn):
         *      1. 線程創(chuàng)建后永不釋放線程資源
         *      2. 任務(wù)隊(duì)列最大長度為Integer.MAX_VALUE,并發(fā)時(shí)會(huì)創(chuàng)建大量的任務(wù)導(dǎo)致OOM
         */
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        Future<?> futureResult = fixedThreadPool.submit(() -> {
            System.out.println("我是遞交到fixedThreadPool的異步任務(wù)....竹子...");
            return "竹子";
        });
        try {
            // 得到執(zhí)行后返回結(jié)果
            String str = (String) futureResult.get();
            System.out.println("我是遞交到fixedThreadPool的異步任務(wù)執(zhí)行完成后的返回結(jié)果:" + str);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        /*
         *  創(chuàng)建定長可支持周期調(diào)度的線程池
         *  優(yōu)點(diǎn):可以支持按時(shí)調(diào)度執(zhí)行任務(wù)
         *  缺點(diǎn):
         *      1. 線程創(chuàng)建后永不釋放線程資源
         *      2. 任務(wù)隊(duì)列最大長度為Integer.MAX_VALUE,并發(fā)時(shí)會(huì)創(chuàng)建大量的任務(wù)導(dǎo)致OOM
         */
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(()->{
            System.out.println("我是遞交到scheduledThreadPool十秒鐘之后執(zhí)行的異步任務(wù)....熊貓...");
        },10,TimeUnit.SECONDS);

        /*
         *  創(chuàng)建單線程的線程池
         *  優(yōu)點(diǎn):可以支持線程池任務(wù)的執(zhí)行按照遞交的順序先進(jìn)先出(FIFO)
         *  缺點(diǎn):單線程效率比不上前面的三種線程池(前面的線程池都存在多線程并行執(zhí)行任務(wù))
         */
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        List<Callable<String>> callables = new ArrayList<>();
        callables.add(()->{
            System.out.println("我是遞交到singleThreadExecutor的異步任務(wù)...熊貓1號(hào)...");
            return "熊貓一號(hào)";
        });
        callables.add(()->{
            System.out.println("我是遞交到singleThreadExecutor的異步任務(wù)...熊貓2號(hào)...");
            return "熊貓二號(hào)";
        });
        callables.add(()->{
            System.out.println("我是遞交到singleThreadExecutor的異步任務(wù)...熊貓3號(hào)...");
            return "熊貓三號(hào)";
        });
        try {
            // 接收批量執(zhí)行后的結(jié)果
            List<Future<String>> futures = singleThreadExecutor.invokeAll(callables);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        cachedThreadPool.shutdown();
        fixedThreadPool.shutdown();
        scheduledThreadPool.shutdown();
        singleThreadExecutor.shutdown();

        /* 執(zhí)行結(jié)果:
         *  我是遞交到cachedThreadPool的異步任務(wù)....竹子....
         *
         *  我是遞交到fixedThreadPool的異步任務(wù)....竹子...
         *  我是遞交到fixedThreadPool的異步任務(wù)執(zhí)行完成后的返回結(jié)果:竹子
         *
         *  我是遞交到singleThreadExecutor的異步任務(wù)...熊貓1號(hào)...
         *  我是遞交到singleThreadExecutor的異步任務(wù)...熊貓2號(hào)...
         *  我是遞交到singleThreadExecutor的異步任務(wù)...熊貓3號(hào)...
         *
         *  我是遞交到scheduledThreadPool十秒鐘之后執(zhí)行的異步任務(wù)....熊貓...
         */
    }
}

在上面的案例中我們使用到了execute()、schedule()、submit()、invokeAll()等方法向線程池中遞交任務(wù),但是當(dāng)我們跟進(jìn)源碼分析會(huì)發(fā)現(xiàn),線程池遞交任務(wù)的核心就是Executor接口定義的核心方法execute(Runnabel command),所以我們?nèi)绻治鼍€程池原理的重點(diǎn)就在此方法。

三、深入源碼剖析線程池工作原理

在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實(shí)現(xiàn)原理,將從下面幾個(gè)方面講解:

  • 1. 線程池狀態(tài)控制參數(shù)ctl
    要了解線程池,我們首先要了解的線程池里面的狀態(tài)控制的參數(shù) ctl,這個(gè)線程池的狀態(tài)控制參數(shù)是一個(gè)原子操作的 AtomicInteger,這個(gè)ctl包含兩個(gè)參數(shù) :

    • runState:當(dāng)前線程池的狀態(tài)
    • workerCount:激活(工作)的線程數(shù)
  • 它的低29位用于存放當(dāng)前的線程數(shù), 因此一個(gè)線程池在理論上最大的線程數(shù)是 536870911; 高 3 位是用于表示當(dāng)前線程池的狀態(tài), 其中高三位的值和狀態(tài)對(duì)應(yīng)如下:

    • 111: RUNNING:線程池初始化(創(chuàng)建出來之后)處于此狀態(tài),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行處理。
    • 000: SHUTDOWN:當(dāng)調(diào)用shutdown()方法時(shí)改為此狀態(tài),在此狀態(tài)時(shí),不接收新任務(wù),但能處理已添加的任務(wù)。
    • 001: STOP:調(diào)用shutdownNow()方法時(shí)處于此狀態(tài),在此狀態(tài)時(shí),不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)嘗試中斷正在處理的任務(wù)。
    • 010: TIDYING:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也為空時(shí),就會(huì)由 SHUTDOWN -> TIDYING。|| 當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實(shí)現(xiàn)。
    • 110: TERMINATED:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING -> TERMINATED。線程池徹底終止,就變成TERMINATED狀態(tài)。
  • 為了能夠使用 ctl 線程池提供了三個(gè)方法:

    // 獲取線程池的狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }    
    // 獲取線程池的工作線程數(shù)
    private static int workerCountOf(int c)  { return c & CAPACITY; }    
    // 根據(jù)工作線程數(shù)和線程池狀態(tài)獲取 ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 2. 任務(wù)的執(zhí)行
    如果想使用線程池就必須通過 execute 這個(gè)方法來向線程池提交任務(wù),而這個(gè)方法也是線程池的核心,所以我們來看代碼:

execute:

public void execute(Runnable command) {        
    //如果傳遞的任務(wù)為空則拋出空指針異常
    if (command == null)           
        throw new NullPointerException();        
    int c = ctl.get();        
    //如果工作線程數(shù)小于核心線程數(shù),
    if (workerCountOf(c) < corePoolSize) {            
        //執(zhí)行addWork,提交為核心線程,提交成功return。提交失敗重新獲取ctl
        if (addWorker(command, true))                
        return;
        c = ctl.get();
    }        
    //如果工作線程數(shù)大于核心線程數(shù),則檢查線程池狀態(tài)是否是正在運(yùn)行,且將新線程向阻塞隊(duì)列提交。
    if (isRunning(c) && workQueue.offer(command)) {            
        //recheck 需要再次檢查,主要目的是判斷加入到阻塞隊(duì)里中的線程是否可以被執(zhí)行
        int recheck = ctl.get();               
        //如果線程池狀態(tài)不為running,將任務(wù)從阻塞隊(duì)列里面移除,啟用拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);            
        // 如果線程池的工作線程為零,則調(diào)用addWoker提交任務(wù)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }        
    //添加非核心線程失敗,拒絕
    else if (!addWorker(command, false))            
        reject(command);
}
image.png

addWoker:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:        
    for (;;) {            
        int c = ctl.get();            
        //獲取線程池狀態(tài)
        int rs = runStateOf(c);            
        // Check if queue empty only if necessary.
        // 判斷是否可以添加任務(wù)。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))                
            return false;            
        for (;;) {               
             //獲取工作線程數(shù)量
            int wc = workerCountOf(c);                
            //是否大于線程池上限,是否大于核心線程數(shù),或者最大線程數(shù)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))                    
                return false;                
            //CAS 增加工作線程數(shù)
            if (compareAndIncrementWorkerCount(c))                    
                break retry;
            c = ctl.get();  // Re-read ctl
            //如果線程池狀態(tài)改變,回到開始重新來
            if (runStateOf(c) != rs)                    
                continue retry;                
           // else CAS failed due to workerCount change; retry inner loop
        }
    }        
                
    boolean workerStarted = false;        
    boolean workerAdded = false;
    Worker w = null;        
    //上面的邏輯是考慮是否能夠添加線程,如果可以就cas的增加工作線程數(shù)量
    //下面正式啟動(dòng)線程
    try {            
        //新建worker
        w = new Worker(firstTask);            
        //獲取當(dāng)前線程
        final Thread t = w.thread;            
        if (t != null) {                
            //獲取可重入鎖
            final ReentrantLock mainLock = this.mainLock;                
            //鎖住
            mainLock.lock();                
            try {                    
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); 
                // rs < SHUTDOWN ==> 線程處于RUNNING狀態(tài)
                // 或者線程處于SHUTDOWN狀態(tài),且firstTask == null(可能是workQueue中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的worker線程執(zhí)行)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {                        
                    // 當(dāng)前線程已經(jīng)啟動(dòng),拋出異常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();                        
                    //workers 是一個(gè) HashSet 必須在 lock的情況下操作。
                    workers.add(w);                        
                    int s = workers.size();                        
                    //設(shè)置 largeestPoolSize 標(biāo)記workAdded
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }                
            //如果添加成功,啟動(dòng)線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {            
        //啟動(dòng)線程失敗,回滾。
        if (! workerStarted)
            addWorkerFailed(w);
    }        
    return workerStarted;
}
image.png

先看看 addWork()的兩個(gè)參數(shù),第一個(gè)是需要提交的線程Runnable firstTask,第二個(gè)參數(shù)是 boolean 類型,表示是否為核心線程。
execute() 中有三處調(diào)用了 addWork()我們逐一分析。

  • 第一次,條件if (workerCountOf(c) < corePoolSize)這個(gè)很好理解,工作線程數(shù)少于核心線程數(shù),提交任務(wù)。所以addWorker(command, true)。
  • 第二次,如果 workerCountOf(recheck) == 0如果worker的數(shù)量為0,那就 addWorker(null,false)。為什么這里是 null ?之前已經(jīng)把command提交到阻塞隊(duì)列了workQueue.offer(command)。所以提交一個(gè)空線程,直接從阻塞隊(duì)列里面取就可以了。
  • 第三次,如果線程池沒有RUNNING或者offer阻塞隊(duì)列失敗,addWorker(command,false),很好理解,對(duì)應(yīng)的就是,阻塞隊(duì)列滿了,將任務(wù)提交到,非核心線程池。與最大線程池比較。
    至此,重新歸納execute()的邏輯應(yīng)該是:
    如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)。
    如果運(yùn)行的線程等于或多于corePoolSize,將任務(wù)加入 BlockingQueue。
    如果加入 BlockingQueue成功,需要二次檢查線程池的狀態(tài)如果線程池沒有處于 Running,則從 BlockingQueue 移除任務(wù),啟動(dòng)拒絕策略。
    如果線程池處于 Running狀態(tài),則檢查工作線程(worker)是否為0。如果為0,則創(chuàng)建新的線程來處理任務(wù)。如果啟動(dòng)線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕策略拒絕。
    如果加入 BlockingQueue。失敗,則創(chuàng)建新的線程來處理任務(wù)。
    如果啟動(dòng)線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕策略拒絕。
    image.png

3. 線程池中的線程初始化
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。
在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個(gè)方法辦到:

prestartCoreThread():初始化一個(gè)核心線程;
prestartAllCoreThreads():初始化所有核心線程;

下面是這2個(gè)方法的實(shí)現(xiàn):

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進(jìn)去的參數(shù)是null
}
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null
        ++n;
    return n;
}

注意上面?zhèn)鬟M(jìn)去的參數(shù)是null,根據(jù)第2小節(jié)的分析可知如果傳進(jìn)去的參數(shù)為null,則最后執(zhí)行線程會(huì)阻塞在getTask方法中的r = workQueue.take();即等待任務(wù)隊(duì)列中有任務(wù)。

4. 任務(wù)緩存隊(duì)列及排隊(duì)策略
見線程池參數(shù),在選擇線程池任務(wù)隊(duì)列時(shí)的阻塞時(shí)隊(duì)列就決定了這個(gè)線程池的任務(wù)緩存及排隊(duì)策略。

5. 任務(wù)拒絕策略
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會(huì)采取任務(wù)拒絕策略,具體拒絕策略參考線程池參數(shù)列表。

6. 線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:

shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù);
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù);

7. 線程池容量的動(dòng)態(tài)調(diào)整
ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()setMaximumPoolSize()
setCorePoolSize:設(shè)置核心池大小
setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。

四、深入源碼分析線程池線程復(fù)用原理

通過前面分析線程池的工作原理我們可以得知一個(gè)結(jié)論:在線程池內(nèi)部關(guān)于線程的調(diào)度執(zhí)行都是被封裝成一個(gè)Worker對(duì)象來操作的。而當(dāng)我們使用Worker.thread.start()啟動(dòng)線程時(shí),JVM會(huì)調(diào)用Worker中重寫的run()方法執(zhí)行,而Worker.run()方法源碼如下:

/** Delegates main run loop to outer runWorker  */
// 將線程運(yùn)行主邏輯交給外部 Worker.runWorker()
public void run() {runWorker(this);}

我們進(jìn)一步跟進(jìn)Worker.runWorker()源碼:

// 線程執(zhí)行邏輯:執(zhí)行循環(huán)并反復(fù)從隊(duì)列獲取任務(wù)并執(zhí)行
final void runWorker(Worker w) {
    // 獲取當(dāng)前執(zhí)行線程
    Thread wt = Thread.currentThread();
    // 獲取當(dāng)前傳遞進(jìn)線程池的方法
    Runnable task = w.firstTask;
    // 將Worker.firstTask 置為空
    w.firstTask = null;
    // 允許發(fā)生線程中斷
    w.unlock(); // allow interrupts
    // 突然執(zhí)行完成標(biāo)志:是否因?yàn)楫惓L鲅h(huán)
    boolean completedAbruptly = true;
    try {
        // 1. 如果線程池外部傳遞了任務(wù)則直接執(zhí)行外部傳遞的任務(wù)
        // 2. 如果沒有獲取到外部傳遞進(jìn)來的任務(wù)則調(diào)用getTask()去隊(duì)列中獲取任務(wù)并執(zhí)行
        // 2.1. 如果在任務(wù)隊(duì)列中獲取到了任務(wù)則直接執(zhí)行已經(jīng)獲取的任務(wù)
        // 2.2. 如果任務(wù)隊(duì)列為空,沒有任務(wù)則反復(fù)執(zhí)行空循環(huán)阻塞當(dāng)前線程死亡
        while (task != null || (task = getTask()) != null) {
            // 禁止線程中斷(防止線程在執(zhí)行過程中中斷導(dǎo)致不可恢復(fù)的錯(cuò)誤)
            w.lock();
            // 二次確認(rèn)線程池以及當(dāng)前工作線程狀態(tài):
            // 如果線程池停止,確保當(dāng)前線程被中斷
            // If pool is stopping, ensure thread is interrupted;
            // 如果線程池為停止,請(qǐng)確保當(dāng)前線程未被中斷
            // if not, ensure thread is not interrupted.  This
            // 如果是第二種情況則需要重新檢測(cè)并且清除中斷
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 鉤子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 調(diào)用任務(wù)的run方法,而不是start()方法,因?yàn)閃orker本身就是一個(gè)線程類
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 鉤子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 執(zhí)行完成后將獲取的任務(wù)置空
                task = null;
                // 執(zhí)行完成后自增當(dāng)前工作線程執(zhí)行的任務(wù)數(shù)量
                w.completedTasks++; 
                // 釋放Worker中自實(shí)現(xiàn)的鎖
                w.unlock();
            }
        }
        // 如果線程能夠執(zhí)行到最后一行代表線程執(zhí)行過程中沒有由于發(fā)生異常導(dǎo)致跳出循環(huán),將 突然結(jié)束 標(biāo)志改為false
        completedAbruptly = false;
    } finally {
        // 執(zhí)行回收工作線程的邏輯
        processWorkerExit(w, completedAbruptly);
    }
}

如上就是關(guān)于線程池復(fù)用的原理,簡單來說就是通過一個(gè)死循環(huán)讓當(dāng)前線程一直處于運(yùn)行狀態(tài),阻止OS將當(dāng)前工作線程回收,從而做到線程的復(fù)用。而關(guān)于死循環(huán)的條件則比較簡單,判斷task是否為空,在調(diào)用方法執(zhí)行的時(shí)候會(huì)先獲取外部傳遞的任務(wù),如果沒有獲取到外部傳遞的任務(wù)則調(diào)用getTask()方法獲取任務(wù)隊(duì)列中的任務(wù)并執(zhí)行:

// 如果返回null,在runWorker方法中會(huì)執(zhí)行processWorkerExit,即關(guān)閉該線程。
private Runnable getTask() {
    // 表示上次從隊(duì)列獲取任務(wù)是否超時(shí)
    boolean timedOut = false; // Did the last poll() time out?
    // 死循環(huán)標(biāo)志位
    retry:
    for (;;) {
        int c = ctl.get(); // 獲取ctl
        int rs = runStateOf(c); // 解析ctl獲取當(dāng)前線程池運(yùn)行狀態(tài)

        // Check if queue empty only if necessary.
        // 如果rs >= STOP,或者 rs=SHUTDOWN且隊(duì)列為空,此時(shí)不再接收新任務(wù),將WorkerCount遞減并返回null。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();    // 自旋CAS遞減workerCount直到成功
            return null;
        }

        // timed用于判斷是否需要重試控制
        boolean timed;      // Are workers subject to culling?

        for (;;) {
            // allowCoreThreadTimeOut默認(rèn)是false,核心線程不進(jìn)行超時(shí)控制,
            // 當(dāng)線程數(shù)量大于corePoolSize時(shí)需要進(jìn)行超時(shí)控制
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 如果wc <= maximumPoolSize ,且上次從隊(duì)列獲取任務(wù)超時(shí)或本次需要進(jìn)行超時(shí)控制,
            // 則跳出內(nèi)層循環(huán)。
            // timedOut=true表示上次從隊(duì)列獲取元素超時(shí),說明隊(duì)列在上次獲取的keepAliveTime時(shí)間內(nèi)是空的。
            // timed=true說明線程數(shù)量大于corePoolSize。
            // 所以timedOut=true和timed=true同時(shí)滿足則說明當(dāng)前線程已經(jīng)空閑了keepAliveTime時(shí)間,
            // 并且線程池的數(shù)量大于corePoolSize。這時(shí)就需要關(guān)閉多余的空閑線程
            //(即compareAndDecrementWorkerCount并返回null)。
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            // 如果線程數(shù)量大于maximumPoolSize,或者上次從隊(duì)列獲取任務(wù)超時(shí)且本次需要進(jìn)行
            // 超時(shí)控制。需要遞減WorkerCount,如果遞減成功則返回null
            if (compareAndDecrementWorkerCount(c))
                return null;
            //檢查線程池運(yùn)行狀態(tài)是否改變。如果改變,那么繼續(xù)外層循環(huán),如果未改變,那么繼續(xù)內(nèi)層循環(huán)。
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                //超時(shí)方式獲取,注意keepAliveTime為超出corePoolSize大小的線程的空閑存活時(shí)間
                workQueue.take();    //阻塞方式獲取,如果隊(duì)列為空阻塞當(dāng)前線程
            if (r != null)
                return r;
            timedOut = true;    //如果超時(shí),繼續(xù)循環(huán)。
        } catch (InterruptedException retry) {
            //如果發(fā)生中斷,則將timedOut置為false,繼續(xù)循環(huán)
            timedOut = false;
        }
    }
}

在getTask()方法中的邏輯也比較簡單,前期效驗(yàn)線程池狀態(tài),一切正常時(shí)開始任務(wù)的獲取邏輯,但是值得注意的是這里使用的是阻塞時(shí)獲取方式,也就代表如果任務(wù)隊(duì)列中沒有任務(wù),當(dāng)前線程會(huì)阻塞等待,直到任務(wù)隊(duì)列中有新的任務(wù)時(shí)才會(huì)獲取并返回執(zhí)行,不過如果線程池設(shè)置了存活時(shí)間,那么當(dāng)前線程會(huì)阻塞到存活時(shí)間的閾值,如果超出存活時(shí)間會(huì)返回null。而如果返回null,則在runWorker方法中會(huì)執(zhí)行processWorkerExit,即關(guān)閉該工作線程,從而實(shí)現(xiàn)了線程池的另一個(gè)功能: 線程池內(nèi)線程空閑時(shí)間超過給定的存活時(shí)間時(shí)自動(dòng)回收該線程資源。

下面我們?cè)賮砜纯磒rocessWorkerExit方法的實(shí)現(xiàn):

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly=false,說明是由getTask返回null導(dǎo)致的,WorkerCount遞減的操作已經(jīng)執(zhí)行
    // 如果completedAbruptly=true,說明是由執(zhí)行任務(wù)的過程中發(fā)生異常導(dǎo)致,需要進(jìn)行WorkerCount遞減的操作
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 從workers中刪除當(dāng)前worker,對(duì)workers更新需要加mainLock鎖
        workers.remove(w);    
    } finally {
        mainLock.unlock();
    }
    // 根據(jù)線程池狀態(tài)判斷是否結(jié)束線程池
    tryTerminate();

    // 如果是異常結(jié)束(completedAbruptly=true),需要重新調(diào)用addWorker()增加一個(gè)線程,保持線程數(shù)量
    // 如果是由getTask()返回null導(dǎo)致的線程結(jié)束,需要進(jìn)行以下判斷:
    //    1)如果allowCoreThreadTimeOut=true且隊(duì)列不為空,那么需要至少保證有一個(gè)線程
    //    2)如果allowCoreThreadTimeOut=false,那么需要保證線程數(shù)大于等于corePoolSize
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

關(guān)于線程池中工作線程的銷毀則是由processWorkerExit()方法來完成的,在這個(gè)方法中首先會(huì)判斷當(dāng)前線程是因?yàn)閳?zhí)行出現(xiàn)異常還是超出存活時(shí)間導(dǎo)致需要發(fā)生回收的。如果是因?yàn)槌龃婊顣r(shí)間,先判斷線程池狀態(tài)之后再從工作集中移除當(dāng)前線程即可。如果是由于異常導(dǎo)致的則需要先對(duì)線程池的工作線程數(shù)進(jìn)行自減,然后再移除工作集中的工作線程,最后再調(diào)用addWorker()添加一個(gè)工作線程保證線程池內(nèi)工作線程的數(shù)量。在上面的源碼中我們也會(huì)看到tryTerminate()這個(gè)方法,那么我們也簡單分析一下它的源碼:

//根據(jù)線程池狀態(tài)判斷是否結(jié)束線程池
final void tryTerminate() {
    for (;;) {
        int c = ctl.get(); // 獲取ctl
        // 如果線程池運(yùn)行狀態(tài)是RUNNING,或者大于等于TIDYING,或者運(yùn)行狀態(tài)為
        // SHUTDOWN且隊(duì)列為空,則直接return返回
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 如果工作線程數(shù)不為0,則中斷一個(gè)空閑線程并return
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 嘗試將線程池狀態(tài)設(shè)置為TIDYING狀態(tài)
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //如果CAS成功,執(zhí)行terminated()鉤子方法
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

如果線程池狀態(tài)不處于STOP或者TERMINATED狀態(tài)則直接返回,反之執(zhí)行terminated()鉤子函數(shù)。

到此關(guān)于線程池的復(fù)用原理就告一段落了,關(guān)于線程池的復(fù)用原理只需要理解死循環(huán)+getTask即可大致明白線程池復(fù)用的思維。

五、自定義線程池實(shí)戰(zhàn)

再前面我們?cè)岬?,JDK為我們提供的已經(jīng)封裝好的線程池實(shí)現(xiàn)在高并發(fā)情況下都會(huì)存在OOM的風(fēng)險(xiǎn),而通過前面分析我們也可以得知,JDK提供的線程池也是通過封裝ThreadPoolExecutor的構(gòu)造,所以我們?cè)谏a(chǎn)環(huán)境時(shí)更應(yīng)該自定義線程池來規(guī)避這些風(fēng)險(xiǎn)以及更好的操作線程池。注:在《阿里巴巴java開發(fā)規(guī)范手冊(cè)》中明確規(guī)定如下:


image.png

所以在一般生產(chǎn)環(huán)境使用創(chuàng)建線程都是通過自定義線程池來使用線程資源,代碼如下:

public static void main(String[] args){
     // 線程工廠可通過 implements ThreadFactory接口自定義
     // 任務(wù)拒絕策略可通過  implements RejectedExecutionHandler接口自定義
     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 0,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
                    Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    for (int i = 0; i < 10;i++){
         final int num = i;
         threadPoolExecutor.execute(()->{
              System.out.println("線程:" + Thread.currentThread().getName() + "正在執(zhí)行:" + num + "個(gè)任務(wù)");
        });
        System.out.println("線程池中線程數(shù)目:" + threadPoolExecutor.getPoolSize() + ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" + threadPoolExecutor.getQueue().size() + ",已執(zhí)行玩別的任務(wù)數(shù)目:"+threadPoolExecutor.getCompletedTaskCount());
    }
}

五、線程池參數(shù)合理配置

本節(jié)來討論一個(gè)比較重要的話題:如何合理配置線程池大小,參考如下:

image.png

六、參考

  • 《Java并發(fā)編程的藝術(shù)》
  • 《java并發(fā)編程實(shí)戰(zhàn)》
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 深入分析線程池 在前面的文章中,我們使用線程的時(shí)候就去創(chuàng)建一個(gè)線程,這樣實(shí)現(xiàn)起來非常簡便,但是就會(huì)有一個(gè)問題: 如...
    史路比閱讀 507評(píng)論 0 1
  • Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線...
    石家志遠(yuǎn)閱讀 1,440評(píng)論 0 6
  • 一、簡介 什么是線程池 線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動(dòng)啟動(dòng)這些任務(wù)。...
    靜默虛空閱讀 631評(píng)論 0 0
  • 1、ThreadPoolExecutor類 java.uitl.concurrent.ThreadPoolExec...
    豪大大大閱讀 3,007評(píng)論 0 0
  • 時(shí)間總是白駒過隙,轉(zhuǎn)瞬即逝。不想接受現(xiàn)實(shí),又不得不提早接受事實(shí)。????我的大一生涯就要結(jié)束了,和王老師的...
    李依純閱讀 468評(píng)論 0 1

友情鏈接更多精彩內(nèi)容