為什么使用線程池
池化技術相比大家已經屢見不鮮了,線程池、數據庫連接池、Http 連接池等等都是對這個思想的應用。池化技術的思想主要是為了減少每次獲取資源的消耗,提高對資源的利用率。
線程池提供了一種限制和管理資源(包括執(zhí)行一個任務)。每個線程池還維護一些基本統計信息,例如已完成任務的數量。
使用線程池的好處:
- 降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統資源,還會降低系統的穩(wěn)定性,使用線程池可以進行統一的分配,調優(yōu)和監(jiān)控。
四種線程池
線程池創(chuàng)建最終是通過ThreadPoolExecutor創(chuàng)建的。根據構造方法傳參數的不同又可以分為不同的線程池。
ThreadPoolExecutor 類圖結構:

ThreadPoolExecutor構造方法為:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
參數說明
corePoolSize:線程池中核心線程數的數量maximumPoolSize:在線程池中允許存在的最大線程數keepAliveTime:當存在的線程數大于corePoolSize,那么會找到空閑線程去銷毀,此參數是設置空閑多久的線程才被銷毀。unit:時間單位 TimeUnit工具類-
workQueue:工作隊列,線程池中的當前線程數大于核心線程的話,那么接下來的任務會放入到隊列中- ArrayBlockingQueue:基于數組有界阻塞隊列
- LinkedBlockingQueue:基于鏈表阻塞隊列
- SynchronousQueue:不存儲元素的阻塞隊列(讀寫須等待一并進行)
- PriorityBlockingQueue:支持優(yōu)先級的無界隊列
threadFactory:在創(chuàng)建線程的時候,通過工廠模式來生產線程。這個參數就是設置我們自定義的線程創(chuàng)建工廠。-
handler:拒絕策略,如果超過了最大線程數,那么就會執(zhí)行我們設置的拒絕策略- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執(zhí)行當前任務。
- DiscardPolicy:不處理,丟棄掉。
線程池處理邏輯:
- 前
corePoolSize個任務時,來一個任務就創(chuàng)建一個線程 - 如果當前線程池的線程數大于了
corePoolSize那么接下來再來的任務就會放入到我們上面設置的workQueue隊列中 - 如果此時
workQueue也滿了,那么再來任務時,就會新建臨時線程,那么此時如果我們設置了keepAliveTime或者設置了allowCoreThreadTimeOut,那么系統就會進行線程的活性檢查,一旦超時便銷毀線程 - 如果此時線程池中的當前線程大于了
maximumPoolSize最大線程數,那么就會執(zhí)行我們剛才設置的handler拒絕策略

線程池分類:

newFixedThreadPool---固定大小的線程池
java.util.concurrent.Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
使用newFixedThreadPool方法創(chuàng)建出來的線程池為固定大小的線程池,可以通過第一個靜態(tài)方法指定線程池的大小,該線程池corePoolSize和maximumPoolSize相等,阻塞隊列使用的是LinkedBlockingQueue,理論上大小為整數最大值。
使用newFixedThreadPool方法創(chuàng)建出來的線程池中的線程數量始終不變,當有新任務提交時,線程池中有空閑線程則會立即執(zhí)行,如果沒有,則會暫存到阻塞隊列。
對于固定大小的線程池,不存在線程數量的變,同時使用無界的LinkedBlockingQueue來存放執(zhí)行的任務。
存在的問題:
當任務提交十分頻繁的時候,LinkedBlockingQueue迅速增大,存在著耗盡系統資源的問題。而且在線程池空閑時,即線程池中沒有可運行任務時,它也不會釋放工作線程,還會占用一定的系統資源,需要shutdown。
ExecutorService fixPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
fixPool.execute(() -> System.out.println(Thread.currentThread().getName() + " : " + atomicInteger.getAndIncrement()));
}
fixPool.shutdown();
// 打印結果 5個線程
pool-1-thread-1 : 0
pool-1-thread-5 : 4
pool-1-thread-4 : 3
pool-1-thread-3 : 2
pool-1-thread-2 : 1
pool-1-thread-3 : 8
pool-1-thread-4 : 7
pool-1-thread-5 : 6
pool-1-thread-1 : 5
pool-1-thread-2 : 9
newSingleThreadExecutor---單個線程的線程池
java.util.concurrent.Executors
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
使用newSingleThreadExecutor方法創(chuàng)建的線程池為單個線程線程池,只有一個線程的線程池,阻塞隊列使用的是LinkedBlockingQueue,若有多余的任務提交到線程池中,則會被暫存到阻塞隊列,待空閑時再去執(zhí)行。按照先入先出的順序執(zhí)行任務。
ExecutorService singlePool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singlePool.execute(() -> System.out.println(Thread.currentThread().getName() + " : " + atomicInteger.getAndIncrement()));
}
singlePool.shutdown();
// 打印結果 只有一個線程
pool-1-thread-1 : 0
pool-1-thread-1 : 1
pool-1-thread-1 : 2
pool-1-thread-1 : 3
pool-1-thread-1 : 4
pool-1-thread-1 : 5
pool-1-thread-1 : 6
pool-1-thread-1 : 7
pool-1-thread-1 : 8
pool-1-thread-1 : 9
newCachedThreadPool---緩存線程池
java.util.concurrent.Executors
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, // 緩存的線程默認存活時間
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,// 緩存的線程默認存活時間
new SynchronousQueue<Runnable>(),
threadFactory);
}
使用newCachedThreadPool方法創(chuàng)建的線程池為緩存線程池。
緩存線程池,緩存的線程默認存活60秒。線程的核心池corePoolSize大小為0,核心池最大為Integer.MAX_VALUE,阻塞隊列使用的是SynchronousQueue。
SynchronousQueue是一個直接提交的阻塞隊列, SynchronousQueue總會迫使線程池增加新的線程去執(zhí)行新的任務。在沒有任務執(zhí)行時,當線程的空閑時間超過keepAliveTime(60秒),則工作線程將會終止被回收,當提交新任務時,如果沒有空閑線程,則創(chuàng)建新線程執(zhí)行任務,會導致一定的系統開銷。
存在的問題:
如果同時又大量任務被提交,而且任務執(zhí)行的時間不是特別快,那么線程池便會新增出等量的線程池處理任務,這很可能會很快耗盡系統的資源。
ExecutorService cachedPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedPool.execute(() -> System.out.println(Thread.currentThread().getName() + " : " + atomicInteger.getAndIncrement()));
}
//打印結果
pool-1-thread-1 : 0
pool-1-thread-4 : 3
pool-1-thread-3 : 2
pool-1-thread-2 : 1
pool-1-thread-6 : 5
pool-1-thread-5 : 4
pool-1-thread-7 : 6
pool-1-thread-8 : 7
pool-1-thread-8 : 8
pool-1-thread-2 : 9
newScheduledThreadPool---定時線程池
java.util.concurrent.Executors
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
使用newScheduledThreadPool方法創(chuàng)建的線程池為定時線程池。
定時線程池,可用于周期性地去執(zhí)行任務,通常用于周期性的同步數據。
ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
//5個線程每兩秒執(zhí)行一次 隨機哪個線程執(zhí)行
ses.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName() + " : " + "執(zhí)行了"), 0, 2, TimeUnit.SECONDS);
打印結果: 5個線程每兩秒執(zhí)行一次 隨機哪個線程執(zhí)行
pool-1-thread-1 : 執(zhí)行了
pool-1-thread-1 : 執(zhí)行了
pool-1-thread-2 : 執(zhí)行了
pool-1-thread-1 : 執(zhí)行了
pool-1-thread-3 : 執(zhí)行了
pool-1-thread-3 : 執(zhí)行了
pool-1-thread-3 : 執(zhí)行了
... ...
線程池的使用
線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。
說明:Executors各個方法的弊端:
- 1:newFixedThreadPool和newSingleThreadExecutor:
??主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。- 2:newCachedThreadPool和newScheduledThreadPool:
??主要問題是線程數最大數是Integer.MAX_VALUE,可能會創(chuàng)建數量非常多的線程,甚至OOM。
使用實例:
package com.thread.study.pool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadTest {
public static void main(String[] args) throws InterruptedException {
int corePoolSize = 3; // 線程池中核心線程數的數量
int maximumPoolSize = 6;// 在線程池中允許存在的最大線程數
long keepAliveTime = 10;// 存活時間
TimeUnit unit = TimeUnit.SECONDS; // 時間單位 秒
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3); // 工作隊列 基于鏈表阻塞隊列
ThreadFactory threadFactory = new NameTreadFactory();// 自定義線程工廠
RejectedExecutionHandler handler = new MyIgnorePolicy(); // 自定義拒絕策略
/*
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
executor.prestartAllCoreThreads(); // 預啟動所有核心線程
for (int i = 1; i <= 10; i++) {
executor.execute(new MyTask(String.valueOf(i)));
}
System.out.println("當前線程池中存活的線程數為: "+executor.getActiveCount());
Thread.sleep(5000);
System.out.println("當前線程池中存活的線程數為: "+executor.getActiveCount());
executor.shutdown();
}
// 自定義線程工廠
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
//拒絕策略
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志記錄等
System.err.println(r.toString() + " rejected!" + " 當前線程池中存活的線程數為: " + e.getActiveCount());
}
}
// 線程
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() +" : "+ this.toString() + " is running!");
Thread.sleep(1000); //讓任務執(zhí)行慢點
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
打印結果:
my-thread-1 has been created
my-thread-2 has been created
my-thread-3 has been created
my-thread-4 has been created
my-thread-3 : MyTask [name=3] is running!
my-thread-1 : MyTask [name=1] is running!
my-thread-2 : MyTask [name=2] is running!
my-thread-5 has been created
my-thread-6 has been created
my-thread-4 : MyTask [name=4] is running!
my-thread-5 : MyTask [name=8] is running!
my-thread-6 : MyTask [name=9] is running!
當前線程池中存活的線程數為: 6
MyTask [name=10] rejected! 當前線程池中存活的線程數為: 6
my-thread-3 : MyTask [name=7] is running!
my-thread-2 : MyTask [name=5] is running!
my-thread-1 : MyTask [name=6] is running!
當前線程池中存活的線程數為: 0
線程池使用場景
使用線程池異步操作常見場景
批量操作
批量操作不是實時顯示效果的操作,比如批量導入配置,批量刪除或作廢,批量導出查詢結果等,只需要搭建一個任務,在任務中處理即可,有錯誤對應處理錯誤
日志
異步寫操作對功能影響不大的業(yè)務邏輯是常見的場景,日志對與功能來說重要性并么有那么高
使用第三方接口
比如發(fā)郵箱,發(fā)短信,發(fā)消息隊列信息,推送搜索引擎數據,異步調用外圍接口等處理數據。