一、線程池
背景
- 線程屬于一個(gè)系統(tǒng)資源,在高并發(fā)場景下,每一個(gè)任務(wù)都創(chuàng)建一個(gè)線程的話,對于內(nèi)存的占用是相當(dāng)大的。
- 線程不一定越多越好,一般情況下CPU核心并不多,如果線程過多,CPU只能讓大部分線程陷入阻塞,引起線程上下文頻繁的切換。上下文切換越頻繁,對CPU性能影響就越大
解決方案:線程池
利用享元模式的思想,充分利用已有線程的資源,來處理不同的任務(wù)。從而減少線程的數(shù)量,避免頻繁的上下文切換
二、自定義線程池實(shí)現(xiàn)
該實(shí)現(xiàn)是跟著黑馬程序員的《全面深入學(xué)習(xí)java并發(fā)編程,java基礎(chǔ)進(jìn)階中級必會(huì)教程》實(shí)現(xiàn)的。自己簡單做了一個(gè)總結(jié),看官們?nèi)绻惺裁唇ㄗh,也可以盡情提出。

- 按照上面的圖來實(shí)現(xiàn)自定義線程池
- 根據(jù)生產(chǎn)者消費(fèi)者模式的思想創(chuàng)建一個(gè)
BlockingQueue阻塞隊(duì)列來平衡生產(chǎn)者創(chuàng)建任務(wù)和消費(fèi)者處理任務(wù)速度之間的差異,生產(chǎn)者不斷的創(chuàng)建新的任務(wù),消費(fèi)者(線程池)不斷來消費(fèi)。 - 如果生產(chǎn)者沒有創(chuàng)建新的任務(wù),那么任務(wù)隊(duì)列就是空的,消費(fèi)者就需要等待生產(chǎn)者創(chuàng)建任務(wù)。
- 如果消費(fèi)者來不及消費(fèi)任務(wù),那么任務(wù)隊(duì)列可能就滿了,生產(chǎn)者就需要等待任務(wù)的消費(fèi)。
實(shí)現(xiàn)
1. 任務(wù)隊(duì)列BlockingQueue
Tips:
- 任務(wù)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)使用雙向鏈表
ArrayDeque實(shí)現(xiàn),ArrayDeque和LinkedList都是雙向鏈表的實(shí)現(xiàn),大多情況下ArrayDeque性能優(yōu)于LinkedList。 - 為了通用給
BlockingQueue加了一個(gè)范型 - 運(yùn)行過程中可能存在多個(gè)線程來隊(duì)列中消費(fèi)任務(wù)或者生產(chǎn)任務(wù),所以必須用鎖保護(hù)任務(wù)隊(duì)列隊(duì)頭和隊(duì)尾,鎖可以使用較為靈活的
ReentrantLock - 任務(wù)隊(duì)列是有容量限制的,不可能無限制的向任務(wù)隊(duì)列中推送任務(wù),那么任務(wù)隊(duì)列滿了的時(shí)候,生產(chǎn)者就必須等待。同時(shí)消費(fèi)者把任務(wù)隊(duì)列消費(fèi)完了以后,消費(fèi)者就必須等待生產(chǎn)者推送任務(wù),可以使用
ReentrantLock的條件變量。
/**
* 阻塞隊(duì)列
* @param <T>
*/
@Slf4j(topic = "c.BlockingQueue") // logback日志
class BlockingQueue<T> {
// 任務(wù)隊(duì)列(雙向鏈表)
// ArrayDeque性能優(yōu)于LinkedList
private Deque<T> queue = new ArrayDeque<>();
// 隊(duì)列容量
private int capacity;
// 鎖
private ReentrantLock lock = new ReentrantLock();
// 鎖的條件變量(生產(chǎn)者)
// 任務(wù)隊(duì)列滿了的時(shí)候 生產(chǎn)者要等待
private Condition prodWaitSet = lock.newCondition();
// 鎖的條件變量(消費(fèi)者)
// 任務(wù)隊(duì)列空時(shí),消費(fèi)者要等待
private Condition consWaitSet = lock.newCondition();
public BlockingQueue(int capacity) { this.capacity = capacity; }
/**
* 生產(chǎn)者向任務(wù)隊(duì)列推送任務(wù)
* 隊(duì)列滿時(shí)等待推送任務(wù)無超時(shí)時(shí)間限制
* @param t 任務(wù)
*/
public void put(T t) {
lock.lock();
// 加鎖
try {
// 判斷隊(duì)列是否滿了
// log.debug("queue.size(): {},capacity: {}", queue.size(), capacity);
while (queue.size() == capacity) {
// 隊(duì)列滿了需要讓線程進(jìn)入等待
// 任務(wù)數(shù)超過了任務(wù)隊(duì)列容量,生產(chǎn)者生產(chǎn)的任務(wù)就會(huì)一直處于阻塞狀態(tài)
log.debug("任務(wù)隊(duì)列滿了,生產(chǎn)者還不能向任務(wù)隊(duì)列里推送任務(wù)!");
prodWaitSet.await();
}
// 隊(duì)列沒有滿,推送任務(wù)
log.debug("任務(wù)隊(duì)列沒有滿,生產(chǎn)者向任務(wù)隊(duì)列推送任務(wù): {}", t);
queue.addLast(t);
// 喚醒消費(fèi)者等待中的線程
consWaitSet.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 消費(fèi)者消費(fèi)任務(wù)
* 隊(duì)列為空時(shí),無限制等待
*/
public T take() {
lock.lock();
try {
// 判斷生產(chǎn)者隊(duì)列是否為空
while (queue.isEmpty()) {
// 消費(fèi)者線程進(jìn)入等待
try {
consWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 喚醒生產(chǎn)者等待的線程
prodWaitSet.signalAll();
// 隊(duì)列不為空,移除隊(duì)列第一個(gè)元素
return queue.removeFirst();
} finally {
lock.unlock();
}
}
/**
* 獲取任務(wù)隊(duì)列容量
*/
public int size() {
lock.lock();
try {
return this.capacity;
} finally {
lock.unlock();
}
}
}
改進(jìn)BlockingQueue的take方法
消費(fèi)者消費(fèi)任務(wù)時(shí),如果任務(wù)隊(duì)列為空,那么take方法將無限制的等待下去,可以設(shè)計(jì)poll方法,該方法設(shè)置超時(shí)時(shí)間,讓消費(fèi)者不會(huì)在任務(wù)隊(duì)列為空的情況下無限等待。
/**
* 消費(fèi)者消費(fèi)任務(wù)(有超時(shí)時(shí)間的等待)
*
* @return
*/
public T poll(long timeout, TimeUnit timeUnit) {
lock.lock();
try {
// 判斷生產(chǎn)者隊(duì)列是否為空
long nanos = timeUnit.toNanos(timeout); // 將超時(shí)時(shí)間單位轉(zhuǎn)換為納秒
while (queue.isEmpty()) {
try {
// awaitNanos方法在等待過程中被喚醒,返回值是等待的剩余時(shí)間
if (nanos <= 0) {
// 避免虛假喚醒的超時(shí)時(shí)間重置
// 如果等待的剩余時(shí)間小于等于0,說明等待超時(shí)了,直接返回null
log.debug("消費(fèi)超時(shí)了!");
return null;
}
// 消費(fèi)者線程進(jìn)入等待
nanos = consWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 喚醒生產(chǎn)者等待的線程
prodWaitSet.signalAll();
// 隊(duì)列不為空,移除隊(duì)頭元素
log.debug("消費(fèi)完成!從任務(wù)隊(duì)列中移除該任務(wù)");
return queue.removeFirst();
} finally {
lock.unlock();
}
}
2. 線程池ThreadPool
tips
- 線程池集合
HashSet范型不使用Thread而是Worker類,方便任務(wù)的處理 - 線程池處理任務(wù)時(shí),要判斷線程集合的容量有沒有超過核心線程數(shù)
- 線程集合容量超過核心線程數(shù),需要將任務(wù)放入任務(wù)隊(duì)列中等待消費(fèi)者消費(fèi)
- 線程集合容量未超過核心線程數(shù),將當(dāng)前任務(wù)交給新建的Worker執(zhí)行即可
/**
* 線程池
*/
@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
/**
* 任務(wù)隊(duì)列
*/
private BlockingQueue<Runnable> queue;
/**
* 線程集合
*/
private HashSet<Worker> workers = new HashSet<>();
/**
* 核心線程數(shù)
* 線程池中的線程數(shù)量
*/
private int coreSize;
/**
* 消費(fèi)等待的超時(shí)時(shí)間
*/
private long timeout;
/**
* 超時(shí)時(shí)間單位
*/
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.queue = new BlockingQueue<>(queueSize); // 構(gòu)建任務(wù)隊(duì)列
}
/**
* 執(zhí)行任務(wù)
*
* @param task 任務(wù)
*/
public void executeTask(Runnable task) {
if (workers.size() < coreSize) {
// 核心線程數(shù)未超過線程集合的容量
// 將當(dāng)前任務(wù)交給Worker執(zhí)行
Worker worker = new Worker(task);
// 將worker加入線集合
log.debug("核心線程數(shù)未滿,剩余:{},新增Worker執(zhí)行任務(wù)", coreSize - workers.size());
workers.add(worker);
// 啟動(dòng)線程
worker.start();
} else {
// 核心線程數(shù)超過線程集合的容量,將當(dāng)前任務(wù)加入任務(wù)隊(duì)列等待消費(fèi)
queue.put(task);
}
}
/**
* 線程包裝類
*/
class Worker extends Thread {
private Runnable task; // 任務(wù)
public Worker(Runnable task) {
this.task = task;
}
/**
* 執(zhí)行任務(wù)
*/
@Override
public void run() {
// 執(zhí)行任務(wù)
// 1)當(dāng)前task不為空,直接執(zhí)行任務(wù)即可 (該情況出現(xiàn)于核心線程數(shù)還沒有超過線程集合的容量,新建的線程的task肯定不為null)
// 2)當(dāng)前task為空,去任務(wù)隊(duì)列中看有沒有待執(zhí)行的任務(wù)(該情況出現(xiàn)于核心線程數(shù)已超過線程集合的容量)
while (task != null || (task = queue.take()) != null) {
try {
log.debug("worker執(zhí)行任務(wù)");
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
// 退出循環(huán)意味著沒有任務(wù)處理,從workers中移除該Worker
synchronized (workers) {
log.debug("任務(wù)處理完畢,移除該worker");
workers.remove(this);
}
}
}
}
測試
任務(wù)數(shù)未超過任務(wù)隊(duì)列的最大容量
@Slf4j(topic = "c.TestThreadPool")
public class TestThreadPool {
public static void main(String[] args) throws InterruptedException {
// 測試:
// 創(chuàng)建一個(gè)線程池
// 核心線程數(shù) 3, 超時(shí)時(shí)間 5000ms,時(shí)間單位 ms,任務(wù)隊(duì)列容量 10
ThreadPool threadPool = new ThreadPool(3, 5000, TimeUnit.MILLISECONDS, 10);
// 任務(wù)數(shù)沒有超過任務(wù)隊(duì)列容量的情況
log.debug("--------------------任務(wù)數(shù):5---------------------");
for (int i = 0; i < 5; i++) {
int tmpI = i;
threadPool.executeTask(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("-------執(zhí)行任務(wù)了!-----,{}", tmpI);
});
}
}
}
結(jié)果輸出:
12:09:17 [main] c.TestThreadPool - --------------------任務(wù)數(shù):5---------------------
12:09:17 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:3,新增Worker執(zhí)行任務(wù)
12:09:17 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:2,新增Worker執(zhí)行任務(wù)
12:09:17 [Thread-0] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:17 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:1,新增Worker執(zhí)行任務(wù)
12:09:17 [Thread-1] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:17 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@56cbfb61
12:09:17 [Thread-2] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:17 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@1134affc
12:09:18 [Thread-0] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,0
12:09:18 [Thread-0] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:18 [Thread-2] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,2
12:09:18 [Thread-1] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,1
12:09:18 [Thread-2] c.ThreadPool - worker執(zhí)行任務(wù)
12:09:19 [Thread-0] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,3
12:09:19 [Thread-2] c.TestThreadPool - -------執(zhí)行任務(wù)了!-----,4
核心線程數(shù) 3,任務(wù)數(shù) 5,前三個(gè)任務(wù)不會(huì)放入任務(wù)隊(duì)列,而是交給線程池里的線程立即執(zhí)行了,剩余的兩個(gè)任務(wù)放入任務(wù)隊(duì)列等待線程池中的線程空閑后再消費(fèi)。
所以可以看見日志:09:18時(shí)執(zhí)行了任務(wù)0,1,2,09:19時(shí)執(zhí)行了任務(wù)3,4
同時(shí)我們可以注意到:Worker調(diào)用的是BlockingQueue的take方法,take方法在任務(wù)隊(duì)列為空時(shí)仍然在無限等待。所以如果想要優(yōu)化,我們可以改成poll方法,即有超時(shí)時(shí)間的等待。
任務(wù)隊(duì)列空閑5s后,退出循環(huán),并移除線程池中的worker
任務(wù)數(shù)超過任務(wù)隊(duì)列的最大容量
@Slf4j(topic = "c.TestThreadPool")
public class TestThreadPool {
public static void main(String[] args) throws InterruptedException {
// 測試:
// 創(chuàng)建一個(gè)線程池
// 核心線程數(shù) 3, 超時(shí)時(shí)間 5000ms,時(shí)間單位 ms,任務(wù)隊(duì)列容量 5
ThreadPool threadPool = new ThreadPool(3, 5000, TimeUnit.MILLISECONDS, 5);
// 任務(wù)數(shù)超過任務(wù)隊(duì)列容量的情況
// Thread.sleep(1000);
log.debug("--------------------任務(wù)數(shù):9---------------------");
for (int i = 0; i < 9; i++) {
int tmpI = i;
threadPool.executeTask(() -> {
try {
// 讓任務(wù)執(zhí)行時(shí)間延長
Thread.sleep(200000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("-------執(zhí)行任務(wù)了!-----,{}", tmpI);
});
}
}
}
測試中,我們讓任務(wù)執(zhí)行的時(shí)間延長足夠長時(shí)間
結(jié)果:
13:45:33 [main] c.TestThreadPool - --------------------任務(wù)數(shù):9---------------------
13:45:33 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:3,新增Worker執(zhí)行任務(wù)
13:45:33 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:2,新增Worker執(zhí)行任務(wù)
13:45:33 [Thread-0] c.ThreadPool - worker執(zhí)行任務(wù)
13:45:33 [main] c.ThreadPool - 核心線程數(shù)未滿,剩余:1,新增Worker執(zhí)行任務(wù)
13:45:33 [Thread-1] c.ThreadPool - worker執(zhí)行任務(wù)
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@56cbfb61
13:45:33 [Thread-2] c.ThreadPool - worker執(zhí)行任務(wù)
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@1134affc
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@d041cf
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@129a8472
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@1b0375b3
13:45:33 [main] c.BlockingQueue - queue.size(): 5,capacity: 5
13:45:33 [main] c.BlockingQueue - 任務(wù)隊(duì)列滿了,生產(chǎn)者還不能向任務(wù)隊(duì)列里推送任務(wù)!
此時(shí),由于消費(fèi)者遲遲沒有將任務(wù)執(zhí)行完,所以生產(chǎn)者一直向任務(wù)隊(duì)列中添加任務(wù),導(dǎo)致任務(wù)隊(duì)列被填滿,其余的任務(wù)無法再添加到任務(wù)隊(duì)列中,導(dǎo)致生產(chǎn)者線程也阻塞住了。這樣對生產(chǎn)者線程不夠友好。應(yīng)該給生產(chǎn)者線程提供選擇,讓它選擇是繼續(xù)等待下去呢,還是選擇做其他操作。
優(yōu)化
1. 給生產(chǎn)者添加一個(gè)等待超時(shí)的選擇
類似于poll()
/**
* 帶超時(shí)時(shí)間的阻塞添加
*
* @param t 任務(wù)
* @param timeout 超時(shí)時(shí)間
* @param timeUnit 時(shí)間單位
* @return false 添加超時(shí) true 添加成功
*/
public boolean offer(T t, long timeout, TimeUnit timeUnit) {
lock.lock();
// 加鎖
try {
// 判斷隊(duì)列是否滿了
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
// 隊(duì)列滿了需要讓線程進(jìn)入等待
// 任務(wù)數(shù)超過了任務(wù)隊(duì)列容量,生產(chǎn)者生產(chǎn)的任務(wù)就會(huì)一直處于阻塞狀態(tài)
try {
log.debug("任務(wù)隊(duì)列滿了,生產(chǎn)者還不能向任務(wù)隊(duì)列里推送任務(wù)!");
if (nanos <= 0) {
// 生產(chǎn)者添加任務(wù)超時(shí)了
return false;
}
nanos = prodWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 隊(duì)列沒有滿,推送任務(wù)
log.debug("任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): {}", t);
queue.addLast(t);
// 喚醒消費(fèi)者等待中的線程
consWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}
2. 生產(chǎn)者還有其他選擇
- 死等
- 超時(shí)放棄
- 直接放棄
- 拋出異常
- 生產(chǎn)者自行處理
- 其他
我們不可能有需求是就往線程池的executeTask()方法中添加邏輯分支,這樣我們的代碼可擴(kuò)展性就非常差。我們可以利用策略模式把選擇的權(quán)利下放給生產(chǎn)者。
實(shí)現(xiàn)
將處理方法抽象為一個(gè)接口的方法
/**
* 利用策略模式,讓生產(chǎn)者自行決定如果任務(wù)隊(duì)列滿了的時(shí)候 該選擇何種策略來處理該情況
* 1)死等
* 2)超時(shí)拒絕
* 3)直接拒絕
* 4)拋出異常
* 5)自行執(zhí)行
* 6)...其他
* 方便擴(kuò)展
*/
@FunctionalInterface
interface RejectPolicy<T> {
/**
* 拒絕策略
*
* @param queue 任務(wù)隊(duì)列
* @param task 任務(wù)
*/
void reject(BlockingQueue<T> queue, T task);
}
修改任務(wù)隊(duì)列類
- 添加
tryPut()方法,在任務(wù)隊(duì)列容量等于最大容量, 將執(zhí)行策略下放給調(diào)用方
public void tryPut(RejectPolicy rejectPolicy, T t) {
lock.lock();
try {
if (capacity == queue.size()) {
// 任務(wù)隊(duì)列容量等于最大容量, 執(zhí)行策略下放給調(diào)用方
rejectPolicy.reject(this, t);
} else {
// 隊(duì)列沒有滿,推送任務(wù)
log.debug("任務(wù)隊(duì)列沒有滿,生產(chǎn)者開始推送任務(wù): {}", t);
queue.addLast(t);
// 喚醒消費(fèi)者等待中的線程
consWaitSet.signalAll();
}
} finally {
lock.unlock();
}
}
修改線程池類
- 添加拒絕策略的成員變量
private RejectPolicy<Runnable> rejectPolicy;
- 構(gòu)造方法
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueSize, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.queue = new BlockingQueue<>(queueSize); // 構(gòu)建任務(wù)隊(duì)列
this.rejectPolicy = rejectPolicy; // 拒絕策略
}
-
executeTask()在核心線程數(shù)超過線程集合的容量時(shí),調(diào)用BlockingQueue的tryPut()
public void executeTask(Runnable task) {
if (workers.size() < coreSize) {
// 核心線程數(shù)未超過線程集合的容量
// 將當(dāng)前任務(wù)交給Worker執(zhí)行
Worker worker = new Worker(task);
// 將worker加入線集合
log.debug("核心線程數(shù)未滿,剩余:{},新增Worker執(zhí)行任務(wù)", coreSize - workers.size());
workers.add(worker);
// 啟動(dòng)線程
worker.start();
} else {
// 核心線程數(shù)超過線程集合的容量
// 選擇策略模式,讓調(diào)用者自行選擇超時(shí)策略
queue.tryPut(rejectPolicy, task);
}
}
再來測一哈
public static void main(String[] args) throws InterruptedException {
// 測試:
// 創(chuàng)建一個(gè)線程池
// 核心線程數(shù) 3, 超時(shí)時(shí)間 5000ms,時(shí)間單位 ms,任務(wù)隊(duì)列容量 5
ThreadPool threadPool = new ThreadPool(3,
5000,
TimeUnit.MILLISECONDS,
5,
(queue, task) -> {
// 1) 死等
// queue.put(task);
// 2) 超時(shí)放棄
// boolean timeoutFlag = queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// if (!timeoutFlag) {
// log.debug("生產(chǎn)者推送任務(wù)超時(shí)!");
// }
// 3) 直接放棄
// log.debug("放棄:{}", task);
// 4) 拋出異常
// throw new RuntimeException("任務(wù)推送失敗! 任務(wù): " + task);
});
// 任務(wù)數(shù)超過任務(wù)隊(duì)列容量的情況
log.debug("--------------------任務(wù)數(shù):9---------------------");
for (int i = 0; i < 10; i++) {
int tmpI = i;
threadPool.executeTask(() -> {
try {
Thread.sleep(200000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("-------執(zhí)行任務(wù)了!-----,{}", tmpI);
});
}
}
死等
測試結(jié)果見上面的日志輸出
超時(shí)放棄
14:17:46 [main] c.TestThreadPool - 生產(chǎn)者推送任務(wù)超時(shí)!
直接放棄
14:19:15 [main] c.TestThreadPool - 放棄:com.example.pool.TestThreadPool$$Lambda$2/1537358694@2f7c7260
拋出異常
Exception in thread "main" java.lang.RuntimeException: 任務(wù)推送失敗! 任務(wù): com.example.pool.TestThreadPool$$Lambda$2/1537358694@2f7c7260
at com.example.pool.TestThreadPool.lambda$main$0(TestThreadPool.java:39)
at com.example.pool.BlockingQueue.tryPut(TestThreadPool.java:386)
at com.example.pool.ThreadPool.executeTask(TestThreadPool.java:163)
at com.example.pool.TestThreadPool.main(TestThreadPool.java:60)
【注意】如果后面還有任務(wù)要推送,也會(huì)推送失敗
三、總結(jié)
- 利用生產(chǎn)者消費(fèi)者模式平衡了線程池消費(fèi)任務(wù)和任務(wù)生產(chǎn)速度不一致的問題。
- 利用享元模式充分利用已有線程的資源,來處理不同的任務(wù)。從而減少線程的數(shù)量,避免頻繁的發(fā)生上下文切換
- 利用策略模式讓生產(chǎn)者自行決定如果任務(wù)隊(duì)列滿了的時(shí)候 該選擇何種策略處理該情況。增加了線程池的可擴(kuò)展性
- 總而言之,言而總之,收獲頗豐。如果實(shí)現(xiàn)過程中有明顯的硬傷,希望各位看官指出,粉腸感謝!