為什么說頻繁的創(chuàng)建和銷毀線程會浪費大量的系統(tǒng)資源?
線程的創(chuàng)建需要開辟虛擬機棧、本地方法棧、程序計數(shù)器等線程私有的內(nèi)存空間。在線程銷毀時需要回收這些系統(tǒng)資源。頻繁的創(chuàng)建和銷毀線程會浪費大量的系統(tǒng)資源
線程池的作用
- 利用線程池管理并復(fù)用線程、控制最大并發(fā)數(shù)等
- 實現(xiàn)任務(wù)線程隊列緩存策略和拒絕機制
- 實現(xiàn)某些與時間相關(guān)的功能,如定時執(zhí)行、周期執(zhí)行等
- 隔離線程環(huán)境。通過配置獨立的線程池,將一些服務(wù)隔開,避免個服務(wù)相互影響
合理的使用線程池能夠帶來三個好處
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀的消耗
- 提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行
- 提高線程的可管理性。
1. 自定義線程池-阻塞隊列

public class ThreadPool {
public static void main(String[] args) {
Pool pool = new Pool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
//拒絕策略
//queue.put(task); //死等
//queue.offer(task,500,TimeUnit.MILLISECONDS); //帶超時等待
//System.out.println("放棄");
});
for (int i = 0; i < 3; i++) {
int j = i;
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " execute " + j);
});
}
}
}
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
class Pool {
private BlockingQueue<Runnable> taskQueue;
//線程集合
private HashSet<Worker> workers = new HashSet<>();
//線程數(shù)
private int coreSize;
//獲取任務(wù)的超時時間
private long timeout;
private TimeUnit unit;
private RejectPolicy<Runnable> rejectPolicy;
public Pool(int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.unit = unit;
taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//執(zhí)行的方法
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println(Thread.currentThread().getName() + " add workers" + task);
workers.add(worker);
worker.start();
} else {
//taskQueue.put(task);
//策略模式
//1 死等 2. 帶超時等待 3. 放棄任務(wù)執(zhí)行 4. 拋出異常 5. 讓調(diào)用者自己執(zhí)行任務(wù)
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//while (task!= null || (task = taskQueue.take()) != null){
while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
try {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行 " + task );
task.run();
} catch (Exception e) {
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println(Thread.currentThread().getName() + " 移除");
workers.remove(this);
}
}
}
}
class BlockingQueue<T> {
Deque<T> queue = new ArrayDeque<>();
int capacity;
Lock lock = new ReentrantLock();
Condition emptyWaitSet = lock.newCondition();
Condition fullWaitSet = lock.newCondition();
public BlockingQueue(int queueCapacity) {
this.capacity = queueCapacity;
}
//添加帶超時的等待
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
//返回的是剩余的時間
if (nanos <= 0) {
System.out.println(Thread.currentThread().getName() + " 沒等到,返回");
return null; //沒等到
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
System.out.println(Thread.currentThread().getName() + " 出隊" + task);
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
//取任務(wù)
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
System.out.println(Thread.currentThread().getName() + " 隊列為空,等待");
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
System.out.println(Thread.currentThread().getName() + " 出隊 " + task);
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
//添加任務(wù)
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println(Thread.currentThread().getName() + " 隊列已滿,等待進入隊列");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
System.out.println(Thread.currentThread().getName() + " 入隊 " + task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
//帶超時的阻塞添加
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
System.out.println(Thread.currentThread().getName() + " 隊列已滿,等待進入隊列");
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
System.out.println(Thread.currentThread().getName() + " 入隊 " + task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判斷隊列是否已滿
if (queue.size() == capacity) {
rejectPolicy.reject(this, task);
} else {
queue.addLast(task);
emptyWaitSet.signal();
System.out.println(Thread.currentThread().getName() + " 入隊 " + task);
}
} finally {
lock.unlock();
}
}
}
執(zhí)行結(jié)果:
(放棄任務(wù))

- 自定義拒絕策略接口
@FunctionalInterface
interface RejectPolicy{
void reject(BlockingQueue<T> queue, T task);
}
2. ThreadPoolExecutor
2.1線程池狀態(tài)
ThreadPoolExecutor 使用int 高三位來表示線程池狀態(tài),低29位表示線程池數(shù)量
| 狀態(tài)名 | 高3位 | 接受新任務(wù) | 處理阻塞隊列任務(wù) | 說明 |
|---|---|---|---|---|
| RUNNING | 111 | Y | Y | |
| SHUTDOWN | 000 | N | Y | 不會接受新任務(wù),會處理阻塞隊列剩余任務(wù) |
| STOP | 001 | N | N | 會中斷正在執(zhí)行的任務(wù),并拋棄阻塞隊列任務(wù) |
| TIDYING | 010 | - | - | 任務(wù)全執(zhí)行完畢,活動線程為0即將進入終結(jié) |
| TERMINATED | 011 | - | - | 終結(jié)狀態(tài) |
這些信息存儲在一個原子變量ctl中,目的是將線程池狀態(tài)與線程個數(shù)合二為一,這樣就可以用一次CAS原子操作進行賦值
//c為舊值,ctlOf返回的結(jié)果為新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
//rs為高三位代表線程池狀態(tài),wc為低29位代表線程個數(shù),ctl是合并它們
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2 構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心線程數(shù)目
- maximumPoolSize 最大線程數(shù)目
- keepAliveTime 生存時間--針對救急線程
- unit 時間單位--針對救急線程
- workQueue 阻塞隊列
- threadFactory 線程工廠--可以為線程創(chuàng)建時起名字
- handler 拒絕策略
最大線程數(shù)= 核心線程數(shù)+ 救急線程數(shù)
線程池中剛開始沒有線程,當(dāng)一個任務(wù)提交給線程池后,線程池會創(chuàng)建一個新線程來執(zhí)行任務(wù)
當(dāng)線程數(shù)達到corePoolSize 并沒有線程空閑,這時再加入任務(wù),新加的任務(wù)會被加入workQueue隊列排隊,直到有空閑的線程
如果隊列選擇了有界隊列,那么任務(wù)超過了隊列大小時,會創(chuàng)建maximunPoolSize-corePoolSize數(shù)目的線程來救急
-
如果線程達到了maximunPoolSize 仍然有新任務(wù)這時會執(zhí)行拒絕策略。拒絕策略jdk提供了四種實現(xiàn),其實著名框架也提供了實現(xiàn)
- AbortPolicy 讓調(diào)用者拋出RejectExecutionException異常,這是默認策略
- CallerRunsPolicy 讓調(diào)用者運行任務(wù)
- DiscardPolicy 放棄本次任務(wù)
- DiscardOldestPolicy 放棄隊列中最早的任務(wù),本任務(wù)取而代之
- Dubbo 的實現(xiàn),在拋出 RejectExecutionException 異常之前會記錄日志,并dump線程棧信息,方便定位問題
- Netty 的實現(xiàn),是創(chuàng)建一個新線程來執(zhí)行任務(wù)
- ActiveMQ的實現(xiàn),帶超時等待(60s)嘗試放入隊列,類似我們之前自定義的拒絕策略
- PinPoint 的實現(xiàn),它使用了一個拒絕策略鏈,會逐一嘗試策略鏈中的每種拒絕策略
當(dāng)高峰過去后,超過coreSize的救急線程如果一段時間沒有任務(wù)做,需要結(jié)束資源,這個時間由keepAliveTime 和 unit來控制。
根據(jù)這個構(gòu)造方法,JDK Executors類中提供了眾多工廠方法來創(chuàng)建各種用途的線程池
2.3 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特點:
- 核心線程數(shù)== 最大線程數(shù)(沒有救急線程被創(chuàng)建),因此無需超時時間
- 阻塞隊列是無界的,可以放任意數(shù)量的任務(wù)
評價:
- 適合于任務(wù)量已知,相對耗時的任務(wù)
2.4 newCacheThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特點:
- 核心線程數(shù)是0,最大線程數(shù)是Integer.MAX_VALUE,救急線程的空閑生存時間是60s,意味著
- 全部是救急線程(60s后可以回收)
- 救急線程可以無限創(chuàng)建
- 隊列采用了SynchronousQueue實現(xiàn)的特點是,它沒有容量,沒有線程來取是放不進去的(一手交錢一手交貨)
評價:
- 整個線程池表現(xiàn)為線程數(shù)會根據(jù)任務(wù)量不斷增長,沒有上限,當(dāng)任務(wù)執(zhí)行完畢,空閑1分鐘后釋放線程。
- 適合任務(wù)數(shù)比較密集,但每個任務(wù)執(zhí)行時間短的情況
2.5 newSingleThreadPool
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
使用場景:
希望多個任務(wù)排隊執(zhí)行。線程數(shù)固定為1,任務(wù)數(shù)多于1時,會放入無界隊列排隊。任務(wù)執(zhí)行完成,這唯一的線程不會被釋放
區(qū)別:
- 自己創(chuàng)建一個單線程串行執(zhí)行任務(wù),如果任務(wù)執(zhí)行失敗而終止那么沒有任何補救措施,而線程池還會新建一個線程,保證池的正常工作
- 線程個數(shù)始終為1,不能修改
- FinalizableDelegateExecutorService應(yīng)用的是裝飾器模式,只對外暴露了ExecutorService接口,因此不能調(diào)用ThreadPoolExecutor 中特有的方法
- Executors.newFixedThreadPool(1) 初始為1,后面還可以修改
- 對外暴露的是ThreadPoolExecutor對象,可以強轉(zhuǎn)后調(diào)用setCorePoolSize等方法進行修改
2.6 提交任務(wù)
//執(zhí)行任務(wù)
void execute(Runnable command);
//提交任務(wù)task, 用返回值Future 獲得任務(wù)執(zhí)行結(jié)果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任務(wù)
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//提交tasks中所有任務(wù),帶超時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//提交tasks中所有任務(wù),哪個任務(wù)先成功執(zhí)行完畢,返回此任務(wù)執(zhí)行結(jié)果,其他任務(wù)取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//提交tasks中所有任務(wù),哪個任務(wù)先成功執(zhí)行完畢,返回此任務(wù)執(zhí)行結(jié)果,其他任務(wù)取消,帶超時時間
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2.7 關(guān)閉線程池
shutdown
/**
* 線程池狀態(tài)變?yōu)镾HUTDOWN
* 不會接收新任務(wù)
* 但已提交任務(wù)會執(zhí)行完
* 此方法不會阻塞調(diào)用線程的執(zhí)行
*/
void shutdown();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改線程池狀態(tài)
advanceRunState(SHUTDOWN);
//僅會打斷空閑線程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//嘗試終結(jié)(沒有運行的線程可以立刻終結(jié),如果還有運行的線程也不會等)
tryTerminate();
}
shutdownNow
/**
* 線程池狀態(tài)變?yōu)镾TOP
* 不會接收新任務(wù)
* 會將隊列中的任務(wù)返回
* 并用 interrupt 的方式中斷正在執(zhí)行的任務(wù)
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改線程池狀態(tài)
advanceRunState(STOP);
//打斷所有線程
interruptWorkers();
//獲取隊列中剩余任務(wù)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
其他方法
//不在RUNNING 狀態(tài)的線程池,此方法返回true
boolean isShutDown();
//線程池狀態(tài)是否是TERMINATED
boolean isTerminated();
//調(diào)用shutdown 后,由于調(diào)用線程并不會等待所有任務(wù)運行結(jié)束,因此如果它想在線程池TERMINATED 后做一些事情,可以用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
public class ShutDownTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future1 = pool.submit(()->{
System.out.println("running 1");
Thread.sleep(1000);
System.out.println("finish 1");
return "1";
});
Future<String> future2 = pool.submit(()->{
System.out.println("running 2");
Thread.sleep(1000);
System.out.println("finish 2");
return "2";
});
Future<String> future3 = pool.submit(()->{
System.out.println("running 3");
Thread.sleep(1000);
System.out.println("finish 3");
return "3";
});
System.out.println("shutdown");
pool.shutdown();
}
}
執(zhí)行結(jié)果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
public class ShutDownTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future1 = pool.submit(()->{
System.out.println("running 1");
Thread.sleep(1000);
System.out.println("finish 1");
return "1";
});
Future<String> future2 = pool.submit(()->{
System.out.println("running 2");
Thread.sleep(1000);
System.out.println("finish 2");
return "2";
});
Future<String> future3 = pool.submit(()->{
System.out.println("running 3");
Thread.sleep(1000);
System.out.println("finish 3");
return "3";
});
System.out.println("shutdown");
try {
pool.awaitTermination(3, TimeUnit.SECONDS); //不用,因為不知道等多久合適
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("other...");
}
}
執(zhí)行結(jié)果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
other...
public class ShutDownTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> future1 = pool.submit(()->{
System.out.println("running 1");
Thread.sleep(1000);
System.out.println("finish 1");
return "1";
});
Future<String> future2 = pool.submit(()->{
System.out.println("running 2");
Thread.sleep(1000);
System.out.println("finish 2");
return "2";
});
Future<String> future3 = pool.submit(()->{
System.out.println("running 3");
Thread.sleep(1000);
System.out.println("finish 3");
return "3";
});
System.out.println("shutdownNow");
List<Runnable> task = pool.shutdownNow();//返回隊列中任務(wù)
}
}
執(zhí)行結(jié)果:
shutdownNow
running 1
running 2
2.8. 池大小
- 過小會導(dǎo)致程序不能充分利用系統(tǒng)資源、容易導(dǎo)致饑餓
- 過大會導(dǎo)致更多的線程上下文切換,占用更多的資源
2.8.1 CPU密集型
通常采用 CPU核數(shù) + 1 能夠?qū)崿F(xiàn)最優(yōu)的CPU利用率,+1 是保證當(dāng)線程由于頁缺失故障(操作系統(tǒng))或其他原因?qū)е聲和r,額外的這個線程就能頂上去,保證CPU時鐘周期不被浪費
2.8.2 I/O密集型
CPU不總是處于繁忙狀態(tài),例如,當(dāng)你執(zhí)行業(yè)務(wù)計算時,這時候會使用CPU資源,但當(dāng)你執(zhí)行I/O操作時,遠程RPC調(diào)用時,包括進行數(shù)據(jù)庫操作時,這時候CPU就閑下來了,你可以利用多線程提高他的利用率
經(jīng)驗公式如下
線程數(shù) = 核數(shù) * CPU利用率 * 總時間(CPU計算時間 + 等待時間)/ CPU計算時間
例如:4核CPU計算時間是50%,其他等待時間是50%,期望CPU被100%利用,套用公式
4*100% *100% / 50% = 8
2.9 任務(wù)調(diào)度線程池
在任務(wù)調(diào)度線程池功能加入之前,可以使用java.util.Timer來實現(xiàn)定時功能,Timer 的優(yōu)點在于簡單易用,但由于所有的任務(wù)都是由同一個線程來調(diào)度,因此所有任務(wù)都是串行執(zhí)行的,同一時間只能有一個任務(wù)在執(zhí)行,前一個任務(wù)的延遲或異常都將會影響到之后的任務(wù)。
public class TimerTest {
public static void main(String[] args){
Timer timer = new Timer();
TimerTask timerTask1 = new TimerTask() {
@Override
public void run() {
System.out.println(" task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask timerTask2 = new TimerTask() {
@Override
public void run() {
System.out.println(" task2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//使用Timer添加兩個任務(wù),希望他們都在 1s 后執(zhí)行
//Timer 內(nèi)只有一個線程來順序執(zhí)行隊列中的任務(wù)
timer.schedule(timerTask1, 1000);
timer.schedule(timerTask2, 1000);
}
}
2.9.1 使用任務(wù)調(diào)度線程池改進
public class TimerTest {
public static void main(String[] args){
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
pool.schedule(()->{
System.out.println(" task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
pool.schedule(()->{
System.out.println(" task2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
}
}
3. Fork/Join
Fork/Join 是JDK 1.7 新加入的線程池實現(xiàn),