線程池作用
相對于為每個請求都創(chuàng)建一個線程,線程池通過重用現(xiàn)有的線程而不是創(chuàng)建新線程,可以在處理多個請求時分?jǐn)傇诰€程創(chuàng)建和銷毀過程中產(chǎn)生的巨大開銷,當(dāng)請求到達時,工作線程通過已經(jīng)存在,不會由于等待創(chuàng)建線程而延遲任務(wù)的執(zhí)行,從而提高響應(yīng)性。通過適當(dāng)調(diào)整線程池的大小,可以創(chuàng)建足夠多的線程以便使處理器保持忙碌狀態(tài),同時還可以防止過多線程相互競爭資源而使應(yīng)用程序耗盡內(nèi)存或失敗
線程池處理流程
1)判斷核心線程池里的線程是否都在執(zhí)行任務(wù)。如果不是,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進入下個流程
2)判斷工作隊列是否已經(jīng)滿。如果工作隊列沒有滿,則將新提交的任務(wù)存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程
3)判斷線程池的線程是否都處于工作狀態(tài)。如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來處理這個任務(wù)
示意圖:
創(chuàng)建線程池
ThreadPoolExecutor構(gòu)造方法:
? ? public ThreadPoolExecutor(int corePoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {
? ? ? ? ...? ? //代碼省略
? ? }
一共七個參數(shù):
corePoolSize
線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使有其他空閑的核心線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,直到線程數(shù)等于corePoolSize就不再創(chuàng)建,繼續(xù)提交的任務(wù)被保存到阻塞隊列中。如果調(diào)用了線程池的prestartAllCoreThreads()或者prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程
maximumPoolSize
線程池最大線程數(shù),如果當(dāng)前阻塞隊列滿了,繼續(xù)提交任務(wù),若當(dāng)前線程數(shù)小于maximumPoolSize則創(chuàng)建新的線程執(zhí)行任務(wù)。注意如果使用了無界的阻塞隊列這個參數(shù)就沒什么效果
keepAliveTime
線程空閑時保持存活時間,即當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間。若當(dāng)前線程池的線程數(shù)超過corePoolSize,且線程空閑時間超過keepAliveTime,就將這些空閑線程銷毀,盡可能降低資源銷毀
unit
keepAliveTime的時間單位,可以是天、小時、分、毫秒、微秒和納秒
workQueue
用于保存等待執(zhí)行的任務(wù)的阻塞隊列
threadFactory
創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè) 置更有意義的名字
handler
線程池的飽和策略(或者叫拒絕策略),當(dāng)隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。Java線程池提供了以下4種策略:
①.AbortPolicy:直接拋出異常,默認(rèn)策略
②.CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)
③.DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)
④.DiscardPolicy:不處理,直接丟棄
也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略
調(diào)用Exectors中的靜態(tài)工廠方法也可以來創(chuàng)建線程池
newFixedThreadPool
? ? public static ExecutorService newFixedThreadPool(int nThreads) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue());
? ? }
復(fù)制代碼
創(chuàng)建一個固定長度的線程池,每當(dāng)提交一個任務(wù)時就創(chuàng)建一個線程,直到達到線程池的最大數(shù)量(corePoolSize == maximumPoolSize),這時線程池的規(guī)模將不再變化(若某個線程由于發(fā)生了未預(yù)期的Exception而結(jié)束,線程池會補充一個新線程),使用LinkedBlockingQuene作為阻塞隊列,適用于負(fù)載比較重的服務(wù)器
newCachedThreadPool
? ? public static ExecutorService newCachedThreadPool() {
? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue());
? ? }
創(chuàng)建一個可緩存線程的線程池,默認(rèn)緩存60s,使用SynchronousQueue作為阻塞隊列(沒有數(shù)據(jù)緩存空間的阻塞隊列,每一個put操作必須等待一個take操作,若任務(wù)提交的速度遠遠大于CachedThreadPool的處理速度,CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù),可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源)。適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者負(fù)載較輕的服務(wù)器,使用該線程池時,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題
newSingleThreadExecutor
? ? public static ExecutorService newSingleThreadExecutor() {
? ? ? ? return new FinalizableDelegatedExecutorService
? ? ? ? ? ? (new ThreadPoolExecutor(1, 1,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue()));
? ? }
單線程的Executor,線程池中只有一個線程,若線程異常結(jié)束,會創(chuàng)建另一個線程替代。newSingleThreadExecutor能確保依照任務(wù)在隊列中的順訊來串行執(zhí)行,內(nèi)部使用LinkedBlockingQueue作為阻塞隊列,適用于需要保證順序地執(zhí)行各個任務(wù);并且在任意時間點,不會有多個線程是活動的應(yīng)用場景
newScheduledThreadPool
? ? public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
? ? ? ? return new ScheduledThreadPoolExecutor(corePoolSize);
? ? }
public ScheduledThreadPoolExecutor(int corePoolSize) {
? ? super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
? ? ? ? ? new DelayedWorkQueue());
}
可以延遲或定時的方式執(zhí)行任務(wù),適用于周期任務(wù)實現(xiàn)原理
線程池狀態(tài)
? ? private static final int RUNNING? ? = -1 << COUNT_BITS;
? ? private static final int SHUTDOWN? =? 0 << COUNT_BITS;
? ? private static final int STOP? ? ? =? 1 << COUNT_BITS;
? ? private static final int TIDYING? ? =? 2 << COUNT_BITS;
? ? private static final int TERMINATED =? 3 << COUNT_BITS;
RUNNING:
線程池能夠接收新任務(wù),且能處理阻塞隊列中的任務(wù)
SHUTDOWN:
線程池不會接收新任務(wù),但會處理阻塞隊列中的任務(wù)(shutdown())
STOP:
線程池不會接收新任務(wù),不會處理已添加的任務(wù),并且會中斷正在處理的任務(wù)(shutdownNow())
TIDYING:
所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0
TERMINATED:
線程池徹底終止(terminated())
任務(wù)提交
有兩種方式向線程池提交任務(wù),分別為execute()和submit()方法。execute()方法提交的任務(wù)不能獲取返回值,而submit()方法提交的任務(wù)會返回一個future類型的對象,可以通過這個future對象判斷任務(wù)是否執(zhí)行成功
execute()
execute()方法執(zhí)行示意圖:
execute()源碼:
? ? public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? int c = ctl.get();
? ? ? ? // 若線程池當(dāng)前線程數(shù)小于核心線程數(shù)則創(chuàng)建新線程執(zhí)行任務(wù)
? ? ? ? if (workerCountOf(c) < corePoolSize) {
? ? ? ? ? ? if (addWorker(command, true))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? c = ctl.get();
? ? ? ? }
? ? ? ? // 若線程數(shù)大于等于核心線程數(shù)或線程創(chuàng)建失敗,則將當(dāng)前任務(wù)放到工作隊列中
? ? ? ? if (isRunning(c) && workQueue.offer(command)) {
? ? ? ? ? ? int recheck = ctl.get();
? ? ? ? ? ? if (! isRunning(recheck) && remove(command))
? ? ? ? ? ? ? ? reject(command);
? ? ? ? ? ? else if (workerCountOf(recheck) == 0)
? ? ? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? ? ? // 若當(dāng)前任務(wù)無法放進阻塞隊列中,則創(chuàng)建新的線程來執(zhí)行任務(wù)
? ? ? ? else if (!addWorker(command, false))
? ? ? ? ? ? // addWoker創(chuàng)建失敗,執(zhí)行reject方法運行相應(yīng)的拒絕策略
? ? ? ? ? ? reject(command);
? ? }
復(fù)制代碼
如果當(dāng)前運行的線程少于corePoolSize,則會調(diào)用addWorker()創(chuàng)建新的線程來執(zhí)行新的任務(wù)
? ? private boolean addWorker(Runnable firstTask, boolean core) {
? ? ? ? retry:
? ? ? ? for (;;) {
? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? // 獲取當(dāng)前線程池運行狀態(tài)
? ? ? ? ? ? int rs = runStateOf(c);
? ? ? ? ? ? // 狀態(tài)判斷,條件不符合添加線程失敗
? ? ? ? ? ? if (rs >= SHUTDOWN &&
? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&
? ? ? ? ? ? ? ? ? firstTask == null &&
? ? ? ? ? ? ? ? ? ! workQueue.isEmpty()))
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? // 獲取線程池當(dāng)前線程數(shù)
? ? ? ? ? ? ? ? int wc = workerCountOf(c);
? ? ? ? ? ? ? ? // 若線程數(shù)超過CAPACITY,返回false
? ? ? ? ? ? ? ? // 若是添加核心線程,超過核心線程數(shù)返回false;若不是超過最大線程數(shù)返回false
? ? ? ? ? ? ? ? if (wc >= CAPACITY ||
? ? ? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? // CAS線程數(shù)+1? ?
? ? ? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))
? ? ? ? ? ? ? ? ? ? break retry;
? ? ? ? ? ? ? ? c = ctl.get();? // Re-read ctl
? ? ? ? ? ? ? ? // 若狀態(tài)與之前不一樣,跳到最外層循環(huán)
? ? ? ? ? ? ? ? if (runStateOf(c) != rs)
? ? ? ? ? ? ? ? ? ? continue retry;
? ? ? ? ? ? ? ? // else CAS failed due to workerCount change; retry inner loop
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? boolean workerStarted = false;
? ? ? ? boolean workerAdded = false;
? ? ? ? Worker w = null;
? ? ? ? try {
? ? ? ? ? ? // 創(chuàng)建線程
? ? ? ? ? ? w = new Worker(firstTask);
? ? ? ? ? ? final Thread t = w.thread;
? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? ? ? // 獲取鎖
? ? ? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? // 再次校驗線程狀態(tài)是否符合添加線程條件
? ? ? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());
? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||
? ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {
? ? ? ? ? ? ? ? ? ? ? ? if (t.isAlive()) // precheck that t is startable
? ? ? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();
? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);
? ? ? ? ? ? ? ? ? ? ? ? int s = workers.size();
? ? ? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)
? ? ? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 添加成功后開啟線程
? ? ? ? ? ? ? ? if (workerAdded) {
? ? ? ? ? ? ? ? ? ? t.start();
? ? ? ? ? ? ? ? ? ? workerStarted = true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (! workerStarted)
? ? ? ? ? ? ? ? addWorkerFailed(w);
? ? ? ? }
? ? ? ? return workerStarted;
? ? }
復(fù)制代碼
addWorker()添加線程時判斷了兩次線程狀態(tài)是否符合添加線程的條件
第一次判斷返回false:
①.線程池狀態(tài)為STOP、TIDYING或TERMINATED狀態(tài)
②.線程池狀態(tài)為SHUTDOWN,任務(wù)不為null即線程處于SHUTDOWN狀態(tài),不允許添加任務(wù)
③.線程池狀態(tài)為SHUTDOWN,任務(wù)為null,但阻塞隊列為空,即添加空任務(wù)沒有意義
第二次判斷返回false:
①.線程池狀態(tài)為STOP、TIDYING或TERMINATED狀態(tài)
②.線程池狀態(tài)為SHUTDOWN且任務(wù)不為null
線程添加成功后,調(diào)用start()方法啟動線程,執(zhí)行Worker類(繼承AQS)的run()方法
? ? public void run() {
? ? ? ? runWorker(this);
? ? }
? ? final void runWorker(Worker w) {
? ? ? ? Thread wt = Thread.currentThread();
? ? ? ? Runnable task = w.firstTask;
? ? ? ? w.firstTask = null;
? ? ? ? // 釋放鎖,允許中斷
? ? ? ? w.unlock(); // allow interrupts
? ? ? ? boolean completedAbruptly = true;
? ? ? ? try {
? ? ? ? ? ? // 若當(dāng)前線程所需執(zhí)行的任務(wù)不為空或阻塞隊列中有任務(wù)
? ? ? ? ? ? while (task != null || (task = getTask()) != null) {
? ? ? ? ? ? ? ? w.lock();
// 若線程池處于STOP、TIDYING或TERMINATED狀態(tài)時,且線程沒有中斷標(biāo)記,則請求中斷線程
// 若線程池處于RUNNING或SHUTDOWN狀態(tài),且線程有中斷標(biāo)記,再次判斷線程池狀態(tài)是否>=STOP,若是請求中斷線程
? ? ? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||
? ? ? ? ? ? ? ? ? ? (Thread.interrupted() &&
? ? ? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&
? ? ? ? ? ? ? ? ? ? !wt.isInterrupted())
? ? ? ? ? ? ? ? ? ? wt.interrupt();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? // 根據(jù)業(yè)務(wù)場景自定義方法
? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);
? ? ? ? ? ? ? ? ? ? Throwable thrown = null;
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? // 執(zhí)行任務(wù)
? ? ? ? ? ? ? ? ? ? ? ? task.run();
? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Error x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {
? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? // 根據(jù)業(yè)務(wù)場景自定義方法
? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? task = null;
? ? ? ? ? ? ? ? ? ? w.completedTasks++;
? ? ? ? ? ? ? ? ? ? w.unlock();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? completedAbruptly = false;
? ? ? ? } finally {
? ? ? ? ? ? // 退出處理
? ? ? ? ? ? processWorkerExit(w, completedAbruptly);
? ? ? ? }
? ? }
若當(dāng)前線程的任務(wù)執(zhí)行完,還會調(diào)用getTask()找阻塞隊列中是否有任務(wù)
? ? private Runnable getTask() {
? ? ? ? boolean timedOut = false; // Did the last poll() time out?
? ? ? ? for (;;) {
? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? // 獲取線程池狀態(tài)
? ? ? ? ? ? int rs = runStateOf(c);
? ? ? ? ? ? // 若線程池狀態(tài)為SHUTDOWN且阻塞隊列為空,workerCount - 1,返回null
? ? ? ? ? ? // 若線程池狀態(tài)為STOP、TIDYING或TERMINATED狀態(tài),workerCount - 1,返回null
? ? ? ? ? ? if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
? ? ? ? ? ? ? ? decrementWorkerCount();
? ? ? ? ? ? ? ? return null;
? ? ? ? ? ? }
? ? ? ? ? ? int wc = workerCountOf(c);
? ? ? ? ? ? // Are workers subject to culling?
? ? ? ? ? ? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
? ? ? ? ? ? if ((wc > maximumPoolSize || (timed && timedOut))
? ? ? ? ? ? ? ? && (wc > 1 || workQueue.isEmpty())) {
? ? ? ? ? ? ? ? if (compareAndDecrementWorkerCount(c))
? ? ? ? ? ? ? ? ? ? return null;
? ? ? ? ? ? ? ? continue;
? ? ? ? ? ? }
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // 若需要超時控制,則調(diào)用poll(),否則調(diào)用take()從阻塞隊列中獲取任務(wù)
? ? ? ? ? ? ? ? Runnable r = timed ?
? ? ? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
? ? ? ? ? ? ? ? ? ? workQueue.take();
? ? ? ? ? ? ? ? if (r != null)
? ? ? ? ? ? ? ? ? ? return r;
? ? ? ? ? ? ? ? timedOut = true;
? ? ? ? ? ? } catch (InterruptedException retry) {
? ? ? ? ? ? ? ? timedOut = false;
? ? ? ? ? ? }
? ? ? ? }
? ? }
從getTask()源碼可以知道線程池中的線程執(zhí)行完自身任務(wù)后會一直執(zhí)行阻塞隊列中的任務(wù)。當(dāng)線程處理完阻塞隊列的任務(wù)后或者處理任務(wù)時出現(xiàn)異常退出循環(huán),會執(zhí)行processWorkerExit()方法
? ? private void processWorkerExit(Worker w, boolean completedAbruptly) {
? ? ? ? // completedAbruptly:true,表明線程運行異常,workerCount-1
? ? ? ? // completedAbruptly:false,表明運行正常getTask()方法中已減少線程數(shù)量
? ? ? ? if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
? ? ? ? ? ? decrementWorkerCount();
? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? mainLock.lock();
? ? ? ? try {
? ? ? ? ? ? completedTaskCount += w.completedTasks;
? ? ? ? ? ? // 從workers移除,從線程池移除至多一個線程
? ? ? ? ? ? workers.remove(w);
? ? ? ? } finally {
? ? ? ? ? ? mainLock.unlock();
? ? ? ? }
? ? ? ? // 嘗試終止線程池
? ? ? ? tryTerminate();
? ? ? ? int c = ctl.get();
? ? ? ? // 若當(dāng)前線程池狀態(tài)為RUNNING或SHUTDOWN,
? ? ? ? if (runStateLessThan(c, STOP)) {
? ? ? ? ? ? // 線程運行正常
? ? ? ? ? ? if (!completedAbruptly) {
? ? ? ? ? ? ? ? // 若allowCoreThreadTimeOut為true,且等待隊列有任務(wù),至少保留一個線程
? ? ? ? ? ? ? ? // 若allowCoreThreadTimeOut為false,線程數(shù)不少于corePoolSize
? ? ? ? ? ? ? ? int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
? ? ? ? ? ? ? ? if (min == 0 && ! workQueue.isEmpty())
? ? ? ? ? ? ? ? ? ? min = 1;
? ? ? ? ? ? ? ? if (workerCountOf(c) >= min)
? ? ? ? ? ? ? ? ? ? return; // replacement not needed
? ? ? ? ? ? }
? ? ? ? ? ? // 線程運行異常,調(diào)用addWorker()添加線程
? ? ? ? ? ? addWorker(null, false);
? ? ? ? }
? ? }
方法先判斷線程運行是否順利,若運行出現(xiàn)異常將線程數(shù)減1。然后調(diào)用tryTerminate()嘗試終止線程池。若當(dāng)前線程池狀態(tài)為RUNNING或SHUTDOWN,視情況是否添加線程
tryTerminate()方法
? ? final void tryTerminate() {
? ? ? ? for (;;) {
? ? ? ? ? ? int c = ctl.get();
? ? ? ? ? ? // 若線程池當(dāng)前狀態(tài)為RUNNING直接返回不終止
? ? ? ? ? ? // 若狀態(tài)為TIDYING或TERMINATED,即已經(jīng)準(zhǔn)備終止
? ? ? ? ? ? // 若狀態(tài)為SHUTDOWN且阻塞隊列非空,需要執(zhí)行完任務(wù)
? ? ? ? ? ? if (isRunning(c) ||
? ? ? ? ? ? ? ? runStateAtLeast(c, TIDYING) ||
? ? ? ? ? ? ? ? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? //? 若線程數(shù)不等于0,適當(dāng)終止一個線程
? ? ? ? ? ? if (workerCountOf(c) != 0) { // Eligible to terminate
? ? ? ? ? ? ? ? interruptIdleWorkers(ONLY_ONE);
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }
? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
? ? ? ? ? ? mainLock.lock();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // // 嘗試終止線程池
? ? ? ? ? ? ? ? if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? // 子類實現(xiàn)
? ? ? ? ? ? ? ? ? ? ? ? terminated();
? ? ? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED, 0));
? ? ? ? ? ? ? ? ? ? ? ? termination.signalAll();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? mainLock.unlock();
? ? ? ? ? ? }
? ? ? ? ? ? // else retry on failed CAS
? ? ? ? }
? ? }
submit()
submit()返回future類型的對象,通過這個future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成。
? ? public? Future submit(Callable task) {
? ? ? ? if (task == null) throw new NullPointerException();
? ? ? ? RunnableFuture ftask = newTaskFor(task);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
在submit方法中調(diào)用newTaskFor()將Callable任務(wù)會被封裝成FutureTask對象
? ? protected? RunnableFuture newTaskFor(Callable callable) {
? ? ? ? return new FutureTask(callable);
? ? }
FutureTask狀態(tài):
? ? /** Possible state transitions:
? ? * NEW -> COMPLETING -> NORMAL
? ? * NEW -> COMPLETING -> EXCEPTIONAL
? ? * NEW -> CANCELLED
? ? * NEW -> INTERRUPTING -> INTERRUPTED
? ? */
? ? private volatile int state;
? ? private static final int NEW? ? ? ? ? = 0;
? ? private static final int COMPLETING? = 1;
? ? private static final int NORMAL? ? ? = 2;
? ? private static final int EXCEPTIONAL? = 3;
? ? private static final int CANCELLED? ? = 4;
? ? private static final int INTERRUPTING = 5;
? ? private static final int INTERRUPTED? = 6;
NEW:表示是個新的任務(wù)或者還沒被執(zhí)行完的任務(wù)。這是初始狀態(tài)。
COMPLETING:任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務(wù)執(zhí)行結(jié)果,如果發(fā)生異常,則用來保存異常原因)的時候,狀態(tài)會從NEW變更到COMPLETING。但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。
NORMAL:任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會從COMPLETING轉(zhuǎn)換到NORMAL。這是一個最終態(tài)。
EXCEPTIONAL:任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會從COMPLETING轉(zhuǎn)換到EXCEPTIONAL。這是一個最終態(tài)。
CANCELLED:任務(wù)還沒開始執(zhí)行或者已經(jīng)開始執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程,這個時候狀態(tài)會從NEW轉(zhuǎn)化為CANCELLED狀態(tài)。這是一個最終態(tài)。
INTERRUPTING:任務(wù)還沒開始執(zhí)行或者已經(jīng)執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒有中斷任務(wù)執(zhí)行線程之前,狀態(tài)會從NEW轉(zhuǎn)化為INTERRUPTING。這是一個中間狀態(tài)。
INTERRUPTED:調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會從INTERRUPTING轉(zhuǎn)換到INTERRUPTED,這是一個最終態(tài)。
所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成,任務(wù)執(zhí)行異常或者任務(wù)被取消)
FutureTask.get實現(xiàn)
? ? public V get() throws InterruptedException, ExecutionException {
? ? ? ? int s = state;
? ? ? ? if (s <= COMPLETING)
? ? ? ? ? ? s = awaitDone(false, 0L);
? ? ? ? return report(s);
? ? }
若狀態(tài)為NEW或者COMPLETING時調(diào)用awaitDone()對主線程進行阻塞
? ? private int awaitDone(boolean timed, long nanos)
? ? ? ? throws InterruptedException {
? ? ? ? final long deadline = timed ? System.nanoTime() + nanos : 0L;
? ? ? ? WaitNode q = null;
? ? ? ? boolean queued = false;
? ? ? ? for (;;) {
? ? ? ? ? ? // 若主線程被中斷,拋異常
? ? ? ? ? ? if (Thread.interrupted()) {
? ? ? ? ? ? ? ? // 去除鏈表中超時或被中斷節(jié)點
? ? ? ? ? ? ? ? removeWaiter(q);
? ? ? ? ? ? ? ? throw new InterruptedException();
? ? ? ? ? ? }
? ? ? ? ? ? int s = state;
? ? ? ? ? ? // 若狀態(tài)大于COMPLETING,表明任務(wù)已完成,直接返回
? ? ? ? ? ? if (s > COMPLETING) {
? ? ? ? ? ? ? ? if (q != null)
? ? ? ? ? ? ? ? ? ? q.thread = null;
? ? ? ? ? ? ? ? return s;
? ? ? ? ? ? }
? ? ? ? ? ? // 若狀態(tài)等于COMPLETING,讓出cpu資源
? ? ? ? ? ? else if (s == COMPLETING) // cannot time out yet
? ? ? ? ? ? ? ? Thread.yield();
? ? ? ? ? ? else if (q == null)
? ? ? ? ? ? ? ? q = new WaitNode();
? ? ? ? ? ? else if (!queued)
? ? ? ? ? ? ? ? // CAS設(shè)置鏈表(棧的邏輯結(jié)構(gòu))
? ? ? ? ? ? ? ? queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? q.next = waiters, q);
? ? ? ? ? ? else if (timed) {
? ? ? ? ? ? ? ? nanos = deadline - System.nanoTime();
? ? ? ? ? ? ? ? // 若超時,去除鏈表中超時或被中斷節(jié)點
? ? ? ? ? ? ? ? if (nanos <= 0L) {
? ? ? ? ? ? ? ? ? ? removeWaiter(q);
? ? ? ? ? ? ? ? ? ? return state;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 限時祖塞
? ? ? ? ? ? ? ? LockSupport.parkNanos(this, nanos);
? ? ? ? ? ? }
? ? ? ? ? ? else
? ? ? ? ? ? ? ? // 一直阻塞
? ? ? ? ? ? ? ? LockSupport.park(this);
? ? ? ? }
? ? }
awaitDone()方法目的是主線程阻塞直至futureTask完成。若狀態(tài)為COMPLETING,表明任務(wù)完成(無論成功或失敗),但其結(jié)果被保存在outcome字段中,讓出cpu資源;若狀態(tài)大于COMPLETING表明任務(wù)完成且結(jié)果已存,直接返回;否則維護基于鏈表的等待棧根據(jù)是否限時阻塞線程節(jié)點
futureTask.run實現(xiàn)
? ? public void run() {
? ? ? ? // 若任務(wù)完成或已有其他執(zhí)行此任務(wù)
? ? ? ? if (state != NEW ||
? ? ? ? ? ? !UNSAFE.compareAndSwapObject(this, runnerOffset,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? null, Thread.currentThread()))
? ? ? ? ? ? return;
? ? ? ? try {
? ? ? ? ? ? Callable c = callable;
? ? ? ? ? ? // 若任務(wù)不為空且狀態(tài)為new
? ? ? ? ? ? if (c != null && state == NEW) {
? ? ? ? ? ? ? ? V result;
? ? ? ? ? ? ? ? boolean ran;
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? // 執(zhí)行任務(wù)
? ? ? ? ? ? ? ? ? ? result = c.call();
? ? ? ? ? ? ? ? ? ? ran = true;
? ? ? ? ? ? ? ? } catch (Throwable ex) {
? ? ? ? ? ? ? ? ? ? result = null;
? ? ? ? ? ? ? ? ? ? ran = false;
? ? ? ? ? ? ? ? ? ? setException(ex);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (ran)
? ? ? ? ? ? ? ? ? ? set(result);
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? // runner must be non-null until state is settled to
? ? ? ? ? ? // 防止并發(fā)調(diào)用run
? ? ? ? ? ? runner = null;
? ? ? ? ? ? // state must be re-read after nulling runner to prevent
? ? ? ? ? ? // leaked interrupts
? ? ? ? ? ? int s = state;
? ? ? ? ? ? if (s >= INTERRUPTING)
? ? ? ? ? ? ? ? handlePossibleCancellationInterrupt(s);
? ? ? ? }
? ? }
run()方法邏輯很簡單,執(zhí)行成功set()方法保存結(jié)果;執(zhí)行異常setException()保存異常,最后runner置空防止并發(fā)調(diào)用,若任務(wù)被中斷,handlePossibleCancellationInterrupt處理由于cancel(true)而取消中斷的線程
set,setException方法:
? ? /**
? ? * 任務(wù)執(zhí)行成功? 狀態(tài)由NEW -> COMPLETING -> NORMAL
? ? */
? ? protected void set(V v) {
? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
? ? ? ? ? ? outcome = v;
? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
? ? ? ? ? ? finishCompletion();
? ? ? ? }
? ? }
? ? /**
? ? * 任務(wù)執(zhí)行異常? 狀態(tài)NEW -> COMPLETING -> EXCEPTIONAL
? ? */
? ? protected void setException(Throwable t) {
? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
? ? ? ? ? ? outcome = t;
? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
? ? ? ? ? ? finishCompletion();
? ? ? ? }
? ? }
兩個方法都會finishCompletion()通知主線程任務(wù)已經(jīng)執(zhí)行完成
? ? private void finishCompletion() {
? ? ? ? // assert state > COMPLETING;
? ? ? ? for (WaitNode q; (q = waiters) != null;) {
? ? ? ? ? ? if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
? ? ? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? ? ? Thread t = q.thread;
? ? ? ? ? ? ? ? ? ? if (t != null) {
? ? ? ? ? ? ? ? ? ? ? ? q.thread = null;
? ? ? ? ? ? ? ? ? ? ? ? LockSupport.unpark(t);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? WaitNode next = q.next;
? ? ? ? ? ? ? ? ? ? if (next == null)
? ? ? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? ? ? q.next = null; // unlink to help gc
? ? ? ? ? ? ? ? ? ? q = next;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? done();
? ? ? ? callable = null;? ? ? ? // to reduce footprint
? ? }
1、執(zhí)行FutureTask類的get方法時,會把主線程封裝成WaitNode節(jié)點并保存在waiters鏈表中;
2、FutureTask任務(wù)執(zhí)行完成后,通過UNSAFE設(shè)置waiters的值,并通過LockSupport類unpark方法喚醒主線程;
線程池關(guān)閉
線程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于關(guān)閉線程池
shutdown():按過去執(zhí)行已提交任務(wù)的順序發(fā)起一個有序的關(guān)閉,其中先前提交的任務(wù)將被執(zhí)行,但不會接受任何新任務(wù)
shutdownNow() :嘗試停止所有主動執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回正在等待執(zhí)行的任務(wù)列表
線程池配置
合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來分析
任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)
任務(wù)的優(yōu)先級:高、中和低
任務(wù)的執(zhí)行時間:長、中和短
任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。
CPU密集型任務(wù):應(yīng)配置盡可能小的線程,如配置Ncpu+1個線程的線程池
IO密集型任務(wù):其線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu
混合型的任務(wù):如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解
可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先執(zhí)行,但優(yōu)先級低的任務(wù)可能永遠不能執(zhí)行
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時間越長,則CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU
建議使用有界隊列,使用無界隊列的話,一旦任務(wù)積壓在阻塞隊列中的話就會占用過多的內(nèi)存資源,系統(tǒng)可能會崩潰