前言
使用線程池能夠提高線程的復用率,避免不必要的創(chuàng)建線程,能夠節(jié)約內(nèi)存空間和CPU運行時間。除此之外用線程池作為接口執(zhí)行任務能夠將任務的提交與執(zhí)行解耦,在線程池內(nèi)部決定任務的分配,具有良好的內(nèi)聚性。
常用線程池
java.util.concurrent.Executors是一個線程池工廠,負責創(chuàng)建常用的線程池,主要有如下幾種:
FixedThreadPool 一個固定線程數(shù)量的線程池:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
CachedThreadPool 不固定線程數(shù)量,且支持最大為Integer.MAX_VALUE的線程數(shù)量:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SingleThreadExecutor 可以理解為線程數(shù)量為1的FixedThreadPool:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ScheduledThreadPool 支持定時以指定周期循環(huán)執(zhí)行任務:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
其中前三種線程池是ThreadPoolExecutor不同配置的實例,最后一種是ScheduledThreadPoolExecutor的實例。
ThreadPoolExecutor
先來通過ThreadPoolExecutor的構造方法了解一下這個類:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
主要參數(shù)有:
corePoolSize 核心線程的數(shù)量,不超過這個參數(shù)數(shù)量的線程會被保留在線程池內(nèi),即使它們是空閑的,如果設置了allowCoreThreadTimeOut為true除外。
maximumPoolSize 線程池所允許擁有線程的最大數(shù)量,當任務隊列的任務已滿,線程數(shù)已達到最大數(shù)量,任務會被拒絕。
keepAliveTime 當線程池的線程數(shù)量超過核心線程的數(shù)量,這些非核心線程會嘗試在keepAliveTime內(nèi)獲取隊列內(nèi)的任務,如果獲取失敗則被線程池移除并終止。
unit 超時時間的單位。
workQueue 任務的阻塞隊列,緩存將要執(zhí)行的Runnable任務,由各線程輪詢該任務隊列獲取任務執(zhí)行。
threadFactory 線程創(chuàng)建的工廠。
handler 當任務由于線程數(shù)量或者任務隊列達到上限,會執(zhí)行該接口的方法處理任務的拒絕。
ThreadPoolExecutor的狀態(tài)變量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中ctl是ThreadPoolExecutor的同步狀態(tài)變量。
workerCountOf()方法取得當前線程池的線程數(shù)量,算法是將ctl的值取低29位。
runStateOf()方法取得線程池的狀態(tài),算法是將ctl的值取高3位:
- RUNNING 111 表示正在運行
- SHUTDOWN 000 表示拒絕接收新的任務
- STOP 001 表示拒絕接收新的任務并且不再處理任務隊列中剩余的任務,并且中斷正在執(zhí)行的任務。
- TIDYING 010 表示所有線程已停止,準備執(zhí)行terminated()方法。
- TERMINATED 011 表示已執(zhí)行完terminated()方法。
Executor.execute(Runnable command)
該方法將使用線程池執(zhí)行Runnable對象的run()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//情況1
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//情況2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //情況3
reject(command);
}
以上代碼對應了三種情況:
- 線程池的線程數(shù)量小于核心線程數(shù)量上限,開啟核心線程執(zhí)行任務。
- 線程池的線程數(shù)量不小于核心線程數(shù)量上限,或者開啟核心線程失敗,嘗試將任務以非阻塞的方式添加到任務隊列。
- 任務隊列以滿導致添加任務失敗,開啟新的非核心線程執(zhí)行任務。
回顧FixedThreadPool,因為它配置的corePoolSize與maximumPoolSize相等,所以不會執(zhí)行到情況3,并且因為workQueue為默認的LinkedBlockingQueue,其長度為Integer.MAX_VALUE,幾乎不可能出現(xiàn)任務無法被添加到workQueue的情況,所以FixedThreadPool的所有任務執(zhí)行在核心線程中。
而CachedThreadPool的corePoolSize為0,表示它不會執(zhí)行到情況1,因為它的maximumPoolSize為Integer.MAX_VALUE,所以幾乎沒有線程數(shù)量上限,因為它的workQueue為SynchronousQueue,所以當線程池里沒有閑置的線程SynchronousQueue就會添加任務失敗,因此會執(zhí)行到情況3添加新的線程執(zhí)行任務。
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//第一個循環(huán)輪詢線程池的狀態(tài),如果線程池處于SHUTDOWN及大于它的狀態(tài)則拒絕執(zhí)行任務
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//第二個循環(huán)就嘗試將當前線程數(shù)+1
//如果是核心線程當前線程數(shù)必須小于corePoolSize
//如果是非核心線程則當前線程數(shù)必須小于maximumPoolSize
//如果當前線程數(shù)小于線程池支持的最大線程數(shù)CAPACITY 也會返回失敗
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//這里已經(jīng)成功執(zhí)行了CAS操作將線程池數(shù)量+1,下面創(chuàng)建線程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //firstTask的優(yōu)先級高于任務隊列
final Thread t = w.thread; //Worker內(nèi)部有一個Thread,并且執(zhí)行Worker的run方法,因為Worker實現(xiàn)了Runnable
if (t != null) {
//這里必須同步在狀態(tài)為運行的情況下將Worker添加到workers中
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//如果添加成功則運行線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker這個方法先嘗試在線程池運行狀態(tài)為RUNNING并且線程數(shù)量未達上限的情況下通過CAS操作將線程池數(shù)量+1,接著在ReentrantLock同步鎖的同步保證下判斷線程池為運行狀態(tài),然后把Worker添加到HashSet workers中。如果添加成功則執(zhí)行Worker的內(nèi)部線程。
Worker構造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
這里指定了第一個要執(zhí)行的任務,并通過線程池的線程工廠創(chuàng)建線程??梢园l(fā)現(xiàn)這個線程的參數(shù)為this,即Worker對象,因為Worker實現(xiàn)了Runnable因此可以被當成任務執(zhí)行,執(zhí)行的即Worker實現(xiàn)的run方法:
public void run() {
runWorker(this);
}
runWorker方法
因為Worker為ThreadPoolExecutor的內(nèi)部類,因此runWorker方法實際是ThreadPoolExecutor定義的:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 因為Worker的構造函數(shù)中setState(-1)禁止了中斷,這里的unclock用于恢復中斷
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這個方法是線程池復用線程的核心代碼,注意Worker繼承了AbstractQueuedSynchronizer,在執(zhí)行每個任務前通過lock方法加鎖,執(zhí)行完后通過unlock方法解鎖,這種機制用來防止運行中的任務被中斷。在執(zhí)行任務時先嘗試獲取firstTask,即構造方法傳入的Runnable對象,然后嘗試從getTask方法中獲取任務隊列中的任務。在任務執(zhí)行前還要再次判斷線程池是否已經(jīng)處于STOP狀態(tài)或者線程被中斷。
注意這里w.lock方法是在獲取到任務后才執(zhí)行的,也就是如果線程獲取到任務前都未加鎖,這樣能保證showDown方法嘗試獲取該鎖中斷空閑的線程,詳見后面的解析。
當線程被中斷、拋出異常、不能及時得到任務,processWorkerExit方法用于最后將線程回收。
getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
還記得Executor.execute方法的情況2是將任務添加到任務隊列,getTask方法就是從任務隊列中同步地取出任務。
這個方法通過一個循環(huán)不斷輪詢?nèi)蝿贞犃杏袥]有任務到來,首先判斷線程池是否處于正常運行狀態(tài),通過超時配置有兩種方法取出任務:
- BlockingQueue.poll 阻塞指定的時間嘗試獲取任務,如果超過指定的時間還未獲取到任務就返回null。
- BlockingQueue.take 這種方法會在取到任務前一直阻塞。
FixedThreadPool使用的是take方法,所以會線程會一直阻塞等待任務。CachedThreadPool使用的是poll方法,也就是說CachedThreadPool中的線程如果在60秒內(nèi)未獲取到隊列中的任務就會被終止。
到此ThreadPoolExecutor是怎么執(zhí)行Runnable任務的分析結束。
ExecutorService.shutdown()
既然有任務的執(zhí)行,就少不了任務的終止。ExecutorService是Executor的子類,也是ThreadPoolExecutor的基類。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
首先通過mainLock加鎖同步修改線程池狀態(tài)為SHUTDOWN,然后通過interruptIdleWorkers方法中斷空閑線程,OnShowDown方法是留給子類去實現(xiàn)的。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
//未中斷的,且tryLock成功
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne) //只做了一次循環(huán)就結束了,因此最多只能中斷一個線程
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers(boolean onlyOne)方法也是先用mainLock加鎖同步,然后循環(huán)找出所有Worker中Thread未中斷的,通過tryLock方法嘗試獲取鎖。還記得上文的runWorker方法Worker的鎖是在獲取任務時才加的,interruptIdleWorkers方法通過競爭該鎖搶先中斷線程,這樣就導致未執(zhí)行任務的線程被中斷了,而正在執(zhí)行任務的線程不受影響,并且可以繼續(xù)執(zhí)行任務隊列中的任務。
ExecutorService.shutdownNow()
與ExecutorService.shutdown()不同的是,shutdownNow方法除了讓線程池拒絕接收新的任務,并且不再執(zhí)行任務隊列里未執(zhí)行的任務。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
首先mainLock同步將狀態(tài)改為STOP,然后中斷所有線程。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
interruptWorkers方法將對所有Worker執(zhí)行interruptIfStarted,即將所有運行中的線程中斷:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
還記得Worker的構造函數(shù)中執(zhí)行了setState(-1),而在runWorker方法中通過unlock將state改為0,因此可以被interruptWorkers方法中斷。
這里注意的是中斷并不意味著線程就一定停止工作,除非在任務中捕獲InterruptedException退出任務。
ExecutorService.submit(Callable<T> task)
使用該方法可以執(zhí)行一個帶返回值的Callable任務,通過該對象的call()方法定義線程要執(zhí)行的事情,同時call()方法的返回值也由開發(fā)者定義,該返回值可以通過ExecutorService.submit返回的Future對象的get方法阻塞獲取。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
submit方法的執(zhí)行過程中,實際是通過newTaskFor方法把Callable對象轉換為RunnableFuture對象,再由上述的execute(Runnable task)方法執(zhí)行的。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
newTaskFor方法實際是構造了一個FutureTask對象返回,RunnableFuture是FutureTask的超類,并且實現(xiàn)了Runnable和Future接口。所以execute方法必然會執(zhí)行FutureTask的run方法。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
...
}
FutureTask工作原理
首先來看下它的內(nèi)部狀態(tài):
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
當新建一個FutureTask,其初始狀態(tài)為NEW。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
FutureTask任務的執(zhí)行
下面分析FutureTask的run方法:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在FutureTask的run方法中,可以看到實際上調(diào)用了內(nèi)部Callable對象的call方法得到執(zhí)行結果。而當call方法執(zhí)行結束,如果沒有異常就執(zhí)行set(result)方法,有異常則執(zhí)行setException方法。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
兩個方法都通過UnSafe類的CAS方法將stateOffSet分別修改為NORMAL或EXCEPTIONAL,然后調(diào)用finishCompletion方法:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
該方法遍歷了一個waiters鏈表,取出WaitNode中封裝的線程,調(diào)用LockSupport.unpark喚醒線程。補充一下LockSupport是JDK中一個底層同步類,內(nèi)部通過UnSafe類實現(xiàn)park與unpark方法用來阻塞或者喚醒線程。
FutureTask得到任務的結果
剩下一個疑問,F(xiàn)utureTask調(diào)用get方法是如何阻塞等待結果的?
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
如果在任務成功執(zhí)行并返回后調(diào)用get,因為狀態(tài)已經(jīng)更新為NORMAL大于COMPLETING,直接返回report(s),否則執(zhí)行awaitDone方法。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
這段代碼需要配合finishCompletion方法理解:
1、當任務已完成,則返回狀態(tài)
2、當任務將要完成通過Thread.yield()將當前線程狀態(tài)從運行狀態(tài)變?yōu)榫途w狀態(tài),從而提高其他線程競爭運行的可能性,將當前狀態(tài)改為NORMAL。
3、如果任務未完成就創(chuàng)建一個WaitNode對象,內(nèi)部持有一個當前線程的引用,并且添加到waiters鏈表上,最后通過LockSupport.park阻塞線程。
因此任務完成后finishCompletion方法就作用就是將waiters鏈表的每個WaitNode中的線程喚醒,以便執(zhí)行report(s)。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report方法將會返回給調(diào)用FutureTask.get方法的線程Callable任務的結果或者異常。
FutureTask任務的取消
ExecutorService.submit方法會返回一個Future對象,上文已經(jīng)分析過,它實際是FutureTask對象向上轉型。
因此Future的cancel方法是在FutureTask中實現(xiàn)的。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
首先通過CAS操作嘗試將FutureTask狀態(tài)從NEW改為INTERRUPTING 或CANCELLED。如果FutureTask的狀態(tài)不為NEW,也就是任務已經(jīng)執(zhí)行完,cancel方法會被視為無效返回。因此cancel方法只適用于未開始執(zhí)行或者已經(jīng)開始執(zhí)行但未完成的任務。
參數(shù)mayInterruptIfRunning如果為true,則將狀態(tài)設置為INTERRUPTING,然后嘗試將運行任務的線程runner對象中斷。回顧run方法中的UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())就是利用CAS操作將runner對象賦值為任務的執(zhí)行線程。最后將FutureTask的狀態(tài)同步設置為INTERRUPTED。
如果mayInterruptIfRunning為false則不會嘗試中斷線程,而是將FutureTask的狀態(tài)同步設置為CANCELLED。
最終執(zhí)行finishCompletion方法,這個方法上文已經(jīng)分析過,它會將所有調(diào)用get方法阻塞等待結果的線程喚醒,并調(diào)用done方法。所以一旦調(diào)用cancel,已經(jīng)調(diào)用get方法的線程將被立即喚醒并返回CancellationException異常。
如果任務在隊列里未被執(zhí)行,cancel方法已經(jīng)將FutureTask的狀態(tài)改為INTERRUPTING或者CANCELLED,
當線程獲取到這個FutureTask執(zhí)行它的run方法時,判斷它的狀態(tài)已經(jīng)不是NEW,所以會直接返回,因此對于未執(zhí)行的任務執(zhí)行了cancel方法將不會被執(zhí)行。
對于已經(jīng)開始執(zhí)行但未執(zhí)行結束的線程,可以通過設置mayInterruptIfRunning為true嘗試將線程中斷從而捕獲異常退出。如果任務中沒有處理InterruptedException則會將任務執(zhí)行完,但是在執(zhí)行set方法返回結果時,CAS操作判斷狀態(tài)已經(jīng)不是NEW因此不會執(zhí)行后續(xù)操作。
ThreadPoolExecutor小結
分析了ThreadPoolExecutor執(zhí)行execute、submit、shutdown、shutdownNow等方法的執(zhí)行過程,并且衍生分析了FutureTask的工作原理。發(fā)現(xiàn)其中大量用到了以UnSafe類為基礎的同步鎖,后續(xù)會繼續(xù)探究ReenTrantLock、AbstractQueuedSynchronizer、LockSupport等同步類的原理。
限于篇幅ScheduledThreadPool 留在以后分析。
最后總結一下3個以ThreadPoolExecutor為實現(xiàn)類的線程池:
- SingleThreadPool 適合維護固定穩(wěn)定的單線程任務。
- FixedThreadPool 適合維護固定且穩(wěn)定的多個線程,而不用擔心任務數(shù)量過多導致的同時創(chuàng)建多個線程的問題。它的缺陷是要注意任務的阻塞問題,一旦線程池內(nèi)的所有線程都阻塞或者長時間被某個任務占用將不會創(chuàng)建新的線程來執(zhí)行任務,造成任務隊列里的剩余任務被長時間阻塞。
- CachedThreadPool 相比FixedThreadPool更適用于處理要求低延時的場景,不會受線程數(shù)量的約束而讓新的任務等待。但是如果任務過多會導致開啟的線程數(shù)量也很多,因此對內(nèi)存的開銷比FixedThreadPool更大,同時多線程調(diào)度也會更消耗CPU資源。但是一旦任務被執(zhí)行完,在等待一段時間后線程會被銷毀,因此收縮性較好。
如果以上三種線程不能滿足實際業(yè)務的需求,可以自定義參數(shù)創(chuàng)建更符合實際的ThreadPoolExecutor。