前言
線程并發(fā)系列文章:
Java 線程基礎
Java 線程狀態(tài)
Java “優(yōu)雅”地中斷線程-實踐篇
Java “優(yōu)雅”地中斷線程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有誤
Java Unsafe/CAS/LockSupport 應用與原理
Java 并發(fā)"鎖"的本質(一步步實現(xiàn)鎖)
Java Synchronized實現(xiàn)互斥之應用與源碼初探
Java 對象頭分析與使用(Synchronized相關)
Java Synchronized 偏向鎖/輕量級鎖/重量級鎖的演變過程
Java Synchronized 重量級鎖原理深入剖析上(互斥篇)
Java Synchronized 重量級鎖原理深入剖析下(同步篇)
Java并發(fā)之 AQS 深入解析(上)
Java并發(fā)之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 詳解
Java 并發(fā)之 ReentrantLock 深入分析(與Synchronized區(qū)別)
Java 并發(fā)之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(應用篇)
最詳細的圖文解析Java各種鎖(終極篇)
線程池必懂系列
線程池系列文章:
Java 線程池是面試、工作必須掌握的基礎之一,使用線程池能夠更好地規(guī)劃應用CPU占用率,提高應用運行的流暢度,本篇將來探索線程池的應用與原理。
通過本篇文章,你將了解到:
1、為什么需要線程池
2、自己實現(xiàn)簡單線程池
3、線程池原理
4、總結
1、為什么需要線程池
名詞由來
池,顧名思義:就是裝一堆東西的地方。當有需要的時候,從池子里拿東西,當不用的時候,就往池子里放??梢钥闯龃娣?、取用都很方便。
線程池類比
咱們在打疫苗的時候,若是啥時候想打啥時候過去,到了醫(yī)院,醫(yī)生需要換上防護服,戴上手套,準備疫苗,準備電腦記錄信息等。

接種者1 到了醫(yī)院,醫(yī)生進行各項準備工作,最后給接種者1打了疫苗后,卸下裝備。此時接種者2也到了醫(yī)院,醫(yī)生又需要換上裝備??梢钥闯鲠t(yī)生的準備工作耗時費力,若是在固定的時間接種疫苗,醫(yī)生就無需頻繁換裝備,既節(jié)約醫(yī)生時間,也縮短了接種者的等待時間。如下圖:

為什么需要線程池
從上面的例子可知,因為醫(yī)生的準備工作耗時費力,因此盡可能集中打一段時間再換裝備。而對于計算機里線程也是類似的:
1、線程切換需要切換上下文,切換上下文涉及到系統(tǒng)調用,占用系統(tǒng)資源。
2、線程切換需要時間,成功創(chuàng)建線程后才能真正做事。
3、線程開啟后無法有效管理線程。
基于以上原因,需要引入線程池。
2、自己實現(xiàn)簡單線程池
簡單Demo
private void createThread() {
Thread thread = new Thread(() -> {
System.out.println("thread running...");
});
thread.start();
}
該線程執(zhí)行完畢就結束了,現(xiàn)在想讓它不結束:
private void createThread() {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("thread running...");
}
});
thread.start();
}
線程一直在運行著,外部想要提交Runnable 給它運行,那么需要有個共享的變量,選擇隊列作為共享變量:
class ThreadPool {
public static void main(String args[]) {
ThreadPool threadPool = new ThreadPool();
threadPool.createThread();
threadPool.startRunnable();
}
BlockingQueue<Runnable> shareQueue = new LinkedBlockingQueue<>();
private void createThread() {
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable targetRunnable = shareQueue.take();
targetRunnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
public void startRunnable() {
for (int i = 0; i < 10; i++) {
int finalI = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("執(zhí)行 runnable " + finalI);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
System.out.println("放入隊列 runnable " + i);
shareQueue.offer(runnable);
}
}
}
上述Demo里只開啟了一個線程,該線程是個死循環(huán)。線程體里從共享隊列shareQueue 取出Runnable,若有元素則拿出來執(zhí)行,若沒有則阻塞等待。
而外部調用者可以往共享隊列里存放Runnable,等待線程執(zhí)行該Runnable。
由此實現(xiàn)了只有一個線程在運行,卻是可以執(zhí)行不同的任務,也避免了每個任務都需要開啟線程執(zhí)行的情況。
3、線程池原理
線程池基本構成
上面雖然實現(xiàn)了線程池,但是是乞丐版的,很明顯地看出很多缺陷:
1、線程一直在運行,不能停下來。
2、只有一個線程在執(zhí)行任務,其它任務需要排隊。
3、隊列無限膨脹,消耗內存。
4、其它缺點...
作為通用的工具庫,來看看Java 線程池是如何實現(xiàn)的。
線程池的核心圍繞著一個原子變量:ctl
#ThreadPoolExecutor.java
//初始化狀態(tài):線程池處在運行狀態(tài),當前線程數(shù)為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//用來表示線程數(shù)的位數(shù)個數(shù),為29
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池線程最大個數(shù),(1<<29) - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 線程池的5種狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//ctl 為int 類型,總共32位,包含了兩個值,高3位表示線程池狀態(tài),低29位表示線程個數(shù)
//提取線程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取線程個數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
//將狀態(tài)和線程個數(shù)存儲在int 里
private static int ctlOf(int rs, int wc) { return rs | wc; }

為什么需要29位表示線程數(shù)呢,因為線程池狀態(tài)有5種,需要3位才能區(qū)分這5種狀態(tài)。
線程池執(zhí)行任務
ThreadPoolExecutor 是線程池最核心的類,實現(xiàn)了Executor 接口,并重寫了execute(Runnable) 方法。
當外部調用者需要線程池執(zhí)行任務時,只需要調用ThreadPoolExecutor.execute(Runnable)方法即可,線程池里的某個線程體里將會執(zhí)行Runnable,也即是執(zhí)行了一次任務。
#ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取ctl 變量值
int c = ctl.get();
//當前正在運行的線程數(shù)沒有超過核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//新加入的任務將會新建一個核心線程來執(zhí)行它
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線程池還在運行,那么就往等待隊列里添加任務
if (isRunning(c) && workQueue.offer(command)) {
//再次獲取ctl 變量值
int recheck = ctl.get();
//再次檢查線程池是否在運行
if (! isRunning(recheck) && remove(command))
//若沒有運行了,那么將任務從隊列里移出,移出成功則執(zhí)行拒絕策略(類似定義的回調)
reject(command);
//若是當前沒有線程在運行
else if (workerCountOf(recheck) == 0)
//新增非核心線程執(zhí)行新的任務,注意此時任務是放在隊列里的,因此
//這里的第一個參數(shù)為null
addWorker(null, false);
}
//加入隊列失敗后,嘗試直接新建非核心線程執(zhí)行該任務
else if (!addWorker(command, false))
//執(zhí)行拒絕任務
reject(command);
}
可以看出有多次判斷ctl的值,這是因為可能存在多個線程同時操作ctl的值,而ctl是AtomicInteger類型,底層使用的是CAS,我們知道CAS是無鎖的,因此需要循環(huán)判斷其狀態(tài)。
線程池很多地方都是依賴CAS,可以說理解了CAS就大部分讀懂了線程池。
CAS 可移步查看:Java Unsafe/CAS/LockSupport 應用與原理
上述代碼用流程圖表示如下:

線程池幾個概念
上述流程涉及到幾個概念:
1、核心線程、核心線程數(shù)
2、非核心線程
3、最大線程數(shù)
3、等待隊列
計算機世界很多時候可以在現(xiàn)實世界找到類比關系,還是以打疫苗為例:
1、某個接種點只有4位醫(yī)生可以打疫苗,這四位醫(yī)生可類比4個線程,因為他們是常駐該接種點,因此核心醫(yī)生-->核心線程。
2、4位醫(yī)生同時只能接種4個人,當同時來的接種人數(shù)超過4人,那么就需要按順序排隊等候,這個等候隊伍稱為等待隊列。
3、某天該接種點突然來了很多待接種者,隊伍排得很長,實在排不下去了,于是需要其他地方的醫(yī)生支援。這些趕過來的醫(yī)生并非常駐此地,他們支援完成后就要回去了,因此可類比為:非核心線程。
4、核心線程 + 非核心線程 == 最大線程數(shù) (類似接種點里最多接種醫(yī)生的人數(shù))。
5、當然,有些接種點醫(yī)生資源比較緊張,隊伍太長了也沒其他地方的醫(yī)生過來支援,一天都打不完,于是剩下的待接種者被告知,今天打不了了你們回去吧,這個過程可類比:拒絕策略。即使有其他醫(yī)生來支援,但是待接種者還是很多,超過了接種點的接種能力,這個時候新來的待接種者也是不能接種的,還是會被拒絕。

核心線程和非核心線程有啥區(qū)別呢?
1、在沒有達到最大核心線程數(shù)的時候,有新的任務到來都會嘗試新建核心線程來執(zhí)行新任務。
2、核心線程常駐線程池,除非設置了超時銷毀,否則一直等待執(zhí)行新的任務。
3、非核心線程執(zhí)行完畢后,不管是否設置了超時銷毀,只要沒有任務執(zhí)行了,就會退出線程執(zhí)行。
實際上線程池里淡化了線程本身的概念,只關注任務是否得到執(zhí)行,而不關注任務被哪個線程執(zhí)行,因此核心線程、非核心線程最主要的區(qū)別在于是否常駐線程池。
核心線程、等待隊列、非核心線程 三者執(zhí)行任務順序:

線程池管理線程
上面提到過一個重要的方法:addWorker(xx)。
#ThreadPoolExecutor.java
private boolean addWorker(Runnable firstTask, boolean core) {
//跳轉標記,java里一般很少用,c/c++用得比較多
retry://--------------------(1)
for (;;) {
//死循環(huán),主要是用來判斷線程池狀態(tài)以及線程個數(shù)
int c = ctl.get();
//取出線程池狀態(tài)
int rs = runStateOf(c);
//如果線程池已經(jīng)關閉/關閉中,則無需再往里邊添加了
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//取出線程個數(shù)
int wc = workerCountOf(c);
//當前需要開啟核心線程數(shù),而核心線程數(shù)已經(jīng)滿了(其它線程成功添加了任務到核心線程)
//或者需要開啟非核心線程,但是超出了總的線程數(shù)
//這兩種情況下,將不能再添加任務到線程池
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 修改ctl,增加線程個數(shù)計數(shù)
//如果不成功(被別人改了),則繼續(xù)循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
//成功增加計數(shù)
c = ctl.get(); // Re-read ctl
//狀態(tài)發(fā)生改變了,則再重試
if (runStateOf(c) != rs)
//跳轉到開頭的標記
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創(chuàng)建Worker,將任務添加到Worker里,并創(chuàng)建線程(new Thread)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//線程創(chuàng)建成功
final ReentrantLock mainLock = this.mainLock;//-----------------(2)
//上鎖
mainLock.lock();
try {
//獲取線程池狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//添加worker到HashSet里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//標記添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//正式開啟線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
//線程池添加任務成功的標記
return workerStarted;
}
該方法有兩個參數(shù),第一個參數(shù)為Runnable,第二個為指示要添加的線程是否為核心線程。
上述標記了兩個重點:
(1)
為啥需要死循環(huán)?因為ctl變量可能存在多個線程修改的情況,而ctl 為AtomicInteger,底層為CAS,因此需要多次判斷直至條件滿足為止。
(2)
為啥需要鎖?因為worker聲明如下:
private final HashSet<Worker> workers = new HashSet<>();
該集合可能被多個線程操作,因此其添加(add)、刪除(remove)需要上鎖后才能安全操作。
Worker 繼承自AQS,利用AQS 實現(xiàn)了自己的一套鎖(類似ReentrantLock)。
其存放的主要信息如下:

線程對象存儲在workers里,而線程池通過管理workers來控制核心/非核心線程數(shù)。
線程池管理任務
線程已經(jīng)被創(chuàng)建,接下來看看如何來執(zhí)行任務。
Worker 實現(xiàn)了Runnable接口,因此必須要重寫run()方法,當構造Thread 對象時傳入了Worker作為Thread的Runnable,因此當Thread.start()后實際執(zhí)行的Runnable為Worker里的run()方法,而該方法調用了runWorker(xx)。
來看看其源碼:
#ThreadPoolExecutor.java
final void runWorker(Worker w) {
//當前線程
Thread wt = Thread.currentThread();
//取出任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//兩個條件------------>(1)
while (task != null || (task = getTask()) != null) {
//worker 上鎖
w.lock();------------>(2)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//如果線程池已經(jīng)被停止運行,并且該線程沒有被中斷過
//那么中斷該線程
wt.interrupt();
try {
//執(zhí)行任務前的回調(鉤子)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執(zhí)行任務后的回調
afterExecute(task, thrown);
}
} finally {
//置空
task = null;
//該線程完成任務計數(shù)
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
//線程退出執(zhí)行
processWorkerExit(w, completedAbruptly);
}
}
上面標記出了兩個重點:
(1)
在最開始的自己實現(xiàn)線程池Demo里,我們有演示過如何讓線程一直在運行,而此處有兩個判斷條件:
1、當前worker關聯(lián)的第一個任務是否存在,若是則取出運行。這種判斷對應于新增核心線程/非核心線程(非存放在隊列里)的場景。也就是說,當線程開啟后,若是關聯(lián)了任務則拿出執(zhí)行。
2、當線程沒有關聯(lián)到第一個任務,這種判斷對應于線程不是第一次執(zhí)行或是線程第一次執(zhí)行(有任務在隊列里),此時從隊列里取出任務執(zhí)行。
getTask()方法如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//死循環(huán)為了判斷線程池狀態(tài)
for (;;) {
int c = ctl.get();
//取出狀態(tài)
int rs = runStateOf(c);
//線程池關閉或是沒有任務在等待
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//減少線程計數(shù)
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//判斷是否需要超時等待
//1、allowCoreThreadTimeOut 外部設置了是否允許超時關閉核心線程
//2、當前線程數(shù)是否大于核心線程數(shù)(也就是開啟了非核心線程數(shù))
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//總共有四種組合判斷
//timeout 指的是獲取隊列元素是否已經(jīng)發(fā)生超時
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//若是超時發(fā)生,或者隊列為空,那么嘗試減少線程計數(shù)
//修改計數(shù)成功,則返回null
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//決定是否超時獲取元素
//若是設定了超時獲取元素,那么當超時時間耗盡后,poll方法將會返回,底層使用LockSupport
//若沒有設定超時,那么一直阻塞直到隊列有元素為止
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//走到這超時了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
可以看出:
1、借助于阻塞隊列的特性,在沒有設置超時的情況下,核心線程將一直會阻塞在BlockingQueue.take()方法,進而線程一直被阻塞于此,直到有新的任務被放入線程池請求執(zhí)行。這也是為什么核心線程能夠常駐線程池的原因。
2、當線程數(shù)超過了核心線程數(shù),那么timed=true,也就是此時線程是獲取隊列元素時設置了超時的,若超時過了沒有獲取到元素,那么就會退出線程執(zhí)行。這也是為什么非核心線程不能常駐線程池的原因。
(2)
此處為什么需要鎖?
前面提到過操作wokers集合需要鎖:mainLock,它的目的是為了多線程對集合進行添加/刪除操作的安全性進行考慮。每個woker同時只能由一個線程操作,似乎沒有加鎖的必要,其實Worker鎖更重要的作用體現(xiàn)在:
用來巧妙地判斷線程是否在運行中(忙碌中)。
當有任務執(zhí)行的時候,Runnable.run()方法被上鎖,執(zhí)行結束后釋放鎖。因此當判斷Woker沒有獲取鎖時,表明它正在等待獲取隊列的元素,此時它是空閑的。
判斷了線程的空閑與否可以用來給外部中斷線程池的執(zhí)行提供依據(jù)。
4、總結
至此,我們了解線程池的核心優(yōu)勢:不頻繁重復創(chuàng)建線程。依靠阻塞隊列特性使得線程常駐,用圖表示如下:

由于篇幅原因,剩下的線程池關閉、線程池一些重要的運行時狀態(tài)、簡單的創(chuàng)建線程池的幾種方式等將在下篇分析,敬請關注!
演示代碼 若是有幫助,給github 點個贊唄~