夢(mèng)想在沒(méi)有實(shí)現(xiàn)之前,不必對(duì)他人講。
先從全局看問(wèn)題總是沒(méi)錯(cuò)的,線(xiàn)程池的繼承體系:

Executors 是一個(gè)用來(lái)生產(chǎn)線(xiàn)程池的靜態(tài)工廠,可以通過(guò)該類(lèi)生產(chǎn)ExecutorService、ScheduledExecutorService等對(duì)象。
在 Executors 這個(gè)類(lèi)里面,定義了這么幾種常用的線(xiàn)程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
這幾種線(xiàn)程池都構(gòu)造了ThreadPoolExecutor類(lèi),只是參數(shù)不同,所以看一下這個(gè)ThreadPoolExecutor類(lèi)。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor參數(shù)描述如下:
-
corePoolSize線(xiàn)程池核心線(xiàn)程數(shù)。當(dāng)提交一個(gè)任務(wù)時(shí),線(xiàn)程池會(huì)新創(chuàng)建一個(gè)新線(xiàn)程執(zhí)行任務(wù),直到線(xiàn)程數(shù)達(dá)到corePoolSize;之后繼續(xù)提交的任務(wù)會(huì)被保存到阻塞隊(duì)列中。 -
maximumPoolSize線(xiàn)程池最大線(xiàn)程數(shù)。這個(gè)參數(shù)只有在隊(duì)列有界的情況下才有效。當(dāng)前阻塞隊(duì)列滿(mǎn)了的情況下,繼續(xù)提交任務(wù)時(shí),則會(huì)繼續(xù)創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù),直到線(xiàn)程數(shù)達(dá)到maximumPoolSize。之后再提交任務(wù),會(huì)執(zhí)行拒絕策略。 -
keepAliveTime空閑隊(duì)列存活時(shí)間。大于corePoolSize的空閑線(xiàn)程在該時(shí)間之后會(huì)被銷(xiāo)毀 -
unitkeepAliveTime 的單位 -
workQueue阻塞隊(duì)列,一般有如下幾種阻塞隊(duì)列
- ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列
- inkedBlockingQuene:基于隊(duì)列的無(wú)界阻塞隊(duì)列
- SynchronousQuene:不實(shí)際存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作,反之亦然。如果使用該隊(duì)列,提交的任務(wù)不會(huì)保存,而總是將新任務(wù)提交給線(xiàn)程執(zhí)行,如果沒(méi)有空閑線(xiàn)程,則嘗試創(chuàng)建新的線(xiàn)程,如果線(xiàn)程已達(dá)最大值,則執(zhí)行拒絕策略。
- priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列
-
threadFactory線(xiàn)程工廠 -
handler拒絕策略,當(dāng)隊(duì)列已滿(mǎn),且沒(méi)有空閑線(xiàn)程時(shí),會(huì)執(zhí)行一種拒絕策略,JDK一共有四種拒絕策略
- AbortPolicy:直接拋出異常
- CallerRunsPolicy :在調(diào)用者線(xiàn)程中運(yùn)行任務(wù)
- DiscardOldestPolicy: 丟棄最早的一個(gè)請(qǐng)求,再次提交該任務(wù)
- DiscardPolicy: 直接丟棄,不做任何處理
結(jié)合之前的代碼可以看到,當(dāng)corePoolSize 等于maximumPoolSize 時(shí),構(gòu)造的就是newFixedThreadPool,這兩個(gè)都為1 時(shí),構(gòu)造的是newSingleThreadExecutor。newCachedThreadPool線(xiàn)程池在沒(méi)有任務(wù)執(zhí)行時(shí),數(shù)量為0,其數(shù)量會(huì)動(dòng)態(tài)變化,最大值為Integer.MAX_VALUE`
ScheduledThreadPoolExecutor 繼承了ThreadPoolExecutor,構(gòu)造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor增加了一些定時(shí)任務(wù)的功能,這里使用到了DelayedWorkQueue,這個(gè)隊(duì)列也很有意思,模擬了二叉查找樹(shù)的性質(zhì),用來(lái)存放有序的計(jì)劃任務(wù)。
主要方法如下:
//在指定的時(shí)間后,對(duì)任務(wù)進(jìn)行一次調(diào)度
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//對(duì)任務(wù)進(jìn)行周期性調(diào)度,以開(kāi)始時(shí)間計(jì)算,周期性調(diào)度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//對(duì)任務(wù)進(jìn)行周期性調(diào)度,以結(jié)束時(shí)間計(jì)算,經(jīng)過(guò)延遲后,才進(jìn)行下一次
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
那么在線(xiàn)程池中的線(xiàn)程是如何調(diào)度的,線(xiàn)程池的原理是什么呢?
先看一下線(xiàn)程池的狀態(tài)表示:
//這個(gè)原子類(lèi)非常強(qiáng)大,其中的高3為表示線(xiàn)程池狀態(tài),后29位表示線(xiàn)程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//高3位為111,表示線(xiàn)程池能接受新任務(wù),并且可以運(yùn)行隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//高3位000,表示線(xiàn)程池不再接受新任務(wù),但可以處理隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高3為001,表示線(xiàn)程池不再接受新任務(wù),不再執(zhí)行隊(duì)列中的任務(wù),而且要中斷正在處理的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
//高3位010,表示線(xiàn)程池位為空,準(zhǔn)備關(guān)閉
private static final int TIDYING = 2 << COUNT_BITS;
//高3位011,表示線(xiàn)程池已關(guān)閉
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//將高3位,低29位保存在一個(gè)int里
private static int ctlOf(int rs, int wc) { return rs | wc; }
接下來(lái)分析線(xiàn)程池的調(diào)度代碼,當(dāng)我們用線(xiàn)程池執(zhí)行一個(gè)任務(wù)的時(shí)候,會(huì)執(zhí)行以下方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取ctl值,上面的分析知道,這個(gè)值包含了高3位的線(xiàn)程池狀態(tài)和低29位的線(xiàn)程池?cái)?shù)量
int c = ctl.get();
//拿到線(xiàn)程數(shù)量和核心線(xiàn)程數(shù)比較
if (workerCountOf(c) < corePoolSize) {
// 如果當(dāng)前線(xiàn)程數(shù)量< 核心線(xiàn)程數(shù),則執(zhí)行addWorker 方法,這個(gè)方法會(huì)新建線(xiàn)程并執(zhí)行任務(wù)
if (addWorker(command, true))
return;
//如果執(zhí)行失敗,再拿一次ctl的值
c = ctl.get();
}
// 當(dāng)線(xiàn)程數(shù)大于核心線(xiàn)程,或上邊任務(wù)添加失敗時(shí)
// 在線(xiàn)程池可用的時(shí)候,會(huì)將任務(wù)添加到阻塞隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次確認(rèn)線(xiàn)程池狀態(tài),若線(xiàn)程池停止了,將任務(wù)刪除,并執(zhí)行拒絕策略
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//如果線(xiàn)程數(shù)量為0,則放入一個(gè)空任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果隊(duì)列無(wú)法放入,則再新建線(xiàn)程執(zhí)行任務(wù),如果失敗,執(zhí)行 拒接策略
// 這里就是從core 到 max 的擴(kuò)展
else if (!addWorker(command, false))
reject(command);
}
下面看一下addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取線(xiàn)程池狀態(tài)
int rs = runStateOf(c);
// 如果線(xiàn)程池不在運(yùn)行狀態(tài),則不再處理提交的任務(wù),直接返回 , 但可以繼續(xù)執(zhí)行隊(duì)列中已有的任務(wù)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//這里的死循環(huán)是為了CAS 線(xiàn)程數(shù)量,直到成功之后跳出外層循環(huán)
for (;;) {
// 獲取線(xiàn)程數(shù)
int wc = workerCountOf(c);
//判斷線(xiàn)程數(shù)是否已達(dá)最大值,超過(guò)容量直接返回
if (wc >= CAPACITY ||
//判斷是核心線(xiàn)程還是最大線(xiàn)程
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加線(xiàn)程數(shù),跳出外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 檢查線(xiàn)程池狀態(tài),如果與開(kāi)始不同,則從外層循環(huán)重新開(kāi)始
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 用傳進(jìn)來(lái)的任務(wù)構(gòu)造一個(gè)worker ,該類(lèi)繼承了AQS,實(shí)現(xiàn)了Runnable
w = new Worker(firstTask);
// 獲取worker中創(chuàng)建的線(xiàn)程
final Thread t = w.thread;
if (t != null) {
//加鎖 ,HashSet線(xiàn)程不安全
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// 檢測(cè)線(xiàn)程池狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//確認(rèn)創(chuàng)建的線(xiàn)程還沒(méi)開(kāi)始運(yùn)行
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將線(xiàn)程加入集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功之后,啟動(dòng)worker線(xiàn)程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
//返回值標(biāo)識(shí)線(xiàn)程是否啟動(dòng)
return workerStarted;
}
看一下線(xiàn)程是怎么啟動(dòng)的:
// worker類(lèi)
Worker(Runnable firstTask) {
//在運(yùn)行之前不允許中斷
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
線(xiàn)程啟動(dòng)執(zhí)行的是runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//由于在worker構(gòu)造方法中抑制了中斷,這里解除抑制
w.unlock(); // allow interrupts
//默認(rèn)為true,說(shuō)明發(fā)生了異常
boolean completedAbruptly = true;
try {
//先執(zhí)行傳進(jìn)來(lái)的任務(wù),之后從隊(duì)列獲取任務(wù)執(zhí)行
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//在任務(wù)執(zhí)行之前,可以做一些事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任務(wù)的真正的執(zhí)行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務(wù)執(zhí)行完,可以做些事情,注意:這里可以拿到任務(wù)運(yùn)行時(shí)的異常
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 如果一切正常,置為false , 清理時(shí)會(huì)做判斷
completedAbruptly = false;
} finally {
//清理工作,同時(shí) 任務(wù)如果有異常,會(huì)通過(guò)這個(gè)方法擦屁股
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
// 兩種情況:
// 1.RUNING狀態(tài)
// 2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
// 執(zhí)行到這里說(shuō)明線(xiàn)程已超核心線(xiàn)程數(shù)并且超時(shí),這時(shí)返回null回收線(xiàn)程
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//如果核心線(xiàn)程允許超時(shí),或者線(xiàn)程數(shù)已達(dá)到核心線(xiàn)程數(shù),則執(zhí)行poll
//poll方法在規(guī)定時(shí)間內(nèi)沒(méi)返回會(huì)返回null,在下一輪循環(huán)的時(shí)候,會(huì)返回null,線(xiàn)程會(huì)被銷(xiāo)毀
// 否則,執(zhí)行take方法,該方法會(huì)阻塞直到隊(duì)列中有任務(wù),所以當(dāng)線(xiàn)程數(shù)在核心線(xiàn)程數(shù)以下的線(xiàn)程不會(huì)被銷(xiāo)毀
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
最后看一下runWorker中的清理工作:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果非正常結(jié)束,將線(xiàn)程數(shù)減一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//從線(xiàn)程池中移出異常和超時(shí)的線(xiàn)程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試關(guān)閉線(xiàn)程池
tryTerminate();
int c = ctl.get();
//線(xiàn)程池狀態(tài)在RUNNING或SHUTDOWN時(shí)
if (runStateLessThan(c, STOP)) {
// 線(xiàn)程正常結(jié)束
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果線(xiàn)程為0 但是隊(duì)列中還有任務(wù)要執(zhí)行
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//線(xiàn)程數(shù)量滿(mǎn)足條件,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//新建空的任務(wù),假如隊(duì)列中有任務(wù)的話(huà),這里保證能執(zhí)行
//如果線(xiàn)程是因?yàn)楫惓M顺龅模@里進(jìn)行補(bǔ)充
addWorker(null, false);
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 線(xiàn)程池正在運(yùn)行時(shí)
// 線(xiàn)程池是SHUTDOWN狀態(tài),但是隊(duì)列還有任務(wù)時(shí)
// 線(xiàn)程池已經(jīng)準(zhǔn)備停止時(shí) 直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//下面的代碼說(shuō)明線(xiàn)程池真的需要關(guān)閉了
//如果線(xiàn)程數(shù)量不為0,說(shuō)明需要將線(xiàn)程中斷,這里只中斷一個(gè)線(xiàn)程就可以(為啥呢?)
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//執(zhí)行關(guān)閉操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 使用 CAS 設(shè)置狀態(tài)位
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
到這里,線(xiàn)程池的基本原理基本能明白一二吧...