知其然而知所以然
Java線程池早已稱為面試官必考的知識,作為被面試的你想必不想被這個簡單的問題難倒吧,來一起學(xué)習(xí)探索吧~
線程池是什么?
通俗可以理解為一個維護(hù)線程的池子。
為什么需要線程池?為什么不直接創(chuàng)建使用線程?
進(jìn)程需要執(zhí)行任務(wù)時,會將任務(wù)交給線程來處理,所以我們需要創(chuàng)建一個線程,處理完任務(wù)后,線程需要銷毀,完成一個線程的生命周期。 如果需要執(zhí)行多個任務(wù),那么就需要創(chuàng)建多個線程去執(zhí)行,由于創(chuàng)建線程和銷毀的過程比較耗時,所以我們可以用一個池子來維護(hù)這些線程,需要的時候去池子里拿,用完了還給池子,線程不會銷毀,省去了線程重復(fù)創(chuàng)建和銷毀的耗時操作。
基本使用
Executors
JDK為我們提供了Executors工具來創(chuàng)建和使用線程池,我們可以通過以下方法來創(chuàng)建不同的線程池:
- newFixedThreadPool
可以設(shè)置固定大小線程的線程池,也是就核心線程數(shù)是固定的
/**
* 創(chuàng)建一個固定大小的線程池
* @param nThreads 線程個數(shù)
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor
/**
* 創(chuàng)建只有一個線程的線程池
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool
線程緩存器,因為線程池沒有初始核心線程,同時使用的是SynchronousQueue阻塞隊列,所以在有任務(wù)處理的時候才會創(chuàng)建線程,線程空閑后還能存活60秒
/**
* 創(chuàng)建一個類似緩存的線程池
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
細(xì)心的我們發(fā)現(xiàn)以上三個方法創(chuàng)建線程池最終都會通過ThreadPoolExecutor的構(gòu)造方法來創(chuàng)建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
所以搞清楚這個構(gòu)造函數(shù)每個參數(shù)的作用非常重要。
實(shí)現(xiàn)原理
1. 原理流程
線程池的組成

上圖可以看到線程池的主要組成,在使用過程中我們也是通過調(diào)整線程數(shù)和隊列以及其他的參數(shù)來定制符合我們場景的線程池。
核心線程數(shù):線程池創(chuàng)建成功之后會創(chuàng)建核心線程數(shù),隨著線程池的關(guān)閉時銷毀,伴隨著線程池的整個生命周期。
非核心線程數(shù):核心線程數(shù)和隊列資源不足時會創(chuàng)建非核心線程數(shù),執(zhí)行完任務(wù)一定時間之后會被銷毀。
最大線程數(shù):最大線程數(shù) = 核心線程數(shù) + 非核心線程數(shù)。
阻塞隊列:當(dāng)核心線程不足以執(zhí)行任務(wù)時,會將任務(wù)先丟到阻塞隊列里等待,起到緩沖的作用。
執(zhí)行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
以下流程圖解讀了以上代碼執(zhí)行的流程,這也是線程池執(zhí)行流程的核心

生命周期

| 狀態(tài) | 描述 |
|---|---|
| RUNNING | 能接受新提交的任務(wù),并且也能處理阻塞隊列中的任務(wù) |
| SHUTDOWN | 關(guān)閉狀態(tài),不在接受新的任務(wù),但卻可以繼續(xù)處理阻塞隊列中的任務(wù) |
| STOP | 關(guān)閉狀態(tài),不在接受新的任務(wù),也不在繼續(xù)處理隊列中的任務(wù),中斷正在處理任務(wù)的線程 |
| TIDYING | 所有的任務(wù)都已經(jīng)終止,workCount(有效線程數(shù))為0 |
| TERMINATED | 在terminated( ) 方法執(zhí)行完后進(jìn)入該狀態(tài) |
2. 參數(shù)介紹
以下為創(chuàng)建線程池最終的構(gòu)造方法,我們想創(chuàng)建一個符合業(yè)務(wù)的線程池,就需要對下列的參數(shù)了如指掌。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize 核心線程數(shù)
maximumPoolSize 線程池最大線程數(shù)
keepAliveTime 非核心線程數(shù)非活躍時存活的時間
unit keepAliveTime值的單位
workQueue 阻塞隊列
| 名稱 | 說明 |
|---|---|
| ArrayBlockingQueue | 有界阻塞消息隊列,創(chuàng)建時指定隊列大小,創(chuàng)建后不能修改 |
| LinkedBlockingQueue | 無界阻塞隊列,其實(shí)也是有界隊列,最大值為Integer.MAX |
| PriorityBlockingQueue | 優(yōu)先級阻塞隊列 |
| SynchronousQueue | 只有一個容量的阻塞隊列 |
threadFactory 線程工廠
handler 拒絕策略
- AbortPolicy(默認(rèn)策略) 不在接受新的任務(wù),當(dāng)新的任務(wù)來時總是拋異常。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy 由調(diào)用線程去執(zhí)行任務(wù)
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardPolicy 默默的將任務(wù)丟棄,啥也不做
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy 丟棄隊列中長時間未處理的任務(wù)(靠近表頭的任務(wù)),并嘗試將新的任務(wù)放入隊列中
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
閱讀到這,你對線程池一定有深刻的印象,接下來我們看一下實(shí)際開發(fā)正確使用的姿勢。
實(shí)際應(yīng)用
對于新手來說,使用線程池的方式可能如下:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
int finalI = i;
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(finalI);
}
});
}
}
我們通過Executors工具創(chuàng)建一個固定大小為10的線程池,使用的是無界的LinkedBlockingQueue阻塞隊列,以上代碼看起來沒有任何問題,很快執(zhí)行結(jié)束。
我們稍微改動一下:
新增sleep模擬耗時的操作。
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
final int[] i = {0};
while (true){
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i[0]++);
}
});
}
}
以上代碼一直執(zhí)行,最終會出現(xiàn)產(chǎn)生內(nèi)存溢出的問題。
因為newFixedThreadPool使用的是LinkedBlockingQueue(無界阻塞隊列),當(dāng)線程池源源不斷的接受新的任務(wù),自己出來不過來的時候,會將任務(wù)暫存到隊列里,由于處理耗時,所以阻塞隊列里的數(shù)據(jù)會越來越多,最終把內(nèi)存打爆。
根據(jù)阿里的代碼規(guī)范,不允許通過Executors來創(chuàng)建線程池,必須通過new ThreadPoolExcutor構(gòu)造方法來創(chuàng)建線程池,如果你的Idea安裝了阿里的代碼檢查插件,會看到如下提示:

所以正確的姿勢是:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor. DiscardPolicy());
while (true){
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("running");
}
});
}
}
我們創(chuàng)建了核心線程數(shù)為10,且最大線程數(shù)為10,阻塞隊列容量為100,拒絕策略為丟棄新的任務(wù) 的線程池,實(shí)際使用時還得需要根據(jù)業(yè)務(wù)場景和部署的機(jī)器來設(shè)置線程數(shù)。
不推薦使用Executors來創(chuàng)建線程池,通過ThreadPoolExecutor來創(chuàng)建。
題外話
當(dāng)一個服務(wù)里有一些比較耗時的操作,比如批量導(dǎo)入、導(dǎo)出等耗時的IO操作,我們需要將這些操作的線程池和其他業(yè)務(wù)的線程池區(qū)分開,否則我們在進(jìn)行導(dǎo)入導(dǎo)出操作時,會影響到其他的業(yè)務(wù)。所以我們會定制多個不同的線程池來處理不同類型的任務(wù)。
以上為個人的理解,如果差錯,煩請不吝嗇指出。