線程池使用的一個(gè)例子
這里約定,細(xì)節(jié)在代碼中注釋說(shuō)明,代碼外正文的描述文字展示程序的大體流程。
int nThreads = 10;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
Runnable task = new Runnable(){
public void run(){
//do something
}
};
exec.execute(task);
這里的工廠方法newFixedThreadPool()初始化的是ThreadPoolExecutor實(shí)例,在線程池實(shí)現(xiàn)的工廠方法newSingleThreadExecutor()與newFixedThreadPool()一樣構(gòu)建底層為L(zhǎng)inkedBlockingQueue的ThreadPoolExecutor。而newCachedThreadPool()初始化的是底層為SynchronousQueue的ThreadPoolExecutor實(shí)例。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ThreadPoolExecutor的execute()
任務(wù)到來(lái)執(zhí)行execute()方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 執(zhí)行過(guò)程分為三步:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 1. 如果少于corePoolSize條線程在運(yùn)行,那么試圖啟動(dòng)一條新的線程,并且把當(dāng)前的命令作
* 為這條線程將要執(zhí)行的第一個(gè)任務(wù)。addWorker()方法原子性檢查runState和workerCount
* 的狀態(tài),以防止不應(yīng)該增加線程時(shí)添加了線程。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 2. 如果一個(gè)任務(wù)成功入隊(duì),我們需要進(jìn)行雙重的校驗(yàn),校驗(yàn)我們是否應(yīng)該增加一個(gè)線程(因
* 為從上次校驗(yàn)之后存在的線程可能已經(jīng)死掉了)或者檢查線程池是否關(guān)閉了。所以需要再次
* 檢驗(yàn),如果需要進(jìn)行回滾出隊(duì)?;蛘撸绻麤](méi)有線程,就開(kāi)始一條新的線程。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*
* 3. 如果任務(wù)入隊(duì)不成功,我們?cè)噲D增加新的進(jìn)程。仍舊失敗的話,直接拒絕執(zhí)行任務(wù)。
*/
int c = ctl.get();
//①
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//②
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//③
else if (!addWorker(command, false))
reject(command);
}
上面的execute()方法的源代碼中注釋的標(biāo)號(hào),對(duì)應(yīng)著上面的英文注釋,中文翻譯在對(duì)應(yīng)英文下面。先大體看流程,execute()方法執(zhí)行新的任務(wù),當(dāng)一個(gè)任務(wù)到來(lái)后,做必要的校驗(yàn),根據(jù)檢驗(yàn)的不同結(jié)果,執(zhí)行不同的邏輯實(shí)現(xiàn)。但是任務(wù)的執(zhí)行都是通過(guò)addWorker()方法完成的,下面流程走的addWorker()方法中。
ThreadPoolExecutor的addWorker()
addWorker()方法代碼片段
private boolean addWorker(Runnable firstTask, boolean core) {
....省略一些校驗(yàn)邏輯.....
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//①
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//②
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//③
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代碼片段中有三點(diǎn)重要之處,分別標(biāo)注了序號(hào)。
- 到來(lái)的任務(wù),被封裝在一個(gè)Worker的對(duì)象中。Worker對(duì)象字段thread賦值給變量t;
- 對(duì)線程池狀態(tài)的參數(shù)設(shè)置和進(jìn)一步的最終校驗(yàn)需要加鎖,保證并發(fā)時(shí)的線程安全,和數(shù)據(jù)的一致性。
- Thread變量t執(zhí)行start()方法。
梳理上面的代碼片段,到來(lái)的任務(wù)會(huì)被先封裝Worker對(duì)象,然后Worker對(duì)象返回一個(gè)Thread對(duì)象,這個(gè)Thread對(duì)象最終執(zhí)行。下面走到Worker類的源代碼。
ThreadPoolExecutor的Worker類
//Worker的構(gòu)造函數(shù)
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
構(gòu)造Worker對(duì)象時(shí),Runnable任務(wù)作為當(dāng)前線程的第一個(gè)任務(wù)賦值到firstTask,如果確實(shí)是第一個(gè)任務(wù)firstTask不為null,否則為null,然后去隊(duì)列中取已經(jīng)入隊(duì)的任務(wù)。字段thread指向通過(guò)ThreadPoolExecutor中定義的線程工廠,生產(chǎn)的封裝了Worker對(duì)象自己的線程。字段thread指向的對(duì)象執(zhí)行start()方法(上面addWorker()方法代碼中的③)時(shí),底層調(diào)用的就是Worker對(duì)象的run()方法。
Worker類的run()方法
run()方法調(diào)用的是runWorker()方法。
public void run() {
runWorker(this);
}
Worker類的runWorker()方法
在runWorker()方法最重要的語(yǔ)句就是那一條while語(yǔ)句(下面代碼中標(biāo)識(shí)①的語(yǔ)句)。涉及如下三點(diǎn):
- 如果任務(wù)task不為null,執(zhí)行當(dāng)前的task,此task是當(dāng)前線程的第一個(gè)任務(wù);
- 如果task為null,getTask()方法從任務(wù)隊(duì)列中獲取排隊(duì)的任務(wù)進(jìn)行執(zhí)行;
- while循環(huán)因?yàn)闂l件中g(shù)etTask()方法會(huì)進(jìn)行自旋,所以當(dāng)任務(wù)隊(duì)列中沒(méi)有任務(wù)時(shí),當(dāng)前線程一直處于阻塞等待狀態(tài),利用阻塞隊(duì)列也可以限時(shí)等待,這也是線程池中線程可以復(fù)用的原因。因?yàn)榫€程會(huì)一直等待隊(duì)列中有任務(wù)。當(dāng)然可以通過(guò)ThreadPoolExecutor的keepAliveTime字段對(duì)線程生存時(shí)間進(jìn)行設(shè)定。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//①
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
自此,完成了任務(wù)到來(lái),提交給線程池,線程池創(chuàng)建線程或者復(fù)用等待中的線程執(zhí)行任務(wù)。復(fù)用線程可以減少創(chuàng)建和銷毀線程產(chǎn)生的開(kāi)銷,在高并發(fā)系統(tǒng)中線程池使用可以取得很好的性能效果。更多細(xì)節(jié)可以參考Java線程池ThreadPoolExecutor的實(shí)現(xiàn)和參數(shù)