線程池源碼之execute
execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//workerCountOf 獲取線程數(shù)量,isRunning獲取線程池狀態(tài)
int c = ctl.get();
//如果小于核心線程池數(shù)量直接添加到線程池里
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//大于核心線程數(shù)量,加入隊列,會做一個dobbule check,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果當(dāng)前線程池狀態(tài)不是RUNNING則從隊列刪除任務(wù),并執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//再次嘗試添加線程,失敗的話執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
總體來說就是:
- 如果線程的數(shù)量小于線程池的核心線程數(shù)直接創(chuàng)建線程執(zhí)行;
- 如果加入線程池失敗,說明超過核心線程數(shù)量嘗試加入阻塞隊列;
- 如果加入阻塞隊列失敗了,嘗試重新加入線程池如果也失敗了就說明超過了最大線程數(shù)量,執(zhí)行拒絕策略。
addWorker:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// check 線程池的狀態(tài)和阻塞隊列是否為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//這段for循環(huán)的代碼含義是:往線程池里面添加一個線程
for (;;) {
int wc = workerCountOf(c);
// check 線程數(shù)量是否超出
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 添加成功直接退出循環(huán),添加失敗的話再次嘗試
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//這段代碼的含義是:把當(dāng)前線程加入到hashSet里面,加入成功后啟動線程,加入失敗的話,從set里面刪除,并且嘗試停止線程池
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//把線程封裝成Worker,worker是繼承了Aqs的
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// check 線程池狀態(tài)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加入到set里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//加入成功啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//加入失敗,刪除線程,嘗試停止線程池
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
總體來說分為兩步:
- 第一步:嘗試給線程池的數(shù)量加一,失敗繼續(xù)嘗試,成功退出。
- 第二步:線程池數(shù)量成功加一之后,采用加鎖的方式把線程加入到set里,然后啟動線程,如果啟動失敗刪除線程,嘗試停止線程池。
來看一下work對象:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
//構(gòu)造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
- 首先,這個work對象實現(xiàn)了runnable接口,就有了線程的屬性。
- 其次,這個work繼承了AbstractQueuedSynchronizer就有了鎖的功能
- 最后,我們來看一下runWorker這個方法。
runWorker:
//運行當(dāng)前線程
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//執(zhí)行前操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//運行當(dāng)前線程
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//執(zhí)行后操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 主要功能就是從隊列里面獲取線程
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 獲取任務(wù)
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
線程池源碼之shutdown
先來搞清楚幾個概念:
interrupt():為當(dāng)前線程打上停止的標記
interrupted():測試當(dāng)前是否中斷。此方法具有清除功能
isInterrupted(): 測試線程是否中斷。此方法不會清除
shutdown:停止線程池,優(yōu)雅停掉,等隊列里面的線程執(zhí)行完再停掉。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//將線程池的狀態(tài)設(shè)置為SHUTDOWN
advanceRunState(SHUTDOWN);
//為所有線程打上停止標記
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
//為所有線程打上停止標記
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
// 嘗試tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
線程池源碼之shutdownNow
shutdownNow:比較粗暴的停掉,不會等阻塞隊列執(zhí)行完再停掉。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//把線程池狀態(tài)改為STOP
advanceRunState(STOP);
interruptWorkers();
// 把隊列的任務(wù)取出來
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
//把隊列的任務(wù)放入到list里面返回
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
// 打上停止的標記
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}