個人博客地址 http://dandanlove.com/
再使用線程池之前,我們應(yīng)該了解為什么需要使用線程池。進行執(zhí)行任務(wù)(task)的時候我們一般情況是new Thread進行執(zhí)行,如果進行大量的并發(fā)任務(wù)的時候呢?
大多數(shù)并發(fā)應(yīng)用程序是圍繞執(zhí)行任務(wù)(task)進行管理的。所謂的任務(wù)就是抽象、離散的工作單元(unit of work)。
當(dāng)我們進行大量并發(fā)操作的時候,每任務(wù)每線程(thread-per-task)方法存在一些實際的缺陷,尤其是在需要創(chuàng)建大量的線程時會更加突出。
- 線程生命周期的消耗(創(chuàng)建和銷毀)
- 資源消耗量(內(nèi)存占用和管理,競爭CPU資源)
- 穩(wěn)定性(線程數(shù)量超過JVM限制是,可能會出現(xiàn)OutofMemoryError)
在這種情況下為線程管理出現(xiàn)了線程池(ThreadPool),它是管理一個工作者線程的同構(gòu)池(homogeneous pool)。線程池是與工作隊列(work queue)緊密綁定的。
Executor是一個簡單的借口,它是基于生產(chǎn)者-消費者模式,用于異步任務(wù)執(zhí)行,而且支持很多不同類型的任務(wù)執(zhí)行策略。
Executor框架使用的是Runnable作為其任務(wù)的基本表達形式。Runnable只是相當(dāng)有限的抽象,但是Run不能反悔一個值或者拋出受檢查的異常。Callable恰好可以將Runnable做封裝返回其執(zhí)行結(jié)果。
一個Executor執(zhí)行的任務(wù)的生命周期有4個階段:創(chuàng)建、提交、開始和完成。Future可以描述任務(wù)的生命周期,并提供了相關(guān)的方法來獲得、取消以及驗證任務(wù)是否已經(jīng)完成還是被取消。
public interface Executor {
void execute(Runnable var1);
}
@FunctionalInterface
public interface Runnable {
void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean var1);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
- corePoolSize:線程池的核心線程數(shù),一般情況下不管有沒有任務(wù)都會一直在線程池中一直存活,只有在 ThreadPoolExecutor 中的方法allowCoreThreadTimeOut(boolean value) 設(shè)置為 true 時,閑置的核心線程會存在超時機制,如果在指定時間沒有新任務(wù)來時,核心線程也會被終止,而這個時間間隔由第3個屬性 keepAliveTime 指定。
- maximumPoolSize:線程池所能容納的最大線程數(shù),當(dāng)活動的線程數(shù)達到這個值后,后續(xù)的新任務(wù)將會被阻塞。
- keepAliveTime:控制線程閑置時的超時時長,超過則終止該線程。一般情況下用于非核心線程,只有在 ThreadPoolExecutor 中的方法 allowCoreThreadTimeOut(boolean value) 設(shè)置為 true時,也作用于核心線程。
- unit:用于指定 keepAliveTime 參數(shù)的時間單位,TimeUnit 是個 enum 枚舉類型,常用的有:TimeUnit.HOURS(小時)、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。
- workQueue:線程池的任務(wù)隊列,通過線程池的 execute(Runnable command) 方法會將任務(wù) Runnable 存儲在隊列中(阻塞隊列)。
1)ArrayBlockingQueue:
是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2)LinkedBlockingQueue:
一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3)SynchronousQueue:
一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。
4)PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列。- threadFactory:線程工廠,它是一個接口,用來為線程池創(chuàng)建新線程的。
newCachedThreadPool
newCachedThreadPool是一個可根據(jù)需要創(chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時將重用它們。對于執(zhí)行很多短期異步任務(wù)的程序而言,這些線程池通??商岣叱绦蛐阅堋U{(diào)用 execute() 將重用以前構(gòu)造的線程(如果線程可用)。如果現(xiàn)有線程沒有可用的,則創(chuàng)建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構(gòu)造方法創(chuàng)建具有類似屬性但細節(jié)不同(例如超時參數(shù))的線程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
newSingleThreadExecutor 創(chuàng)建是一個單線程池,也就是該線程池只有一個線程在工作,所有的任務(wù)是串行執(zhí)行的,如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它,此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newFixedThreadPool
newFixedThreadPool 創(chuàng)建固定大小的線程池,每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小,線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newScheduledThreadPool
newScheduledThreadPool 創(chuàng)建一個大小無限的線程池,此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}