深入淺出Java(Android )線程池ThreadPoolExecutor

前言

關(guān)于線程池
在Java/Android開發(fā)中,設(shè)計到并發(fā)的請求,那基本上是離不開線程池了。用線程池的好處:

  • 1、減少線程頻繁創(chuàng)建、銷毀的開銷;
  • 2、好控制并發(fā)量,降低OOM的可能,至于原因文中會說;
  • 3、提高程序的響應(yīng)速度,因為可以省去部分創(chuàng)建的過程;

要不要深度學習線程池

  • 對于服務(wù)端的同學來說應(yīng)該會比較重視這一塊,因為需要做高并發(fā);而移動端的同學可能比較容易忽略這一塊。有些人覺得平時也用不到,移動端沒有那么大并發(fā)量,或者說第三方框架中已經(jīng)完成了,比如OkHtttp;其實只能說有這種想法的同學還沒有遇到大一點的項目或者說沒有太多多線程優(yōu)化的經(jīng)驗。如果你真的遇到了這種項目瓶頸,你連線程池的運行原理都不知道,那又如何解決項目問題呢?

  • 如果你要尋求一份中高級開發(fā)工程師的工作,那線程池是基本是必問題目之一,而且還要有一定深度。

如何深度學習線程池
這也是我們今天的重點,本文將從下面幾點帶大家快速掌握線程池的要點:

  • 1、從API使用到原碼解析,基于JDK1.8版本;
  • 2、從源碼閱讀(深入)中總結(jié)出(淺出)線程池工作原理;
  • 3、對應(yīng)用場景的分析以及異常處理

預(yù)覽

線程池類圖.png

先對線程池的部分核心類/接口做個簡介,大家有個印象就好。
Executor接口

public interface Executor {

    /**
     * 就一個方法,用來執(zhí)行線程任務(wù)的,類似于Thread的start()方法
     */
    void execute(Runnable command);
}

由于Executor是一個接口,所以execute是由具體的實現(xiàn)類來完成的,調(diào)用這個方法,可能會出現(xiàn)如下情況:

  • 1.創(chuàng)建一個新線程并立即啟動;
  • 2.復(fù)用線程池中空閑的線程來執(zhí)行任務(wù);
  • 3.進入一個阻塞隊列中排隊;
  • 4.拋出異常/拒絕接收該任務(wù),這個要看具體的拒絕策略,默認拋出異常。

ExecutorService接口
繼承自Executor接口,我們常用的很多方法就是在這個接口中定義的。主要涉及到:提交任務(wù)、關(guān)閉線程獲取結(jié)果。


public interface ExecutorService extends Executor {

    /**
     * 關(guān)閉線程池,新提交的任務(wù)會被拒絕,但是已經(jīng)提交的任務(wù)會繼續(xù)執(zhí)行
     */
    void shutdown();

    /**
     * 關(guān)閉線程池,新提交的任務(wù)會被拒絕,并且嘗試關(guān)閉正在執(zhí)行的任務(wù)
     */
    List<Runnable> shutdownNow();

    /**
     * 線程池是否已關(guān)閉
    */
    boolean isShutdown();

    /**
     * 如果調(diào)用了shutdown或者shutdownNow之后,所有的任務(wù)都結(jié)束了,那么返回true,否則返回false
     */
    boolean isTerminated();

    /**
     * 當調(diào)用shutdown 或 shutdownNow之后,再調(diào)用這個方法,會
     *等待所有的任務(wù)執(zhí)行完成,直到超時(超過timeout)或者說當前的線程被中斷了
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    /**
     * 提交一個Runnable 任務(wù)
     */
    Future<?> submit(Runnable task);

    /**
     * 執(zhí)行所有任務(wù),返回 Future 類型的一個 list
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}

AbstractExecutorService
抽象類,實現(xiàn)了ExecutorService接口。主要封裝了通過submit方式提交任務(wù)的一些操作。

注意: 不需要獲取結(jié)果,可以用 execute 方法;需要獲取結(jié)果(FutureTask)用 submit 方法。

由于篇幅有限,本文只針對execute方式做講解,想了解submit 方式的同學可以參考深度解讀 java 線程池設(shè)計思想及源碼實現(xiàn)

Executors
這是大多數(shù)人最常用的一個類,實質(zhì)上就是一個工具類。可以快速的構(gòu)建一個線程池對象,常見的操作有如下:

   /**
     * 創(chuàng)建一個固定大小的線程池,而且全是核心線程,
     * 會一直存活,除非特別設(shè)置了核心線程的超時時間
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

   /**
     * 創(chuàng)建了一個沒有大小限制的線程池,全是非核心線程;如果線程
     * 空閑的時間超過60s就會被移除
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

   /**
     * 這個線程池只有1個唯一的核心線程
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   /**
     * 創(chuàng)建一個定長的線程池,可以執(zhí)行周期性的任務(wù)
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

可以看出這幾種方式最后都是通過ThreadPoolExecutor來實現(xiàn)的,所以下面就來研究一下今天的主角ThreadPoolExecutor,等理解了這個類,也就可以掌握線程池等工作原理,甚至可以根據(jù)自己的策略來自定義線程池。

ThreadPoolExecutor(關(guān)鍵類)

繼承與抽象方法AbstractExecutorService,也就間接實現(xiàn)了ExecutorService、Executor等接口。

從構(gòu)造方法談起

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:
    核心線程數(shù)量,所謂核心線程就是一直保留在線程池中,及時處于空閑狀態(tài)也不會銷毀等線程,除非手動調(diào)用allowCoreThreadTimeOut才可以在超時銷毀。
  • maximumPoolSize:
    線程池允許創(chuàng)建的最大線程數(shù)量。
  • keepAliveTime:
    線程池中除了有核心線程之外,還有非核心線程,非核心線程處于空閑的時候會在一定時間范圍內(nèi)被關(guān)閉,而這個空閑的時間就是keepAliveTime。
  • unit:
    keepAliveTime的時間單位,比如秒、分、時等
  • workQueue:
    保存待執(zhí)行任務(wù)的阻塞隊列。如果一個任務(wù)進入線程池之后,如果核心線程滿了的話,就會先嘗試添加到隊列中,當然未必添加成功,而且隊列也有多種實現(xiàn),具體的后面再說,先簡單理解為排隊即可。
  • threadFactory:
    如果沒有設(shè)置的話,使用默認的ThreadFactory來創(chuàng)建線程;當然你也可以通過ThreadFactory自己創(chuàng)建線程,比如設(shè)置線程名稱,優(yōu)先級等
  • handler:
    當達到線程池的最大容量時的拒絕策略。當線程池飽和,繼續(xù)提交任務(wù),需要一種策略來處理該任務(wù)。線程池提供了4種策略:

    AbortPolicy:直接拋出異常,這是默認策略;
    CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
    DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當前任務(wù);
    DiscardPolicy:直接丟棄任務(wù);

一些重要屬性和方法

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

這里很關(guān)鍵,一定要認真看,后面分析任務(wù)執(zhí)行execute()方法就需要用到這些基礎(chǔ)。

Integer.SIZE =32,這代表了java中,int最大是32位,所以COUNT_BITS等于29;CAPACITY等于1*2^29 -1,用它來表示線程池的最大容量是足夠了的。

從線程池的生命周期來看,線程池有5種狀態(tài):

  • RUNNING: 能接受新的任務(wù),也能處理隊列中的任務(wù);
  • SHUTDOWN:不接受新的任務(wù),但是會處理隊列中的任務(wù);
  • STOP: 不接受新的任務(wù),也不處理隊列中的任務(wù),而且會中斷正在進行的任務(wù);
  • TIDYING: 所有的任務(wù)都完成了,workCount等于 0。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時,會執(zhí)行鉤子方法 terminated()
  • TERMINATED: terminated() 方法結(jié)束后,線程池的狀態(tài)就會變成這個

關(guān)于狀態(tài)轉(zhuǎn)換

  • RUNNING -> SHUTDOWN:
    當調(diào)用shutdown()方法后,會發(fā)生這個狀態(tài)轉(zhuǎn)換;
  • (RUNNING or SHUTDOWN) -> STOP:
    當調(diào)用 shutdownNow() 后,會發(fā)生這個狀態(tài)轉(zhuǎn)換;
  • SHUTDOWN -> TIDYING:
    當隊列和線程池都變成空的時候,會發(fā)生這個狀態(tài)轉(zhuǎn)換;
  • STOP -> TIDYING:
    當線程池是空的時候,會發(fā)生這個狀態(tài)轉(zhuǎn)換;
  • TIDYING -> TERMINATED:
    當terminated() 方法結(jié)束后,會發(fā)生這個狀態(tài)轉(zhuǎn)換。
線程狀態(tài)轉(zhuǎn)換.png

關(guān)于狀態(tài)轉(zhuǎn)換就講完了,特別是前2個狀態(tài)轉(zhuǎn)換,更是常用。還有一個關(guān)鍵的屬性ctl需要講一下,初學者可能不太好理解,需要一點計算機基礎(chǔ)。

首先ctl是一個AtomicInteger類型的對象,它其實是對int的包裝,可以在多線程并發(fā)的情況下保證原子性,它傳入的參數(shù)就是它表示的值。這里是通過ctlOf()方法來計算的。

在計算之前先補充2個小知識點:

1、 <<:是移位運算符,具體倆說是左移;右移用>>表示。左移的意思是將一個二進制數(shù)向左邊移動1位,那么左移1位就等于這個數(shù)乘以2,左移n位的話就是乘以2^n;右移的話就是除以;

2、 由于10進制數(shù)有正負之分,所以轉(zhuǎn)換成二進制數(shù)的時候,需要在最高位加上0/1來表示正負,正數(shù)用0表示;負數(shù)用1來表示。

3、原碼:加上符號為之后的二進制數(shù);反碼:正數(shù)的反碼是其本身,負數(shù)的反碼:符號位不變,其余各位取反;補碼:正數(shù)的補碼就是其本身;負數(shù)的補碼:即在反碼的基礎(chǔ)上+1。

4、針對二進制數(shù)的&、|、~。與(&)運算:2個都是1才為1,否則為0;或(|)運算:只要有1個為1就是1,否則為0;非(~)運算:取反,1變0,0變1。

首先RUNNING = -1 << COUNT_BITS;其中-1的二進制數(shù)1001,那轉(zhuǎn)換成補碼就是1111,然后左移29位就變成1110 0000 0000 0000 0000 0000 0000 0000,因為int最多32位,所以高位的1沒了;然后再和0做或運算,所以結(jié)果還是它本身,所以ctl初始值為1110 0000 0000 0000 0000 0000 0000 0000,其中高3位存放線程狀態(tài),后面29位存放線程數(shù)量。

還有幾個方法

  • runStateOf:獲取運行狀態(tài);
  • workerCountOf:取出ctl低29位,來表示當前線程數(shù);
  • ctlOf:獲取運行狀態(tài)和活動線程數(shù)的值。

execute(關(guān)鍵方法)

這是線程池的關(guān)鍵方法,用來提交任務(wù)的。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //表示 “線程池狀態(tài)” 和 “線程數(shù)量” 的整數(shù)
        int c = ctl.get();
        /*
         * 如果當前活躍線程數(shù)小于核心線程數(shù),就會添加一個worker來執(zhí)行任務(wù);
         * 具體來說,新建一個核心線程放入線程池中,并把任務(wù)添加到該線程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker()如果返回true表示添加成功,線程池會執(zhí)行這個任務(wù),那么本方法可以結(jié)束了,返回 false 代表線程池不允許提交任務(wù),那么就會執(zhí)行后面的方法。
             */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

      //程序執(zhí)行到這里,說明要么活躍線程數(shù)大于核心線程數(shù);要么addWorker()失敗

        /*
         * 如果當前線程池是運行狀態(tài),會把任務(wù)添加到隊列
         */
        if (isRunning(c) && workQueue.offer(command)) {
            /*
            *這里的邏輯比較有意思,又重新檢查了線程狀態(tài)和數(shù)量;
            *如果線程不處于 RUNNING 狀態(tài),就會移除剛才添加到隊列中的任務(wù);
            *如果線程池還是 RUNNING 狀態(tài),并且線程數(shù)為 0,那么開啟新的線程;
            * addWorker(null, false)參數(shù)分析:
            * 1. 第一個參數(shù)為null,表示在線程池中創(chuàng)建一個線程,但不去啟動;
            * 2. 第二個參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize,
            * 添加線程時根據(jù)maximumPoolSize來判斷; 
          */

            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

 //程序執(zhí)行到這里,說明要么線程狀態(tài)不是RUNNING;要么workQueue隊列已經(jīng)滿了

         /*
          * 這時,再調(diào)用addWorker方法去創(chuàng)建線程,
          * 會把線程池的線程 數(shù)量的上限設(shè)置為maximum;
          * 如果失敗,說明當前線程數(shù)已經(jīng)達到 maximumPoolSize,執(zhí)行拒絕策略
          */
        else if (!addWorker(command, false))
            reject(command);
    }

為什么當任務(wù)添加到隊列后,內(nèi)部還執(zhí)行了那么復(fù)雜的判斷?

因為擔心任務(wù)提交到隊列中了,但是線程池卻關(guān)閉了。

當執(zhí)行execute方法提交一個任務(wù)的時候,如果線程池一直處于RUNNING狀態(tài),那流程如下:

  • 1、當工作線程數(shù)量 < 核心線程數(shù)量,會嘗試創(chuàng)建一個核心線程并提交任務(wù);
  • 2、當工作線程數(shù)量 >= 核心線程數(shù)量,如果阻塞隊列沒有滿,則把任務(wù)添加到隊列中;如果隊列滿了,則嘗試啟動一個新的非核心線程來提交任務(wù);
  • 3、當工作線程數(shù)量 > maximumPoolSize,則根據(jù)拒絕策略來處理該任務(wù), 默認的處理方式是直接拋異常。

注意:
addWorker(null, false);也是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,當worker在執(zhí)行的時候,會直接從workQueue中獲取任務(wù)。在workerCountOf(recheck) == 0時執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。

關(guān)于Worker類和addWorker方法

addWorker()是嘗試在線程池中創(chuàng)建一個線程并執(zhí)行任務(wù),firstTask表示作為新創(chuàng)建的線程的第一個任務(wù),core參數(shù)為true的時候,會用核心線程數(shù)做創(chuàng)建線程的邊界;如果為false,會用最大線程數(shù)maximumPoolSize做為邊界。如果addWorker()返回true,表示創(chuàng)建線程成功

private boolean addWorker(Runnable firstTask, boolean core) {}
任務(wù)執(zhí)行流程.png

業(yè)務(wù)場景(分析2種常用的線程池)

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定大小的線程池。最大線程數(shù)與核心線程數(shù)相等,keepAliveTime的設(shè)置無效,因為核心線程默認不會銷毀,阻塞隊列為LinkedBlockingQueue,它是無界隊列。這種線程池適合CPU密集型任務(wù).

關(guān)于CPU密集型任務(wù)和IO密集型任務(wù)可以參考這篇文章
線程池核心線程數(shù)多少最為合適(IO密集型和CPU密集型)?

工作流程:

  • 提交任務(wù)
  • 如果線程數(shù)小于核心線程數(shù),創(chuàng)建核心線程并執(zhí)行任務(wù)
  • 如果線程數(shù)大于核心線程,把任務(wù)添加到LinkedBlockingQueue阻塞隊列
  • 如果線程執(zhí)行完任務(wù),去阻塞隊列取任務(wù),繼續(xù)執(zhí)行。

雖然線程數(shù)量是固定的,但是由于使用了無界隊列LinkedBlockingQueue,如果線程的并發(fā)量比較大,任務(wù)的執(zhí)行時間比較長,那還是可能會OOM的。適用于CPU密集型的任務(wù),也就是那種長期的任務(wù)。

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

核心線程數(shù)為 0,最大線程數(shù)為 Integer.MAX_VALUE,所有線程空閑時間 為 60 秒,任務(wù)隊列采用 SynchronousQueue。

用它去處理那種并發(fā)量很大的任務(wù)就不合適,由于空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。適用于那種任務(wù)可以快速完成的任務(wù)。

總結(jié)

線程池的內(nèi)容其實是很多的,絕不是1,2篇文章就能講完的。本文也主要是針對提交任務(wù)之后線程池的工作原理以及線程狀態(tài)變化來做講解。核心要義如下:
1、Executors這個工具類下創(chuàng)建的幾種線程池的工作原理。

newFixedThreadPool、newCachedThreadPool等,需要注意每一個的優(yōu)缺點和使用場景。

2、ThreadPoolExecutor這個類的構(gòu)造方法和一些關(guān)鍵成員屬性

Executors創(chuàng)建的多種線程池都是通過它的構(gòu)造方法來實現(xiàn)的,讀者需要熟悉它的參數(shù)的意義,這樣的話,就可以自定義滿足個性化需求的線程池。在文中列舉出的一些成員屬性也很重要,后面對線程池的各種操作離不開它們。

3、理解深刻線程池中的線程創(chuàng)建時機
主要是那個execute()方法和addWorker()方法,主要是根據(jù)線程池狀態(tài)、當前線程數(shù)、核心線程數(shù)、隊列大小、線程池最大線程數(shù)來結(jié)合來判斷。

4、拒絕策略

添加任務(wù)到線程池,不一定會被接受。主要看一下哪些情況會執(zhí)行reject(command)方法;還有幾種不同的拒絕策略,默認是拋異常。

5、異常處理

如果某個任務(wù)執(zhí)行出現(xiàn)異常,那么執(zhí)行任務(wù)的線程會被關(guān)閉。

感謝以下作者

優(yōu)雅的使用Java線程池
深入理解 Java 線程池:ThreadPoolExecutor
面試必備:Java線程池解析
Java線程池中的核心線程是如何被重復(fù)利用的?
Java線程池中的核心線程是如何被重復(fù)利用的?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容