1. 線程復(fù)用
我們知道Thread.start執(zhí)行之后,線程就不能再次執(zhí)行了,那ThreadPoolExecutor是如何做到線程復(fù)用的呢?
原理很簡(jiǎn)單,在實(shí)際執(zhí)行的線程外部套一個(gè)Thread,外層Thread的run方法while循環(huán)執(zhí)行實(shí)際執(zhí)行線程的run方法,實(shí)現(xiàn)線程的復(fù)用并且執(zhí)行之后不銷毀。下面是偽代碼:
// 任務(wù)等待隊(duì)列
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue();
new Thread(() -> {
for (;;){
Runnable runnable = taskQueue.poll();//隊(duì)列里拿
runnable.run();//同步執(zhí)行
}
}).start();// 異步while執(zhí)行
下面是ThreadPoolExecutor的重點(diǎn)代碼:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 阻塞獲取等待任務(wù)
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 實(shí)際阻塞執(zhí)行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2. 線程銷毀
我們知道,在創(chuàng)建線程池的時(shí)候有超時(shí)參數(shù)keepAliveTime,那么線程池是如何實(shí)現(xiàn)精確的超時(shí)銷毀呢?
這個(gè)是結(jié)合BlockingQueue的阻塞超時(shí)來(lái)實(shí)現(xiàn)的,下面是源碼:
/**
* ...
* @return task, or null if the worker must exit, in which case workerCount is decremented
* 翻譯: 返回task,如果worker必須退出,則返回null,在這種情況下workerCount遞減
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 重點(diǎn)在這,如果超時(shí)沒(méi)有獲取到任務(wù),則返回null,銷毀線程。
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3. 為什么用BlockingQueue
- 獲取等待任務(wù)的時(shí)候,直接用阻塞代替通知輪詢,提高性能,減少代碼復(fù)雜度。
- 復(fù)用阻塞超時(shí)獲取等待任務(wù)實(shí)現(xiàn)線程超時(shí)銷毀,設(shè)計(jì)精巧。
- 本身就是支持并發(fā)操作的,不用額外維護(hù)線程安全。