當(dāng)你需要同時限制應(yīng)用程序中運行的線程數(shù)時,線程池非常有用。 啟動新線程會產(chǎn)生性能開銷,每個線程也會為其堆棧等分配一些內(nèi)存。
可以將任務(wù)傳遞給線程池,而不是為每個任務(wù)啟動并發(fā)執(zhí)行的新線程。 只要線程池有任何空閑線程,任務(wù)就會分配給其中一個線程并執(zhí)行。 在內(nèi)部,任務(wù)被插入到阻塞隊列中,池中的線程從該阻塞隊列中出隊。 當(dāng)新任務(wù)插入隊列時,其中一個空閑線程將成功將其出列并執(zhí)行它。 線程池中的其余空閑線程將被阻塞,等待出列任務(wù)。
從上述所知,一個基本的線程池需要具有
- 一個存儲線程的容器(容器可以使用隊列,鏈表等數(shù)據(jù)結(jié)構(gòu)),當(dāng)有任務(wù)時,就從容器中拿出一個線程,來執(zhí)行任務(wù)。
- 一個存儲任務(wù)的阻塞隊列。(阻塞隊列可以控制任務(wù)提交的最大數(shù))
- 線程池對外暴露一個execute(Runnable task)方法,用以外界向線程池中提交任務(wù)。
自定義阻塞隊列
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue<T> {
/**
* 使用鏈表實現(xiàn)一個阻塞隊列(數(shù)據(jù)結(jié)構(gòu)定義數(shù)據(jù)存儲和獲取方式,所以只要滿足這兩點,阻塞隊列可以用鏈表,也可以使用數(shù)組等來實現(xiàn))
*/
private List<T> queue = new LinkedList();
/**
* limit用來限制提交任務(wù)的最大數(shù),默認10
*/
private int limit = 10;
public BlockingQueue(int limit) {
this.limit = limit;
}
/**
*
* @param item
*
* enqueue是一個同步方法,當(dāng)任務(wù)到達上限,便會調(diào)用wait方法進行阻塞,否則將任務(wù)放入隊列中,并喚醒dequeue()任務(wù)線程
*/
public synchronized void enqueue(T item){
while (this.queue.size() == this.limit) {
this.wait();
}
if (this.queue.size() <= limit) {
this.notifyAll();
}
this.queue.add(item);
}
/**
*
* @return
*
* dequeue也是一個同步方法,當(dāng)隊列中沒有任務(wù)時便會調(diào)用wait方法進入阻塞,當(dāng)任務(wù)到達最大容量是喚醒其他dequeue()線程
* ,并出列一個任務(wù)。
*/
public synchronized T dequeue() {
while (this.queue.size() == 0) {
this.wait();
}
if (this.queue.size() == this.limit) {
this.notifyAll();
}
return this.queue.remove(0);
}
public synchronized int size(){
return queue.size();
}
}
新建一個線程池線程類,用來執(zhí)行提交的任務(wù)。結(jié)構(gòu)體中傳入任務(wù)隊列,run()方中發(fā)現(xiàn)taskQueue有任務(wù)時,獲取任務(wù)并執(zhí)行,沒有任務(wù)就阻塞。
public class PoolThread extends Thread {
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThread(BlockingQueue taskQueue) {
this.taskQueue = taskQueue;
}
public void run(){
while(!isStopped() && !Thread.currentThread().isInterrupted()){
try{
//從任務(wù)隊列獲取任務(wù)并執(zhí)行
Runnable runnable = (Runnable) taskQueue.dequeue();
runnable.run();
} catch(Exception e){
isStopped = true;
break;
}
}
}
public synchronized void doStop(){
isStopped = true;
this.interrupt();
}
public synchronized boolean isStopped(){
return isStopped;
}
}
新建線程池類
public interface Service {
//關(guān)閉線程池
void shutdown();
//查看線程池是否已經(jīng)被shutdown
boolean isShutdown();
//提交任務(wù)到線程池
void execute(Runnable runnable);
}
import java.util.ArrayDeque;
import java.util.Queue;
public class ThreadPool implements Service {
/**
* 任務(wù)隊列,用來存儲提交的任務(wù)
*/
private BlockingQueue<Runnable> taskQueue = null;
/**
* 線程池中存儲線程的容器。
*/
private Queue<PoolThread> threads = new ArrayDeque<PoolThread>();
private boolean isShutdown = false;
public ThreadPool(int initSize, int maxNoOfTasks){
taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);
//初始化線程池
for (int i = 0; i < initSize; i++) {
threads.add(new PoolThread(taskQueue));
}
//啟動線程池線程
threads.forEach(thread -> thread.start());
}
@Override
public synchronized void execute(Runnable task) {
if (this.isStopped){
throw new IllegalStateException("ThreadPool is stopped");
}
//任務(wù)入列
taskQueue.enqueue(task);
}
@Override
public synchronized void shutdown(){
this.isShutdown= true;
threads.forEach(thread -> thread.doStop());
}
@Override
public boolean isShutdown() {
return isShutdown;
}
}
至此,一個簡單的線程池便完成。新建一個線程池測試類
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
final ThreadPool threadPool = new ThreadPool(5 , 20);
//定義20個任務(wù)并且提交到線程池
for (int i = 0; i < 20; i++) {
threadPool.execute(() ->{
try {
TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getName() + " is running add done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
while (true){
System.out.println("---------------------------------");
TimeUnit.SECONDS.sleep(5);
}
}
}
打印每次輸出5條記錄,共輸出4次
---------------------------------
---------------------------------
Thread-3 is running add done
Thread-1 is running add done
Thread-0 is running add done
Thread-4 is running add done
Thread-2 is running add done
---------------------------------
---------------------------------
Thread-2 is running add done
Thread-4 is running add done
Thread-0 is running add done
Thread-1 is running add done
Thread-3 is running add done
---------------------------------
---------------------------------
Thread-0 is running add done
Thread-1 is running add done
Thread-3 is running add done
Thread-2 is running add done
Thread-4 is running add done
---------------------------------
---------------------------------
Thread-2 is running add done
Thread-1 is running add done
Thread-3 is running add done
Thread-0 is running add done
Thread-4 is running add done
---------------------------------
當(dāng)執(zhí)行完任務(wù)后,使用visualvm工具或jstack命令獲取線程快照,可以看到有5個線程池中的線程
"Thread-4" #16 prio=5 os_prio=0 tid=0x00000000207b0000 nid=0x2b7c in Object.wait() [0x000000002141e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at java.lang.Object.wait(Object.java:502)
at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
- locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at com.customthreadpool.PoolThread.run(PoolThread.java:18)
Locked ownable synchronizers:
- None
"Thread-3" #15 prio=5 os_prio=0 tid=0x00000000207ad000 nid=0x56d0 in Object.wait() [0x000000002131f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at java.lang.Object.wait(Object.java:502)
at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
- locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at com.customthreadpool.PoolThread.run(PoolThread.java:18)
Locked ownable synchronizers:
- None
"Thread-2" #14 prio=5 os_prio=0 tid=0x00000000207ab800 nid=0x4cbc in Object.wait() [0x000000002121f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at java.lang.Object.wait(Object.java:502)
at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
- locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at com.customthreadpool.PoolThread.run(PoolThread.java:18)
Locked ownable synchronizers:
- None
"Thread-1" #13 prio=5 os_prio=0 tid=0x00000000207a9800 nid=0x3670 in Object.wait() [0x000000002111f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at java.lang.Object.wait(Object.java:502)
at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
- locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at com.customthreadpool.PoolThread.run(PoolThread.java:18)
Locked ownable synchronizers:
- None
"Thread-0" #12 prio=5 os_prio=0 tid=0x00000000207a9000 nid=0x4d84 in Object.wait() [0x000000002101f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at java.lang.Object.wait(Object.java:502)
at com.customthreadpool.BlockingQueue.dequeue(BlockingQueue.java:51)
- locked <0x000000076b888558> (a com.customthreadpool.BlockingQueue)
at com.customthreadpool.PoolThread.run(PoolThread.java:18)
從線程快照可以看到,線程池的線程名稱使用系統(tǒng)默認名稱,但在實際編碼中通常都會按我們規(guī)范定義系統(tǒng)名稱,所以我們使用工廠模式對線程的創(chuàng)建進行重構(gòu)。
使用工廠模式有一下好處
- 對象的創(chuàng)建如果比較復(fù)雜,需要經(jīng)過一系列的初始化。使用工廠模式,可以屏蔽這過程。
- 把同一類事物歸于一個框架之下。比如A和B,他們需要自己定義線程池線程創(chuàng)建,但規(guī)定他們都要實現(xiàn)工廠接口,便可以把他們控制在同一框架之下。
- 解耦。(只要是不直接創(chuàng)建目標(biāo)對象,基本上都可以叫解耦或者對修改關(guān)閉對擴展開放)
新建線程工廠接口
@FunctionalInterface
public interface ThreadFactory {
Thread createThread(Runnable runnable);
}
重構(gòu)后的線程池類如下:
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPool implements Service {
/**
* 任務(wù)隊列,用來存儲提交的任務(wù)
*/
private BlockingQueue<Runnable> taskQueue = null;
/**
* 線程池中存儲線程的容器。
*/
private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();
/**
* 默認線程工廠
*/
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
private boolean isShutdown = false;
public ThreadPool(int initSize, int maxNoOfTasks){
taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks);
//初始化線程池
for (int i = 0; i < initSize; i++) {
newThread();
}
}
private void newThread(){
PoolThread poolThread = new PoolThread(taskQueue);
Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
ThreadTask threadTask = new ThreadTask(thread , poolThread);
threads.add(threadTask);
thread.start();
}
/**
* 工廠模式屏蔽對象創(chuàng)建的過程
*/
private static class DefaultThreadFactory implements ThreadFactory{
private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());
private static final AtomicInteger COUNTER = new AtomicInteger(0);
@Override
public Thread createThread(Runnable runnable) {
return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
}
}
/**
* ThreadTask 只是PoolThread和Thread的組合,因為后面關(guān)閉線程還需要用到poolThread的doStop方法
*/
private static class ThreadTask{
Thread thread;
PoolThread poolThread;
public ThreadTask(Thread thread , PoolThread poolThread){
this.thread = thread;
this.poolThread = poolThread;
}
}
@Override
public synchronized void execute(Runnable task) {
if (this.isShutdown){
throw new IllegalStateException("ThreadPool is stopped");
}
//任務(wù)入列
taskQueue.enqueue(task);
}
@Override
public synchronized void shutdown(){
this.isShutdown = true;
threads.forEach(threadTask -> threadTask.poolThread.doStop());
}
@Override
public boolean isShutdown() {
return isShutdown;
}
}
運行測試類,結(jié)果如下圖所示

dump文件如下所示

到目前為如果線程任務(wù)隊列到達上限,便會調(diào)用wait方法進行阻塞,我們可以自定義拒接策略,使處理更靈活。
public interface DenyPolicy<T> {
void reject(T runnable, ThreadPool threadPool);
//該拒接策略會直接將任務(wù)丟棄
class DiscardDenyPolicy implements DenyPolicy<Runnable>{
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
System.out.println(runnable + "do nothing");
}
}
//該拒絕策略會向任務(wù)提交者拋出異常
class AbortDenyPolicy implements DenyPolicy<Runnable>{
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
throw new RunnbaleDenyException("The runnbale " + runnable + " will be abort.");
}
}
//該拒絕策略會使用任務(wù)在提交者所在的線程中執(zhí)行任務(wù)
class RunnerDenyPolicy implements DenyPolicy<Runnable>{
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
if (!threadPool.isShutdown()){
runnable.run();
}
}
}
}
public class RunnbaleDenyException extends RuntimeException {
public RunnbaleDenyException(String message) {
super(message);
}
}
- reject 為拒接方法
- DiscardDenyPolicy 策略會直接丟棄掉Runnable任務(wù)。
- AbortDenyPolicy 策略會拋出RunnbaleDenyException異常。
- RunnerDenyPolicy 策略,交給調(diào)用者的線程直接運行runnable,而不會被加入到線程池中。
重構(gòu)阻塞隊列,當(dāng)隊列中的值超出最大容量時使用拒接策略。
重構(gòu)后的阻塞隊列
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue<T> {
/**
* 使用鏈表實現(xiàn)一個阻塞隊列(數(shù)據(jù)結(jié)構(gòu)定義數(shù)據(jù)存儲和獲取方式,所以只要滿足這兩點,阻塞隊列可以用鏈表,也可以使用數(shù)組等來實現(xiàn))
*/
private List<T> queue = new LinkedList();
/**
* limit用來限制提交任務(wù)的最大數(shù),默認10
*/
private int limit = 10;
/**
* 拒接策略
*/
private DenyPolicy denyPolicy;
private ThreadPool threadPool;
public BlockingQueue(int limit , DenyPolicy denyPolicy , ThreadPool threadPool) {
this.limit = limit;
this.denyPolicy = denyPolicy;
this.threadPool = threadPool;
}
/**
*
* @param item
* enqueue是一個同步方法,當(dāng)任務(wù)到達上限,便會調(diào)用wait方法進行阻塞,否則將任務(wù)放入隊列中,并喚醒dequeue()任務(wù)線程
*/
public synchronized void enqueue(T item) {
//若果隊列到達最大容量,調(diào)用拒接策略
if (this.queue.size() >= this.limit) {
denyPolicy.reject(item , threadPool);
}
if (this.queue.size() <= limit) {
this.notifyAll();
}
this.queue.add(item);
}
/**
*
* @return
*
* dequeue也是一個同步方法,當(dāng)隊列中沒有任務(wù)時便會調(diào)用wait方法進入阻塞,當(dāng)任務(wù)到達最大容量是喚醒其他dequeue()線程
* ,并出列一個任務(wù)。
*/
public synchronized T dequeue(){
while (this.queue.size() == 0) {
this.wait();
}
if (this.queue.size() == this.limit) {
this.notifyAll();
}
return this.queue.remove(0);
}
public synchronized int size(){
return queue.size();
}
}
線程池類修改如下兩點ThreadPool.class
...
public class ThreadPool implements Service{
/**
* 默認使用丟棄策略
*/
private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
public ThreadPool(int noOfThreads , int maxNoOfTasks){
taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
//初始化線程池
for (int i = 0; i < noOfThreads; i++) {
newThread();
}
}
}
...
運行測試類測試類,可以看到當(dāng)任務(wù)到達最大容量時,就會有任務(wù)被拋棄

目前初始化線程池時,只指定了初始線程數(shù)init,并不能很好的管理線程池線程數(shù)量。繼續(xù)對線程池進行擴展。
- 新增兩個控制線程池線程數(shù)量的參數(shù)。線程池自動擴充時最大的線程池數(shù)量max,線程池空閑時需要釋放線程但是也要維護一定數(shù)量的活躍線程數(shù)量或者核心數(shù)量core。有了這init , max , core三個參數(shù)就能很好的控制線程池中線程數(shù)量,三者之間的關(guān)系init <= core <= max。
- 新增參數(shù)Keepedalive時間,該時間主要決定線程各個重要參數(shù)自動維護的時間間隔。
重構(gòu)后的線程池類
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPool implements Service{
/**
* 初始化線程數(shù)量
*/
private final int initSize;
/**
* 線程池最大線程數(shù)量
*/
private final int maxSzie;
/**
* 線程池核心線程數(shù)量
*/
private final int coreSize;
/**
* 當(dāng)前活躍的線程數(shù)量
*/
private int activeCount;
private final long keepAliveTime;
private final TimeUnit timeUnit;
private InternalTask internalTask;
/**
* 創(chuàng)建線程所需的工廠
*/
private final ThreadFactory threadFactory;
/**
* 任務(wù)隊列,用來存儲提交的任務(wù)
*/
private BlockingQueue<Runnable> taskQueue = null;
/**
* 線程池中存儲線程的容器。
*/
private Queue<ThreadTask> threads = new ArrayDeque<ThreadTask>();
/**
* 默認線程工廠
*/
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
private boolean isShutdown = false;
/**
* 默認使用丟棄策略
*/
private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
}
public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
, DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){
this.initSize = initSize;
this.maxSzie = maxSize;
this.coreSize = coreSize;
this.threadFactory = threadFactory;
this.taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
init();
}
private void init(){
//初始化線程池
for (int i = 0; i < initSize; i++) {
newThread();
}
//啟動內(nèi)部維護線程
internalTask = new InternalTask();
internalTask.start();
}
private void newThread(){
PoolThread poolThread = new PoolThread(taskQueue);
Thread thread = DEFAULT_THREAD_FACTORY.createThread(poolThread);
ThreadTask threadTask = new ThreadTask(thread , poolThread);
activeCount++;
threads.add(threadTask);
thread.start();
}
private void removeThread(){
//從線程池中移除某個線程
ThreadTask threadTask = threads.remove();
threadTask.poolThread.stop();
this.activeCount--;
}
/**
* 工廠模式屏蔽對象創(chuàng)建的過程
*/
private static class DefaultThreadFactory implements ThreadFactory{
private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
private static final ThreadGroup group = new ThreadGroup("customThreadPool-" + GROUP_COUNTER.getAndDecrement());
private static final AtomicInteger COUNTER = new AtomicInteger(0);
@Override
public Thread createThread(Runnable runnable) {
return new Thread(group , runnable , "thread-pool-" + COUNTER.getAndDecrement());
}
}
/**
* ThreadTask 只是PoolThread和Thread的組合,因為后面關(guān)閉線程還需要用到poolThread的doStop方法
*/
private static class ThreadTask{
Thread thread;
PoolThread poolThread;
public ThreadTask(Thread thread , PoolThread poolThread){
this.thread = thread;
this.poolThread = poolThread;
}
}
@Override
public synchronized void execute(Runnable task) {
if (this.isShutdown){
throw new IllegalStateException("ThreadPool is stopped");
}
//任務(wù)入列
taskQueue.enqueue(task);
}
@Override
public int getInitSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return this.initSize;
}
@Override
public int getMaxSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return this.maxSzie;
}
@Override
public int getCoreSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return this.coreSize;
}
@Override
public int getQueueSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return taskQueue.size();
}
@Override
public int getActiveCount() {
synchronized (this){
return this.activeCount;
}
}
@Override
public synchronized void shutdown(){
this.isShutdown = true;
threads.forEach(threadTask -> threadTask.poolThread.doStop());
internalTask.interrupt();
}
@Override
public boolean isShutdown() {
return isShutdown;
}
class InternalTask extends Thread{
@Override
public void run() {
//run方法繼承自Thread,主要用于維護線程數(shù)量,比如擴容,回收等工作
while (!isShutdown&&!isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
isShutdown = true;
break;
}
synchronized (ThreadPool.this){
if (isShutdown){
break;
}
//當(dāng)前隊列中任務(wù)尚未處理,并且activeCount< coreSize則繼續(xù)擴容
if (taskQueue.size() > 0 && activeCount <coreSize){
for (int i = initSize; i < coreSize ; i++){
newThread();
}
//continue的目的在于不想讓線程的擴容直接打到maxsize
continue;
}
//當(dāng)前的隊列中有任務(wù)尚未處理,并且activeCount < maxSize則繼續(xù)擴容
if (taskQueue.size() > 0 && activeCount < maxSzie){
for (int i = coreSize; i < maxSzie ; i++){
newThread();
}
}
//如果任務(wù)隊列中沒有任務(wù),則需要回收,回收至coreSize即可
if (taskQueue.size() == 0 && activeCount > coreSize ){
for (int i = coreSize ; i < activeCount ; i++){
removeThread();
}
}
}
}
}
}
}
線程池類中主要新增了如下參數(shù)
/**
* 初始化線程數(shù)量
*/
private final int initSize;
/**
* 線程池最大線程數(shù)量
*/
private final int maxSzie;
/**
* 線程池核心線程數(shù)量
*/
private final int coreSize;
/**
* 當(dāng)前活躍的線程數(shù)量
*/
private int activeCount;
private final long keepAliveTime;
private final TimeUnit timeUnit;
/**
* 創(chuàng)建線程所需的工廠
*/
private final ThreadFactory threadFactory;
private InternalTask internalTask;
重寫了兩個構(gòu)造函數(shù)
public ThreadPool(int initSize , int maxSize , int coreSize , int maxNoOfTasks){
this(initSize , maxSize , coreSize , DEFAULT_THREAD_FACTORY , maxNoOfTasks , DEFAULT_DENY_POLICY , 10 , TimeUnit.SECONDS);
}
public ThreadPool(int initSize , int maxSize , int coreSize , ThreadFactory threadFactory , int maxNoOfTasks
, DenyPolicy<Runnable> denyPolicy , long keepAliveTime , TimeUnit timeUnit){
this.initSize = initSize;
this.maxSzie = maxSize;
this.coreSize = coreSize;
this.threadFactory = threadFactory;
this.taskQueue = new BlockingQueue<Runnable>(maxNoOfTasks , DEFAULT_DENY_POLICY , this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
init();
}
新增一個線程類,用于維護內(nèi)部狀態(tài)
class InternalTask extends Thread{
@Override
public void run() {
//run方法繼承自Thread,主要用于維護線程數(shù)量,比如擴容,回收等工作
while (!isShutdown&&!isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
isShutdown = true;
break;
}
synchronized (ThreadPool.this){
if (isShutdown){
break;
}
//當(dāng)前隊列中任務(wù)尚未處理,并且activeCount< coreSize則繼續(xù)擴容
if (taskQueue.size() > 0 && activeCount <coreSize){
for (int i = initSize; i < coreSize ; i++){
newThread();
}
//continue的目的在于不想讓線程的擴容直接打到maxsize
continue;
}
//當(dāng)前的隊列中有任務(wù)尚未處理,并且activeCount < maxSize則繼續(xù)擴容
if (taskQueue.size() > 0 && activeCount < maxSzie){
for (int i = coreSize; i < maxSzie ; i++){
newThread();
}
}
//如果任務(wù)隊列中沒有任務(wù),則需要回收,回收至coreSize即可
if (taskQueue.size() == 0 && activeCount > coreSize ){
for (int i = coreSize ; i < activeCount ; i++){
removeThread();
}
}
}
}
}
}
以及一系列輔助方法
public interface Service {
.....
//獲取線程池的初始化大小
int getInitSize();
//獲取線程池最大的線程數(shù)
int getMaxSize();
//獲取線程池核心線程梳理
int getCoreSize();
//獲取線程池中活躍線程的數(shù)量大小
int getQueueSize();
//獲取線程池中用于緩存任務(wù)隊列的大小
int getActiveCount();
.....
}
@Override
public int getInitSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return this.initSize;
}
@Override
public int getMaxSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return this.maxSzie;
}
@Override
public int getCoreSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return this.coreSize;
}
@Override
public int getQueueSize() {
if (isShutdown){
throw new IllegalStateException("The thread pool is destory");
}
return taskQueue.size();
}
@Override
public int getActiveCount() {
synchronized (this){
return this.activeCount;
}
}
執(zhí)行測試類
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
final ThreadPool threadPool = new ThreadPool(2 , 6 , 4 , 1000);
//定義20個任務(wù)并且提交到線程池
for (int i = 0; i < 20; i++) {
threadPool.execute(() ->{
try {
TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getName() + " is running add done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
while (true){
System.out.println("getActiveCount: " + threadPool.getActiveCount());
System.out.println("getQueueSize: " + threadPool.getQueueSize());
System.out.println("getCoreSize: " + threadPool.getCoreSize());
System.out.println("getMaxSize: "+ threadPool.getMaxSize());
System.out.println("======================================");
TimeUnit.SECONDS.sleep(5);
}
}
}
會有如下輸出,activeCount數(shù)量會增長到與maxSize一直,最后會保持與coreSize相等
getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 2
getQueueSize: 18
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--4 is running add done
thread-pool--3 is running add done
thread-pool--5 is running add done
thread-pool--1 is running add done
thread-pool-0 is running add done
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 2
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
thread-pool--4 is running add done
thread-pool--5 is running add done
thread-pool-0 is running add done
thread-pool--1 is running add done
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 6
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
thread-pool--2 is running add done
thread-pool--3 is running add done
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 5
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
getActiveCount: 4
getQueueSize: 0
getCoreSize: 4
getMaxSize: 6
======================================
到這里,一個功能比較完善的線程池就已經(jīng)完成了
代碼地址: github
參考
- 《Java高并發(fā)編程詳解》
- http://tutorials.jenkov.com/java-concurrency/thread-pools.html