Java線程池,知道這些就夠了

合理利用線程池能夠帶來三個好處。第一:降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。第二:提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。但是要做到合理的利用線程池,必須對其原理了如指掌。

一. Java中的ThreadPoolExecutor類

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類。因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實(shí)現(xiàn)源碼。

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

從構(gòu)造函數(shù)中,我們可以看出,創(chuàng)建一個線程池需要輸入幾個參數(shù):

  • corePoolSize: 核心池的大小,這個參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
  • maximumPoolSize: 線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是如果使用了無界的任務(wù)隊(duì)列這個參數(shù)就沒什么效果。
  • keepAliveTime: 表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達(dá)到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;
  • TimeUnit: 參數(shù)keepAliveTime的時間單位,有7種取值:

TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒

* **workQueue**: 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。可以選擇以下幾個阻塞隊(duì)列:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue

1. ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
2. LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊(duì)列。
3. SynchronousQueue:一個不存儲元素的阻塞隊(duì)列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊(duì)列。
4. PriorityBlockingQueue:一個具有優(yōu)先級得無限阻塞隊(duì)列。

* **ThreadFactory**:線程工廠,主要用來創(chuàng)建線程

* **RejectedExecutionHandler**:當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時拋出異常。以下是JDK1.5提供的四種策略。

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

具體參數(shù)的配置與線程池的關(guān)系將在下一節(jié)講述。

通過ThreadPoolExecutor的代碼我們看到,ThreadPoolExecutor繼承了AbstractExecutorService,AbstractExecutorService是一個抽象類,它實(shí)現(xiàn)了ExecutorService接口。ExecutorService又是繼承了Executor接口
Excutor 整體結(jié)構(gòu)如下:
![](http://upload-images.jianshu.io/upload_images/327713-11bfcfd3cf6369d0.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務(wù),交由線程池去執(zhí)行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒有對其進(jìn)行重寫,這個方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。

ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎(chǔ)上提供了支持定時調(diào)度的功能。線程任務(wù)可以在一定延時時間后才被觸發(fā)執(zhí)行。

##二. 深入剖析線程池實(shí)現(xiàn)原理
###1. 線程池狀態(tài)

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

當(dāng)創(chuàng)建線程池后,初始時,線程池處于RUNNING狀態(tài);
如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時線程池不能夠接受新的任務(wù),它會等待所有任務(wù)執(zhí)行完畢;
如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務(wù),并且會去嘗試終止正在執(zhí)行的任務(wù);
當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。

###2. ThreadPoolExecutor 原理
####2.1 ThreadPoolExecutor的幾個重要屬性

private final BlockingQueue<Runnable> workQueue; //任務(wù)緩存隊(duì)列,用來存放等待執(zhí)行的任務(wù)
private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態(tài)鎖,對線程池狀態(tài)(比如線程池大小
//、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集

private volatile long keepAliveTime; //線程存貨時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設(shè)置存活時間
private volatile int corePoolSize; //核心池的大?。淳€程池中的線程數(shù)目大于這個參數(shù)時,提交的任務(wù)會被放進(jìn)任務(wù)緩存隊(duì)列)
private volatile int maximumPoolSize; //線程池最大能容忍的線程數(shù)

private volatile int poolSize; //線程池中當(dāng)前的線程數(shù)

private volatile RejectedExecutionHandler handler; //任務(wù)拒絕策略

private volatile ThreadFactory threadFactory; //線程工廠,用來創(chuàng)建線程

private int largestPoolSize; //用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)

private long completedTaskCount; //用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個數(shù)

####2.2 ThreadPoolExecutor的內(nèi)部工作原理
線程池的主要工作流程如下圖:
![](http://upload-images.jianshu.io/upload_images/327713-9d0e1d992971ccef.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

1. 如果當(dāng)前池大小poolSize小于corePoolSize,則創(chuàng)建新線程執(zhí)行任務(wù);
2. 如果當(dāng)前池大小poolSize大于corePoolSize,且等待隊(duì)列未滿,則進(jìn)入等待隊(duì)列;
3. 如果當(dāng)前池大小poolSize大于corePoolSize,且等待隊(duì)列已滿,且小于maxmumPoolSize,則創(chuàng)建新線程執(zhí)行任務(wù);
4. 如果當(dāng)前池大小poolSize大于corePoolSize,且等待隊(duì)列已滿,且大于maxmumPoolSize,則調(diào)用拒絕策略來處理該任務(wù)
5. 線程池里的每個線程執(zhí)行完任務(wù)后不會立即退出,而是會去檢查下等待隊(duì)列中是否有任務(wù)等待執(zhí)行,如果在keepAliveTime里等不到新任務(wù),那么線程就退出了。

####下面看看代碼實(shí)現(xiàn):
線程池最重要的方法是由 Executor 接口定義的 execute 方法 , 是任務(wù)提交的入口。
我們看看 ThreadPoolExecutor.execute(Runnable cmd) 的實(shí)現(xiàn):

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}

當(dāng)提交一個新的 Runnable 任務(wù):
分支1 : 如果當(dāng)前池大小小于 corePoolSize, 執(zhí)行 addIfUnderCorePoolSize(command) , 如果線程池處于運(yùn)行狀態(tài)且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 會做如下事情,將 Runnable 任務(wù)封裝成 Worker 任務(wù) , 創(chuàng)建新的 Thread ,執(zhí)行 Worker 任務(wù)。如果不滿足條件,則返回 false 。
代碼如下:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù)
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

分支2 : 如果大于 corePoolSize 或 1 失敗失敗,則:
* 如果等待隊(duì)列未滿,把 Runnable 任務(wù)加入到 workQueue 等待隊(duì)列workQueue .offer(command)
* 如果等待隊(duì)列已經(jīng)滿了,調(diào)用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本類似,只不過判斷條件是 poolSize < maximumPoolSize 。如果大于 maximumPoolSize ,則把 Runnable 任務(wù)交由 RejectedExecutionHandler 來處理。

####2.3 線程池的初始化
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會創(chuàng)建線程。
在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個方法辦到:
    ?   prestartCoreThread():初始化一個核心線程;
    ?   prestartAllCoreThreads():初始化所有核心線程

####2.4 任務(wù)緩存隊(duì)列及排隊(duì)策略
在前面我們多次提到了任務(wù)緩存隊(duì)列,即workQueue,它用來存放等待執(zhí)行的任務(wù)。
workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:
  1)ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時必須指定大?。?  2)LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
  3)synchronousQueue:這個隊(duì)列比較特殊,它不會保存提交的任務(wù),而是將直接新建一個線程來執(zhí)行新來的任務(wù)。
 
####2.5 任務(wù)拒絕策略
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:
  1. ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
  2. ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
  4. ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
####2.6 線程池的關(guān)閉
ThreadPoolExecutor提供了兩個方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
    ?   shutdown():不會立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)
    ?   shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)

####2.7 線程池容量的動態(tài)調(diào)整
ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
    ?   setCorePoolSize:設(shè)置核心池大小
    ?   setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當(dāng)上述參數(shù)從小變大時,ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。


###3. ScheduledThreadPoolExecutor

ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的集成。增加了定時觸發(fā)線程任務(wù)的功能。需要注意:從內(nèi)部實(shí)現(xiàn)看, ScheduleThreadPoolExecutor 使用的是 corePoolSize 線程和一個無界隊(duì)列的固定大小的池,所以調(diào)整 maximumPoolSize 沒有效果。無界隊(duì)列是一個內(nèi)部自定義的 DelayedWorkQueue 。
ScheduleThreadPoolExecutor 線程池接收定時任務(wù)的方法是 schedule ,看看內(nèi)部實(shí)現(xiàn):

public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));

     delayedExecute(t);  
     return t;  

}

以上代碼會初始化一個 RunnableScheduledFuture 類型的任務(wù) t, 并交給 delayedExecute 方法。 delayedExecute(t) 方法實(shí)現(xiàn)如下:

private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}
if (getPoolSize() < getCorePoolSize())
prestartCoreThread();

            super.getQueue().add(command);  

}

如果當(dāng)前線程池大小 poolSize 小于 CorePoolSize ,則創(chuàng)建一個新的線程,注意這里創(chuàng)建的線程是空的,不會把任務(wù)直接交給線程來做,而是把線程任務(wù)放到隊(duì)列里。因?yàn)槿蝿?wù)是要定時觸發(fā)的,所以不能直接交給線程去執(zhí)行。
**那如何做到定時觸發(fā)呢?**
關(guān)鍵在于DelayedWorkQueue,它代理了 DelayQueue ??梢哉J(rèn)為 DelayQueue 是這樣一個隊(duì)列(具體可以去看下源碼,不詳細(xì)分析):
1. 隊(duì)列里的元素按照任務(wù)的 delay 時間長短升序排序, delay 時間短的在隊(duì)頭, delay 時間長的在隊(duì)尾。
2. 從 DelayQueue 里 FIFO 的獲取一個元素的時候,不會直接返回 head ??赡軙枞?,等到 head 節(jié)點(diǎn)到達(dá) delay 時間后才能被獲取??梢钥聪?DelayQueue 的 take 方法實(shí)現(xiàn):

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);//等待delay時間
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}


博客地址:[Java線程池,知道這些就夠了](http://www.wjd1024.com/2017/07/03/java%E7%BA%BF%E7%A8%8B%E6%B1%A0/>)
參考資料:
<http://www.cnblogs.com/dolphin0520/p/3932921.html>
<http://developer.51cto.com/art/201203/321885.htm>
<http://ifeve.com/java-threadpool/>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容