
核心屬性
- corePoolSize :核心線程數,一般情況下,該數量的核心線程創(chuàng)建好之后,會常駐在線程池中,不會應空閑而關閉,可以設置allowCoreThreadTimeOut=true使核心線程空閑關閉
- maximumPoolSize :最大線程數,>核心線程數。
- keepAliveTime : 空閑時間,當線程獲取任務時,超過keepAliveTime仍然獲取不到任務,那么線程執(zhí)行完所有邏輯后,自動消亡,workerSet也會移除該worker對象
- unit : 空閑事件keepAliveTime 的單位
- BlockingQueue<Runnable> workQueue : 任務的阻塞隊列,當前提交任務時,工作線程已經>= 核心線程數, 則會將任務 推入阻塞隊列中。如果阻塞隊列達到最大長度,則會在工作線程數 不超過最大線程數maximumPoolSize的情況下,繼續(xù)創(chuàng)建空閑線程來處理任務。
- ThreadFactory threadFactory: 創(chuàng)建線程的工廠
- RejectedExecutionHandler handler :任務的拒絕策略。當線程數任務阻塞隊列滿了,且工作線程數 大于等于 最大線程數了, 則 線程池無法調度線程則處理任務,調用構造方法傳入的RejectedExecutionHandler實例的rejectedExecution()方法來拒絕任務。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
線程池狀態(tài)
ThreadPoolExecutor 使用 int 的高 3 位來表示線程池狀態(tài),低 29 位表示線程數量
| 狀態(tài) | value | 說明 |
|---|---|---|
| RUNNING(當線程池創(chuàng)建出來的初始狀態(tài)) | 111 | 能接受任務,能執(zhí)行阻塞任務 |
| SHUTDOWN(調用shutdown方法) | 000 | 不接受新任務,能執(zhí)行阻塞任務 肯定可以 執(zhí)行正在執(zhí)行的任務 |
| STOP(調用shutDownNow) | 001 | 不接受新任務,打斷正在執(zhí)行的任務,丟棄阻塞任務 |
| TIDYING(中間狀態(tài)) | 010 | 任務全部執(zhí)行完,活動線程也沒了 |
| TERMINATED(終結狀態(tài)) | 011 | 線程池終結 |
常用api
execute()
執(zhí)行任務,無返回值
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// -100000000000000000000000000000 | 0 = -100000000000000000000000000000
int c = ctl.get();
// 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
// workerCountOf(c) : 11111111111111111111111111111 & -100000000000000000000000000000
if (workerCountOf(c) < corePoolSize) {
// 則創(chuàng)建新增核心線程,并執(zhí)行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
// 如果 阻塞隊列還沒有滿,則是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
// 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
else if (!addWorker(command, false))
// 第四 : 種情況如果 >= maximumPoolSize,執(zhí)行拒絕策略
reject(command);
}
submit(Runable)
會返回Future對象,調用Future對象的get(),會阻塞,直到拿到返回值返回
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
doInvokeAny
返回最快執(zhí)行完的任務的結果,集合中其他正在執(zhí)行的線程會被關閉
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
invokeAll
執(zhí)行所有任務,返回List<Future<T>>
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
shutdown()
不會接收新的任務,但是已經運行和在隊列中的任務會執(zhí)行完,然后在關閉線程
線程狀態(tài)變成SHUTDOWN
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
awaitTermination(long timeout, TimeUnit unit)
等待線程池關閉,會提前也會超時
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
shutdownNow
打斷所有所有正在執(zhí)行的任務,返回隊列中的任務
線程狀態(tài)變成STOP
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 打斷正在工作的線程
interruptWorkers();
// 從隊列中取出等待的任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 返回隊列中等待的任務
return tasks;
}
常用的線程池配置
jdk的Executors類提供了4個創(chuàng)建線程池的配置方法, 通過之前的原理,我們來分析下這些線程池的不同
1. newFixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
創(chuàng)建一個 通過操作一個共享的無界隊列來復用固定數量的線程的線程池。
首先看構造方法,核心線程數和最大線程數是一樣的 ,說明不存在線程池擴容的情況
空閑有效時間為0 毫秒, 由于只存在核心線程,所以不存在 線程被注銷的情況
LinkedBlockingQueue 是一個無界隊列,默認大小為int的最大值,所以不會出現(xiàn) 隊列長度不夠而導致 創(chuàng)建空閑線程的情況,也就不會出現(xiàn) 拒絕策略。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
總結 :
- 線程數固定,
- 沒有多余線程線程回收,
- 不會出現(xiàn)因線程不夠,隊列裝不下而拒絕任務的情況
2.newSingleThreadExecutor
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
只有一個線程,無界隊列
3.newCachedThreadPool
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最大線程數很大,沒有核心線程,但是空閑時間 較長有1分鐘, 說明適用于 任務周期較短, 很多線程都可以快速處理完任務,并被復用,超過一分鐘線程就被注銷
4.newScheduledThreadPool
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
定時任務線程池
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
延遲執(zhí)行
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
重復執(zhí)行
自定義
schedule(){
// dosomething
// 遞歸
schedule();
}
api
scheduleWithFixedDelay
執(zhí)行完任務再計算延遲時間
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleAtFixedRate
從任務開始執(zhí)行就計算延遲時間
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
案例 每周三22點執(zhí)行
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
// 計算當前時間距離目標時間還有多久
// 初始化延遲時間 = 目標時間 - 當前時間
// 周期 = 7天
int period = 7;
scheduled.scheduleAtFixedRate(() -> {
//doSomething
}, 初始化延遲時間, period, TimeUnit.DAYS)
}
源碼解析
流程圖

一 、任務的執(zhí)行以及線程的創(chuàng)建 : execute(Runnable task):
傳入Runnable 對象作為要執(zhí)行的任務。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// -100000000000000000000000000000 | 0 = -100000000000000000000000000000
int c = ctl.get();
// 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
// workerCountOf(c) : 11111111111111111111111111111 & -100000000000000000000000000000
if (workerCountOf(c) < corePoolSize) {
// 則創(chuàng)建新增核心線程,并執(zhí)行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
// 如果 阻塞隊列還沒有滿,則是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 判斷工作線程是否 = 0,這種情況是 核心線程數為0的時候,需要創(chuàng)建空閑線程來處理隊列中的任務,比如CachedThreadPool,第一次進來
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
// 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
else if (!addWorker(command, false))
// 第四 : 種情況如果 >= maximumPoolSize,執(zhí)行拒絕策略
reject(command);
}
1. 當前工作線程小于核心線程數,則創(chuàng)建Worker對象,加入到workerSet中

addWorker(command, true)
創(chuàng)建Worker對象(持有線程),并調用持有線程的start方法,在run方法中執(zhí)行runnable
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取當前工作線程數
int wc = workerCountOf(c);
// 如果當前工作線程數 大于最大線程數 2 ^ 29 -1 ,或者大于 (根據當前添加工作線程的類型) 核心線程數 還是 線程池最大線程數
// 核心線程 判斷是否 > corePoolSize,空閑線程,判斷 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas,工作線程數+1,退出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下面走創(chuàng)建線程的邏輯
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 線程一個worker, Worker 是Thread的子類,傳入runnable對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 不是關閉狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加到工作線程集合中
workers.add(w);
// 當前工作線程集合大小
int s = workers.size();
// 更新 線程池 線程數量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 執(zhí)行task
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker.Run方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 執(zhí)行任務
public void run() {
runWorker(this);
}
}
在run方法中調用runWorker方法,傳入當前對象
- 該worker對象第一次執(zhí)行任務時,w.firstTask是!= null的,所以可以進入while的循環(huán)體, 執(zhí)行Runnable的run方法
- 第二次進來則從阻塞隊列中拿任務。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// task第一次被創(chuàng)建時,構造方法傳入了Runnable對象,所以現(xiàn)在是!= null的
Runnable task = w.firstTask;
// 之后清空,第二次進來是null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 無限循環(huán),當前有任務未執(zhí)行 或者 阻塞隊列中有任務
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行runnable的run方法執(zhí)行業(yè)務邏輯
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 允許核心線程超時關閉 或者 當前工作線程數 > 核心線程數
// 線程關閉前,從worderSet中移除worker對象
processWorkerExit(w, completedAbruptly);
}
}
2. 當前工作線程數已達到核心線程數了,但是阻塞隊列還沒滿
則會往workQueue 存入Runnable對象
如果 隊列長度還沒達到上限,則offer方法會成功存入Runnable對象,返回true
如果 隊列長度已達到上限,則返回false,說明當前工作線程從隊列中拿task,處理task的速度還不夠,會創(chuàng)建非工作線程。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// -100000000000000000000000000000 | 0 = -100000000000000000000000000000
int c = ctl.get();
// 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
// workerCountOf(c) : 11111111111111111111111111111 & -100000000000000000000000000000
if (workerCountOf(c) < corePoolSize) {
// 則創(chuàng)建新增核心線程,并執(zhí)行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
// 如果 阻塞隊列還沒有滿,則是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
// 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
else if (!addWorker(command, false))
// 如果 >= maximumPoolSize,執(zhí)行拒絕策略
reject(command);
}
3. 阻塞隊列已滿,添加task失敗,就會嘗試創(chuàng)建空閑線程
創(chuàng)建空閑線程的方法和創(chuàng)建核心線程的方法都是addWorker(runnable,boolean core),只不過傳入的core參數是false,表示是空閑線程
如果是空閑線程創(chuàng)建,則會判斷當前工作線程數是否 > 最大線程數maximumPoolSize, 如果是創(chuàng)建核心線程則判斷的是 核心線程數
如果大于 最大線程數maximumPoolSize 就會創(chuàng)建線程失敗
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取當前工作線程,
int wc = workerCountOf(c);
// 如果當前工作線程數 大于最大線程數 2 ^ 29 -1 ,或者大于 (根據當前添加工作線程的類型) 核心線程數 還是 線程池最大線程數
// 核心線程 判斷是否 > corePoolSize,空閑線程,判斷 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 后面創(chuàng)建線程Worker對象和創(chuàng)建核心線程是一模一樣的
}
4. 任務拒絕:阻塞隊列滿了,并且 工作線程數已經達到最大線程數了, 則嘗試創(chuàng)建空閑線程會失敗,走任務的拒絕策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// -100000000000000000000000000000 | 0 = -100000000000000000000000000000
int c = ctl.get();
// 第一種情況 :計算當前工作線程數是否小于 所配置的核心線程數
// workerCountOf(c) : 11111111111111111111111111111 & -100000000000000000000000000000
if (workerCountOf(c) < corePoolSize) {
// 則創(chuàng)建新增核心線程,并執(zhí)行task
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二種情況 :當前工作線程數已經 大于等于核心線程數了,嘗試往阻塞隊列中添加task
// 如果 阻塞隊列還沒有滿,則是添加成功的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 第三種情況 :如果滿了,阻塞隊列添加task失敗,就會嘗試創(chuàng)建空閑線程
// 會判斷 當前工作線程數是否 < 最大線程數 maximumPoolSize, 如果小于就可以創(chuàng)建空閑線程
else if (!addWorker(command, false))
// 如果 >= maximumPoolSize,執(zhí)行拒絕策略
reject(command);
}
jdk提供的拒絕策略類 :

-
AbortPolicy 拋異常
/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } -
DiscardPolicy 丟棄 = 啥也不干
/** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } DiscardOldestPolicy 推出并忽略阻塞隊列中的第一個任務,嘗試執(zhí)行當前任務
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// poll出阻塞隊列中的第一個任務,并忽略掉
e.getQueue().poll();
// 以執(zhí)行當前任務
e.execute(r);
}
}
}
................................................
二 、線程的維護
線程池中的作用維護線程,避免頻繁創(chuàng)建,銷毀線程而帶來系統(tǒng)資源的浪費。
核心線程默認(可以配置allowCoreThreadTimeOut = true 來設置 注銷核心線程 )是不會在執(zhí)行完某一個任務后被注銷的
空閑線程 在空閑時間達到keepAliveTime 后, 會自動注銷(執(zhí)行完run方法)。
1. 線程的阻塞 :
Worker.runWorker(Worker w)
線程的執(zhí)行方法中,用while的方式,判斷 當前是否有任務(第一次被創(chuàng)建出來) 或者 從阻塞隊列中拿任務
- 判斷 當前是否有任務(第一次被創(chuàng)建出來)
- 阻塞隊列中有任務 :execute任務時,當工作線程數 > 大于核心線程數時且 阻塞隊列沒有滿時, 會把任務存入阻塞隊列
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// task第一次被創(chuàng)建時,構造方法傳入了Runnable對象,所以現(xiàn)在是!= null的
Runnable task = w.firstTask;
// 之后清空,第二次進來是null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 無限循環(huán),當前有任務未執(zhí)行 或者 阻塞隊列中有任務
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行runnable的run方法執(zhí)行業(yè)務邏輯
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 允許核心線程超時關閉 或者 當前工作線程數 > 核心線程數
// 線程關閉前,從worderSet中移除worker對象
processWorkerExit(w, completedAbruptly);
}
}
如果可以不停的獲取任務,處理任務,這種情況下 所有線程都不會被注銷,因為無法退出while循環(huán)
2. 線程的注銷
但是當沒有任務提交時,也就是當前任務沒有,阻塞隊列里也拿不到任務,線程則處于空閑狀態(tài),空閑線程 空閑狀態(tài)下的時間達到keepAliveTime ,則會退出while循環(huán),結束線程。
而核心線程則會在getTask中(如果沒配置allowCoreThreadTimeOut=true) 阻塞住, 不返回結果,直到阻塞隊列中可以獲取到任務, 再進入while循環(huán)體。
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否允許核心線程超時關閉 或者 當 工作線程數 > 核心線程數了
// 當 線程中 只剩下核心線程的時候, wc > corePoolSize 就不會返回true,則會workQueue.take()阻塞住
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果允許核心線程超時注銷 或者 當前工作線程數 > 核心線程數, 則調阻塞隊列的 poll,超時返回null
// 否則調take()方法,一直拿不到就一直阻塞
// 這就說明,只有允許核心線程超時注銷,或者 當 當前工作線程數 > 核心線程數時,才會調 阻塞隊列會超時的poll方法,
// runWorker方法才會退出while循環(huán)體, 結束線程
// 如果allowCoreThreadTimeOut被設置為true,則所有線程從隊列中拿任務調用的都是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,所有線程在poll超時之后,仍然沒獲取到任務,則會返回 null ,退出循環(huán)體, 結束線程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
從workers移除線程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從workers移除線程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果允許核心線程超時關閉,則為0,否則為corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果當前工作線程數 > 最小的線程數量
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 小于最小的線程數量,添加worker
addWorker(null, false);
}
}