1.線程池是什么
線程池(Thread Pool)是一種基于池化思想管理線程的工具,經常出現(xiàn)在多線程服務器中,如MySQL。
我們都知道線程的創(chuàng)建和銷毀都需要一定的資源開銷,降低了計算機的整體性能。那么有沒有一種辦法能避免頻繁的線程創(chuàng)建和銷毀呢?基于此就引出了線程池的概念,使用線程池可以帶來一系列好處:
- 降低資源消耗:通過池化技術重復利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
- 提高響應速度:任務到達時,無需等待線程創(chuàng)建即可立即執(zhí)行。
- 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建,不僅會消耗系統(tǒng)資源,還會因為線程的不合理分布導致資源調度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進行統(tǒng)一的分配、調優(yōu)和監(jiān)控。
- 提供更多更強大的功能:線程池具備可拓展性,允許開發(fā)人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執(zhí)行或定期執(zhí)行。
2.線程池的設計
Java中JDK8的線程池核心實現(xiàn)類是ThreadPoolExecutor,我們首先來看一下ThreadPoolExecutor的UML類圖,了解下ThreadPoolExecutor的繼承關系。

ThreadPoolExecutor實現(xiàn)的頂層接口是Executor,頂層接口Executor提供了一種思想:將任務提交和任務執(zhí)行進行解耦。用戶無需關注如何創(chuàng)建線程,如何調度線程來執(zhí)行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執(zhí)行器(Executor)中,由Executor框架完成線程的調配和任務的執(zhí)行部分。ExecutorService接口增加了一些能力:(1)擴充執(zhí)行任務的能力,補充可以為一個或一批異步任務生成Future的方法;(2)提供了管控線程池的方法,比如停止線程池的運行。AbstractExecutorService則是上層的抽象類,將執(zhí)行任務的流程串聯(lián)了起來,保證下層的實現(xiàn)只需關注一個執(zhí)行任務的方法即可。最下層的實現(xiàn)類ThreadPoolExecutor實現(xiàn)最復雜的運行部分,ThreadPoolExecutor將會一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結合從而執(zhí)行并行任務。
3.線程池的實現(xiàn)
3.1. 通過Executors提供四種線程池:
1、newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收線程,若無可回收,則新建線程;線程池無限大,當執(zhí)行第二個任務時第一個任務已完成,會復用執(zhí)行第一個任務的線程,而不是新建線程。
2、newFixedThreadPool創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待;初始線程數(shù)和最大線程數(shù)一樣,如果要執(zhí)行的線程大于初始線程數(shù),則會將多余的線程任務加入到緩存隊列中等待執(zhí)行。
3、newScheduledThreadPool創(chuàng)建一個定長線程池,支持定時及周期性任務的執(zhí)行;
4、newSingleThreadExecutor創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO,LIFO,優(yōu)先級)執(zhí)行;
通過Executors創(chuàng)建的線程池有個致命缺點,以newCachedThreadPool來說
public class ThreadPoolDemo {
public static void main(String[] args)throws Exception {
ExecutorService threadPool = Executors.newCachedThreadPool();
}
}
我們點進newCachedThreadPool()方法會看到如下內容:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//我們發(fā)現(xiàn)第二個參數(shù)為Integer.MAX_VALUE,也就是最大的工作線程數(shù)為Integer.MAX_VALUE,如果超高并發(fā)過來會直接OOM。
所以我們得通過new ThreadPoolExecutor()來自己指定參數(shù)。
3.2通過ThreadPoolExecutor來創(chuàng)建線程池
構造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:表示核心線程的數(shù)量,就是線程池中最好要保證的線程數(shù)量。
- maximumPoolSize:最大線程數(shù)量,就是當任務很多時,能工作的最大線程數(shù)。
- keepAliveTime:線程的空閑時間。就是當實際工作的線程數(shù)小于最大線程數(shù)的時候,就有部分線程是處于空閑的狀態(tài),當這些空閑線程的空閑時間到達keepAliveTime就會被干掉。但是要保證最少線程數(shù)為corePoolSize。
- unit:空閑時間都單位。
- workQueue:工作隊列,就是當工作線程數(shù)達到corePoolSize之后的任務會放入到工作隊列中。
- threadFactory:線程工廠,用于創(chuàng)建線程。
- handler:拒絕處理器,就是當工作線程到達最大工作線程并且工作隊列已經滿了情況下,線程池應該怎么做。
3.3線程池的工作流程。
流程圖如下:

當任務過來時,首先會先去判斷線程池中工作的線程數(shù)量是否到達核心線程數(shù),如果沒達到就直接執(zhí)行,如果達到了就查看工作隊列是否滿。如果沒滿就將任務放入到工作隊列中,如果滿了就增加工作線程數(shù)來處理任務。如果工作線程和隊列都滿了的話就會用制定的策略去拒絕任務。
3.4 拒絕策略
AbortPolicy(默認):直接拋出異常RejectedExecutionException異常阻止系統(tǒng)正常運行。
CallerRunsPolicy:調用者運行是一種調節(jié)機制,該策略既不會拋棄任務,也不會拋棄異常,而是將某些任務回退到調用者,從而降低新任務的流量。
DiscardOldestPolicy:丟棄隊列中等待最久的任務,然后把當前任務加入隊列中嘗試再次提交當前任務。
DiscardPolicy:直接丟棄任務,不予處理也不拋出異常。如果允許任務丟失,這是最好的一種方案。
代碼演示拒絕策略:
1.AbortPolicy
public static void main(String[] args)throws Exception {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,5,1L,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
poolExecutor.shutdown();
}
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務");
}

也就是說明當工作線程和工作隊列都滿了之后線程池會拒絕任務直接報錯。
2.CallerRunsPolicy
public static void main(String[] args)throws Exception {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,5,1L,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 0; i < 10; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
poolExecutor.shutdown();
}
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務");
}

也就是說明當工作線程和工作隊列都滿了之后會將任務返還給調用線程池的人,讓他去處理。
3.DiscardOldestPolicy和DiscardPolicy
public static void main(String[] args)throws Exception {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,5,1L,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
for (int i = 0; i < 10; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
poolExecutor.shutdown();
}
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務");
}

一共輸出了10條記錄,說明有一條消息被拋棄了。