為什么要用線程池?
- 降低資源消耗
- 通過重復(fù)利用已創(chuàng)建的線程, 降低線程創(chuàng)建和銷毀的造成的消耗
- 提高相應(yīng)速度
- 任務(wù)到達(dá)時(shí),可以不需要等待線程創(chuàng)建就能立即執(zhí)行
- 提高線程的可管理性
- 線程池對(duì)線程進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)可監(jiān)控
線程池種類
- newSingleThreadExecutor
//只有一個(gè)線程的線程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newWorkStealingPool
//可以設(shè)置并行級(jí)別(數(shù)量) 并行級(jí)別決定了同一時(shí)刻最多有多少個(gè)線程在執(zhí)行 默認(rèn)是cpu個(gè)數(shù)
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
- newFixedThreadPool
//固定大小 可重用的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newCachedThreadPool
// 創(chuàng)建沒有數(shù)量上線的線程池 線程不夠就創(chuàng)建 不用的60s就銷毀
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newScheduledThreadPool
// 維持一定數(shù)量的線程 線程即使空閑 也不會(huì)被回收
//可以定時(shí)或者周期執(zhí)行任務(wù)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
Executor框架
image
- 接口
- Executor 一個(gè)運(yùn)行新任務(wù)的簡單接口
- ExecutorService 擴(kuò)展了Executor接口,添加了一些管理執(zhí)行器和任務(wù)生命周期的方法
- ScheduledExecutorService 擴(kuò)展了ExecutorService。支持Future和定時(shí)執(zhí)行任務(wù)
public interface Executor {
//只有一個(gè)方法
void execute(Runnable command);
}
- ThreadPoolExecutor分析
public class ThreadPoolExecutor extends AbstractExecutorService {
ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// ctl 控制線程池的運(yùn)行狀態(tài)(runState) 和線程池有效線程(workCount)的數(shù)量進(jìn)行控制的字段
//獲取運(yùn)行狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取運(yùn)行狀態(tài)
private static int workerCountOf(int c) { return c & CAPACITY; }
// 獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState 線程池的運(yùn)行狀態(tài) 看下圖
// 能接受新提交的任務(wù),也能處理阻塞隊(duì)列中的任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//關(guān)閉狀態(tài),不接收新任務(wù),卻能繼續(xù)處理阻塞隊(duì)列中的任務(wù)。調(diào)用shutdown()使線程進(jìn)入該狀態(tài)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接收新任務(wù),不處理隊(duì)列中任務(wù),中斷正在處理任務(wù)的線程。調(diào)用shutdownNow()使線程進(jìn)入該狀態(tài)
private static final int STOP = 1 << COUNT_BITS;
//所有任務(wù)都終止了,workerCount=0,可以進(jìn)入該狀態(tài)
private static final int TIDYING = 2 << COUNT_BITS;
//在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài)
private static final int TERMINATED = 3 << COUNT_BITS;
}
-
線程池運(yùn)行狀態(tài)圖
image
構(gòu)造函數(shù) 參數(shù)很重要
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
……
//核心線程數(shù)量
this.corePoolSize = corePoolSize;
//最大線程數(shù)量
this.maximumPoolSize = maximumPoolSize;
//等待隊(duì)列
this.workQueue = workQueue;
// 線程池維護(hù)線程所允許的空閑時(shí)間
this.keepAliveTime = unit.toNanos(keepAliveTime);
//用來創(chuàng)建新線程
this.threadFactory = threadFactory;
//拒絕策略
this.handler = handler;
}
-
執(zhí)行流程
image
或者這個(gè)圖
image
調(diào)用ThreadPoolExecutor的execute提交線程,首先檢查corePool
- 如果CorePool內(nèi)的線程<CorePoolSize,新創(chuàng)建線程執(zhí)行任務(wù),即使線程池中其他線程是空閑的。
- 如果>=CorePoolSize且<maximumPoolSize,
- 且Queue不滿,將線程加入workQueue
- 如果滿了,創(chuàng)建新線程處理任務(wù)
- 如果>=maximumPoolSize
- 如果Queue不滿,放入Queue
- Queue滿了,執(zhí)行拒絕策略。
所以判斷的順序?yàn)?corePoolSize --> workQueue --> maximumPoolSize。
- 拒絕策略
- AbortPolicy:直接拋出異常,這是默認(rèn)策略;
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
- DiscardPolicy:直接丟棄任務(wù);
阻塞隊(duì)列
- ArrayBlockingQueue
- 基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列
- 創(chuàng)建時(shí)必須指定容量大小
- 可以指定公平性或非公平性,默認(rèn)非公平,即不保證等待時(shí)間長的最優(yōu)先訪問隊(duì)列
- LinkedBlockingQueue
- 基于鏈表實(shí)現(xiàn)
- 如果不指定容量大小,默認(rèn)是Integer.MAX_VALUE
- PriorityBlockingQueue
- 無界阻塞隊(duì)列
- 按照元素優(yōu)先級(jí)進(jìn)行排序,優(yōu)先級(jí)高的先出隊(duì)
- SynchronousQueue
- 同步阻塞隊(duì)列
- 隊(duì)列大小為1,一個(gè)元素要放入該隊(duì)列中必須有一個(gè)線程在等待獲取元素
-
DelayQueue
- DelayedWorkQueue
- BlockingDeque
- 雙向阻塞隊(duì)列
- TransferQueue