Java 線程池 Thread Pool

更多 Java 并發(fā)編程方面的文章,請參見文集《Java 并發(fā)編程》


線程組 Thread Group

線程的集合,不推薦使用。

線程池 Thread Pool 介紹

所在包:java.util.concurrent

線程池用于多線程處理中,它可以根據(jù)系統(tǒng)的情況,可以有效控制線程執(zhí)行的數(shù)量,優(yōu)化運行效果。
線程池做的工作主要是控制運行的線程的數(shù)量,處理過程中將任務(wù)放入隊列,然后在線程創(chuàng)建后啟動這些任務(wù),如果線程數(shù)量超過了最大數(shù)量,超出數(shù)量的線程排隊等候,等其它線程執(zhí)行完畢,再從隊列中取出任務(wù)來執(zhí)行。

線程池的3個優(yōu)點:

  • 線程復(fù)用,不需要頻繁的創(chuàng)建和銷毀線程
  • 控制最大并發(fā)數(shù),提高系統(tǒng)資源利用率,同時避免過多的資源競爭,避免堵塞
  • 管理線程

目的:減少大量創(chuàng)建和銷毀線程帶來的時間和空間上的消耗。

線程池的組成

一般的線程池主要分為以下4個組成部分:

  • 線程池管理器:用于創(chuàng)建并管理線程池
  • 工作線程:線程池中的線程
  • 任務(wù)接口:每個任務(wù)必須實現(xiàn)的接口,用于工作線程調(diào)度其運行
  • 任務(wù)隊列:用于存放待處理的任務(wù),提供一種緩沖機制

線程池框架

Java 中的線程池是通過 Executor 框架實現(xiàn)的,該框架中用到了 Executor,ExecutorsExecutorService,ThreadPoolExecutorCallableFutureFutureTask 這幾個類。

  • Executor:所有線程池的接口,只有一個方法
  • ExecutorsExecutor 的工廠類,提供了創(chuàng)建各種不同線程池的方法,返回的線程池都實現(xiàn)了ExecutorService 接口
  • ThreadPoolExecutor:線程池的具體實現(xiàn)類,一般所有的線程池都是基于這個類實現(xiàn)的

ThreadPoolExecutor 的構(gòu)造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

其中:

  • corePoolSize
    • 線程池的核心線程數(shù),線程池中運行的線程數(shù)也永遠不會超過 corePoolSize
    • 在沒有設(shè)置 allowCoreThreadTimeOuttrue 的情況下,核心線程會在線程池中一直存活,即使處于閑置狀態(tài)
  • maximumPoolSize
    • 線程池中允許的最大線程數(shù)
    • 當活動線程(核心線程+非核心線程)達到這個數(shù)值后,后續(xù)任務(wù)將會根據(jù) RejectedExecutionHandler 來進行拒絕策略處理
  • keepAliveTime
    • 非核心線程 閑置時的超時時長。超過該時長,非核心線程就會被回收。
    • 若線程池通過 allowCoreThreadTimeOut() 方法設(shè)置 allowCoreThreadTimeOut 屬性為 true,則該時長同樣會作用于核心線程,AsyncTask 配置的線程池就是這樣設(shè)置的
  • unit
    • 是一個枚舉,它表示的是 keepAliveTime 的單位
  • workQueue
    • 工作隊列,用于存放不能被及時處理掉的任務(wù)的一個隊列,通過線程池的 execute() 方法提交的 Runnable 對象會存儲在該隊列中
    • 它是一個 BlockingQueue 類型。關(guān)于 BlockingQueue,雖然它是 Queue 的子接口,但是它的主要作用并不是容器,而是作為線程同步的工具,他有一個特征,當生產(chǎn)者試圖向 BlockingQueue 放入(put)元素,如果隊列已滿,則該線程被阻塞;當消費者試圖從 BlockingQueue 取出(take)元素,如果隊列已空,則該線程被阻塞。
  • ThreadFactory
    • 線程工廠,功能很簡單,就是為線程池提供創(chuàng)建新線程的功能
    • 這是一個接口,可以通過自定義,做一些自定義線程名的操作
    • 默認使用 DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
  • RejectedExecutionHandler:當任務(wù)無法被執(zhí)行時(超過線程最大容量 maximumPoolSize 并且 workQueue 已經(jīng)被排滿了)的處理策略,這里有四種任務(wù)拒絕類型
    • CallerRunsPolicy:當線程池中的數(shù)量等于最大線程數(shù)時,重試添加當前的任務(wù);它會自動重復(fù)調(diào)用execute() 方法。
    • AbortPolicy: 默認策略,當線程池中的數(shù)量等于最大線程數(shù)時拋 java.util.concurrent.RejectedExecutionException 異常
    • DiscardPolicy:當線程池中的數(shù)量等于最大線程數(shù)時,默默丟棄不能執(zhí)行的新加任務(wù),不報任何異常。
    • DiscardOldestPolicy:當線程池中的數(shù)量等于最大線程數(shù)時,拋棄線程池中工作隊列頭部的任務(wù)(即等待時間最久的任務(wù)),并執(zhí)行新傳入的任務(wù)。

線程池的工作過程

  • 線程池剛創(chuàng)建時,里面沒有一個線程。
  • 當調(diào)用 execute() 方法添加一個任務(wù)時,線程池會做如下判斷:
    • 如果正在運行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建 核心線程 運行這個任務(wù);
    • 如果正在運行的線程數(shù)量大于或等于 corePoolSize,那么將這個任務(wù)放入工作隊列 workQueue
      • 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于 maximumPoolSize,那么還是要創(chuàng)建 非核心線程 立刻運行這個任務(wù);
      • 如果隊列滿了,而且正在運行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會拋出異常RejectExecutionException。
  • 當一個線程完成任務(wù)時,它會從工作隊列 workQueue 中取下一個任務(wù)來執(zhí)行。
  • 當一個 非核心線程 無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數(shù)大于 corePoolSize,那么這個 非核心線程 就被停掉。所以線程池的所有任務(wù)完成后,它最終會收縮到 corePoolSize 的大小。

常見的 Java 線程池

Executors:提供一系列工廠方法來創(chuàng)建線程池,返回的線程池都實現(xiàn)了 ExecutorService 接口:

  • SingleThreadExecutor
    • 單個線程的線程池,即線程池中每次只有一個線程在運行,單線程串行執(zhí)行任務(wù)。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
    • Executors 類中方法定義:public static ExecutorService newSingleThreadExecutor()
    • 實現(xiàn):
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • FixedThreadPool
    • 固定數(shù)量的線程池,只有核心線程,每提交一個任務(wù)就是一個線程,直到達到線程池的最大數(shù)量,然后后面進入等待隊列,直到前面的任務(wù)完成才繼續(xù)執(zhí)行。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。
    • Executors 類中方法定義:public static ExecutorService newFixedThreadPool(int nThreads)
    • 實現(xiàn):
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • CachedThreadPool
    • 可緩存線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。其中,SynchronousQueue 是一個是緩沖區(qū)為1的阻塞隊列。
    • Executors 類中方法定義:public static ExecutorService newCachedThreadPool()
    • 實現(xiàn):
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • ScheduledThreadPool
    • 核心線程池固定,大小無限制的線程池,支持定時和周期性的執(zhí)行線程。創(chuàng)建一個周期性執(zhí)行任務(wù)的線程池。如果閑置,非核心線程池會在 DEFAULT_KEEPALIVEMILLIS 時間內(nèi)回收。
  • Executors 類中方法定義:public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
    • 實現(xiàn):
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

注意事項:

  • 線程池 ExecutorService 中可以使用 execute(t) 或者 submit(t) 方法加入多個任務(wù),任務(wù)的執(zhí)行順序與加入順序無關(guān)
  • 如果線程池中某個線程因為執(zhí)行異常而結(jié)束,線程池會自動補充一個新線程

示例:

public class ThreadPool_Test {
    public static void main(String[] args) {
        ExecutorService es = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; i++) {
            es.execute(new MyThread());
        }

        es.shutdown();
    }

    static class MyThread extends Thread {
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }
}

以上代碼的輸出如下:
因為創(chuàng)建的線程池中只有一個線程,因此每次只會有一個任務(wù)執(zhí)行,getName() 都會得到相同的結(jié)果。

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

pool-1-thread-1

如果將 Executors.newSingleThreadExecutor() 更改為 Executors.newFixedThreadPool(2),創(chuàng)建一個有兩個線程的線程池,則輸出如下:

pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2

Thread Factory

在我們通過 Executors 來創(chuàng)建線程池的時候,還可以傳入一個參數(shù) ThreadFactory,例如:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread.

ThreadFactory 遵循了工廠模式,提供 newThread() 方法用來創(chuàng)建線程。
示例:我們希望創(chuàng)建一個線程,它能夠在產(chǎn)生 不能捕獲的 Runtime Exception 時能自動 Logging:

public class ThreadFactory_Test {
    public static void main(String[] args) {
        ExecutorService es = Executors.newSingleThreadExecutor(new MyThreadFactory());

        es.execute(new MyThread());

        es.shutdown();
    }

    static class MyThread extends Thread {
        public void run() {
            // Runtime exception
            int i = 1 / 0;
        }
    }

    static class MyThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);

            t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    System.err.println(t.getName() + e.getMessage());
                }
            });

            return t;
        }
    }
}

線程池的關(guān)閉

可以通過調(diào)用線程池的 shutdownshutdownNow 方法來關(guān)閉線程池,但是它們的實現(xiàn)原理不同:

  • shutdown的原理是只是將線程池的狀態(tài)設(shè)置成 SHUTDOWN 狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程
  • shutdownNow 的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的 interrupt 方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠無法終止。shutdownNow 會首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表

只要調(diào)用了這兩個關(guān)閉方法的其中一個,isShutdown 方法就會返回 true。當所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用 isTerminaed 方法會返回 true。

線程池的監(jiān)控

通過線程池提供的參數(shù)進行監(jiān)控。
線程池里有一些屬性在監(jiān)控線程池的時候可以使用:

  • public long getTaskCount():線程池需要執(zhí)行的任務(wù)數(shù)量。
  • public long getCompletedTaskCount():線程池在運行過程中已完成的任務(wù)數(shù)量。小于或等于taskCount。
  • public int getPoolSize():線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
  • public int getLargestPoolSize():線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
  • public int getActiveCount():獲取活動的線程數(shù)。

通過擴展線程池進行監(jiān)控。通過繼承線程池并重寫線程池的 beforeExecuteafterExecuteterminated方法,我們可以在任務(wù)執(zhí)行前,執(zhí)行后和線程池關(guān)閉前干一些事情。如監(jiān)控任務(wù)的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等。這幾個方法在線程池里是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }

合理的配置線程池

任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
任務(wù)的優(yōu)先級:高,中和低。
任務(wù)的執(zhí)行時間:長,中和短。
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理:

  • CPU密集型任務(wù)配置盡可能少的線程數(shù)量,如配置 Ncpu+1個線程的線程池。
  • IO密集型任務(wù)則由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu。
  • 混合型的任務(wù),如果可以拆分,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。
    我們可以通過 Runtime.getRuntime().availableProcessors() 方法獲得當前設(shè)備的CPU個數(shù)。

優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列 PriorityBlockingQueue 來處理。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠不能執(zhí)行。

執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。

依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交 SQL 后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長 CPU 空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。


引用:
ThreadFactory usage in Java
Java線程池的簡介以及使用
關(guān)于線程池的執(zhí)行原則及配置參數(shù)詳解
聊聊并發(fā)(3):Java線程池的分析和使用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前段時間遇到這樣一個問題,有人問微信朋友圈的上傳圖片的功能怎么做才能讓用戶的等待時間較短,比如說一下上傳9張圖片,...
    加油碼農(nóng)閱讀 1,285評論 0 2
  • 為何要使用線程池和任務(wù)隊列 在實際應(yīng)用場景中,通常會有大量的任務(wù)請求需要處理,如果應(yīng)用輪詢處理到達的請求,而通常請...
    BigBug77閱讀 1,642評論 0 0
  • 系統(tǒng)啟動一個新線程的成本是比較高的,因為它涉及到與操作系統(tǒng)的交互。在這種情況下,使用線程池可以很好的提供性能,尤其...
    我是嘻哈大哥閱讀 456評論 0 2
  • 本篇文章講述Java中的線程池問題,同樣適用于Android中的線程池使用。本篇文章參考:Java線程池分析,Ja...
    Android進階與總結(jié)閱讀 869評論 0 5
  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,018評論 1 19

友情鏈接更多精彩內(nèi)容