一、引言
一般在開發(fā)過程中,一個(gè)功能是運(yùn)行時(shí)長太久了,一般是通過什么方式去優(yōu)化的?
異步/多線程,對(duì)于一個(gè)業(yè)務(wù)方法而言,如果其中的調(diào)用鏈太長勢(shì)必會(huì)引起程序運(yùn)行時(shí)間延長,導(dǎo)致整個(gè)系統(tǒng)吞吐來量下降,而我們使用多線程方式來對(duì)該方法的調(diào)用鏈進(jìn)行優(yōu)化,對(duì)于一些耦合度不是特別高的調(diào)用關(guān)系可以直接通過多線程來走異步的方式進(jìn)行處理,大大的縮短了程序的運(yùn)行時(shí)長,但是如果我們的多線程創(chuàng)建方式是通過new Thread();這種方式去進(jìn)行顯式創(chuàng)建的話它真的可以嗎?答案是不可以,Why?答案如下:-
如果在生產(chǎn)環(huán)境使用
new Thread();這種方式去進(jìn)行顯式創(chuàng)建線程會(huì)帶來什么后果?-
1. OOM: 如果當(dāng)前方法突遇高并發(fā)情況,假設(shè)此時(shí)來了1000個(gè)請(qǐng)求,而按傳統(tǒng)的網(wǎng)絡(luò)模型是BIO,此時(shí)服務(wù)器會(huì)開1000個(gè)線程來處理這1000個(gè)請(qǐng)求(不考慮WEB容器的最大線程數(shù)配置),當(dāng)1000個(gè)請(qǐng)求執(zhí)行時(shí)又會(huì)發(fā)現(xiàn)此方法中存在
new Thread();創(chuàng)建線程,此時(shí)每個(gè)執(zhí)行請(qǐng)求的線程又會(huì)創(chuàng)建一個(gè)線程,此時(shí)就會(huì)出現(xiàn)1000*2=2000個(gè)線程的情況出現(xiàn),而在一個(gè)程序中創(chuàng)建線程是需要向JVM申請(qǐng)內(nèi)存分配的,但是此時(shí)大量線程在同一瞬間向JVM申請(qǐng)分配內(nèi)存,此時(shí)會(huì)很容易造成內(nèi)存溢出(OOM)的情況發(fā)生。 - 2. 資源開銷與耗時(shí): Java對(duì)象的生命周期大致包括三個(gè)階段:對(duì)象的創(chuàng)建,對(duì)象的使用,對(duì)象的清除。因此,對(duì)象的生命周期長度可用如下的表達(dá)式表示:Object = O1 + O2 +O3。其中O1表示對(duì)象的創(chuàng)建時(shí)間,O2表示對(duì)象的使用時(shí)間,而O3則表示其清除(垃圾回收)時(shí)間。由此,我們可以看出,只有O2是真正有效的時(shí)間,而O1、O3則是對(duì)象本身的開銷。當(dāng)我們?nèi)?chuàng)建一個(gè)線程時(shí)也是一樣,因?yàn)榫€程在Java中其實(shí)也是一個(gè)Thread類的實(shí)例,所以對(duì)于線程而言,其實(shí)它的創(chuàng)建(申請(qǐng)內(nèi)存分配、JVM向OS提交線程映射進(jìn)程申請(qǐng)、OS真實(shí)線程映射)和銷毀對(duì)資源是開銷非常大的并且非常耗時(shí)的。
-
3. 不可管理性: 對(duì)于
new Thread();的顯示創(chuàng)建出來的線程是無法管理的,一旦CPU調(diào)度成功,此線程的可管理性幾乎為零。
-
1. OOM: 如果當(dāng)前方法突遇高并發(fā)情況,假設(shè)此時(shí)來了1000個(gè)請(qǐng)求,而按傳統(tǒng)的網(wǎng)絡(luò)模型是BIO,此時(shí)服務(wù)器會(huì)開1000個(gè)線程來處理這1000個(gè)請(qǐng)求(不考慮WEB容器的最大線程數(shù)配置),當(dāng)1000個(gè)請(qǐng)求執(zhí)行時(shí)又會(huì)發(fā)現(xiàn)此方法中存在
-
那么我們使用線程池能給我們帶來什么好處?
- 降低資源消耗:通過重用已經(jīng)創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的消耗。
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí)不需要等待線程創(chuàng)建就可以立即執(zhí)行。
- 提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調(diào)優(yōu)和監(jiān)控。
而在Java中為我們提供四種原生線程池,它們都是基于ThreadPoolExecutor類實(shí)現(xiàn)的,所以ThreadPoolExecutor類這也是我們待會(huì)兒分析線程池原理時(shí)的重點(diǎn)~
二、JDK提供的原生線程池
在Java中,JDK通過Executors類為我們提供了四種封裝好的線程池類型(ForkJoinPool不在本章探討范圍之內(nèi)),源碼如下:
//創(chuàng)建一個(gè)定長的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//創(chuàng)建一個(gè)單線程的線程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
//創(chuàng)建一個(gè)可緩存支持靈活回收的線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
//創(chuàng)建一個(gè)支持周期執(zhí)行任務(wù)的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
在上面的源碼中,其實(shí)我們通過觀察發(fā)現(xiàn)JDK為我們提供的四種線程池內(nèi)部都是通過封裝ThreadPoolExecutor類的構(gòu)造函數(shù)來進(jìn)行線程池的初始化的,所以我們先來理清楚線程池“家族”體系。

從上圖中我們可以得知,線程池的最上層接口是Executor,而這個(gè)接口定義了一個(gè)核心方法execute(Runnable command),當(dāng)我們使用它時(shí),需要傳遞一個(gè)Runnable類型的異步任務(wù)作為參數(shù)。我們看一下Executor接口的定義:
public interface Executor {
// 提交任務(wù)到線程池并執(zhí)行的方法
void execute(Runnable command);
}
而Executor接口是一個(gè)函數(shù)式接口,其中只定義了一個(gè)方法,但是我們?cè)谑褂镁€程池的時(shí)候?yàn)槭裁茨軌蛘{(diào)用的方法卻會(huì)有那么多呢?因?yàn)檫€有一個(gè)ExecutorService接口,它繼承了Executor接口作為Executor接口的子接口,為Executor接口提供了很多拓展方法。我們接著看ExecutorService接口的實(shí)現(xiàn):
```java
public interface ExecutorService extends Executor {
// 等待線程池執(zhí)行完成已接收的任何后關(guān)閉線程池,將線程池置為SHUNTDOWM狀態(tài)
void shutdown();
// 嘗試主動(dòng)終止線程池中的所有正在執(zhí)行的任務(wù)并返回未執(zhí)行的任務(wù)列表,
// 將線程池置為STOP狀態(tài)
List<Runnable> shutdownNow();
// 判斷線程池是否已關(guān)閉:線程池調(diào)用過shutdown或者shutdownNow后返回true
boolean isShutdown();
// 判斷線程池中的子線程是否已全部終止
// 當(dāng)調(diào)用shutdown后全部任務(wù)執(zhí)行完成返回true或調(diào)用shutdownNow成功后返回true
boolean isTerminated();
// 配合shutdown使用,在調(diào)用shutdown后調(diào)用該方法,讓線程池在指定時(shí)間內(nèi)關(guān)閉,
// 不管任務(wù)是否執(zhí)行完成,在指定時(shí)間內(nèi)還在執(zhí)行任務(wù)則拋出異常中斷線程
// 注意:有時(shí)能夠關(guān)閉線程池單并不能完全保證線程池中子線程停止執(zhí)行
// 比如子線程中用到 BufferedReader,那么需要配合shutdownNow主動(dòng)中斷所有子線程
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 向線程池提交一個(gè)Callable類型的異步任務(wù),當(dāng)線程池執(zhí)行后返回執(zhí)行結(jié)果
<T> Future<T> submit(Callable<T> task);
// 向線程池提交一個(gè)Runnable類型的異步任務(wù),線程池執(zhí)行完成后將返回指定類型的執(zhí)行結(jié)果
<T> Future<T> submit(Runnable task, T result);
// 向線程池提交一個(gè)Runnable類型的異步任務(wù),線程池執(zhí)行完成后執(zhí)行的結(jié)果
Future<?> submit(Runnable task);
// 傳入一個(gè)Collection類型的異步任務(wù)集合,批量執(zhí)行并返回執(zhí)行結(jié)果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 傳入一個(gè)Collection類型的異步任務(wù)集合,在指定的時(shí)間內(nèi)批量執(zhí)行并返回執(zhí)行
// 結(jié)果,如果超時(shí)則拋出異常中斷線程
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 傳入一個(gè)Collection類型的異步任務(wù)集合,返回第一個(gè)執(zhí)行完成的結(jié)果并終止其他線程
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 傳入一個(gè)Collection類型的異步任務(wù)集合,在指定的時(shí)間內(nèi)返回第一個(gè)執(zhí)行完成的結(jié)果
// 并終止其他線程,如果超時(shí)則拋出異常中斷線程
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
通過上面的代碼我們會(huì)發(fā)現(xiàn)ExecutorService的確繼承了Executor接口,作為Executor拓展接口提供了很多其他的方法以便于開發(fā)人員使用線程池,而Executor和ExecutorService接口中的方法實(shí)現(xiàn)全部都是由ThreadPoolExecutor類來完成的,而ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實(shí)現(xiàn):
public abstract class AbstractExecutorService implements ExecutorService {
// 將異步任務(wù)包裝為Future,傳遞Runnable類型異步任務(wù),聲明返回類型,返回一個(gè)RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
// 將異步任務(wù)包裝為Future,傳遞Callable類型異步任務(wù),返回一個(gè)RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
// 在指定的時(shí)間內(nèi)執(zhí)行傳入的異步任務(wù)集合,返回最后一個(gè)任務(wù)執(zhí)行
//執(zhí)行集合tasks結(jié)果是最后一個(gè)執(zhí)行結(jié)束的任務(wù)結(jié)果
//可以設(shè)置超時(shí) timed為true并且nanos是未來的一個(gè)時(shí)間
//任何一個(gè)任務(wù)完成都將會(huì)返回結(jié)果
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//傳入的任務(wù)集合不能為null
if (tasks == null)
throw new NullPointerException();
//傳入的任務(wù)數(shù)不能是0
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//滿足上面的校驗(yàn)后將任務(wù)分裝到一個(gè)ArrayList中
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//并且創(chuàng)建一個(gè)執(zhí)行器傳入this
//這里簡單講述他的執(zhí)行原理,傳入this會(huì)使用傳入的this(類型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù),當(dāng)submit提交任務(wù)的時(shí)候回將任務(wù)
//封裝為一個(gè)內(nèi)部的Future并且重寫他的done而此方法就是在future完成的時(shí)候調(diào)用的,而他的寫法則是將當(dāng)前完成的future添加到esc
//維護(hù)的結(jié)果隊(duì)列中
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
//創(chuàng)建一個(gè)執(zhí)行異常,以便后面拋出
ExecutionException ee = null;
//如果開啟了超時(shí)則計(jì)算死線時(shí)間如果時(shí)間是0則代表沒有開啟執(zhí)行超時(shí)
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//獲取任務(wù)的迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();
//先獲取迭代器中的第一個(gè)任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器
futures.add(ecs.submit(it.next()));
//前面記錄的任務(wù)數(shù)減一
--ntasks;
//當(dāng)前激活數(shù)為1
int active = 1;
//進(jìn)入死循環(huán)
for (;;) {
//獲取剛才提價(jià)的任務(wù)是否完成如果完成則f不是null否則為null
Future<T> f = ecs.poll();
//如果為null則代表任務(wù)還在繼續(xù)
if (f == null) {
//如果當(dāng)前任務(wù)大于0 說明除了剛才的任務(wù)還有別的任務(wù)存在
if (ntasks > 0) {
//則任務(wù)數(shù)減一
--ntasks;
//并且再次提交新的任務(wù)
futures.add(ecs.submit(it.next()));
//當(dāng)前的存活的執(zhí)行任務(wù)加一
++active;
}
//如果當(dāng)前存活任務(wù)數(shù)是0則代表沒有任務(wù)在執(zhí)行了從而跳出循環(huán)
else if (active == 0)
break;
//如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時(shí)時(shí)間
else if (timed) {
//則設(shè)置指定的超時(shí)時(shí)間獲取
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
//等待執(zhí)行超時(shí)還沒有獲取到則拋出超時(shí)異常
if (f == null)
throw new TimeoutException();
//否則使用當(dāng)前時(shí)間計(jì)算剩下的超時(shí)時(shí)間用于下一個(gè)循環(huán)使用
nanos = deadline - System.nanoTime();
}
//如果沒有設(shè)置超時(shí)則直接獲取任務(wù)
else
f = ecs.take();
}
//如果獲取到了任務(wù)結(jié)果f!=null
if (f != null) {
//激活數(shù)減一
--active;
try {
//返回獲取到的結(jié)果
return f.get();
//如果獲取結(jié)果出錯(cuò)則包裝異常
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//如果異常不是null則拋出如果是則創(chuàng)建一個(gè)
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//其他任務(wù)則設(shè)置取消
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,
TimeUnit unit) throws InterruptedException {
};
}
(Executor接口有一個(gè)子接口ExecutorService,而AbstracExecutorService類又實(shí)現(xiàn)了ExecutorService接口,而ThreadPoolExcutor正是AbstrcExecutorService的子類)
到這里,大家應(yīng)該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個(gè)之間的關(guān)系了。
Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個(gè)非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒有對(duì)其進(jìn)行重寫,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在以后章節(jié)講述)。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。
而Executor接口最終被ThreadPoolExecutor類實(shí)現(xiàn)。而且ThreadPoolExecutor是線程池體系的核心類,此類的構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
ThreadPoolExecutor類總共為我們提供了四個(gè)構(gòu)造方法,前面三個(gè)構(gòu)造方法都是調(diào)用最后一個(gè)全參的構(gòu)造函數(shù)來完成工作的,最后一個(gè)全參的構(gòu)造方法需要我們傳遞7個(gè)參數(shù),這七個(gè)參數(shù)的具體含義如下:
-
構(gòu)造函數(shù)參數(shù)列表:
- corePoolSize: 核心線程池的大小,如果核心線程池有空閑位置,這時(shí)新的任務(wù)就會(huì)被核心線程池新建一個(gè)線程執(zhí)行,執(zhí)行完畢后不會(huì)銷毀線程,線程會(huì)進(jìn)入緩存隊(duì)列等待再次被運(yùn)行。
- maximunPoolSize: 線程池能創(chuàng)建最大的線程數(shù)量。如果核心線程池和緩存隊(duì)列都已經(jīng)滿了,新的任務(wù)進(jìn)來就會(huì)創(chuàng)建新的線程來執(zhí)行。但是數(shù)量不能超過maximunPoolSize,否側(cè)會(huì)采取拒絕接受任務(wù)策略,我們下面會(huì)具體分析。
- keepAliveTime: 非核心線程能夠空閑的最長時(shí)間,超過時(shí)間,線程終止。這個(gè)參數(shù)默認(rèn)只有在線程數(shù)量超過核心線程池大小時(shí)才會(huì)起作用。只要線程數(shù)量不超過核心線程大小,就不會(huì)起作用(當(dāng)然如果設(shè)置了allowCoreThreadTimeOut(true)線程池中的核心線程也受該參數(shù)的影響)。
-
unit: 時(shí)間單位,和keepAliveTime配合使用,可選擇項(xiàng)如下:
- TimeUnit.DAYS:天
- TimeUnit.HOURS:小時(shí)
- TimeUnit.MINUTES:分鐘
- TimeUnit.SECONDS:秒
- TimeUnit.MILLISECONDS:毫秒
- TimeUnit.MICROSECONDS:微妙
- TimeUnit.NANOSECONDS:納秒
-
workQueue: 任務(wù)隊(duì)列,用來存放等待被執(zhí)行的任務(wù),一般為阻塞隊(duì)列(BlockingQueue)三種常用為:(可自定義阻塞隊(duì)列)。
- ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時(shí)必須指定大??;
- LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
- SynchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來執(zhí)行新來的任務(wù)。
- threadFactory: 線程工廠,用來創(chuàng)建線程,一般有三種選擇策略(可自定義)。
-
handler: 任務(wù)拒絕策略,線程數(shù)量大于最大線程數(shù)就會(huì)采用拒絕處理策略。ThreadPoolExecutor中為我們提供了四種默認(rèn)策略可選擇(可自定義):
- ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
而當(dāng)我們需要使用線程池時(shí),我們可以通過調(diào)用Executors中為我們封裝好的方法創(chuàng)建線程池,也可以通過自己對(duì)于ThreadPoolExecutor的構(gòu)造方法進(jìn)行封裝自定義線程池(后面會(huì)詳細(xì)談到),示例如下:
public class ThreadPoolDemo {
public static void main(String[] args) {
/*
* 創(chuàng)建可緩存的線程池
* 優(yōu)點(diǎn):當(dāng)線程池中線程執(zhí)行完任務(wù)后會(huì)將線程緩存起來,默認(rèn)60s后空閑線程會(huì)自動(dòng)回收
* 缺點(diǎn):任然存在由于并發(fā)過高導(dǎo)致瞬間創(chuàng)建大量線程產(chǎn)生的OOM
*/
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
cachedThreadPool.execute(() -> {
System.out.println("我是遞交到cachedThreadPool的異步任務(wù)....竹子....");
});
/*
* 創(chuàng)建定長的線程池
* 優(yōu)點(diǎn):可以避免由于并發(fā)過高導(dǎo)致瞬間創(chuàng)建大量線程產(chǎn)生的OOM
* 缺點(diǎn):
* 1. 線程創(chuàng)建后永不釋放線程資源
* 2. 任務(wù)隊(duì)列最大長度為Integer.MAX_VALUE,并發(fā)時(shí)會(huì)創(chuàng)建大量的任務(wù)導(dǎo)致OOM
*/
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
Future<?> futureResult = fixedThreadPool.submit(() -> {
System.out.println("我是遞交到fixedThreadPool的異步任務(wù)....竹子...");
return "竹子";
});
try {
// 得到執(zhí)行后返回結(jié)果
String str = (String) futureResult.get();
System.out.println("我是遞交到fixedThreadPool的異步任務(wù)執(zhí)行完成后的返回結(jié)果:" + str);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
/*
* 創(chuàng)建定長可支持周期調(diào)度的線程池
* 優(yōu)點(diǎn):可以支持按時(shí)調(diào)度執(zhí)行任務(wù)
* 缺點(diǎn):
* 1. 線程創(chuàng)建后永不釋放線程資源
* 2. 任務(wù)隊(duì)列最大長度為Integer.MAX_VALUE,并發(fā)時(shí)會(huì)創(chuàng)建大量的任務(wù)導(dǎo)致OOM
*/
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(()->{
System.out.println("我是遞交到scheduledThreadPool十秒鐘之后執(zhí)行的異步任務(wù)....熊貓...");
},10,TimeUnit.SECONDS);
/*
* 創(chuàng)建單線程的線程池
* 優(yōu)點(diǎn):可以支持線程池任務(wù)的執(zhí)行按照遞交的順序先進(jìn)先出(FIFO)
* 缺點(diǎn):單線程效率比不上前面的三種線程池(前面的線程池都存在多線程并行執(zhí)行任務(wù))
*/
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
List<Callable<String>> callables = new ArrayList<>();
callables.add(()->{
System.out.println("我是遞交到singleThreadExecutor的異步任務(wù)...熊貓1號(hào)...");
return "熊貓一號(hào)";
});
callables.add(()->{
System.out.println("我是遞交到singleThreadExecutor的異步任務(wù)...熊貓2號(hào)...");
return "熊貓二號(hào)";
});
callables.add(()->{
System.out.println("我是遞交到singleThreadExecutor的異步任務(wù)...熊貓3號(hào)...");
return "熊貓三號(hào)";
});
try {
// 接收批量執(zhí)行后的結(jié)果
List<Future<String>> futures = singleThreadExecutor.invokeAll(callables);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.shutdown();
fixedThreadPool.shutdown();
scheduledThreadPool.shutdown();
singleThreadExecutor.shutdown();
/* 執(zhí)行結(jié)果:
* 我是遞交到cachedThreadPool的異步任務(wù)....竹子....
*
* 我是遞交到fixedThreadPool的異步任務(wù)....竹子...
* 我是遞交到fixedThreadPool的異步任務(wù)執(zhí)行完成后的返回結(jié)果:竹子
*
* 我是遞交到singleThreadExecutor的異步任務(wù)...熊貓1號(hào)...
* 我是遞交到singleThreadExecutor的異步任務(wù)...熊貓2號(hào)...
* 我是遞交到singleThreadExecutor的異步任務(wù)...熊貓3號(hào)...
*
* 我是遞交到scheduledThreadPool十秒鐘之后執(zhí)行的異步任務(wù)....熊貓...
*/
}
}
在上面的案例中我們使用到了execute()、schedule()、submit()、invokeAll()等方法向線程池中遞交任務(wù),但是當(dāng)我們跟進(jìn)源碼分析會(huì)發(fā)現(xiàn),線程池遞交任務(wù)的核心就是Executor接口定義的核心方法execute(Runnabel command),所以我們?nèi)绻治鼍€程池原理的重點(diǎn)就在此方法。
三、深入源碼剖析線程池工作原理
在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實(shí)現(xiàn)原理,將從下面幾個(gè)方面講解:
-
1. 線程池狀態(tài)控制參數(shù)ctl
要了解線程池,我們首先要了解的線程池里面的狀態(tài)控制的參數(shù) ctl,這個(gè)線程池的狀態(tài)控制參數(shù)是一個(gè)原子操作的 AtomicInteger,這個(gè)ctl包含兩個(gè)參數(shù) :runState:當(dāng)前線程池的狀態(tài)workerCount:激活(工作)的線程數(shù)
-
它的低29位用于存放當(dāng)前的線程數(shù), 因此一個(gè)線程池在理論上最大的線程數(shù)是 536870911; 高 3 位是用于表示當(dāng)前線程池的狀態(tài), 其中高三位的值和狀態(tài)對(duì)應(yīng)如下:
111: RUNNING:線程池初始化(創(chuàng)建出來之后)處于此狀態(tài),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行處理。000: SHUTDOWN:當(dāng)調(diào)用shutdown()方法時(shí)改為此狀態(tài),在此狀態(tài)時(shí),不接收新任務(wù),但能處理已添加的任務(wù)。001: STOP:調(diào)用shutdownNow()方法時(shí)處于此狀態(tài),在此狀態(tài)時(shí),不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)嘗試中斷正在處理的任務(wù)。010: TIDYING:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也為空時(shí),就會(huì)由 SHUTDOWN -> TIDYING。|| 當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實(shí)現(xiàn)。110: TERMINATED:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING -> TERMINATED。線程池徹底終止,就變成TERMINATED狀態(tài)。
為了能夠使用 ctl 線程池提供了三個(gè)方法:
// 獲取線程池的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取線程池的工作線程數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據(jù)工作線程數(shù)和線程池狀態(tài)獲取 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
-
2. 任務(wù)的執(zhí)行
如果想使用線程池就必須通過 execute 這個(gè)方法來向線程池提交任務(wù),而這個(gè)方法也是線程池的核心,所以我們來看代碼:
execute:
public void execute(Runnable command) {
//如果傳遞的任務(wù)為空則拋出空指針異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果工作線程數(shù)小于核心線程數(shù),
if (workerCountOf(c) < corePoolSize) {
//執(zhí)行addWork,提交為核心線程,提交成功return。提交失敗重新獲取ctl
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果工作線程數(shù)大于核心線程數(shù),則檢查線程池狀態(tài)是否是正在運(yùn)行,且將新線程向阻塞隊(duì)列提交。
if (isRunning(c) && workQueue.offer(command)) {
//recheck 需要再次檢查,主要目的是判斷加入到阻塞隊(duì)里中的線程是否可以被執(zhí)行
int recheck = ctl.get();
//如果線程池狀態(tài)不為running,將任務(wù)從阻塞隊(duì)列里面移除,啟用拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池的工作線程為零,則調(diào)用addWoker提交任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加非核心線程失敗,拒絕
else if (!addWorker(command, false))
reject(command);
}

addWoker:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取線程池狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判斷是否可以添加任務(wù)。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取工作線程數(shù)量
int wc = workerCountOf(c);
//是否大于線程池上限,是否大于核心線程數(shù),或者最大線程數(shù)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 增加工作線程數(shù)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果線程池狀態(tài)改變,回到開始重新來
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
//上面的邏輯是考慮是否能夠添加線程,如果可以就cas的增加工作線程數(shù)量
//下面正式啟動(dòng)線程
try {
//新建worker
w = new Worker(firstTask);
//獲取當(dāng)前線程
final Thread t = w.thread;
if (t != null) {
//獲取可重入鎖
final ReentrantLock mainLock = this.mainLock;
//鎖住
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 線程處于RUNNING狀態(tài)
// 或者線程處于SHUTDOWN狀態(tài),且firstTask == null(可能是workQueue中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的worker線程執(zhí)行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 當(dāng)前線程已經(jīng)啟動(dòng),拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers 是一個(gè) HashSet 必須在 lock的情況下操作。
workers.add(w);
int s = workers.size();
//設(shè)置 largeestPoolSize 標(biāo)記workAdded
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功,啟動(dòng)線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//啟動(dòng)線程失敗,回滾。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

先看看 addWork()的兩個(gè)參數(shù),第一個(gè)是需要提交的線程Runnable firstTask,第二個(gè)參數(shù)是 boolean 類型,表示是否為核心線程。
execute() 中有三處調(diào)用了 addWork()我們逐一分析。
- 第一次,條件
if (workerCountOf(c) < corePoolSize)這個(gè)很好理解,工作線程數(shù)少于核心線程數(shù),提交任務(wù)。所以addWorker(command, true)。 - 第二次,如果
workerCountOf(recheck) == 0如果worker的數(shù)量為0,那就addWorker(null,false)。為什么這里是 null ?之前已經(jīng)把command提交到阻塞隊(duì)列了workQueue.offer(command)。所以提交一個(gè)空線程,直接從阻塞隊(duì)列里面取就可以了。 - 第三次,如果線程池沒有
RUNNING或者offer阻塞隊(duì)列失敗,addWorker(command,false),很好理解,對(duì)應(yīng)的就是,阻塞隊(duì)列滿了,將任務(wù)提交到,非核心線程池。與最大線程池比較。
至此,重新歸納execute()的邏輯應(yīng)該是:
如果當(dāng)前運(yùn)行的線程,少于corePoolSize,則創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)。
如果運(yùn)行的線程等于或多于corePoolSize,將任務(wù)加入BlockingQueue。
如果加入BlockingQueue成功,需要二次檢查線程池的狀態(tài)如果線程池沒有處于Running,則從BlockingQueue移除任務(wù),啟動(dòng)拒絕策略。
如果線程池處于Running狀態(tài),則檢查工作線程(worker)是否為0。如果為0,則創(chuàng)建新的線程來處理任務(wù)。如果啟動(dòng)線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕策略拒絕。
如果加入BlockingQueue。失敗,則創(chuàng)建新的線程來處理任務(wù)。
如果啟動(dòng)線程數(shù)大于maximumPoolSize,任務(wù)將被拒絕策略拒絕。
image.png
3. 線程池中的線程初始化
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。
在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個(gè)方法辦到:
prestartCoreThread():初始化一個(gè)核心線程;
prestartAllCoreThreads():初始化所有核心線程;
下面是這2個(gè)方法的實(shí)現(xiàn):
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意傳進(jìn)去的參數(shù)是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null
++n;
return n;
}
注意上面?zhèn)鬟M(jìn)去的參數(shù)是null,根據(jù)第2小節(jié)的分析可知如果傳進(jìn)去的參數(shù)為null,則最后執(zhí)行線程會(huì)阻塞在getTask方法中的r = workQueue.take();即等待任務(wù)隊(duì)列中有任務(wù)。
4. 任務(wù)緩存隊(duì)列及排隊(duì)策略
見線程池參數(shù),在選擇線程池任務(wù)隊(duì)列時(shí)的阻塞時(shí)隊(duì)列就決定了這個(gè)線程池的任務(wù)緩存及排隊(duì)策略。
5. 任務(wù)拒絕策略
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會(huì)采取任務(wù)拒絕策略,具體拒絕策略參考線程池參數(shù)列表。
6. 線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù);
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù);
7. 線程池容量的動(dòng)態(tài)調(diào)整
ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設(shè)置核心池大小
setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。
四、深入源碼分析線程池線程復(fù)用原理
通過前面分析線程池的工作原理我們可以得知一個(gè)結(jié)論:在線程池內(nèi)部關(guān)于線程的調(diào)度執(zhí)行都是被封裝成一個(gè)Worker對(duì)象來操作的。而當(dāng)我們使用Worker.thread.start()啟動(dòng)線程時(shí),JVM會(huì)調(diào)用Worker中重寫的run()方法執(zhí)行,而Worker.run()方法源碼如下:
/** Delegates main run loop to outer runWorker */
// 將線程運(yùn)行主邏輯交給外部 Worker.runWorker()
public void run() {runWorker(this);}
我們進(jìn)一步跟進(jìn)Worker.runWorker()源碼:
// 線程執(zhí)行邏輯:執(zhí)行循環(huán)并反復(fù)從隊(duì)列獲取任務(wù)并執(zhí)行
final void runWorker(Worker w) {
// 獲取當(dāng)前執(zhí)行線程
Thread wt = Thread.currentThread();
// 獲取當(dāng)前傳遞進(jìn)線程池的方法
Runnable task = w.firstTask;
// 將Worker.firstTask 置為空
w.firstTask = null;
// 允許發(fā)生線程中斷
w.unlock(); // allow interrupts
// 突然執(zhí)行完成標(biāo)志:是否因?yàn)楫惓L鲅h(huán)
boolean completedAbruptly = true;
try {
// 1. 如果線程池外部傳遞了任務(wù)則直接執(zhí)行外部傳遞的任務(wù)
// 2. 如果沒有獲取到外部傳遞進(jìn)來的任務(wù)則調(diào)用getTask()去隊(duì)列中獲取任務(wù)并執(zhí)行
// 2.1. 如果在任務(wù)隊(duì)列中獲取到了任務(wù)則直接執(zhí)行已經(jīng)獲取的任務(wù)
// 2.2. 如果任務(wù)隊(duì)列為空,沒有任務(wù)則反復(fù)執(zhí)行空循環(huán)阻塞當(dāng)前線程死亡
while (task != null || (task = getTask()) != null) {
// 禁止線程中斷(防止線程在執(zhí)行過程中中斷導(dǎo)致不可恢復(fù)的錯(cuò)誤)
w.lock();
// 二次確認(rèn)線程池以及當(dāng)前工作線程狀態(tài):
// 如果線程池停止,確保當(dāng)前線程被中斷
// If pool is stopping, ensure thread is interrupted;
// 如果線程池為停止,請(qǐng)確保當(dāng)前線程未被中斷
// if not, ensure thread is not interrupted. This
// 如果是第二種情況則需要重新檢測(cè)并且清除中斷
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 調(diào)用任務(wù)的run方法,而不是start()方法,因?yàn)閃orker本身就是一個(gè)線程類
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子方法
afterExecute(task, thrown);
}
} finally {
// 執(zhí)行完成后將獲取的任務(wù)置空
task = null;
// 執(zhí)行完成后自增當(dāng)前工作線程執(zhí)行的任務(wù)數(shù)量
w.completedTasks++;
// 釋放Worker中自實(shí)現(xiàn)的鎖
w.unlock();
}
}
// 如果線程能夠執(zhí)行到最后一行代表線程執(zhí)行過程中沒有由于發(fā)生異常導(dǎo)致跳出循環(huán),將 突然結(jié)束 標(biāo)志改為false
completedAbruptly = false;
} finally {
// 執(zhí)行回收工作線程的邏輯
processWorkerExit(w, completedAbruptly);
}
}
如上就是關(guān)于線程池復(fù)用的原理,簡單來說就是通過一個(gè)死循環(huán)讓當(dāng)前線程一直處于運(yùn)行狀態(tài),阻止OS將當(dāng)前工作線程回收,從而做到線程的復(fù)用。而關(guān)于死循環(huán)的條件則比較簡單,判斷task是否為空,在調(diào)用方法執(zhí)行的時(shí)候會(huì)先獲取外部傳遞的任務(wù),如果沒有獲取到外部傳遞的任務(wù)則調(diào)用getTask()方法獲取任務(wù)隊(duì)列中的任務(wù)并執(zhí)行:
// 如果返回null,在runWorker方法中會(huì)執(zhí)行processWorkerExit,即關(guān)閉該線程。
private Runnable getTask() {
// 表示上次從隊(duì)列獲取任務(wù)是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
// 死循環(huán)標(biāo)志位
retry:
for (;;) {
int c = ctl.get(); // 獲取ctl
int rs = runStateOf(c); // 解析ctl獲取當(dāng)前線程池運(yùn)行狀態(tài)
// Check if queue empty only if necessary.
// 如果rs >= STOP,或者 rs=SHUTDOWN且隊(duì)列為空,此時(shí)不再接收新任務(wù),將WorkerCount遞減并返回null。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 自旋CAS遞減workerCount直到成功
return null;
}
// timed用于判斷是否需要重試控制
boolean timed; // Are workers subject to culling?
for (;;) {
// allowCoreThreadTimeOut默認(rèn)是false,核心線程不進(jìn)行超時(shí)控制,
// 當(dāng)線程數(shù)量大于corePoolSize時(shí)需要進(jìn)行超時(shí)控制
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果wc <= maximumPoolSize ,且上次從隊(duì)列獲取任務(wù)超時(shí)或本次需要進(jìn)行超時(shí)控制,
// 則跳出內(nèi)層循環(huán)。
// timedOut=true表示上次從隊(duì)列獲取元素超時(shí),說明隊(duì)列在上次獲取的keepAliveTime時(shí)間內(nèi)是空的。
// timed=true說明線程數(shù)量大于corePoolSize。
// 所以timedOut=true和timed=true同時(shí)滿足則說明當(dāng)前線程已經(jīng)空閑了keepAliveTime時(shí)間,
// 并且線程池的數(shù)量大于corePoolSize。這時(shí)就需要關(guān)閉多余的空閑線程
//(即compareAndDecrementWorkerCount并返回null)。
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
// 如果線程數(shù)量大于maximumPoolSize,或者上次從隊(duì)列獲取任務(wù)超時(shí)且本次需要進(jìn)行
// 超時(shí)控制。需要遞減WorkerCount,如果遞減成功則返回null
if (compareAndDecrementWorkerCount(c))
return null;
//檢查線程池運(yùn)行狀態(tài)是否改變。如果改變,那么繼續(xù)外層循環(huán),如果未改變,那么繼續(xù)內(nèi)層循環(huán)。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//超時(shí)方式獲取,注意keepAliveTime為超出corePoolSize大小的線程的空閑存活時(shí)間
workQueue.take(); //阻塞方式獲取,如果隊(duì)列為空阻塞當(dāng)前線程
if (r != null)
return r;
timedOut = true; //如果超時(shí),繼續(xù)循環(huán)。
} catch (InterruptedException retry) {
//如果發(fā)生中斷,則將timedOut置為false,繼續(xù)循環(huán)
timedOut = false;
}
}
}
在getTask()方法中的邏輯也比較簡單,前期效驗(yàn)線程池狀態(tài),一切正常時(shí)開始任務(wù)的獲取邏輯,但是值得注意的是這里使用的是阻塞時(shí)獲取方式,也就代表如果任務(wù)隊(duì)列中沒有任務(wù),當(dāng)前線程會(huì)阻塞等待,直到任務(wù)隊(duì)列中有新的任務(wù)時(shí)才會(huì)獲取并返回執(zhí)行,不過如果線程池設(shè)置了存活時(shí)間,那么當(dāng)前線程會(huì)阻塞到存活時(shí)間的閾值,如果超出存活時(shí)間會(huì)返回null。而如果返回null,則在runWorker方法中會(huì)執(zhí)行processWorkerExit,即關(guān)閉該工作線程,從而實(shí)現(xiàn)了線程池的另一個(gè)功能: 線程池內(nèi)線程空閑時(shí)間超過給定的存活時(shí)間時(shí)自動(dòng)回收該線程資源。
下面我們?cè)賮砜纯磒rocessWorkerExit方法的實(shí)現(xiàn):
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly=false,說明是由getTask返回null導(dǎo)致的,WorkerCount遞減的操作已經(jīng)執(zhí)行
// 如果completedAbruptly=true,說明是由執(zhí)行任務(wù)的過程中發(fā)生異常導(dǎo)致,需要進(jìn)行WorkerCount遞減的操作
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從workers中刪除當(dāng)前worker,對(duì)workers更新需要加mainLock鎖
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根據(jù)線程池狀態(tài)判斷是否結(jié)束線程池
tryTerminate();
// 如果是異常結(jié)束(completedAbruptly=true),需要重新調(diào)用addWorker()增加一個(gè)線程,保持線程數(shù)量
// 如果是由getTask()返回null導(dǎo)致的線程結(jié)束,需要進(jìn)行以下判斷:
// 1)如果allowCoreThreadTimeOut=true且隊(duì)列不為空,那么需要至少保證有一個(gè)線程
// 2)如果allowCoreThreadTimeOut=false,那么需要保證線程數(shù)大于等于corePoolSize
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
關(guān)于線程池中工作線程的銷毀則是由processWorkerExit()方法來完成的,在這個(gè)方法中首先會(huì)判斷當(dāng)前線程是因?yàn)閳?zhí)行出現(xiàn)異常還是超出存活時(shí)間導(dǎo)致需要發(fā)生回收的。如果是因?yàn)槌龃婊顣r(shí)間,先判斷線程池狀態(tài)之后再從工作集中移除當(dāng)前線程即可。如果是由于異常導(dǎo)致的則需要先對(duì)線程池的工作線程數(shù)進(jìn)行自減,然后再移除工作集中的工作線程,最后再調(diào)用addWorker()添加一個(gè)工作線程保證線程池內(nèi)工作線程的數(shù)量。在上面的源碼中我們也會(huì)看到tryTerminate()這個(gè)方法,那么我們也簡單分析一下它的源碼:
//根據(jù)線程池狀態(tài)判斷是否結(jié)束線程池
final void tryTerminate() {
for (;;) {
int c = ctl.get(); // 獲取ctl
// 如果線程池運(yùn)行狀態(tài)是RUNNING,或者大于等于TIDYING,或者運(yùn)行狀態(tài)為
// SHUTDOWN且隊(duì)列為空,則直接return返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作線程數(shù)不為0,則中斷一個(gè)空閑線程并return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 嘗試將線程池狀態(tài)設(shè)置為TIDYING狀態(tài)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//如果CAS成功,執(zhí)行terminated()鉤子方法
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
如果線程池狀態(tài)不處于STOP或者TERMINATED狀態(tài)則直接返回,反之執(zhí)行terminated()鉤子函數(shù)。
到此關(guān)于線程池的復(fù)用原理就告一段落了,關(guān)于線程池的復(fù)用原理只需要理解死循環(huán)+getTask即可大致明白線程池復(fù)用的思維。
五、自定義線程池實(shí)戰(zhàn)
再前面我們?cè)岬?,JDK為我們提供的已經(jīng)封裝好的線程池實(shí)現(xiàn)在高并發(fā)情況下都會(huì)存在OOM的風(fēng)險(xiǎn),而通過前面分析我們也可以得知,JDK提供的線程池也是通過封裝ThreadPoolExecutor的構(gòu)造,所以我們?cè)谏a(chǎn)環(huán)境時(shí)更應(yīng)該自定義線程池來規(guī)避這些風(fēng)險(xiǎn)以及更好的操作線程池。注:在《阿里巴巴java開發(fā)規(guī)范手冊(cè)》中明確規(guī)定如下:

所以在一般生產(chǎn)環(huán)境使用創(chuàng)建線程都是通過自定義線程池來使用線程資源,代碼如下:
public static void main(String[] args){
// 線程工廠可通過 implements ThreadFactory接口自定義
// 任務(wù)拒絕策略可通過 implements RejectedExecutionHandler接口自定義
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10;i++){
final int num = i;
threadPoolExecutor.execute(()->{
System.out.println("線程:" + Thread.currentThread().getName() + "正在執(zhí)行:" + num + "個(gè)任務(wù)");
});
System.out.println("線程池中線程數(shù)目:" + threadPoolExecutor.getPoolSize() + ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" + threadPoolExecutor.getQueue().size() + ",已執(zhí)行玩別的任務(wù)數(shù)目:"+threadPoolExecutor.getCompletedTaskCount());
}
}
五、線程池參數(shù)合理配置
本節(jié)來討論一個(gè)比較重要的話題:如何合理配置線程池大小,參考如下:

六、參考
- 《Java并發(fā)編程的藝術(shù)》
- 《java并發(fā)編程實(shí)戰(zhàn)》
