一、簡介
什么是線程池
線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務。
為什么要用線程池
如果并發(fā)請求數(shù)量很多,但每個線程執(zhí)行的時間很短,就會出現(xiàn)頻繁的創(chuàng)建和銷毀線程。如此一來,會大大降低系統(tǒng)的效率,可能頻繁創(chuàng)建和銷毀線程的時間、資源開銷要大于實際工作的所需。
正是由于這個問題,所以有必要引入線程池。使用 線程池的好處 有以下幾點:
- 降低資源消耗 - 通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應速度 - 當任務到達時,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性 - 線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。但是要做到合理的利用線程池,必須對其原理了如指掌。
二、Executor 框架
Executor 框架是一個根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制。
核心 API 概述
Executor 框架核心 API 如下:
-
Executor- 運行任務的簡單接口。 -
ExecutorService- 擴展了Executor接口。擴展能力:- 支持有返回值的線程;
- 支持管理線程的生命周期。
-
ScheduledExecutorService- 擴展了ExecutorService接口。擴展能力:支持定期執(zhí)行任務。 -
AbstractExecutorService-ExecutorService接口的默認實現(xiàn)。 -
ThreadPoolExecutor- Executor 框架最核心的類,它繼承了AbstractExecutorService類。 -
ScheduledThreadPoolExecutor-ScheduledExecutorService接口的實現(xiàn),一個可定時調(diào)度任務的線程池。 -
Executors- 可以通過調(diào)用Executors的靜態(tài)工廠方法來創(chuàng)建線程池并返回一個ExecutorService對象。
Executor
Executor 接口中只定義了一個 execute 方法,用于接收一個 Runnable 對象。
public interface Executor {
void execute(Runnable command);
}
ExecutorService
ExecutorService 接口繼承了 Executor 接口,它還提供了 invokeAll、invokeAny、shutdown、submit 等方法。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
從其支持的方法定義,不難看出:相比于 Executor 接口,ExecutorService 接口主要的擴展是:
- 支持有返回值的線程 -
sumbit、invokeAll、invokeAny方法中都支持傳入Callable對象。 - 支持管理線程生命周期 -
shutdown、shutdownNow、isShutdown等方法。
ScheduledExecutorService
ScheduledExecutorService 接口擴展了 ExecutorService 接口。
它除了支持前面兩個接口的所有能力以外,還支持定時調(diào)度線程。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
其擴展的接口提供以下能力:
-
schedule方法可以在指定的延時后執(zhí)行一個Runnable或者Callable任務。 -
scheduleAtFixedRate方法和scheduleWithFixedDelay方法可以按照指定時間間隔,定期執(zhí)行任務。
三、ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor 類是 Executor 框架中最核心的類。所以,本文將著重講述一下這個類。
重要字段
ThreadPoolExecutor 有以下重要字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
參數(shù)說明:
-
ctl- 用于控制線程池的運行狀態(tài)和線程池中的有效線程數(shù)量。它包含兩部分的信息:- 線程池的運行狀態(tài) (
runState) - 線程池內(nèi)有效線程的數(shù)量 (
workerCount) - 可以看到,
ctl使用了Integer類型來保存,高 3 位保存runState,低 29 位保存workerCount。COUNT_BITS就是 29,CAPACITY就是 1 左移 29 位減 1(29 個 1),這個常量表示workerCount的上限值,大約是 5 億。
- 線程池的運行狀態(tài) (
- 運行狀態(tài) - 線程池一共有五種運行狀態(tài):
-
RUNNING- 運行狀態(tài)。接受新任務,并且也能處理阻塞隊列中的任務。 -
SHUTDOWN- 關(guān)閉狀態(tài)。不接受新任務,但可以處理阻塞隊列中的任務。- 在線程池處于
RUNNING狀態(tài)時,調(diào)用shutdown方法會使線程池進入到該狀態(tài)。 -
finalize方法在執(zhí)行過程中也會調(diào)用shutdown方法進入該狀態(tài)。
- 在線程池處于
-
STOP- 停止狀態(tài)。不接受新任務,也不處理隊列中的任務。會中斷正在處理任務的線程。在線程池處于RUNNING或SHUTDOWN狀態(tài)時,調(diào)用shutdownNow方法會使線程池進入到該狀態(tài)。 -
TIDYING- 整理狀態(tài)。如果所有的任務都已終止了,workerCount(有效線程數(shù)) 為 0,線程池進入該狀態(tài)后會調(diào)用terminated方法進入TERMINATED狀態(tài)。 -
TERMINATED- 已終止狀態(tài)。在terminated方法執(zhí)行完后進入該狀態(tài)。默認terminated方法中什么也沒有做。進入TERMINATED的條件如下:- 線程池不是
RUNNING狀態(tài); - 線程池狀態(tài)不是
TIDYING狀態(tài)或TERMINATED狀態(tài); - 如果線程池狀態(tài)是
SHUTDOWN并且workerQueue為空; -
workerCount為 0; - 設置
TIDYING狀態(tài)成功。
- 線程池不是
-
構(gòu)造方法
ThreadPoolExecutor 有四個構(gòu)造方法,前三個都是基于第四個實現(xiàn)。第四個構(gòu)造方法定義如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
參數(shù)說明:
-
corePoolSize- 核心線程數(shù)量。當有新任務通過execute方法提交時 ,線程池會執(zhí)行以下判斷:- 如果運行的線程數(shù)少于
corePoolSize,則創(chuàng)建新線程來處理任務,即使線程池中的其他線程是空閑的。 - 如果線程池中的線程數(shù)量大于等于
corePoolSize且小于maximumPoolSize,則只有當workQueue滿時才創(chuàng)建新的線程去處理任務; - 如果設置的
corePoolSize和maximumPoolSize相同,則創(chuàng)建的線程池的大小是固定的。這時如果有新任務提交,若workQueue未滿,則將請求放入workQueue中,等待有空閑的線程去從workQueue中取任務并處理; - 如果運行的線程數(shù)量大于等于
maximumPoolSize,這時如果workQueue已經(jīng)滿了,則使用handler所指定的策略來處理任務; - 所以,任務提交時,判斷的順序為
corePoolSize=>workQueue=>maximumPoolSize。
- 如果運行的線程數(shù)少于
-
maximumPoolSize- 最大線程數(shù)量。- 如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務。
- 值得注意的是:如果使用了無界的任務隊列這個參數(shù)就沒什么效果。
-
keepAliveTime:線程保持活動的時間。- 當線程池中的線程數(shù)量大于
corePoolSize的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime。 - 所以,如果任務很多,并且每個任務執(zhí)行的時間比較短,可以調(diào)大這個時間,提高線程的利用率。
- 當線程池中的線程數(shù)量大于
-
unit-keepAliveTime的時間單位。有 7 種取值??蛇x的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。 -
workQueue- 等待執(zhí)行的任務隊列。用于保存等待執(zhí)行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。-
ArrayBlockingQueue- 有界阻塞隊列。- 此隊列是基于數(shù)組的先進先出隊列(FIFO)。
- 此隊列創(chuàng)建時必須指定大小。
-
LinkedBlockingQueue- 無界阻塞隊列。- 此隊列是基于鏈表的先進先出隊列(FIFO)。
- 如果創(chuàng)建時沒有指定此隊列大小,則默認為
Integer.MAX_VALUE。 - 吞吐量通常要高于
ArrayBlockingQueue。 - 使用
LinkedBlockingQueue意味著:maximumPoolSize將不起作用,線程池能創(chuàng)建的最大線程數(shù)為corePoolSize,因為任務等待隊列是無界隊列。 -
Executors.newFixedThreadPool使用了這個隊列。
-
SynchronousQueue- 不會保存提交的任務,而是將直接新建一個線程來執(zhí)行新來的任務。- 每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。
- 吞吐量通常要高于
LinkedBlockingQueue。 -
Executors.newCachedThreadPool使用了這個隊列。
-
PriorityBlockingQueue- 具有優(yōu)先級的無界阻塞隊列。
-
-
threadFactory- 線程工廠。可以通過線程工廠給每個創(chuàng)建出來的線程設置更有意義的名字。 -
handler- 飽和策略。它是RejectedExecutionHandler類型的變量。當隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務。線程池支持以下策略:-
AbortPolicy- 丟棄任務并拋出異常。這也是默認策略。 -
DiscardPolicy- 丟棄任務,但不拋出異常。 -
DiscardOldestPolicy- 丟棄隊列最前面的任務,然后重新嘗試執(zhí)行任務(重復此過程)。 -
CallerRunsPolicy- 只用調(diào)用者所在的線程來運行任務。 - 如果以上策略都不能滿足需要,也可以通過實現(xiàn)
RejectedExecutionHandler接口來定制處理策略。如記錄日志或持久化不能處理的任務。
-
execute 方法
默認情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創(chuàng)建線程。
提交任務可以使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執(zhí)行。
execute 方法工作流程如下:
- 如果
workerCount < corePoolSize,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務; - 如果
workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務添加到該阻塞隊列中; - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務; - 如果
workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊列已滿,則根據(jù)拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
其他重要方法
在 ThreadPoolExecutor 類中還有一些重要的方法:
-
submit- 類似于execute,但是針對的是有返回值的線程。submit方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn)。ThreadPoolExecutor直接復用AbstractExecutorService的submit方法。 -
shutdown- 不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執(zhí)行完后才終止,但再也不會接受新的任務。- 將線程池切換到
SHUTDOWN狀態(tài); - 并調(diào)用
interruptIdleWorkers方法請求中斷所有空閑的 worker; - 最后調(diào)用
tryTerminate嘗試結(jié)束線程池。
- 將線程池切換到
-
shutdownNow- 立即終止線程池,并嘗試打斷正在執(zhí)行的任務,并且清空任務緩存隊列,返回尚未執(zhí)行的任務。與shutdown方法類似,不同的地方在于:- 設置狀態(tài)為
STOP; - 中斷所有工作線程,無論是否是空閑的;
- 取出阻塞隊列中沒有被執(zhí)行的任務并返回。
- 設置狀態(tài)為
-
isShutdown- 調(diào)用了shutdown或shutdownNow方法后,isShutdown方法就會返回 true。 -
isTerminaed- 當所有的任務都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回 true。 -
setCorePoolSize- 設置核心線程數(shù)大小。 -
setMaximumPoolSize- 設置最大線程數(shù)大小。 -
getTaskCount- 線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務總數(shù); -
getCompletedTaskCount- 線程池已完成的任務數(shù)量,該值小于等于taskCount; -
getLargestPoolSize- 線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過,也就是達到了maximumPoolSize; -
getPoolSize- 線程池當前的線程數(shù)量; -
getActiveCount- 當前線程池中正在執(zhí)行任務的線程數(shù)量。
使用示例
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("線程池中線程數(shù)目:%s,隊列中等待執(zhí)行的任務數(shù)目:%s,已執(zhí)行玩別的任務數(shù)目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}
static class MyThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
}
}
四、Executors
JDK 的 Executors 類中提供了幾種具有代表性的線程池,這些線程池 都是基于 ThreadPoolExecutor 的定制化實現(xiàn)。
在實際使用線程池的場景中,我們往往不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具有代表性的線程池實例。
newSingleThreadExecutor
創(chuàng)建一個單線程的線程池。
只會創(chuàng)建唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。 如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它 。
單工作線程最大的特點是:可保證順序地執(zhí)行各個任務。
示例:
public class SingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
});
}
executorService.shutdown();
}
}
newFixedThreadPool
創(chuàng)建一個固定大小的線程池。
每次提交一個任務就會新創(chuàng)建一個工作線程,如果工作線程數(shù)量達到線程池最大線程數(shù),則將提交的任務存入到阻塞隊列中。
FixedThreadPool 是一個典型且優(yōu)秀的線程池,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時所耗的開銷的優(yōu)點。但是,在線程池空閑時,即線程池中沒有可運行任務時,它不會釋放工作線程,還會占用一定的系統(tǒng)資源。
示例:
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
});
}
executorService.shutdown();
}
}
newCachedThreadPool
創(chuàng)建一個可緩存的線程池。
- 如果線程池長度超過處理任務所需要的線程數(shù),就會回收部分空閑的線程;
- 如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為 1 分鐘),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創(chuàng)建一個工作線程。
- 此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說 JVM)能夠創(chuàng)建的最大線程大小。 因此,使用
CachedThreadPool時,一定要注意控制任務的數(shù)量,否則,由于大量線程同時運行,很有會造成系統(tǒng)癱瘓。
示例:
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
});
}
executorService.shutdown();
}
}
newScheduleThreadPool
創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務的需求。
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
schedule();
scheduleAtFixedRate();
}
private static void schedule() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
}, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
}, 1, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
}