更多 Java 并發(fā)編程方面的文章,請參見文集《Java 并發(fā)編程》
線程組 Thread Group
線程的集合,不推薦使用。
線程池 Thread Pool 介紹
所在包:java.util.concurrent
線程池用于多線程處理中,它可以根據(jù)系統(tǒng)的情況,可以有效控制線程執(zhí)行的數(shù)量,優(yōu)化運行效果。
線程池做的工作主要是控制運行的線程的數(shù)量,處理過程中將任務(wù)放入隊列,然后在線程創(chuàng)建后啟動這些任務(wù),如果線程數(shù)量超過了最大數(shù)量,超出數(shù)量的線程排隊等候,等其它線程執(zhí)行完畢,再從隊列中取出任務(wù)來執(zhí)行。
線程池的3個優(yōu)點:
- 線程復(fù)用,不需要頻繁的創(chuàng)建和銷毀線程
- 控制最大并發(fā)數(shù),提高系統(tǒng)資源利用率,同時避免過多的資源競爭,避免堵塞
- 管理線程
目的:減少大量創(chuàng)建和銷毀線程帶來的時間和空間上的消耗。
線程池的組成
一般的線程池主要分為以下4個組成部分:
- 線程池管理器:用于創(chuàng)建并管理線程池
- 工作線程:線程池中的線程
- 任務(wù)接口:每個任務(wù)必須實現(xiàn)的接口,用于工作線程調(diào)度其運行
- 任務(wù)隊列:用于存放待處理的任務(wù),提供一種緩沖機制
線程池框架
Java 中的線程池是通過 Executor 框架實現(xiàn)的,該框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 這幾個類。
-
Executor:所有線程池的接口,只有一個方法 -
Executors:Executor的工廠類,提供了創(chuàng)建各種不同線程池的方法,返回的線程池都實現(xiàn)了ExecutorService接口 -
ThreadPoolExecutor:線程池的具體實現(xiàn)類,一般所有的線程池都是基于這個類實現(xiàn)的
ThreadPoolExecutor 的構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中:
-
corePoolSize:- 線程池的核心線程數(shù),線程池中運行的線程數(shù)也永遠不會超過
corePoolSize個 - 在沒有設(shè)置
allowCoreThreadTimeOut為true的情況下,核心線程會在線程池中一直存活,即使處于閑置狀態(tài)
- 線程池的核心線程數(shù),線程池中運行的線程數(shù)也永遠不會超過
-
maximumPoolSize:- 線程池中允許的最大線程數(shù)
- 當活動線程(核心線程+非核心線程)達到這個數(shù)值后,后續(xù)任務(wù)將會根據(jù)
RejectedExecutionHandler來進行拒絕策略處理
-
keepAliveTime:- 非核心線程 閑置時的超時時長。超過該時長,非核心線程就會被回收。
- 若線程池通過
allowCoreThreadTimeOut()方法設(shè)置allowCoreThreadTimeOut屬性為true,則該時長同樣會作用于核心線程,AsyncTask配置的線程池就是這樣設(shè)置的
-
unit:- 是一個枚舉,它表示的是
keepAliveTime的單位
- 是一個枚舉,它表示的是
-
workQueue:- 工作隊列,用于存放不能被及時處理掉的任務(wù)的一個隊列,通過線程池的
execute()方法提交的Runnable對象會存儲在該隊列中 - 它是一個
BlockingQueue類型。關(guān)于BlockingQueue,雖然它是Queue的子接口,但是它的主要作用并不是容器,而是作為線程同步的工具,他有一個特征,當生產(chǎn)者試圖向BlockingQueue放入(put)元素,如果隊列已滿,則該線程被阻塞;當消費者試圖從BlockingQueue取出(take)元素,如果隊列已空,則該線程被阻塞。
- 工作隊列,用于存放不能被及時處理掉的任務(wù)的一個隊列,通過線程池的
-
ThreadFactory:- 線程工廠,功能很簡單,就是為線程池提供創(chuàng)建新線程的功能
- 這是一個接口,可以通過自定義,做一些自定義線程名的操作
- 默認使用
DefaultThreadFactory:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
-
RejectedExecutionHandler:當任務(wù)無法被執(zhí)行時(超過線程最大容量maximumPoolSize并且workQueue已經(jīng)被排滿了)的處理策略,這里有四種任務(wù)拒絕類型-
CallerRunsPolicy:當線程池中的數(shù)量等于最大線程數(shù)時,重試添加當前的任務(wù);它會自動重復(fù)調(diào)用execute()方法。 -
AbortPolicy: 默認策略,當線程池中的數(shù)量等于最大線程數(shù)時拋java.util.concurrent.RejectedExecutionException異常 -
DiscardPolicy:當線程池中的數(shù)量等于最大線程數(shù)時,默默丟棄不能執(zhí)行的新加任務(wù),不報任何異常。 -
DiscardOldestPolicy:當線程池中的數(shù)量等于最大線程數(shù)時,拋棄線程池中工作隊列頭部的任務(wù)(即等待時間最久的任務(wù)),并執(zhí)行新傳入的任務(wù)。
-
線程池的工作過程
- 線程池剛創(chuàng)建時,里面沒有一個線程。
- 當調(diào)用
execute()方法添加一個任務(wù)時,線程池會做如下判斷:- 如果正在運行的線程數(shù)量小于
corePoolSize,那么馬上創(chuàng)建 核心線程 運行這個任務(wù); - 如果正在運行的線程數(shù)量大于或等于
corePoolSize,那么將這個任務(wù)放入工作隊列workQueue;- 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于
maximumPoolSize,那么還是要創(chuàng)建 非核心線程 立刻運行這個任務(wù); - 如果隊列滿了,而且正在運行的線程數(shù)量大于或等于
maximumPoolSize,那么線程池會拋出異常RejectExecutionException。
- 如果這時候隊列滿了,而且正在運行的線程數(shù)量小于
- 如果正在運行的線程數(shù)量小于
- 當一個線程完成任務(wù)時,它會從工作隊列
workQueue中取下一個任務(wù)來執(zhí)行。 - 當一個 非核心線程 無事可做,超過一定的時間(
keepAliveTime)時,線程池會判斷,如果當前運行的線程數(shù)大于corePoolSize,那么這個 非核心線程 就被停掉。所以線程池的所有任務(wù)完成后,它最終會收縮到corePoolSize的大小。
常見的 Java 線程池
Executors:提供一系列工廠方法來創(chuàng)建線程池,返回的線程池都實現(xiàn)了 ExecutorService 接口:
-
SingleThreadExecutor
- 單個線程的線程池,即線程池中每次只有一個線程在運行,單線程串行執(zhí)行任務(wù)。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
-
Executors類中方法定義:public static ExecutorService newSingleThreadExecutor() - 實現(xiàn):
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
-
FixedThreadPool
- 固定數(shù)量的線程池,只有核心線程,每提交一個任務(wù)就是一個線程,直到達到線程池的最大數(shù)量,然后后面進入等待隊列,直到前面的任務(wù)完成才繼續(xù)執(zhí)行。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。
-
Executors類中方法定義:public static ExecutorService newFixedThreadPool(int nThreads) - 實現(xiàn):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
CachedThreadPool
-
可緩存線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。其中,
SynchronousQueue是一個是緩沖區(qū)為1的阻塞隊列。 -
Executors類中方法定義:public static ExecutorService newCachedThreadPool() - 實現(xiàn):
-
可緩存線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。其中,
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
ScheduledThreadPool
-
核心線程池固定,大小無限制的線程池,支持定時和周期性的執(zhí)行線程。創(chuàng)建一個周期性執(zhí)行任務(wù)的線程池。如果閑置,非核心線程池會在
DEFAULT_KEEPALIVEMILLIS時間內(nèi)回收。
-
核心線程池固定,大小無限制的線程池,支持定時和周期性的執(zhí)行線程。創(chuàng)建一個周期性執(zhí)行任務(wù)的線程池。如果閑置,非核心線程池會在
-
Executors類中方法定義:public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)- 實現(xiàn):
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
注意事項:
- 線程池 ExecutorService 中可以使用
execute(t)或者submit(t)方法加入多個任務(wù),任務(wù)的執(zhí)行順序與加入順序無關(guān)- 關(guān)于
execute(t)和submit(t)的區(qū)別,請參見 Java ExecutorService
- 關(guān)于
- 如果線程池中某個線程因為執(zhí)行異常而結(jié)束,線程池會自動補充一個新線程
示例:
public class ThreadPool_Test {
public static void main(String[] args) {
ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
es.execute(new MyThread());
}
es.shutdown();
}
static class MyThread extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
}
以上代碼的輸出如下:
因為創(chuàng)建的線程池中只有一個線程,因此每次只會有一個任務(wù)執(zhí)行,getName() 都會得到相同的結(jié)果。
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
如果將 Executors.newSingleThreadExecutor() 更改為 Executors.newFixedThreadPool(2),創(chuàng)建一個有兩個線程的線程池,則輸出如下:
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
Thread Factory
在我們通過 Executors 來創(chuàng)建線程池的時候,還可以傳入一個參數(shù) ThreadFactory,例如:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread.
ThreadFactory 遵循了工廠模式,提供 newThread() 方法用來創(chuàng)建線程。
示例:我們希望創(chuàng)建一個線程,它能夠在產(chǎn)生 不能捕獲的 Runtime Exception 時能自動 Logging:
public class ThreadFactory_Test {
public static void main(String[] args) {
ExecutorService es = Executors.newSingleThreadExecutor(new MyThreadFactory());
es.execute(new MyThread());
es.shutdown();
}
static class MyThread extends Thread {
public void run() {
// Runtime exception
int i = 1 / 0;
}
}
static class MyThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.println(t.getName() + e.getMessage());
}
});
return t;
}
}
}
線程池的關(guān)閉
可以通過調(diào)用線程池的 shutdown 或 shutdownNow 方法來關(guān)閉線程池,但是它們的實現(xiàn)原理不同:
-
shutdown的原理是只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程 -
shutdownNow的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠無法終止。shutdownNow會首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表
只要調(diào)用了這兩個關(guān)閉方法的其中一個,isShutdown 方法就會返回 true。當所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用 isTerminaed 方法會返回 true。
線程池的監(jiān)控
通過線程池提供的參數(shù)進行監(jiān)控。
線程池里有一些屬性在監(jiān)控線程池的時候可以使用:
-
public long getTaskCount():線程池需要執(zhí)行的任務(wù)數(shù)量。 -
public long getCompletedTaskCount():線程池在運行過程中已完成的任務(wù)數(shù)量。小于或等于taskCount。 -
public int getPoolSize():線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。 -
public int getLargestPoolSize():線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。 -
public int getActiveCount():獲取活動的線程數(shù)。
通過擴展線程池進行監(jiān)控。通過繼承線程池并重寫線程池的 beforeExecute,afterExecute 和 terminated方法,我們可以在任務(wù)執(zhí)行前,執(zhí)行后和線程池關(guān)閉前干一些事情。如監(jiān)控任務(wù)的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等。這幾個方法在線程池里是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }
合理的配置線程池
任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
任務(wù)的優(yōu)先級:高,中和低。
任務(wù)的執(zhí)行時間:長,中和短。
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理:
- CPU密集型任務(wù)配置盡可能少的線程數(shù)量,如配置 Ncpu+1個線程的線程池。
- IO密集型任務(wù)則由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu。
- 混合型的任務(wù),如果可以拆分,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。
我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設(shè)備的CPU個數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列 PriorityBlockingQueue 來處理。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠不能執(zhí)行。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交 SQL 后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長 CPU 空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。
引用:
ThreadFactory usage in Java
Java線程池的簡介以及使用
關(guān)于線程池的執(zhí)行原則及配置參數(shù)詳解
聊聊并發(fā)(3):Java線程池的分析和使用