Java并發(fā)——線程池ThreadPoolExecutor

線程池作用

相對于為每個請求都創(chuàng)建一個線程,線程池通過重用現(xiàn)有的線程而不是創(chuàng)建新線程,可以在處理多個請求時分?jǐn)傇诰€程創(chuàng)建和銷毀過程中產(chǎn)生的巨大開銷,當(dāng)請求到達時,工作線程通過已經(jīng)存在,不會由于等待創(chuàng)建線程而延遲任務(wù)的執(zhí)行,從而提高響應(yīng)性。通過適當(dāng)調(diào)整線程池的大小,可以創(chuàng)建足夠多的線程以便使處理器保持忙碌狀態(tài),同時還可以防止過多線程相互競爭資源而使應(yīng)用程序耗盡內(nèi)存或失敗

線程池處理流程

1)判斷核心線程池里的線程是否都在執(zhí)行任務(wù)。如果不是,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進入下個流程

2)判斷工作隊列是否已經(jīng)滿。如果工作隊列沒有滿,則將新提交的任務(wù)存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程

3)判斷線程池的線程是否都處于工作狀態(tài)。如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來處理這個任務(wù)

示意圖:

創(chuàng)建線程池

ThreadPoolExecutor構(gòu)造方法:

? ? public ThreadPoolExecutor(int corePoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {

? ? ? ? ...? ? //代碼省略

? ? }

一共七個參數(shù):

corePoolSize

線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使有其他空閑的核心線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,直到線程數(shù)等于corePoolSize就不再創(chuàng)建,繼續(xù)提交的任務(wù)被保存到阻塞隊列中。如果調(diào)用了線程池的prestartAllCoreThreads()或者prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程

maximumPoolSize

線程池最大線程數(shù),如果當(dāng)前阻塞隊列滿了,繼續(xù)提交任務(wù),若當(dāng)前線程數(shù)小于maximumPoolSize則創(chuàng)建新的線程執(zhí)行任務(wù)。注意如果使用了無界的阻塞隊列這個參數(shù)就沒什么效果

keepAliveTime

線程空閑時保持存活時間,即當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間。若當(dāng)前線程池的線程數(shù)超過corePoolSize,且線程空閑時間超過keepAliveTime,就將這些空閑線程銷毀,盡可能降低資源銷毀

unit

keepAliveTime的時間單位,可以是天、小時、分、毫秒、微秒和納秒

workQueue

用于保存等待執(zhí)行的任務(wù)的阻塞隊列

threadFactory

創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè) 置更有意義的名字

handler

線程池的飽和策略(或者叫拒絕策略),當(dāng)隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。Java線程池提供了以下4種策略:

①.AbortPolicy:直接拋出異常,默認(rèn)策略

②.CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)

③.DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)

④.DiscardPolicy:不處理,直接丟棄

也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略

調(diào)用Exectors中的靜態(tài)工廠方法也可以來創(chuàng)建線程池

newFixedThreadPool

? ? public static ExecutorService newFixedThreadPool(int nThreads) {

? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue());

? ? }

復(fù)制代碼

創(chuàng)建一個固定長度的線程池,每當(dāng)提交一個任務(wù)時就創(chuàng)建一個線程,直到達到線程池的最大數(shù)量(corePoolSize == maximumPoolSize),這時線程池的規(guī)模將不再變化(若某個線程由于發(fā)生了未預(yù)期的Exception而結(jié)束,線程池會補充一個新線程),使用LinkedBlockingQuene作為阻塞隊列,適用于負(fù)載比較重的服務(wù)器

newCachedThreadPool

? ? public static ExecutorService newCachedThreadPool() {

? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue());

? ? }

創(chuàng)建一個可緩存線程的線程池,默認(rèn)緩存60s,使用SynchronousQueue作為阻塞隊列(沒有數(shù)據(jù)緩存空間的阻塞隊列,每一個put操作必須等待一個take操作,若任務(wù)提交的速度遠遠大于CachedThreadPool的處理速度,CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù),可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源)。適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者負(fù)載較輕的服務(wù)器,使用該線程池時,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題

newSingleThreadExecutor

? ? public static ExecutorService newSingleThreadExecutor() {

? ? ? ? return new FinalizableDelegatedExecutorService

? ? ? ? ? ? (new ThreadPoolExecutor(1, 1,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue()));

? ? }

單線程的Executor,線程池中只有一個線程,若線程異常結(jié)束,會創(chuàng)建另一個線程替代。newSingleThreadExecutor能確保依照任務(wù)在隊列中的順訊來串行執(zhí)行,內(nèi)部使用LinkedBlockingQueue作為阻塞隊列,適用于需要保證順序地執(zhí)行各個任務(wù);并且在任意時間點,不會有多個線程是活動的應(yīng)用場景

newScheduledThreadPool

? ? public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {

? ? ? ? return new ScheduledThreadPoolExecutor(corePoolSize);

? ? }

public ScheduledThreadPoolExecutor(int corePoolSize) {

? ? super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

? ? ? ? ? new DelayedWorkQueue());

}

可以延遲或定時的方式執(zhí)行任務(wù),適用于周期任務(wù)實現(xiàn)原理

線程池狀態(tài)

? ? private static final int RUNNING? ? = -1 << COUNT_BITS;

? ? private static final int SHUTDOWN? =? 0 << COUNT_BITS;

? ? private static final int STOP? ? ? =? 1 << COUNT_BITS;

? ? private static final int TIDYING? ? =? 2 << COUNT_BITS;

? ? private static final int TERMINATED =? 3 << COUNT_BITS;

RUNNING:

線程池能夠接收新任務(wù),且能處理阻塞隊列中的任務(wù)

SHUTDOWN:

線程池不會接收新任務(wù),但會處理阻塞隊列中的任務(wù)(shutdown())

STOP:

線程池不會接收新任務(wù),不會處理已添加的任務(wù),并且會中斷正在處理的任務(wù)(shutdownNow())

TIDYING:

所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0

TERMINATED:

線程池徹底終止(terminated())

任務(wù)提交

有兩種方式向線程池提交任務(wù),分別為execute()和submit()方法。execute()方法提交的任務(wù)不能獲取返回值,而submit()方法提交的任務(wù)會返回一個future類型的對象,可以通過這個future對象判斷任務(wù)是否執(zhí)行成功

execute()

execute()方法執(zhí)行示意圖:

execute()源碼:

? ? public void execute(Runnable command) {

? ? ? ? if (command == null)

? ? ? ? ? ? throw new NullPointerException();

? ? ? ? int c = ctl.get();

? ? ? ? // 若線程池當(dāng)前線程數(shù)小于核心線程數(shù)則創(chuàng)建新線程執(zhí)行任務(wù)

? ? ? ? if (workerCountOf(c) < corePoolSize) {

? ? ? ? ? ? if (addWorker(command, true))

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? c = ctl.get();

? ? ? ? }

? ? ? ? // 若線程數(shù)大于等于核心線程數(shù)或線程創(chuàng)建失敗,則將當(dāng)前任務(wù)放到工作隊列中

? ? ? ? if (isRunning(c) && workQueue.offer(command)) {

? ? ? ? ? ? int recheck = ctl.get();

? ? ? ? ? ? if (! isRunning(recheck) && remove(command))

? ? ? ? ? ? ? ? reject(command);

? ? ? ? ? ? else if (workerCountOf(recheck) == 0)

? ? ? ? ? ? ? ? addWorker(null, false);

? ? ? ? }

? ? ? ? // 若當(dāng)前任務(wù)無法放進阻塞隊列中,則創(chuàng)建新的線程來執(zhí)行任務(wù)

? ? ? ? else if (!addWorker(command, false))

? ? ? ? ? ? // addWoker創(chuàng)建失敗,執(zhí)行reject方法運行相應(yīng)的拒絕策略

? ? ? ? ? ? reject(command);

? ? }

復(fù)制代碼

如果當(dāng)前運行的線程少于corePoolSize,則會調(diào)用addWorker()創(chuàng)建新的線程來執(zhí)行新的任務(wù)

? ? private boolean addWorker(Runnable firstTask, boolean core) {

? ? ? ? retry:

? ? ? ? for (;;) {

? ? ? ? ? ? int c = ctl.get();

? ? ? ? ? ? // 獲取當(dāng)前線程池運行狀態(tài)

? ? ? ? ? ? int rs = runStateOf(c);

? ? ? ? ? ? // 狀態(tài)判斷,條件不符合添加線程失敗

? ? ? ? ? ? if (rs >= SHUTDOWN &&

? ? ? ? ? ? ? ? ! (rs == SHUTDOWN &&

? ? ? ? ? ? ? ? ? firstTask == null &&

? ? ? ? ? ? ? ? ? ! workQueue.isEmpty()))

? ? ? ? ? ? ? ? return false;

? ? ? ? ? ? for (;;) {

? ? ? ? ? ? ? ? // 獲取線程池當(dāng)前線程數(shù)

? ? ? ? ? ? ? ? int wc = workerCountOf(c);

? ? ? ? ? ? ? ? // 若線程數(shù)超過CAPACITY,返回false

? ? ? ? ? ? ? ? // 若是添加核心線程,超過核心線程數(shù)返回false;若不是超過最大線程數(shù)返回false

? ? ? ? ? ? ? ? if (wc >= CAPACITY ||

? ? ? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))

? ? ? ? ? ? ? ? ? ? return false;

? ? ? ? ? ? ? ? // CAS線程數(shù)+1? ?

? ? ? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))

? ? ? ? ? ? ? ? ? ? break retry;

? ? ? ? ? ? ? ? c = ctl.get();? // Re-read ctl

? ? ? ? ? ? ? ? // 若狀態(tài)與之前不一樣,跳到最外層循環(huán)

? ? ? ? ? ? ? ? if (runStateOf(c) != rs)

? ? ? ? ? ? ? ? ? ? continue retry;

? ? ? ? ? ? ? ? // else CAS failed due to workerCount change; retry inner loop

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? boolean workerStarted = false;

? ? ? ? boolean workerAdded = false;

? ? ? ? Worker w = null;

? ? ? ? try {

? ? ? ? ? ? // 創(chuàng)建線程

? ? ? ? ? ? w = new Worker(firstTask);

? ? ? ? ? ? final Thread t = w.thread;

? ? ? ? ? ? if (t != null) {

? ? ? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;

? ? ? ? ? ? ? ? // 獲取鎖

? ? ? ? ? ? ? ? mainLock.lock();

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? // 再次校驗線程狀態(tài)是否符合添加線程條件

? ? ? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());

? ? ? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||

? ? ? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {

? ? ? ? ? ? ? ? ? ? ? ? if (t.isAlive()) // precheck that t is startable

? ? ? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();

? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);

? ? ? ? ? ? ? ? ? ? ? ? int s = workers.size();

? ? ? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)

? ? ? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;

? ? ? ? ? ? ? ? ? ? ? ? workerAdded = true;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? mainLock.unlock();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? // 添加成功后開啟線程

? ? ? ? ? ? ? ? if (workerAdded) {

? ? ? ? ? ? ? ? ? ? t.start();

? ? ? ? ? ? ? ? ? ? workerStarted = true;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? } finally {

? ? ? ? ? ? if (! workerStarted)

? ? ? ? ? ? ? ? addWorkerFailed(w);

? ? ? ? }

? ? ? ? return workerStarted;

? ? }

復(fù)制代碼

addWorker()添加線程時判斷了兩次線程狀態(tài)是否符合添加線程的條件

第一次判斷返回false:

①.線程池狀態(tài)為STOP、TIDYING或TERMINATED狀態(tài)

②.線程池狀態(tài)為SHUTDOWN,任務(wù)不為null即線程處于SHUTDOWN狀態(tài),不允許添加任務(wù)

③.線程池狀態(tài)為SHUTDOWN,任務(wù)為null,但阻塞隊列為空,即添加空任務(wù)沒有意義

第二次判斷返回false:

①.線程池狀態(tài)為STOP、TIDYING或TERMINATED狀態(tài)

②.線程池狀態(tài)為SHUTDOWN且任務(wù)不為null

線程添加成功后,調(diào)用start()方法啟動線程,執(zhí)行Worker類(繼承AQS)的run()方法

? ? public void run() {

? ? ? ? runWorker(this);

? ? }


? ? final void runWorker(Worker w) {

? ? ? ? Thread wt = Thread.currentThread();

? ? ? ? Runnable task = w.firstTask;

? ? ? ? w.firstTask = null;

? ? ? ? // 釋放鎖,允許中斷

? ? ? ? w.unlock(); // allow interrupts

? ? ? ? boolean completedAbruptly = true;

? ? ? ? try {

? ? ? ? ? ? // 若當(dāng)前線程所需執(zhí)行的任務(wù)不為空或阻塞隊列中有任務(wù)

? ? ? ? ? ? while (task != null || (task = getTask()) != null) {

? ? ? ? ? ? ? ? w.lock();

// 若線程池處于STOP、TIDYING或TERMINATED狀態(tài)時,且線程沒有中斷標(biāo)記,則請求中斷線程

// 若線程池處于RUNNING或SHUTDOWN狀態(tài),且線程有中斷標(biāo)記,再次判斷線程池狀態(tài)是否>=STOP,若是請求中斷線程

? ? ? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||

? ? ? ? ? ? ? ? ? ? (Thread.interrupted() &&

? ? ? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&

? ? ? ? ? ? ? ? ? ? !wt.isInterrupted())

? ? ? ? ? ? ? ? ? ? wt.interrupt();

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? // 根據(jù)業(yè)務(wù)場景自定義方法

? ? ? ? ? ? ? ? ? ? beforeExecute(wt, task);

? ? ? ? ? ? ? ? ? ? Throwable thrown = null;

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? // 執(zhí)行任務(wù)

? ? ? ? ? ? ? ? ? ? ? ? task.run();

? ? ? ? ? ? ? ? ? ? } catch (RuntimeException x) {

? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

? ? ? ? ? ? ? ? ? ? } catch (Error x) {

? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

? ? ? ? ? ? ? ? ? ? } catch (Throwable x) {

? ? ? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);

? ? ? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? ? ? // 根據(jù)業(yè)務(wù)場景自定義方法

? ? ? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? task = null;

? ? ? ? ? ? ? ? ? ? w.completedTasks++;

? ? ? ? ? ? ? ? ? ? w.unlock();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? completedAbruptly = false;

? ? ? ? } finally {

? ? ? ? ? ? // 退出處理

? ? ? ? ? ? processWorkerExit(w, completedAbruptly);

? ? ? ? }

? ? }

若當(dāng)前線程的任務(wù)執(zhí)行完,還會調(diào)用getTask()找阻塞隊列中是否有任務(wù)

? ? private Runnable getTask() {

? ? ? ? boolean timedOut = false; // Did the last poll() time out?

? ? ? ? for (;;) {

? ? ? ? ? ? int c = ctl.get();

? ? ? ? ? ? // 獲取線程池狀態(tài)

? ? ? ? ? ? int rs = runStateOf(c);

? ? ? ? ? ? // 若線程池狀態(tài)為SHUTDOWN且阻塞隊列為空,workerCount - 1,返回null

? ? ? ? ? ? // 若線程池狀態(tài)為STOP、TIDYING或TERMINATED狀態(tài),workerCount - 1,返回null

? ? ? ? ? ? if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

? ? ? ? ? ? ? ? decrementWorkerCount();

? ? ? ? ? ? ? ? return null;

? ? ? ? ? ? }

? ? ? ? ? ? int wc = workerCountOf(c);

? ? ? ? ? ? // Are workers subject to culling?

? ? ? ? ? ? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

? ? ? ? ? ? if ((wc > maximumPoolSize || (timed && timedOut))

? ? ? ? ? ? ? ? && (wc > 1 || workQueue.isEmpty())) {

? ? ? ? ? ? ? ? if (compareAndDecrementWorkerCount(c))

? ? ? ? ? ? ? ? ? ? return null;

? ? ? ? ? ? ? ? continue;

? ? ? ? ? ? }

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? // 若需要超時控制,則調(diào)用poll(),否則調(diào)用take()從阻塞隊列中獲取任務(wù)

? ? ? ? ? ? ? ? Runnable r = timed ?

? ? ? ? ? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

? ? ? ? ? ? ? ? ? ? workQueue.take();

? ? ? ? ? ? ? ? if (r != null)

? ? ? ? ? ? ? ? ? ? return r;

? ? ? ? ? ? ? ? timedOut = true;

? ? ? ? ? ? } catch (InterruptedException retry) {

? ? ? ? ? ? ? ? timedOut = false;

? ? ? ? ? ? }

? ? ? ? }

? ? }

從getTask()源碼可以知道線程池中的線程執(zhí)行完自身任務(wù)后會一直執(zhí)行阻塞隊列中的任務(wù)。當(dāng)線程處理完阻塞隊列的任務(wù)后或者處理任務(wù)時出現(xiàn)異常退出循環(huán),會執(zhí)行processWorkerExit()方法

? ? private void processWorkerExit(Worker w, boolean completedAbruptly) {

? ? ? ? // completedAbruptly:true,表明線程運行異常,workerCount-1

? ? ? ? // completedAbruptly:false,表明運行正常getTask()方法中已減少線程數(shù)量

? ? ? ? if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

? ? ? ? ? ? decrementWorkerCount();

? ? ? ? final ReentrantLock mainLock = this.mainLock;

? ? ? ? mainLock.lock();

? ? ? ? try {

? ? ? ? ? ? completedTaskCount += w.completedTasks;

? ? ? ? ? ? // 從workers移除,從線程池移除至多一個線程

? ? ? ? ? ? workers.remove(w);

? ? ? ? } finally {

? ? ? ? ? ? mainLock.unlock();

? ? ? ? }

? ? ? ? // 嘗試終止線程池

? ? ? ? tryTerminate();

? ? ? ? int c = ctl.get();

? ? ? ? // 若當(dāng)前線程池狀態(tài)為RUNNING或SHUTDOWN,

? ? ? ? if (runStateLessThan(c, STOP)) {

? ? ? ? ? ? // 線程運行正常

? ? ? ? ? ? if (!completedAbruptly) {

? ? ? ? ? ? ? ? // 若allowCoreThreadTimeOut為true,且等待隊列有任務(wù),至少保留一個線程

? ? ? ? ? ? ? ? // 若allowCoreThreadTimeOut為false,線程數(shù)不少于corePoolSize

? ? ? ? ? ? ? ? int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

? ? ? ? ? ? ? ? if (min == 0 && ! workQueue.isEmpty())

? ? ? ? ? ? ? ? ? ? min = 1;

? ? ? ? ? ? ? ? if (workerCountOf(c) >= min)

? ? ? ? ? ? ? ? ? ? return; // replacement not needed

? ? ? ? ? ? }

? ? ? ? ? ? // 線程運行異常,調(diào)用addWorker()添加線程

? ? ? ? ? ? addWorker(null, false);

? ? ? ? }

? ? }

方法先判斷線程運行是否順利,若運行出現(xiàn)異常將線程數(shù)減1。然后調(diào)用tryTerminate()嘗試終止線程池。若當(dāng)前線程池狀態(tài)為RUNNING或SHUTDOWN,視情況是否添加線程

tryTerminate()方法

? ? final void tryTerminate() {

? ? ? ? for (;;) {

? ? ? ? ? ? int c = ctl.get();

? ? ? ? ? ? // 若線程池當(dāng)前狀態(tài)為RUNNING直接返回不終止

? ? ? ? ? ? // 若狀態(tài)為TIDYING或TERMINATED,即已經(jīng)準(zhǔn)備終止

? ? ? ? ? ? // 若狀態(tài)為SHUTDOWN且阻塞隊列非空,需要執(zhí)行完任務(wù)

? ? ? ? ? ? if (isRunning(c) ||

? ? ? ? ? ? ? ? runStateAtLeast(c, TIDYING) ||

? ? ? ? ? ? ? ? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? //? 若線程數(shù)不等于0,適當(dāng)終止一個線程

? ? ? ? ? ? if (workerCountOf(c) != 0) { // Eligible to terminate

? ? ? ? ? ? ? ? interruptIdleWorkers(ONLY_ONE);

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? }

? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;

? ? ? ? ? ? mainLock.lock();

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? // // 嘗試終止線程池

? ? ? ? ? ? ? ? if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? // 子類實現(xiàn)

? ? ? ? ? ? ? ? ? ? ? ? terminated();

? ? ? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED, 0));

? ? ? ? ? ? ? ? ? ? ? ? termination.signalAll();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? return;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? mainLock.unlock();

? ? ? ? ? ? }

? ? ? ? ? ? // else retry on failed CAS

? ? ? ? }

? ? }

submit()

submit()返回future類型的對象,通過這個future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成。

? ? public? Future submit(Callable task) {

? ? ? ? if (task == null) throw new NullPointerException();

? ? ? ? RunnableFuture ftask = newTaskFor(task);

? ? ? ? execute(ftask);

? ? ? ? return ftask;

? ? }

在submit方法中調(diào)用newTaskFor()將Callable任務(wù)會被封裝成FutureTask對象

? ? protected? RunnableFuture newTaskFor(Callable callable) {

? ? ? ? return new FutureTask(callable);

? ? }

FutureTask狀態(tài):

? ? /** Possible state transitions:

? ? * NEW -> COMPLETING -> NORMAL

? ? * NEW -> COMPLETING -> EXCEPTIONAL

? ? * NEW -> CANCELLED

? ? * NEW -> INTERRUPTING -> INTERRUPTED

? ? */

? ? private volatile int state;

? ? private static final int NEW? ? ? ? ? = 0;

? ? private static final int COMPLETING? = 1;

? ? private static final int NORMAL? ? ? = 2;

? ? private static final int EXCEPTIONAL? = 3;

? ? private static final int CANCELLED? ? = 4;

? ? private static final int INTERRUPTING = 5;

? ? private static final int INTERRUPTED? = 6;

NEW:表示是個新的任務(wù)或者還沒被執(zhí)行完的任務(wù)。這是初始狀態(tài)。

COMPLETING:任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務(wù)執(zhí)行結(jié)果,如果發(fā)生異常,則用來保存異常原因)的時候,狀態(tài)會從NEW變更到COMPLETING。但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。

NORMAL:任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會從COMPLETING轉(zhuǎn)換到NORMAL。這是一個最終態(tài)。

EXCEPTIONAL:任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會從COMPLETING轉(zhuǎn)換到EXCEPTIONAL。這是一個最終態(tài)。

CANCELLED:任務(wù)還沒開始執(zhí)行或者已經(jīng)開始執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程,這個時候狀態(tài)會從NEW轉(zhuǎn)化為CANCELLED狀態(tài)。這是一個最終態(tài)。

INTERRUPTING:任務(wù)還沒開始執(zhí)行或者已經(jīng)執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒有中斷任務(wù)執(zhí)行線程之前,狀態(tài)會從NEW轉(zhuǎn)化為INTERRUPTING。這是一個中間狀態(tài)。

INTERRUPTED:調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會從INTERRUPTING轉(zhuǎn)換到INTERRUPTED,這是一個最終態(tài)。

所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成,任務(wù)執(zhí)行異常或者任務(wù)被取消)

FutureTask.get實現(xiàn)

? ? public V get() throws InterruptedException, ExecutionException {

? ? ? ? int s = state;

? ? ? ? if (s <= COMPLETING)

? ? ? ? ? ? s = awaitDone(false, 0L);

? ? ? ? return report(s);

? ? }

若狀態(tài)為NEW或者COMPLETING時調(diào)用awaitDone()對主線程進行阻塞

? ? private int awaitDone(boolean timed, long nanos)

? ? ? ? throws InterruptedException {

? ? ? ? final long deadline = timed ? System.nanoTime() + nanos : 0L;

? ? ? ? WaitNode q = null;

? ? ? ? boolean queued = false;

? ? ? ? for (;;) {

? ? ? ? ? ? // 若主線程被中斷,拋異常

? ? ? ? ? ? if (Thread.interrupted()) {

? ? ? ? ? ? ? ? // 去除鏈表中超時或被中斷節(jié)點

? ? ? ? ? ? ? ? removeWaiter(q);

? ? ? ? ? ? ? ? throw new InterruptedException();

? ? ? ? ? ? }

? ? ? ? ? ? int s = state;

? ? ? ? ? ? // 若狀態(tài)大于COMPLETING,表明任務(wù)已完成,直接返回

? ? ? ? ? ? if (s > COMPLETING) {

? ? ? ? ? ? ? ? if (q != null)

? ? ? ? ? ? ? ? ? ? q.thread = null;

? ? ? ? ? ? ? ? return s;

? ? ? ? ? ? }

? ? ? ? ? ? // 若狀態(tài)等于COMPLETING,讓出cpu資源

? ? ? ? ? ? else if (s == COMPLETING) // cannot time out yet

? ? ? ? ? ? ? ? Thread.yield();

? ? ? ? ? ? else if (q == null)

? ? ? ? ? ? ? ? q = new WaitNode();

? ? ? ? ? ? else if (!queued)

? ? ? ? ? ? ? ? // CAS設(shè)置鏈表(棧的邏輯結(jié)構(gòu))

? ? ? ? ? ? ? ? queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? q.next = waiters, q);

? ? ? ? ? ? else if (timed) {

? ? ? ? ? ? ? ? nanos = deadline - System.nanoTime();

? ? ? ? ? ? ? ? // 若超時,去除鏈表中超時或被中斷節(jié)點

? ? ? ? ? ? ? ? if (nanos <= 0L) {

? ? ? ? ? ? ? ? ? ? removeWaiter(q);

? ? ? ? ? ? ? ? ? ? return state;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? // 限時祖塞

? ? ? ? ? ? ? ? LockSupport.parkNanos(this, nanos);

? ? ? ? ? ? }

? ? ? ? ? ? else

? ? ? ? ? ? ? ? // 一直阻塞

? ? ? ? ? ? ? ? LockSupport.park(this);

? ? ? ? }

? ? }

awaitDone()方法目的是主線程阻塞直至futureTask完成。若狀態(tài)為COMPLETING,表明任務(wù)完成(無論成功或失敗),但其結(jié)果被保存在outcome字段中,讓出cpu資源;若狀態(tài)大于COMPLETING表明任務(wù)完成且結(jié)果已存,直接返回;否則維護基于鏈表的等待棧根據(jù)是否限時阻塞線程節(jié)點

futureTask.run實現(xiàn)

? ? public void run() {

? ? ? ? // 若任務(wù)完成或已有其他執(zhí)行此任務(wù)

? ? ? ? if (state != NEW ||

? ? ? ? ? ? !UNSAFE.compareAndSwapObject(this, runnerOffset,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? null, Thread.currentThread()))

? ? ? ? ? ? return;

? ? ? ? try {

? ? ? ? ? ? Callable c = callable;

? ? ? ? ? ? // 若任務(wù)不為空且狀態(tài)為new

? ? ? ? ? ? if (c != null && state == NEW) {

? ? ? ? ? ? ? ? V result;

? ? ? ? ? ? ? ? boolean ran;

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? // 執(zhí)行任務(wù)

? ? ? ? ? ? ? ? ? ? result = c.call();

? ? ? ? ? ? ? ? ? ? ran = true;

? ? ? ? ? ? ? ? } catch (Throwable ex) {

? ? ? ? ? ? ? ? ? ? result = null;

? ? ? ? ? ? ? ? ? ? ran = false;

? ? ? ? ? ? ? ? ? ? setException(ex);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (ran)

? ? ? ? ? ? ? ? ? ? set(result);

? ? ? ? ? ? }

? ? ? ? } finally {

? ? ? ? ? ? // runner must be non-null until state is settled to

? ? ? ? ? ? // 防止并發(fā)調(diào)用run

? ? ? ? ? ? runner = null;

? ? ? ? ? ? // state must be re-read after nulling runner to prevent

? ? ? ? ? ? // leaked interrupts

? ? ? ? ? ? int s = state;

? ? ? ? ? ? if (s >= INTERRUPTING)

? ? ? ? ? ? ? ? handlePossibleCancellationInterrupt(s);

? ? ? ? }

? ? }

run()方法邏輯很簡單,執(zhí)行成功set()方法保存結(jié)果;執(zhí)行異常setException()保存異常,最后runner置空防止并發(fā)調(diào)用,若任務(wù)被中斷,handlePossibleCancellationInterrupt處理由于cancel(true)而取消中斷的線程

set,setException方法:

? ? /**

? ? * 任務(wù)執(zhí)行成功? 狀態(tài)由NEW -> COMPLETING -> NORMAL

? ? */

? ? protected void set(V v) {

? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

? ? ? ? ? ? outcome = v;

? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

? ? ? ? ? ? finishCompletion();

? ? ? ? }

? ? }


? ? /**

? ? * 任務(wù)執(zhí)行異常? 狀態(tài)NEW -> COMPLETING -> EXCEPTIONAL

? ? */

? ? protected void setException(Throwable t) {

? ? ? ? if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

? ? ? ? ? ? outcome = t;

? ? ? ? ? ? UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

? ? ? ? ? ? finishCompletion();

? ? ? ? }

? ? }

兩個方法都會finishCompletion()通知主線程任務(wù)已經(jīng)執(zhí)行完成

? ? private void finishCompletion() {

? ? ? ? // assert state > COMPLETING;

? ? ? ? for (WaitNode q; (q = waiters) != null;) {

? ? ? ? ? ? if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

? ? ? ? ? ? ? ? for (;;) {

? ? ? ? ? ? ? ? ? ? Thread t = q.thread;

? ? ? ? ? ? ? ? ? ? if (t != null) {

? ? ? ? ? ? ? ? ? ? ? ? q.thread = null;

? ? ? ? ? ? ? ? ? ? ? ? LockSupport.unpark(t);

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? WaitNode next = q.next;

? ? ? ? ? ? ? ? ? ? if (next == null)

? ? ? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? ? ? ? ? q.next = null; // unlink to help gc

? ? ? ? ? ? ? ? ? ? q = next;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? break;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? done();

? ? ? ? callable = null;? ? ? ? // to reduce footprint

? ? }

1、執(zhí)行FutureTask類的get方法時,會把主線程封裝成WaitNode節(jié)點并保存在waiters鏈表中;

2、FutureTask任務(wù)執(zhí)行完成后,通過UNSAFE設(shè)置waiters的值,并通過LockSupport類unpark方法喚醒主線程;

線程池關(guān)閉

線程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于關(guān)閉線程池

shutdown():按過去執(zhí)行已提交任務(wù)的順序發(fā)起一個有序的關(guān)閉,其中先前提交的任務(wù)將被執(zhí)行,但不會接受任何新任務(wù)

shutdownNow() :嘗試停止所有主動執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回正在等待執(zhí)行的任務(wù)列表

線程池配置

合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來分析

任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)

任務(wù)的優(yōu)先級:高、中和低

任務(wù)的執(zhí)行時間:長、中和短

任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接

性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。

CPU密集型任務(wù):應(yīng)配置盡可能小的線程,如配置Ncpu+1個線程的線程池

IO密集型任務(wù):其線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu

混合型的任務(wù):如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解

可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)

優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先執(zhí)行,但優(yōu)先級低的任務(wù)可能永遠不能執(zhí)行

執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行

依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時間越長,則CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU

建議使用有界隊列,使用無界隊列的話,一旦任務(wù)積壓在阻塞隊列中的話就會占用過多的內(nèi)存資源,系統(tǒng)可能會崩潰

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容