概述
? 線程作為系統(tǒng)稀缺資源,如果在應(yīng)用中進(jìn)行頻繁的創(chuàng)建和銷毀,會(huì)為我們的應(yīng)用帶來災(zāi)難性的體驗(yàn),增大系統(tǒng)負(fù)荷,降低效率。池化技術(shù)為該問題的解決提供了一種有效的思路,通過建立一個(gè)線程池,每次線程的時(shí)候從池中取出一個(gè)空閑的線程,這樣就省去了線程創(chuàng)建和銷毀。java的線程池實(shí)現(xiàn)是在jdk1.5開始引入的,本文將對(duì)其中最常用的ThreadPoolExecutor的實(shí)現(xiàn)進(jìn)行詳細(xì)的介紹,系統(tǒng)可以通過本文了解到如何去實(shí)現(xiàn)一個(gè)線程池,并向Doug Lea大神致敬。
使用
? 我們先看下面的線程池使用的例子,在該例子中我聲明一個(gè)核心線程數(shù)是2,最大線程數(shù)是5,非核心線程線程存活時(shí)間1s,阻塞隊(duì)列大小為1,拒絕策略為AbortPolicy,我們會(huì)輸出程序執(zhí)行過程中的線程池達(dá)到的最大線程數(shù)以及在所有任務(wù)執(zhí)行結(jié)束后線程池中線程的數(shù)量。代碼如下:
/**
* Created by yuanqiongqiong on 2019/4/10.
*/
public class ThreadPoolExecutorTest {
private static Logger LOGGER = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
//聲明一個(gè)線程池
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1,
TimeUnit.SECONDS,
new ArrayBlockingQueue(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String []args) {
for (int i = 0; i< 7;i++) {
String runnableName = "test" + i;
PersonRunnable personRunnable = new PersonRunnable(runnableName);
try {
threadPoolExecutor.execute(personRunnable);
} catch (Exception e) {
LOGGER.error("執(zhí)行{}任務(wù)異常", runnableName, e);
}
}
try {
Thread.sleep(500);
LOGGER.info("線程池當(dāng)前線程數(shù)目 = {}", threadPoolExecutor.getPoolSize());
Thread.sleep(2000);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
LOGGER.info("線程池中達(dá)到的最大線程數(shù)目 = {}", threadPoolExecutor.getLargestPoolSize());
LOGGER.info("線程池當(dāng)前線程數(shù)目 = {}", threadPoolExecutor.getPoolSize());
LOGGER.info("線程池已經(jīng)完成的任務(wù)數(shù)量 = {}", threadPoolExecutor.getCompletedTaskCount());
}
static class PersonRunnable implements Runnable {
private String name;
public PersonRunnable(String name) {
this.name = name;
}
@Override
public void run() {
LOGGER.info("我是" + name + "我在線程" + Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (Exception e) {
LOGGER.error("任務(wù){(diào)}執(zhí)行異常", Thread.currentThread().getName(), e);
}
}
}
}
? 輸出結(jié)果如下:
20:22:26.571 [pool-1-thread-3] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test3我在線程pool-1-thread-3
20:22:26.571 [pool-1-thread-2] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test1我在線程pool-1-thread-2
20:22:26.571 [pool-1-thread-4] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test4我在線程pool-1-thread-4
20:22:26.571 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test5我在線程pool-1-thread-5
20:22:26.571 [pool-1-thread-1] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test0我在線程pool-1-thread-1
20:22:26.576 [main] ERROR com.meituan.campaign.ThreadPoolExecutorTest - 執(zhí)行test6任務(wù)異常
java.util.concurrent.RejectedExecutionException: Task com.meituan.campaign.ThreadPoolExecutorTest$PersonRunnable@46f7f36a rejected from java.util.concurrent.ThreadPoolExecutor@421faab1[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.meituan.campaign.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:30)
20:22:26.681 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test2我在線程pool-1-thread-5
20:22:27.082 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池當(dāng)前線程數(shù)目 = 5
20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池中達(dá)到的最大線程數(shù)目 = 5
20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池當(dāng)前線程數(shù)目 = 2
20:22:29.085 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 線程池已經(jīng)完成的任務(wù)數(shù)量 = 6
? 由于我們代碼設(shè)置了最大線程數(shù)是5個(gè),并且阻塞隊(duì)列大小是1,所以同一時(shí)間最多會(huì)有6個(gè)任務(wù)被執(zhí)行,其中1個(gè)任務(wù)放在阻塞隊(duì)列中。線程池達(dá)到的最大線程數(shù)目是5個(gè),因?yàn)榫€程池設(shè)置了maximumPoolSize=5。非核心線程會(huì)在1s空閑后被回收,因此最終線程池線程數(shù)目還是2個(gè)。
實(shí)現(xiàn)分析
? 拋開ThreadPoolExecutor,我們先想下實(shí)現(xiàn)一個(gè)線程池需要哪些成員變量,個(gè)人感覺以下變量是必不可少的:(1) 一個(gè)存放線程的容器或數(shù)組;(2) 一個(gè)隊(duì)列用來在線程池線程不足是存放排隊(duì)的任務(wù);(3) 一個(gè)狀態(tài)字段表示線程池的狀態(tài),用來表示線程池不同生命周期狀態(tài)。下面,我們看下ThreadPoolExecutor的成員變量:
//表示線程狀態(tài)和線程數(shù),高三位代表線程狀態(tài),低29位代表線程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//值為29
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池最大線程數(shù),大概為5億,可以肯定不會(huì)達(dá)到這么多線程
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//線程池處于運(yùn)行狀態(tài)可以接收新任務(wù)并執(zhí)行任務(wù)隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//該狀態(tài)下線程池不再接收新任務(wù),但是會(huì)把任務(wù)隊(duì)列中的任務(wù)執(zhí)行完成,調(diào)用shutDown()會(huì)進(jìn)入該狀態(tài)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//該狀態(tài)下線程池不接受新任務(wù)并拋棄任務(wù)隊(duì)列中的任務(wù)中斷所有正在執(zhí)行的線程,調(diào)用shutDownNoW()會(huì)進(jìn)入該狀態(tài)
private static final int STOP = 1 << COUNT_BITS;
//已經(jīng)沒有任務(wù)可以執(zhí)行,會(huì)從SHUTDOWN和STOP狀態(tài)變換為該狀態(tài)
private static final int TIDYING = 2 << COUNT_BITS;
//在執(zhí)行完terminated()操作后會(huì)進(jìn)入該狀態(tài)
private static final int TERMINATED = 3 << COUNT_BITS;
//任務(wù)阻塞隊(duì)列,存放排隊(duì)任務(wù)
private final BlockingQueue<Runnable> workQueue;
//存放線程的hashset
private final HashSet<Worker> workers = new HashSet<Worker>();
//線程工廠,生成新線程
private volatile ThreadFactory threadFactory;
//拒絕策略
private volatile RejectedExecutionHandler handler;
//線程池核心線程數(shù)
private volatile int corePoolSize;
//線程池最大線程數(shù)
private volatile int maximumPoolSize;
? 上述代碼的注釋給出了線程池各個(gè)狀態(tài)的含義,我們看下各個(gè)狀態(tài)之間的狀態(tài)轉(zhuǎn)換關(guān)系,具體如下:
(1) RUNNING -> SHUTDOWN:調(diào)用了shutdown()函數(shù);
(2) (RUNNING or SHUTDOWN) -> STOP:調(diào)用了shutdownNow();
(3)SHUTDOWN -> TIDYING:當(dāng)線程池線程為空并者任務(wù)隊(duì)列為空;
(4)STOP -> TIDYING:當(dāng)線程池線程為空;
(5)TIDYING -> TERMINATED:當(dāng)調(diào)用了terminated()方法;
? 如上示例,我們把一個(gè)任務(wù)放入線程池的execute()函數(shù)中,線程池會(huì)為我們選擇一個(gè)線程來執(zhí)行我們提交的任務(wù)。在這個(gè)選擇線程的過程中,如果線程池中線程數(shù)量小于corePoolSize,那么將創(chuàng)建新線程執(zhí)行任務(wù);當(dāng)線程池?cái)?shù)量大于等于corePoolSize并且小于maximumPoolSize,線程池會(huì)把任務(wù)放到阻塞隊(duì)列workQueue中直到workQueue滿了去創(chuàng)建新線程;當(dāng)線程池線程數(shù)量等于maximumPoolSize并且workQueue滿時(shí)會(huì)執(zhí)行拒絕策略。下面我們通過execute()函數(shù)的邏輯來理解上述過程:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果線程數(shù)小于核心線程數(shù),那么創(chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)command
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線程數(shù)大于等于核心線程數(shù),線程數(shù)處于RUNNING狀態(tài)(可以將任務(wù)加入阻塞隊(duì)列)并且加入阻塞隊(duì)列成功(即阻塞隊(duì)列未滿),那么任務(wù)就被加入阻塞隊(duì)列等待空閑線程。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次檢查線程池狀態(tài),如果不是RUNNING狀態(tài),從阻塞隊(duì)列中移除任務(wù),執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//線程處RUNNING狀態(tài)并且線程數(shù)是0,則創(chuàng)建個(gè)空閑新線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果線程數(shù)大于核心線程并且阻塞隊(duì)列已滿,則以maximumPoolSize為線程數(shù)最大值進(jìn)行處理
else if (!addWorker(command, false))
//線程池中線程達(dá)到最大線程數(shù)并且阻塞隊(duì)列已經(jīng)滿執(zhí)行拒絕策略
reject(command);
}
? 看到上面的代碼邏輯,我們會(huì)發(fā)現(xiàn)主要的邏輯還是在addWorker里,這個(gè)函數(shù)主要功能就是為任務(wù)分配線程并執(zhí)行,我們在看這塊邏輯之前需要取看一個(gè)重要的Worker類。該類封裝了線程及任務(wù),可以在內(nèi)部執(zhí)行任務(wù),具體定義如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//具體線程
final Thread thread;
//線程要執(zhí)行的任務(wù)
Runnable firstTask;
//線程完成的任務(wù)數(shù)
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//調(diào)用線程工廠創(chuàng)建新的線程,threadFactory由我們的線程池構(gòu)造函數(shù)傳入,沒有指定則使用默認(rèn)的,這塊會(huì)創(chuàng)建一個(gè)新的線程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
//可以看出Worker實(shí)現(xiàn)了AQS,其本身也是不可重入鎖
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
? 思考為什么Worker要繼承AQS實(shí)現(xiàn)一個(gè)獨(dú)占鎖?這個(gè)問題我們后面分析。
? 了解了worker的構(gòu)成,我們就可以具體看下addWorker函數(shù)的執(zhí)行邏輯了,具體如下:
//core為true,那么創(chuàng)建線程是以corePoolSize作為線程數(shù)最大值,否則以maximumPoolSize作為線程數(shù)最大值
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 線程狀態(tài)是非RUNNING狀態(tài)不再進(jìn)行任務(wù)提交處理,其中SHUTDOWN狀態(tài)下已經(jīng)提交進(jìn)行任務(wù)和阻塞隊(duì)列 中的任務(wù)要繼續(xù)處理
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//線程池中線程大于最大線程數(shù)或者大于要求的閾值,返回失敗
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//符合要求后,CAS增大線程數(shù),跳出自旋,走下面的線程創(chuàng)建邏輯
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;//標(biāo)記線程是否啟動(dòng)
boolean workerAdded = false;//標(biāo)記線程是否添加成功
Worker w = null;
try {
//創(chuàng)建新的線程并封裝為一個(gè)Work對(duì)象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//對(duì)線程池創(chuàng)建的線程狀態(tài)進(jìn)行檢查
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//如果新線程檢查成功,將新線程加入workers中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
//更新全局變量,線程池達(dá)到的最大線程數(shù),該值可以輸出作為線程池參數(shù)設(shè)定的指標(biāo)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//這塊重要了,線程創(chuàng)建成功后,開始執(zhí)行任務(wù)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
? 我們繼續(xù)跟著上述代碼思路走,看下任務(wù)如何執(zhí)行,t.start()的會(huì)調(diào)用Worker類run()方法,而該方法會(huì)調(diào)用runWorker來從任務(wù)隊(duì)列中獲取任務(wù),執(zhí)行任務(wù),具體看下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果Worker中創(chuàng)建時(shí)存在任務(wù),則執(zhí)行;否則,調(diào)用getTask從阻塞隊(duì)列中獲取任務(wù),當(dāng)阻塞隊(duì)列中沒有任務(wù)并且線程不應(yīng)該被回收時(shí),線程會(huì)一直阻塞等待獲取任務(wù),具體在getTask方法中分析
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 上面的英文注釋很清楚了,這塊為了處理調(diào)用shutdownNow時(shí)需要停止所有的線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//這個(gè)函數(shù)可以自己實(shí)現(xiàn),默認(rèn)為空
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行具體任務(wù)的業(yè)務(wù)邏輯
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//這個(gè)函數(shù)可以自己實(shí)現(xiàn),默認(rèn)為空
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執(zhí)行線程銷毀過程
processWorkerExit(w, completedAbruptly);
}
}
? 在getTask方法中, Worker線程會(huì)一直循環(huán)的從阻塞隊(duì)列中獲取任務(wù),直到遇到以下情況會(huì)返回null,進(jìn)而執(zhí)行上面的線程銷毀過程processWorkerExit:
(1) 線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空;
(2) 線程數(shù)狀態(tài)變大于SHUTDOWN (STOP TIDYING TERMINATED);
(3) 線程池線程數(shù)大于最大線程數(shù)或者線程超時(shí)未獲取任務(wù)的情況下,任務(wù)隊(duì)列為空或者工作線程數(shù)大于1;
這塊邏輯具體代碼如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 線程池狀態(tài)為STOP或者(狀態(tài)為SHUTDOWN&&任務(wù)隊(duì)列為空),這個(gè)時(shí)候無需在執(zhí)行任務(wù)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//設(shè)置了允許核心線程超過keepAliveTime空閑后銷毀線程 或者 線程數(shù)大于核心線程數(shù)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//從阻塞隊(duì)列中獲取任務(wù),如果進(jìn)行超時(shí)控制,則調(diào)用poll方法,否則調(diào)用take一直阻塞到隊(duì)列中有任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
? 以上就是線程池中任務(wù)執(zhí)行的大致過程,接下來我們對(duì)線程池結(jié)束及其中實(shí)現(xiàn)的一些細(xì)節(jié)進(jìn)行分析。
原文
袁瓊瓊的技術(shù)博客,歡迎指針
http://yuanqiongqiong.cn/2019/04/10/%E8%B0%88%E8%B0%88ThreadPoolExecutor%E7%9A%84%E5%AE%9E%E7%8E%B0/