ThreadPoolExecutor解析
Java里線程池的基本接口是 Executor:
public interface Executor {
void execute(Runnable command);
}
實(shí)現(xiàn)線程池的類是ThreadPoolExecutor,最主要的構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
參數(shù)解析:
-
corePoolSize核心線程數(shù) -
maximumPoolSize最大線程數(shù) -
keepAliveTime線程空閑以后存活時(shí)間。通常在線程數(shù)大于核心線程數(shù)時(shí)才生效,直到存活線程數(shù)等于核心線程數(shù) -
unit時(shí)間單位 -
workQueue存儲任務(wù)的阻塞隊(duì)列 -
threadFactory用來創(chuàng)建線程的線程工廠 -
handler拒絕任務(wù)時(shí)的策略,通常有以下幾種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
成員變量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是原子的整數(shù)值,它用來記錄線程池的狀態(tài) 和 當(dāng)前線程池中線程 數(shù)量,初始值為RUNNING狀態(tài),線程數(shù)為0
private static final int COUNT_BITS = Integer.SIZE - 3;
Integer.SIZE表示int類型數(shù)據(jù)字節(jié)數(shù)(32),COUNT_BITS表示線程數(shù)量占據(jù)的位數(shù)(29)
//線程池最大容量(線程數(shù)) 00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//線程池狀態(tài)
// runState is stored in the high-order bits
//接受新任務(wù)并且處理阻塞隊(duì)列里的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//拒絕新任務(wù)但是處理阻塞隊(duì)列里的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//拒絕新任務(wù)并且拋棄阻塞隊(duì)列里的任務(wù)同時(shí)會中斷正在處理的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
//所有任務(wù)都執(zhí)行完(包含阻塞隊(duì)列里面任務(wù))當(dāng)前線程池活動線程為0,將要調(diào)用terminated方法
private static final int TIDYING = 2 << COUNT_BITS;
//終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)
private static final int TERMINATED = 3 << COUNT_BITS;
private static int ctlOf(int rs, int wc) { return rs | wc; }
可以看到,表示狀態(tài)的數(shù)據(jù)全部左移29位,存儲在int數(shù)值的前三位,然后通過ctlOf方法,將狀態(tài)和線程數(shù)兩個(gè)值位或操作結(jié)合起來,這樣就得到了ctl值,也就是說上面的ctl后29位用來存儲線程數(shù),前3位存儲線程池狀態(tài)。
執(zhí)行過程:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取當(dāng)前線程池的狀態(tài)
int c = ctl.get();
//判斷線程數(shù)是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//尚未達(dá)到核心線程數(shù),直接添加線程執(zhí)行任務(wù)
if (addWorker(command, true))
return;
//如果添加線程失敗,重新獲取線程池狀態(tài)
c = ctl.get();
}
//已經(jīng)達(dá)到核心線程數(shù),或者添加線程失敗,繼續(xù)執(zhí)行:
//判斷線程池是否處于Running狀態(tài),如果是,添加任務(wù)到隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
//再次檢查線程池狀態(tài)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果不是處于Running狀態(tài),就移除任務(wù),移除成功以后執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
//如果是處于非Running狀態(tài),但是任務(wù)移除失敗,或者處于Running狀態(tài),但是線程數(shù)為0,新建線程
addWorker(null, false);
}
//不是Running狀態(tài),或者任務(wù)添加失敗了(隊(duì)列已滿),進(jìn)入addWorker,執(zhí)行command
else if (!addWorker(command, false))
//添加線程失敗則拒絕任務(wù)
reject(command);
}
private static int workerCountOf(int c) { return c & CAPACITY; }
addWorker方法接受兩個(gè)參數(shù),command為待執(zhí)行的任務(wù)對象,core為true表示添加核心線程,false表示不需要使用核心線程。
execute執(zhí)行流程如下:
如果
commad為空,直接拋出異常獲取線程池狀態(tài),如果線程數(shù)小于核心線程數(shù),直接調(diào)用
addWorker方法,創(chuàng)建核心線程來執(zhí)行任務(wù)核心線程創(chuàng)建成功,則直接返回。如果核心線程創(chuàng)建失敗,說明當(dāng)前線程池核心線程已經(jīng)滿了或者線程池被關(guān)閉了等異常情況,任務(wù)沒有辦法執(zhí)行,那么重新獲取線程池狀態(tài),執(zhí)行下一步。
再次判斷線程池是否處于
Running狀態(tài)。如果是,添加command到任務(wù)隊(duì)列中,執(zhí)行下面的步驟5;如果不是Running狀態(tài)或者添加command失敗,跳到步驟9任務(wù)添加成功以后,等待線程來執(zhí)行它,此時(shí)會再次檢查狀態(tài),進(jìn)入下面6,7,8三個(gè)步驟
如果此時(shí)線程池不是
Running狀態(tài)了,把剛剛添加的任務(wù)移除掉,執(zhí)行拒絕策略,execute流程結(jié)束,任務(wù)執(zhí)行失敗。如果線程池仍然是
Running狀態(tài),或者線程池不是Running狀態(tài),但是移除任務(wù)失?。⊿hutdown時(shí),不接受新任務(wù),但是該任務(wù)還沒有執(zhí)行完,移除不掉),此時(shí)仍然有任務(wù)需要執(zhí)行,但是池內(nèi)的線程數(shù)為0,則調(diào)用addWorker方法添加一個(gè)null對象,創(chuàng)建一個(gè)非核心線程來執(zhí)行任務(wù),execute流程結(jié)束,任務(wù)會被執(zhí)行,但是線程池可能會在任務(wù)執(zhí)行完畢之后結(jié)束。如果線程池仍然是Running狀態(tài),而且線程池內(nèi)的線程數(shù)也不是0,那么
execute流程到此結(jié)束,任務(wù)添加到workQueue中,線程池狀態(tài)正常。線程池處于非
Running狀態(tài),或者任務(wù)添加失?。ū热珀?duì)列已滿),調(diào)用addWorker方法,通過非核心線程來處理command
線程數(shù)=worker數(shù),worker執(zhí)行完當(dāng)前任務(wù)以后會從隊(duì)列里取出任務(wù)來執(zhí)行,沒有任務(wù)的時(shí)候只保留核心線程數(shù)的worker運(yùn)行。
addWorker的執(zhí)行步驟:
private boolean addWorker(Runnable firstTask, boolean core) {
//該循環(huán)用來更新ctl的值(自增),addWorker可能會被多線程同時(shí)調(diào)用,所以更新ctl可能會失敗,如果失敗則重新讀取,繼續(xù)更新ctl值,更新成功則跳出循環(huán),開始創(chuàng)建線程
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//檢查線程池狀態(tài),分為幾種情況:
//1.當(dāng)前狀態(tài)大于Shutdown,直接返回false,說明線程池已經(jīng)被關(guān)閉了
//2.當(dāng)前狀態(tài)等于Shutdown,說明線程池正在關(guān)閉中,此時(shí)來判斷傳入的commad是否為null,并且! workQueue.isEmpty(),如果三項(xiàng)有一項(xiàng)不滿足,直接返回false。這種情況對應(yīng)的是execute中的addWorker(null, false);這一句指令,線程在關(guān)閉過程中,但是還有任務(wù)在等待執(zhí)行的情況。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//到這里說明線程池狀態(tài)更新成功了,開始創(chuàng)建線程執(zhí)行任務(wù)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創(chuàng)建一個(gè)持有當(dāng)前任務(wù)的worker
w = new Worker(firstTask);
//Worker里的線程是通過制定的 ThreadFactory 來創(chuàng)建的
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//此處對workers對象進(jìn)行操作,并發(fā)訪問的時(shí)候必須加鎖,保證不會重復(fù)添加線程
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//線程添加成功,開始執(zhí)行任務(wù)
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//最終會走到這里,判斷任務(wù)有沒有成功執(zhí)行,未成功的話說明線程沒有成功運(yùn)行起來,回滾ctl的狀態(tài)。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
該方法主要分為兩個(gè)步驟,第一步是對ctl狀態(tài)更新,第二步是狀態(tài)更新完成以后,創(chuàng)建線程并執(zhí)行任務(wù)。
第一步主要為兩個(gè)循環(huán),外層的循環(huán)用來獲取線程池狀態(tài),并判斷是否滿足創(chuàng)建線程的條件,如果滿足,進(jìn)入內(nèi)部的循環(huán),嘗試對ctl進(jìn)行自增操作(線程數(shù)+1)
考慮到并發(fā)問題,自增可能會失敗,失敗以后,再一次自增之前,線程池的狀態(tài)可能發(fā)生變化,所以需要再次獲取線程池狀態(tài),如果沒有變化再次嘗試自增,如果已經(jīng)變了,回到外部的循環(huán),重新判斷是否滿足自增條件。
滿足自增條件有幾下幾種情況:
線程池狀態(tài) > shutdown: 說明線程池已經(jīng)被關(guān)閉了,直接返回false,不再創(chuàng)建線程
線程池 = shutdown,此時(shí)再判斷
firstTask == null && !workQueue.isEmpty(),其實(shí)對應(yīng)的是execute方法中的addWorker(null, false);語句,說明線程池正在關(guān)閉,但是還有未執(zhí)行的任務(wù),此時(shí)也需要創(chuàng)建線程。傳入的command不是null,或者任務(wù)隊(duì)列為空,也會直接返回false,不再創(chuàng)建線程。
確定需要創(chuàng)建線程并且自增操作成功,線程成狀態(tài)更新完成以后,開始真正的線程創(chuàng)建和任務(wù)執(zhí)行工作
需要注意的是workers變量,它存儲的對象為Worker,實(shí)際上是對任務(wù)和線程的包裝,workers是一個(gè)HashSet,是線程不安全的,對它的操作需要加鎖。
第二步線程創(chuàng)建和執(zhí)行的步驟如下:
新建包含當(dāng)前任務(wù)的
Worker對象,獲取到該對象包含的線程(在Worker的構(gòu)造函數(shù)中,通過指定的線程工廠創(chuàng)建,創(chuàng)建對象時(shí)已經(jīng)生成了)。如果線程t不為空,說明創(chuàng)建線程成功,開始獲取鎖,檢查線程池和線程狀態(tài),更新
workers變量最后執(zhí)行t.start開始執(zhí)行任務(wù)
關(guān)鍵的t.start方法是怎樣執(zhí)行任務(wù)的,還要看接下來的Worker對象:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
上邊的源碼只列出了幾個(gè)關(guān)鍵部分,可以看到Worker繼承了Runnable,并重寫了run方法,而構(gòu)造函數(shù)中創(chuàng)建線程時(shí)傳入的對象就是Worker本身,所以t.start方法首先執(zhí)行的是Worker的run方法,run方法里只有一句runWorker(this),實(shí)際上最終執(zhí)行任務(wù)是通過runWorker來執(zhí)行的。
下面是runWorker的源碼:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
簡單來說,runWorker的作用如下:
如果是新創(chuàng)建的
worker,第一次啟動會執(zhí)行當(dāng)前worker內(nèi)的任務(wù),執(zhí)行完之后會依次從workQueue中取出任務(wù)執(zhí)行。如果
workQueue為空,那么等待keepAliveTime時(shí)間,workQueue仍然為空,結(jié)束循環(huán),線程也就結(jié)束了。
Executors類
Executors是Java線程池的工具類,它內(nèi)部實(shí)現(xiàn)了4中常用的線程池:
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可緩存線程池:
沒有核心線程,可以創(chuàng)建無上限的普通線程,如果某個(gè)線程超過60s沒有任務(wù),結(jié)束該線程
任務(wù)隊(duì)列為SynchronousQueue,它內(nèi)部不存儲數(shù)據(jù),前一個(gè)數(shù)據(jù)被取走之后,后一個(gè)才能存進(jìn)來。也就是說,execute方法執(zhí)行的時(shí)候,如果command添加到隊(duì)列失敗,說明SynchronousQueue中的任務(wù)沒有被取走,直接新開一個(gè)線程運(yùn)行任務(wù),如果添加成功,任務(wù)會在隊(duì)列中等待空閑進(jìn)程來取走它。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定長度線程池:
核心線程和最大線程數(shù)都是指定數(shù)值,不設(shè)置超時(shí)(設(shè)置也沒有意義,核心線程不會退出,除非設(shè)置了allowCoreThreadTimeOut),任務(wù)大于線程數(shù)時(shí)放在LinkedBlockingQueue中排隊(duì)。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
可指定核心線程數(shù),最大線程數(shù)無上限,有默認(rèn)的超時(shí)時(shí)間,可以執(zhí)行周期性的任務(wù),它執(zhí)行任務(wù)主要調(diào)用的是schedule方法:
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
創(chuàng)建一個(gè)單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序執(zhí)行.