Java線程池:ThreadPoolExecutor詳解

用過java線程的同學(xué)都應(yīng)該大致了解,在java中,為了合理使用線程以合理利用資源、提高吞吐率以及加快響應(yīng)時(shí)間,通常會(huì)使用java線程池,因?yàn)榫€程池架構(gòu)設(shè)計(jì)合理,比起自己創(chuàng)建線程可能花銷巨大來講,線程池是一個(gè)很好的選擇。

作為一只喜歡研究源碼的程序猿,就我所學(xué),來講講java線程池是如何鞏工作的。

一.4種線程池

首先,java線程池為我們量身定制了4中拿來即用的線程池:

1.newSingleThreadExecutor:

public static ExecutorServicenewSingleThreadExecutor() {

????????????return new FinalizableDelegatedExecutorService

? ? ? ?????????????(new ThreadPoolExecutor(1, 1,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue()));

}


2.newFixedThreadPool:

public static ExecutorServicenewFixedThreadPool(int nThreads) {

????????????return new ThreadPoolExecutor(nThreads, nThreads,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue());

}

3.newCachedThreadPool:

public static ExecutorServicenewCachedThreadPool() {

????????????return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue());

}

4. newScheduledThreadPool:

public static ScheduledExecutorServicenewSingleThreadScheduledExecutor() {

????????????return new DelegatedScheduledExecutorService

????????????????????????(new ScheduledThreadPoolExecutor(1));

}

可以發(fā)現(xiàn):除了newScheduledThreadPool,其他三個(gè)都是ThreadPoolExecutor的一種特殊實(shí)現(xiàn)。

二.ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,

? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,

? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue workQueue,

? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,

? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {

if (corePoolSize <0 ||

maximumPoolSize <=0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime <0)

throw new IllegalArgumentException();

? ? if (workQueue ==null || threadFactory ==null || handler ==null)

throw new NullPointerException();

? ? this.acc = System.getSecurityManager() ==null ?

null :

AccessController.getContext();

? ? this.corePoolSize = corePoolSize;

? ? this.maximumPoolSize = maximumPoolSize;

? ? this.workQueue = workQueue;

? ? this.keepAliveTime = unit.toNanos(keepAliveTime);

? ? this.threadFactory = threadFactory;

? ? this.handler = handler;

}

1.構(gòu)造函數(shù)參數(shù)說明:

1).corePoolSize:線程池中的核心線程數(shù)

2).maximumPoolSize:線程池中的最大線程數(shù)

3).keepAliveTime: 線程池中的線程的存活時(shí)間

4).unit: keepAliveTime的時(shí)間單位

5).workQueue: BlockingQueue的實(shí)現(xiàn),用于存儲(chǔ)任務(wù)

6).threadFactory:自定義線程的創(chuàng)建工廠

7).handler:線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒有空閑的線程,繼續(xù)提交任務(wù)時(shí),必須進(jìn)行處理,默認(rèn)的方式是拋出RejectedExecutionHandler異常

2.內(nèi)部狀態(tài)變量說明:

jdk1.8就是用一個(gè)int型來表示線程池的運(yùn)行狀態(tài)和運(yùn)行任務(wù)數(shù)量,一個(gè)int一共32位,前3位表示運(yùn)行狀態(tài),后29位表示運(yùn)行任務(wù)線程數(shù);運(yùn)行狀態(tài)和運(yùn)行任務(wù)線程數(shù)量分別通過runStateOf(int c)和workerCountOf(int c)來獲取,都是通過&操作來計(jì)算的。各個(gè)值的初始值分為如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl

private static int runStateOf(int c) { return c & ~CAPACITY; }

private static int workerCountOf(int c) { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

?1).ctl:包裝任務(wù)狀態(tài)和線程數(shù)(高3位表示運(yùn)行狀態(tài),后29位表示運(yùn)行任務(wù)線程數(shù)),初始值:11100000000000000000000000000000,沒多一線程就加1,為原子操作;默認(rèn)AtomicInteger,如果數(shù)量不夠的話,可以自行改成AtomicLong

? 2).COUNT_BITS:29

? 3).CAPACITY:運(yùn)行任務(wù)線程數(shù),值為:000 11111111111111111111111111111

??4).RUNNING:運(yùn)行狀態(tài),? ? ? ?值為:111 00000000000000000000000000000

??5).SHUTDOWN:關(guān)閉狀態(tài),? 值為:0

? 6).STOP: 停止?fàn)顟B(tài),? ? ? ? ? ? ?值為:001 00000000000000000000000000000

? 7).TIDYING:整理狀態(tài),? ? ? ? ?值為:010 00000000000000000000000000000

? 8).TERMINATED:終止?fàn)顟B(tài),值為:011 00000000000000000000000000000

3.內(nèi)部公用函數(shù)說明:

1).runStateOf(int c):獲取運(yùn)行狀態(tài),

2).workerCountOf(int c):獲取運(yùn)行任務(wù)線程數(shù)

3).ctl(int rs,int wc): 包裝運(yùn)行狀態(tài)和任務(wù)線程數(shù)

4).runStateLessThan(int c,int s):是否小于某個(gè)狀態(tài)

5).runStateAtLesat(int c,int s):是否大于或者等于某個(gè)狀態(tài)

這些公共函數(shù)接下來都會(huì)用到,寫在這里是便于理解。

4.提交任務(wù)函數(shù):execute(Runnable command)

public void execute(Runnable command){

????if (command == null)

????????throw new NullPointerException();

????int c = ctl.get();

????if (workerCountOf(c) < corePoolSize) {

????????if (addWorker(command, true))

????????????return;

????????c = ctl.get();

????}

????if (isRunning(c) && workQueue.offer(command)) {

????????int recheck = ctl.get();

????????if (! isRunning(recheck) && remove(command))

????????????reject(command);

????????else if (workerCountOf(recheck) == 0)

????????????addWorker(null, false);

?????} else if (!addWorker(command, false))

????????????reject(command);

}

1).如果當(dāng)前運(yùn)行任務(wù)線程數(shù)小于corePoolSize,嘗試另起線程去運(yùn)行任務(wù)。調(diào)用addWorker函數(shù)會(huì)自動(dòng)檢查運(yùn)行狀態(tài)和運(yùn)行任務(wù)線程數(shù),防止添加線程時(shí)出現(xiàn)多線程操作錯(cuò)誤。

2).如果添加新的線程失敗,那么就將該任務(wù)添加到隊(duì)列中,同時(shí),還需要檢查運(yùn)行狀態(tài)和運(yùn)行任務(wù)線程數(shù);再次檢查狀態(tài),防止入列期間出現(xiàn)狀態(tài)改變情況,如果線程池處于非運(yùn)行狀態(tài),移除任務(wù);如果沒有運(yùn)行任務(wù)線程數(shù)量為0,則起一個(gè)新的線程。

3).如果任務(wù)不能進(jìn)入隊(duì)列(例如隊(duì)列滿了),再次嘗試另起一個(gè)線程運(yùn)行;如果失敗了,使用拒絕策略。

5.addWorker

在execute函數(shù)中,addWorker是另起線程去執(zhí)行任務(wù),它的具體實(shí)現(xiàn)如下:

private boolean addWorker(Runnable firstTask, boolean core) {

????retry:

????for (;;) {

????????int c = ctl.get();

????????int rs = runStateOf(c);

????????// Check if queue empty only if necessary.

????????if (rs >= SHUTDOWN &&

????????????! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))

????????????????return false;

????????for (;;) {

????????????int wc = workerCountOf(c);

????????????if (wc >= CAPACITY ||?

? ? ? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))

????????????????return false;

????????????if (compareAndIncrementWorkerCount(c))

????????????????break retry;

????????????????c = ctl.get();// Re-read ctl

????????????if (runStateOf(c) != rs)

????????????????continue retry; // else CAS failed due to workerCount change; retry inner loop? ? ? ? ? ?}

????}

????boolean workerStarted = false;

? ? boolean workerAdded = false;

????Worker w = null;

????try {

????????w = new Worker(firstTask);

????????final Thread t = w.thread;

????????if (t != null) {

????????????final ReentrantLock mainLock = this.mainLock;

????????????mainLock.lock();

????????????try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired.

????????????????????int rs = runStateOf(ctl.get());

????????????????????if (rs < SHUTDOWN

????????????????????????????|| (rs == SHUTDOWN && firstTask == null)) {

????????????????????????????if (t.isAlive()) // precheck that t is startable

????????????????????????????????throw new IllegalThreadStateException();

????????????????????????????workers.add(w);

????????????????????????????int s = workers.size();

????????????????????????????if (s > largestPoolSize)

????????????????????????????largestPoolSize = s;

????????????????????????????workerAdded = true;

????????????????????????}

????????????????} finally {

????????????????????mainLock.unlock();

????????????????}

????????????????if (workerAdded) {

????????????????????t.start();

????????????????????workerStarted = true;

????????????????}

????????????}

????????} finally {

????????????if (! workerStarted)

????????????addWorkerFailed(w);

????????????}

????????????return workerStarted;

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容