前言
本文重點(diǎn)分析了ThreadPoolExecutor兩個(gè)方法execute() 和 submit() 的執(zhí)行原理,并說明Future如何實(shí)現(xiàn)阻塞返回。
繼承關(guān)系圖

關(guān)鍵方法介紹
構(gòu)造方法
/**
* @param corePoolSize 核心線程數(shù)
* @param maximumPoolSize 最大線程數(shù)
* @param keepAliveTime 臨時(shí)線程保留時(shí)間
* @param unit 臨時(shí)線程保留時(shí)間單位
* @param workQueue 阻塞隊(duì)列
* @param threadFactory 線程工程
* @param handler 拒絕策略
*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
為了方便區(qū)分,本文會(huì)將超過核心線程數(shù)創(chuàng)建的線程叫臨時(shí)線程,本質(zhì)上這兩類線程沒有任何區(qū)別,到期回收哪個(gè)線程完全是跟當(dāng)時(shí)線程池哪個(gè)線程先被空閑有關(guān),跟創(chuàng)建時(shí)間的先后無關(guān)
execute(Runnable command)
默認(rèn)參數(shù)
先介紹主要方法實(shí)現(xiàn)前,先說明一些靜態(tài)變量的含義和值。
ctl 官方給出的注釋是The main pool control state,這個(gè)值包含了兩部分,workerCount和runState。
int COUNT_BITS = Integer.SIZE - 3 = 29; 一共32位,高3位表示線程池的運(yùn)行狀態(tài),低29位表示線程池中的線程數(shù)量。是一種高低位的實(shí)現(xiàn)。
用一個(gè)變量去存儲(chǔ)兩個(gè)值,可避免在做相關(guān)決策時(shí),出現(xiàn)不一致的情況,不必為了維護(hù)兩者的一致,而占用鎖資源。通過閱讀線程池源代碼也可以發(fā)現(xiàn),經(jīng)常出現(xiàn)要同時(shí)判斷線程池運(yùn)行狀態(tài)和線程數(shù)量的情況。
int CAPACITY = (1 << COUNT_BITS) - 1 = 536870912;也就是從的線程容量是536870912個(gè)。
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 都是用高3位表示不同的含義。低29位都是0
具體值參考下表:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 RUNNING | 0 = -536870912 , 1110 0000 + 24位0
private static final int COUNT_BITS = Integer.SIZE - 3; //29 高3位表示狀態(tài) 低29表示線程數(shù)量
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //536870912 0001 1111 + 24位1
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // -536870912 1110 0000 + 24位0
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0 0000 0000 + 24位0
private static final int STOP = 1 << COUNT_BITS; // 536870912 0010 0000 + 24位0
private static final int TIDYING = 2 << COUNT_BITS; // 1073741824 0100 0000 + 24位0
private static final int TERMINATED = 3 << COUNT_BITS; // 1610612736 0110 0000 + 24位0
// Packing and unpacking ctl
// 如果c是默認(rèn)值-536870912,
// runStateOf = (-536870912 & ~29) = -536870912,
// workerCountOf = (-536870912 & 29) = 0
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
源碼分析
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//c = -536870912
int c = ctl.get();
// workerCountOf(c) = 0
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
首先,這個(gè)execute有三個(gè)主要的if判斷:
//判斷當(dāng)前線程池中的線程數(shù)量有沒有到核心線程數(shù),沒有就創(chuàng)建新的worker來處理任務(wù)。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//執(zhí)行到此處,說明此時(shí)線程池的線程數(shù)已經(jīng)超過了coolPoolSize。先判斷線程池狀態(tài),且嘗試將任務(wù)添加到阻塞隊(duì)列里。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 最后意味著此時(shí)阻塞隊(duì)列已滿,嘗試創(chuàng)建新的worker來處理,不能創(chuàng)建則執(zhí)行拒絕策略。
else if (!addWorker(command, false))
reject(command);
addWorker()
很長(zhǎng)的一個(gè)方法,注釋就不貼了,兩個(gè)參數(shù)分別是當(dāng)前要執(zhí)行的任務(wù)和core(表示要?jiǎng)?chuàng)建的是核心線程還是臨時(shí)線程)。
這里的worker是真正負(fù)責(zé)處理任務(wù)的對(duì)象,worker內(nèi)部封裝了所屬線程和待執(zhí)行的任務(wù).
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
接下來主要看addWorker方法的實(shí)現(xiàn)。
private boolean addWorker(Runnable firstTask, boolean core) {
//第一部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//第二部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
第一部分:
rs表示是線程池的狀態(tài),先校驗(yàn)線程池狀態(tài)和隊(duì)列數(shù)量。前文已經(jīng)提過,RUNNING的值是負(fù)數(shù),SHOTDOWN是0,其他值都是正數(shù)。
之后是for循環(huán),判斷容量和是否超過了預(yù)設(shè)的線程數(shù)量。
如果成功增加了workerCount的值就跳出循環(huán),開始執(zhí)行任務(wù)。
如果失敗,說明有并發(fā)情況,就重新獲取ctl,判斷rs狀態(tài)是否變了,從而決定是重新執(zhí)行一遍大或小循環(huán)。
for循環(huán)結(jié)束后,說明當(dāng)前可以增加worker對(duì)象。此時(shí)就真正創(chuàng)建對(duì)象開始執(zhí)行任務(wù)。
第二部分:
在創(chuàng)建worker對(duì)象時(shí),構(gòu)造方法中也創(chuàng)建了一個(gè)Thread。并通過lock來保證原子性,校驗(yàn)狀態(tài)之后將worker對(duì)象add到HashSet中。
private final HashSet<Worker> workers = new HashSet<Worker>();
添加后,釋放鎖并start線程。
如果在addWorker過程中失敗,且第一階段順利完成,就從hashSet中移除,并減少workerCount。
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
如果添加任務(wù)順利,則在t.start();執(zhí)行完成后,主要任務(wù)就完成了并返回true。此時(shí)線程會(huì)執(zhí)行worker對(duì)象內(nèi)的run方法。
worker內(nèi) run()
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
runWorker真正執(zhí)行,這個(gè)this只得是worker對(duì)象,task和線程都已經(jīng)封裝到worker內(nèi)了。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池已經(jīng)是STOP或TIDYING或TERMINATED,需要將線程也主動(dòng)中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里說明一些最核心的邏輯
執(zhí)行過程:
- 在while中判斷當(dāng)前的task和隊(duì)列中的task,如果當(dāng)前task != null,說明是線程是伴隨著任務(wù)一起創(chuàng)建的,直接調(diào)用task.run來執(zhí)行。
- 第一圈執(zhí)行完成后,task=null,第二次執(zhí)行while時(shí),需要從getTask中取task來執(zhí)行。
- 當(dāng)getTask() 返回null時(shí),while結(jié)束,設(shè)置completedAbruptly = false;表明任務(wù)時(shí)正常結(jié)束。最后調(diào)用processWorkerExit來退出線程。
這里提供了兩個(gè)方法:beforeExecute 和 afterExecute,task.run()的切面,我們可以定義worker的子類,來實(shí)現(xiàn)擴(kuò)展,比如加入一些監(jiān)控等。
getTask() 返回null就代表著線程可以正常結(jié)束,那么什么情況下會(huì)返回null?
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask() 的主要任務(wù)是從阻塞隊(duì)列中獲取task。通過判斷當(dāng)前的wc 是否超過了核心線程數(shù),來決定poll還是take來取任務(wù)。
如果超過了,說明此時(shí)已經(jīng)創(chuàng)建過了臨時(shí)線程,臨時(shí)線程的有效期就是等待從隊(duì)列返回的時(shí)間,超過這個(gè)時(shí)間沒有取到,則設(shè)置timeOut表示已經(jīng)超時(shí),在下一次for循環(huán)的if判斷中,返回null,讓這個(gè)臨時(shí)線程自動(dòng)結(jié)束。
如果沒超過,說明此時(shí)還處在核心線程的階段,可以take長(zhǎng)期等待。
至此,run方法的執(zhí)行過程就此完成。
任務(wù)是如何添加到隊(duì)列中的,還得回到execute方法。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
如果已經(jīng)達(dá)到核心線程數(shù),就不能在繼續(xù)addWorker,而是要offer到workQueue中,并再次檢查線程池狀態(tài)。
如果offer失敗,說明阻塞隊(duì)列已滿,此時(shí)需要繼續(xù)創(chuàng)建新的worker來完成任務(wù)。
else if (!addWorker(command, false))
reject(command);
這里的false代表 創(chuàng)建時(shí)和最大線程數(shù)進(jìn)行比較,如果超過了最大線程數(shù),則調(diào)用reject來執(zhí)行拒絕策略。
reject()
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
4種默認(rèn)的拒絕策略
AbortPolicy : 直接拋出異常(默認(rèn)策略)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy : 什么也不處理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
DiscardOldestPolicy : 把當(dāng)前最早在隊(duì)列的任務(wù)丟棄,并將再次執(zhí)行此任務(wù)(可能會(huì)直接執(zhí)行,也可能被加到隊(duì)列中)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
CallerRunsPolicy : 由當(dāng)前線程來直接執(zhí)行run,不再交給線程池。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
submit 源碼分析
submit()
Future<?> future = Executors.newCachedThreadPool().submit(new Thread());
...
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit方法可用于帶返回值的任務(wù)執(zhí)行??梢苑祷谾uture來獲取線程的執(zhí)行結(jié)果,具體的實(shí)現(xiàn)定義在AbstractExecutorService中。
首先創(chuàng)建了一個(gè)FutureTask對(duì)象,傳入了要執(zhí)行的任務(wù)。把封裝后的FutureTask交給execute來執(zhí)行。
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable 要執(zhí)行的任務(wù)
* @param 返回的默認(rèn)值
* @param <T> the type of the given value
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask
繼承關(guān)系圖和構(gòu)造方法

/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// FutureTask 可能的狀態(tài)列表
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 執(zhí)行的任務(wù) */
private Callable<V> callable;
/** get() 的返回值,即最終的執(zhí)行結(jié)果 */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
// 單項(xiàng)列表的node
private volatile WaitNode waiters;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask 既然將任務(wù)封裝到了callable屬性中,且它自身還是一個(gè)Runnable,那么真正執(zhí)行一定在run方法中。而get() 是一個(gè)阻塞方法,當(dāng)執(zhí)行完成后,可以獲取返回值,否則就等待。
那重點(diǎn)看下run() 和 get() 的實(shí)現(xiàn)。
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
state 有多種狀態(tài),用來標(biāo)記當(dāng)前任務(wù)的執(zhí)行情況,如果已經(jīng)是完成狀態(tài),通過report方法直接返回outcome即可。
如果還未到達(dá)完成態(tài),就說明當(dāng)前任務(wù)還在執(zhí)行,此時(shí)需要await等待,也就是awaitDone。
awaitDone()
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
awaitDone的兩個(gè)參數(shù)分別用于表示是否有等待時(shí)間,以及等待時(shí)間的納秒數(shù)。
如果有等待時(shí)間,deadline就是截止時(shí)間。
下面則是主要邏輯:
一般來說,這里的for循環(huán)會(huì)執(zhí)行3圈,(不考慮已經(jīng)執(zhí)行完成和中斷的情況)。
- 第一圈:因?yàn)閃aitNode q 最初被賦值為null,在run執(zhí)行完之前,state是NEW,所以for循環(huán)會(huì)執(zhí)行q=null的邏輯,先創(chuàng)建一個(gè)WaitNode對(duì)象。
- 第二圈:因?yàn)閝此時(shí)有值,但queued是false,此時(shí)for循環(huán)執(zhí)行! queued的邏輯,如果設(shè)置成功,則queued = true。
- 第三圈:LockSupport.park(this); (如果有deadline,就判斷是否超時(shí)了)此時(shí)線程進(jìn)入阻塞狀態(tài)等待喚醒。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
//背景:
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
重點(diǎn)就是這一句。
這一句是做了兩個(gè)事情:
- 構(gòu)建waiters的Node單向鏈表
- 如果添加隊(duì)列成功就返回true。
這里為什么要構(gòu)建單向鏈表?
一般來說,一個(gè)task通過一個(gè)get()方法等待獲取就OK了,是一個(gè)單任務(wù)。但如果,同一個(gè)FutureTask的get() 方法被多個(gè)線程調(diào)用時(shí),多個(gè)線程(可能)會(huì)同時(shí)處于阻塞狀態(tài),這時(shí)就需要一個(gè)存儲(chǔ)介質(zhì)來存儲(chǔ)這些等待線程,這里是通過單鏈表來實(shí)現(xiàn)。
構(gòu)建單向鏈表的過程如下:
- 第一次調(diào)用get():
當(dāng)前waiters = null;q.next = waiters(null); waiters = q; 即waiters的頭節(jié)點(diǎn)是q,q.next是null。 - 第二次調(diào)用get(); 如果當(dāng)前的任務(wù)命名為p;
當(dāng)前waiters = q; p.next = waiter(q); waiters = p; 即構(gòu)建了一個(gè) p -> q的鏈表結(jié)構(gòu),waiters是頭節(jié)點(diǎn)p。 - 第三次調(diào)用get(); 如果當(dāng)前的任務(wù)命名為r;
最后的效果是 r -> p -> q; 可以看出來是頭插法。
run()
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
run() 比較簡(jiǎn)單,如果當(dāng)前FutureTask是NEW的狀態(tài),就調(diào)用callable.call(),將執(zhí)行完成的result通過set方法設(shè)置到outcome中。
且無論成功失敗,都將runner線程置為null,并判斷執(zhí)行過程中是否被其他線程中斷,如果因?yàn)橹袛喽。瑒t此線程一直交出時(shí)間片,直到狀態(tài)從INTERRUPTING變成INTERRUPTED。
如果成功執(zhí)行且沒有被中斷過,則通過set方法進(jìn)行返回值的設(shè)置。
set()
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
先判斷此時(shí)狀態(tài)是NEW,則改成COMPLETING,設(shè)置outcome后,狀態(tài)改成NORMAL(完成態(tài)),調(diào)用finishCompletion來喚醒等待中的線程。
finishCompletion()
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// done是一個(gè)空方法,給子類重寫用。
done();
callable = null; // to reduce footprint
}
這個(gè)方法比較簡(jiǎn)單,可以看到就是在遍歷waiters單鏈表,依次喚醒內(nèi)部的阻塞線程。(阻塞的發(fā)起點(diǎn)是get方法)。
總結(jié)
execute()
實(shí)現(xiàn)思想:
- task因?yàn)榻挥删€程池來執(zhí)行,線程池的線程直接調(diào)用task中的run,而不是執(zhí)行task.start()。
- 如果當(dāng)前線程池中的線程數(shù) < corePoolSize ,就創(chuàng)建新的線程添加到線程池中(HashSet存儲(chǔ))。
- 如果當(dāng)前的線程數(shù) > corePoolSize 就先存放到阻塞隊(duì)列里
- 如果阻塞隊(duì)列已滿,且 < maximumPoolSize,就創(chuàng)建新的線程添加到線程池中(HashSet存儲(chǔ)),當(dāng)keepAliveTime的時(shí)間沒有處理任務(wù),則銷毀(也就是讓run方法結(jié)束)。
- 如果已經(jīng)超過maximumPoolSize,則根據(jù)拒絕策略執(zhí)行。
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
submit()
實(shí)現(xiàn)思想:
任務(wù)執(zhí)行的思想還是execute,阻塞等待返回值的思想是通過Future完成。實(shí)現(xiàn)類是FutureTask。
- get()返回值時(shí)如果還未完成,將當(dāng)前線程封裝成WaiterNode,進(jìn)行LockSupport.park,并將所有park的線程按照頭插法構(gòu)建一個(gè)單向鏈表。
- run() 執(zhí)行完成后,將內(nèi)部的outcome屬性設(shè)置成當(dāng)前FutureTask的返回值,并unpark單鏈表中的所有阻塞線程,這些線程的get()會(huì)直接返回outcome的值。