一、執(zhí)行任務(wù)
public void execute(Runnable command) {
// 如果任務(wù)為空,直接拋異常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 獲取線程池運行狀態(tài)
int c = ctl.get();
// 如果運行的線程數(shù)小于核心數(shù),添加worker
if (workerCountOf(c) < corePoolSize) {
// 添加worker,并將core設(shè)為true,表示是核心線程
if (addWorker(command, true))
return;
// 如果添加失敗重新獲取線程池運行狀態(tài)
c = ctl.get();
}
// 如果線程池在運行且隊列未滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池不在運行且刪除任務(wù)成功,執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池工作線程為0,添加空的任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 走到這里,說明核心線程數(shù)用完且任務(wù)隊列已滿,那么啟用非核心線程數(shù),如果失敗,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
- 首先用核心線程執(zhí)行任務(wù),如果核心線程已滿,將任務(wù)添加到任務(wù)隊列;如果隊列也滿了,那么用非核心線程執(zhí)行任務(wù)
- addWorker(Runnable firstTask, boolean core)第一個參數(shù)是執(zhí)行的任務(wù),第二個參數(shù)如果為true,表示用的是核心線程,false表示用的是非核心線程
- 成員變量ctl是AtomicInteger類型,用來表示線程運行狀態(tài)和線程數(shù),高3位表示運行狀態(tài),低29位表示運行線程數(shù)
private boolean addWorker(Runnable firstTask, boolean core) {
//retry用來判斷是否可以添加任務(wù),并更新線程數(shù)
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池已經(jīng)關(guān)閉且沒有任務(wù),直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取工作線程數(shù)
int wc = workerCountOf(c);
// 如果工作線程數(shù)大于等于最大線程數(shù),直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 更新線程數(shù),如果成功,跳出retry
if (compareAndIncrementWorkerCount(c))
break retry;
// 走到這,說明更新線程數(shù)失敗了,重新獲取線程池狀態(tài)
c = ctl.get(); // Re-read ctl
// 如果線程池狀態(tài)變化了,從retry重新執(zhí)行;如果線程池狀態(tài)沒有變化,繼續(xù)for循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建Worker對象
w = new Worker(firstTask);
// 獲取線程
final Thread t = w.thread;
// 如果線程不為空,啟動線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 再次獲取狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 因為線程還沒啟動,所以這里線程是alive,說明是不正常的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將新創(chuàng)建的worker加入到workers集合
workers.add(w);
int s = workers.size();
// 更新largestPoolSize值
if (s > largestPoolSize)
largestPoolSize = s;
// workerAdded標記為true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果任務(wù)添加完成,啟動線程且將workerStarted標記為true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果啟動線程失敗,從workers刪除新創(chuàng)建的任務(wù),且執(zhí)行tryTerminate
if (! workerStarted)
addWorkerFailed(w);
}
// 最后返回線程是否啟動成功
return workerStarted;
}
- 首先去更新工作的線程數(shù)
- 創(chuàng)建Worker對象,此時會創(chuàng)建Thread類型的成員變量thread,Worker對象會傳入到該線程,因為Worker對象實現(xiàn)了Runnable方法,所以啟動線程thread時,會執(zhí)行Worker的run方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
二、執(zhí)行任務(wù)
1. 發(fā)起任務(wù)執(zhí)行
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 用來表示是否正常執(zhí)行任務(wù),true表示被打斷了,false表示未被打斷
boolean completedAbruptly = true;
try {
// 如果worker對象有傳入了任務(wù)或者任務(wù)隊列有任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池狀態(tài)是停止以上的級別
// 或者
// 線程已經(jīng)被中斷且狀態(tài)是停止以上的級別且當前線程還不是打斷狀態(tài)
// 那么中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 方法執(zhí)行前,空方法,留給子類實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行完后,空方法,留給子類實現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 被中斷標記設(shè)為false
completedAbruptly = false;
} finally {
// 任務(wù)完成后的操作
processWorkerExit(w, completedAbruptly);
}
}
- 首先執(zhí)行傳入worker對象里的任務(wù),如果為空,則從任務(wù)隊列里獲取任務(wù)
- 判斷是否需要打斷線程
- 執(zhí)行任務(wù)
- 任務(wù)完成相關(guān)的操作
2. 獲取任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池狀態(tài)關(guān)閉且任務(wù)隊列為空
// 或者
// 線程池狀態(tài)是停止
// 那么將線程數(shù)減1,返回空任務(wù)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 獲取線程數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果allowCoreThreadTimeOut為真,表示核心線程也有超時時間,一般默認為false
// 或者
// 工作線程數(shù)超過核心線程數(shù)
// 那么將timed設(shè)為真,設(shè)為真的目的是為了沒任務(wù)的時候,減少工作線程的數(shù)量
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作線程大于最大線程數(shù)或者超時了
// 且
// 工作線程數(shù)大于1或者任務(wù)隊列不為空
// 那么工作線程減1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果有超時設(shè)置,那么帶有超時時間獲取任務(wù),否則就阻塞獲取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果任務(wù)不為空,返回任務(wù)
if (r != null)
return r;
// 如果任務(wù)為空,將超時設(shè)置為true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 這部分代碼能說明線程復(fù)用,超時未獲取到任務(wù)減少線程的原理
- 如果設(shè)置了核心線程有超時時間或者線程數(shù)超過了核心線程數(shù),那么采用帶超時的方式獲取任務(wù),如果沒有獲取到任務(wù),那么線程數(shù)會減1;如果不采用帶超時的方式獲取任務(wù),那么一直等待,知道從任務(wù)隊列里獲取了任務(wù)
3. 退出任務(wù)執(zhí)行
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從任務(wù)集合中刪除任務(wù)
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試終止任務(wù)
tryTerminate();
int c = ctl.get();
// 如果線程池狀態(tài)小于STOP,說明還需要工作線程
if (runStateLessThan(c, STOP)) {
// 如果任務(wù)執(zhí)行被中斷了,保證至少還有1個工作線程在執(zhí)行
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 相當于線程還在工作
addWorker(null, false);
}
}
- 從任務(wù)集合中刪除當前任務(wù)
- 嘗試終止線程池
- 如果線程池狀態(tài)是小于STOP,保證有工作線程在工作
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// isRunning(c) 表示在運行,不能停止
// runStateAtLeast(c, TIDYING)表示已經(jīng)停止了,沒必要停止
// (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())表示關(guān)閉但是有任務(wù),不能停止
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作線程數(shù)不等于0,停止1個空閑的工作線程,通過tryLock判斷是否空閑
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 走到這,說明工作線程是0了,將狀態(tài)改為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 終止線程池,空方法,留給子類實現(xiàn)
terminated();
} finally {
// 最后將狀態(tài)改為TERMINATED,通知等待線程池終止的線程
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 如果線程池狀態(tài)不在運行,且工作線程數(shù)為0,那么最終將線程池狀態(tài)改為TERMINATED
三、拒絕策略
1. 直接拋異常(默認策略)
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2. 調(diào)用者的線程執(zhí)行任務(wù)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3. 丟掉最早的任務(wù)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
4. 空的策略
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
四、線程工廠
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
// 設(shè)置線程組
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 設(shè)置線程前綴名
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// 創(chuàng)建線程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
五、常見四種線程池
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 可自定義線程數(shù)和線程工廠,核心線程數(shù)與最大線程數(shù)相等,這樣的話線程一旦創(chuàng)建,就會一直運行
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- 只可以自定義線程工廠,核心線程數(shù)為0,最大線程數(shù)為Integer最大值
- 相當于沒有限制工作線程數(shù),任務(wù)量大的時候,會影響機器性能
- 空任務(wù)的時候,會有1個線程在運行
3.newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
- 只創(chuàng)建一個工作線程,可自定義線程工廠,可以定時執(zhí)行任務(wù)
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
- 可定義線程工廠和核心工作線程數(shù),最大工作線程數(shù)為Integer.MAX_VALUE
- 可定時執(zhí)行任務(wù)