轉(zhuǎn)《Java并發(fā)編程的藝術(shù)》
1.線程池的實(shí)現(xiàn)原理


ThreadPoolExecutor執(zhí)行execute方法分下面4種情況。
1)如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
2)如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
3)如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
4)如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設(shè)計(jì)思路,是為了在執(zhí)行execute()方法時,盡可能 地避免獲取全局鎖(那將會是一個嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后 (當(dāng)前運(yùn)行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。
2.源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果線程數(shù)小于基本線程數(shù),則創(chuàng)建線程并執(zhí)行當(dāng)前任務(wù)
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗,則將當(dāng)前任務(wù)放到工作隊(duì)列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
// 如果線程池不處于運(yùn)行中或任務(wù)無法放入隊(duì)列,并且當(dāng)前線程數(shù)量小于最大允許的線程數(shù)量,
// 則創(chuàng)建一個線程執(zhí)行任務(wù)。
else if (!addIfUnderMaximumPoolSize(command))
// 拋出RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}
工作線程:線程池創(chuàng)建線程時,會將線程封裝成工作線程Worker,Worker在執(zhí)行完任務(wù)
后,還會循環(huán)獲取工作隊(duì)列里的任務(wù)來執(zhí)行。我們可以從Worker類的run()方法里看到這點(diǎn)。
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
ThreadPoolExecutor中線程執(zhí)行任務(wù)的示意圖如圖所示。

線程池中的線程執(zhí)行任務(wù)分兩種情況,如下。
1)在execute()方法中創(chuàng)建一個線程時,會讓這個線程執(zhí)行當(dāng)前任務(wù)。
2)這個線程執(zhí)行完上圖中1的任務(wù)后,會反復(fù)從BlockingQueue獲取任務(wù)來執(zhí)行。
3.線程池的使用
3.1 線程池的創(chuàng)建
我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池。
new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,milliseconds,runnableTaskQueue, handler);
創(chuàng)建一個線程池時需要輸入幾個參數(shù),如下。
1)corePoolSize(線程池的基本大小):當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程。
2)runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列??梢赃x擇以下幾個阻塞隊(duì)列。
- ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
- LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊(duì)列。
- SynchronousQueue:一個不存儲元素的阻塞隊(duì)列。每個插入操作必須等到另一個線程調(diào)用 移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊(duì)列。
- PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊(duì)列。
3)maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如果使用了無界的任務(wù)隊(duì)列這個參數(shù)就沒什么效果。
4)ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè) 置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線 程設(shè)置有意義的名字,代碼如下。new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
5)RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略?!bortPolicy:直接拋出異常?!allerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)?!iscardOldestPolicy:丟棄隊(duì)列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)?!iscardPolicy:不處理,丟棄掉。 當(dāng)然,也可以根據(jù)應(yīng)用場景需要來實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任務(wù)。
6)AliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以, 如果任務(wù)很多,并且每個任務(wù)執(zhí)行的時間比較短,可以調(diào)大時間,提高線程的利用率。
7)TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS),千分之一微秒。
3.2 向線程池提交任務(wù)
可以使用兩個方法向線程池提交任務(wù),分別為execute()和submit()方法。
execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。 通過以下代碼可知execute()方法輸入的任務(wù)是一個Runnable類的實(shí)例。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
submit()方法用于提交需要返回值的任務(wù)。線程池會返回一個future類型的對象,通過這個 future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執(zhí)行任務(wù)異常
} finally {
// 關(guān)閉線程池
executor.shutdown();
}
3.3 關(guān)閉線程池
可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池。它們的原理是遍歷線 程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而 shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true。當(dāng)所有的任務(wù) 都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。
4. 合理地配置線程池
要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來分析。
- 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
CPU密集型任務(wù): 主要是執(zhí)行計(jì)算任務(wù),響應(yīng)時間很快,cpu一直在運(yùn)行,這種任務(wù)cpu的利用率很高
IO密集型任務(wù):主要是進(jìn)行IO操作,執(zhí)行IO操作的時間較長,這是cpu出于空閑狀態(tài),導(dǎo)致cpu的利用率不高
CPU密集型:線程個數(shù)為CPU核數(shù)。這幾個線程可以并行執(zhí)行,不存在線程切換到開銷,提高了cpu的利用率的同時也減少了切換線程導(dǎo)致的性能損耗
IO密集型:線程個數(shù)為CPU核數(shù)的兩倍。到其中的線程在IO操作的時候,其他線程可以繼續(xù)用cpu,提高了cpu的利用率
任務(wù)的優(yōu)先級:高、中和低。
任務(wù)的執(zhí)行時間:長、中和短。
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能少的 線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進(jìn)行分解。
可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊(duì)列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高
的任務(wù)先執(zhí)行。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊(duì)列,讓
執(zhí)行時間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時間越
長,則CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU。
建議使用有界隊(duì)