在線程池詳解一:線程池概念以及架構(gòu)中提到了四個主要的快捷創(chuàng)建線程池方法,但是盡管 Executors 的工廠方法使用方便,在生產(chǎn)場景被很多企業(yè)的開發(fā)規(guī)范所禁用。要求通過標(biāo)準(zhǔn)構(gòu)造器 ThreadPoolExecutor 去構(gòu)造工作線程池。
1. 核心數(shù)據(jù)結(jié)構(gòu)
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任務(wù)的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
// 對線程池內(nèi)部各種變量進行互斥訪問控制
private final ReentrantLock mainLock = new ReentrantLock();
// 線程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//...
}
每一個線程是一個Worker對象。Worker是ThreadPoolExector的內(nèi)部類,核心數(shù)據(jù)結(jié)構(gòu)如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
// Worker封裝的線程
final Thread thread;
// Worker接收到的第1個任務(wù)
Runnable firstTask;
// Worker執(zhí)行完畢的任務(wù)個數(shù)
volatile long completedTasks;
// ...
}
由定義會發(fā)現(xiàn),Worker繼承于AQS,也就是說Worker本身就是一把鎖。
這把鎖有什么用處呢?用于線程池的關(guān)閉、線程執(zhí)行任務(wù)的過程中。
2.核心配置參數(shù)解釋

上圖的各個參數(shù),解釋如下:
參數(shù)一:corePoolSize:線程池中的常駐核心線程數(shù),在創(chuàng)建了線程池后,當(dāng)有請求任務(wù)來之后,就會安排池中的線程去執(zhí)行請求任務(wù),近視理解為今日當(dāng)值線程,當(dāng)線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務(wù)放入到緩存隊列當(dāng)中.
參數(shù)二:maximumPoolSize:線程池能夠容納同時執(zhí)行的最大線程數(shù),此值大于等于1
參數(shù)三:keepAliveTime:多余的空閑線程存活時間,當(dāng)空間時間達到keepAliveTime值時,多余的線程會被銷毀直到只剩下corePoolSize個線程為止默認(rèn)情況下:只有當(dāng)線程池中的線程數(shù)大于corePoolSize時keepAliveTime才會起作用,知道線程中的線程數(shù)不大于corepoolSIze,
參數(shù)四:unit:keepAliveTime的單位
參數(shù)五:workQueue:任務(wù)隊列,被提交但尚未被執(zhí)行的任務(wù).
參數(shù)六:threadFactory:表示生成線程池中工作線程的線程工廠,用戶創(chuàng)建新線程,一般用默認(rèn)即可
參數(shù)七:handler:拒絕策略,表示當(dāng)線程隊列滿了并且工作線程大于等于線程池的最大顯示 數(shù)(maxnumPoolSize)時如何來拒絕.
3.線程池的拒絕策略
handler:拒絕處理任務(wù)的策略
AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。(默認(rèn)這種)
DiscardPolicy:也是丟棄任務(wù),但是不拋出異常
DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
測試拒絕策略代碼如下:
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy()
// new ThreadPoolExecutor.CallerRunsPolicy()
// new ThreadPoolExecutor.DiscardOldestPolicy()
//new ThreadPoolExecutor.DiscardPolicy()
);
for (int i = 0; i < 20; i++) {
int finalI = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 開始");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 結(jié)束");
});
try {Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
}
executor.shutdown();
boolean flag = true;
try {
do {
flag = !executor.awaitTermination(1, TimeUnit.SECONDS);
} while (flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程池關(guān)閉成功。。。");
System.out.println(Thread.currentThread().getId());
}
}
4. 任務(wù)阻塞隊列
Java 中的阻塞隊列(BlockingQueue)與普通隊列相比,有一個重要的特點:在阻塞隊列為空時,會阻塞當(dāng)前線程的元素獲取操作。具體來說,在一個線程從一個空的阻塞隊列中取元素時,線程會被阻塞,直到阻塞隊列中有了元素;當(dāng)隊列中有元素后,被阻塞的線程會自動被喚醒(喚醒過程不需要用戶程序干預(yù))。
Java 線程池使用 BlockingQueue 實例暫時接收到的異步任務(wù),BlockingQueue 是 JUC 包的一個超級接口,比較常用的實現(xiàn)類有:
(1)ArrayBlockingQueue:是一個數(shù)組實現(xiàn)的有界阻塞隊列 (有界隊列),隊列中元素按 FIFO排序;ArrayBlockingQueue 在創(chuàng)建時必須設(shè)置大小,接收的任務(wù)超出 corePoolSize 數(shù)量時,則任務(wù)被緩存到該阻塞隊列中,任務(wù)緩存的數(shù)量只能為創(chuàng)建時設(shè)置的大?。蝗粼撟枞犃袧M,則會為新的任務(wù)則創(chuàng)建線程,直到線程池中的線程總數(shù)> maximumPoolSize。
(2)LinkedBlockingQueue:是一個基于鏈表實現(xiàn)的阻塞隊列,按 FIFO 排序任務(wù),可以設(shè)置容量(有界隊列),不設(shè)置容量則默認(rèn)使用 Integer.Max_VALUE 作為容量 (無界隊列)。該隊列的吞吐量高于 ArrayBlockingQueue。如果不設(shè)置 LinkedBlockingQueue 的容量(無界隊列),當(dāng)接收的任務(wù)數(shù)量超出 corePoolSize數(shù)量時,則新任務(wù)可以被無限制地緩存到該阻塞隊列中,直到資源耗盡。有兩個快捷創(chuàng)建線程池的工廠方法 Executors.newSingleThreadExecutor、Executors.newFixedThreadPool,使用了這個隊列,并且都沒有設(shè)置容量(無界隊列)。
(3)PriorityBlockingQueue:是具有優(yōu)先級的無界隊列。
(4)DelayQueue:這是一個無界阻塞延遲隊列,底層基于 PriorityBlockingQueue 實現(xiàn)的,隊列中每個元素都有過期時間,當(dāng)從隊列獲取元素(元素出隊)時,只有已經(jīng)過期的元素才會出隊,而隊列頭部的元素是過期最快的元素??旖莨S方法 Executors.newScheduledThreadPool 所創(chuàng)建的線程池使用此隊列。
(5)SynchronousQueue:(同步隊列) 是一個不存儲元素的阻塞隊列,每個插入操作必須等到另 一 個 線 程 的 調(diào) 用 移 除 操 作 , 否 則 插 入 操 作 一 直 處 于 阻 塞 狀 態(tài) , 其 吞 吐 量 通 常 高 于LinkedBlockingQueue。 快捷工廠方法 Executors.newCachedThreadPool 所創(chuàng)建的線程池使用此隊列。與前面的隊列相比,這個隊列比較特殊,它不會保存提交的任務(wù),而是將直接新建一個線程來執(zhí)行新來的任務(wù)。
5. 線程池的運行原理

1.在創(chuàng)建了線程池之后,等待提交過來的任務(wù)請求
2.當(dāng)調(diào)用execute()方法添加一個請求任務(wù)的時候,線程池會做出如下判斷:
2.1 如果正在運行的線程數(shù)量小于corePoolSize,那么馬上創(chuàng)建線程運行這個程序
2.2 如果正在運行的線程數(shù)量大于或者等于corePoolSize,那么將這個任務(wù)放入隊列
2.3 如果這個時候隊列滿了并且正在運行的線程數(shù)量還小于maximumPoolSize,那么還是要創(chuàng)建非核心線程立刻執(zhí)行這個任務(wù)
2.4 如果隊列滿了并且正在運行的線程數(shù)量大于或等于maximumPoolSize,那么線程池會啟動飽和拒絕策略來執(zhí)行
3. 當(dāng)一個線程完成任務(wù)的時候,它會從隊列中取下一個任務(wù)來執(zhí)行
4. 當(dāng)一個線程無事可做超過一定時間(keepAliveTime)時:線程會判斷如果當(dāng)前運行的線程數(shù)大于corePoolSize,那么這個線程就會被停掉。