Java線程池的優(yōu)點(diǎn):
1、降低資源消耗 通過(guò)重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀的消耗。
2、提高響應(yīng)速度 當(dāng)任務(wù)到達(dá)的時(shí)候,不需要等到線程創(chuàng)建就能立即執(zhí)行。
3、提高線程的可管理性 線程時(shí)稀缺資源,使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。
ThreadPoolExecutor 工作原理:
1、如果當(dāng)前運(yùn)行的線程少于 corePoolSize,則直接創(chuàng)建新線程來(lái)執(zhí)行任務(wù)。
2、如果運(yùn)行的線程等于或多于 corePoolSize,則將任務(wù)加入 BlockingQueue
3、如果無(wú)法加入到 BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)。
4、如果新創(chuàng)建的線程將使當(dāng)前運(yùn)行的線程超出 maximumPoolSize,任務(wù)將被拒絕,調(diào)用飽和策略處理(RejectedExceptionHandler.rejectedException()方法),默認(rèn)是AbortPolicy,會(huì)拋出Exception。
當(dāng)前運(yùn)行的線程的意思是:只要不是
TERMINATED都算活著。
ThreadPoolExecutor 的創(chuàng)建:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize(線程池的基本大小):當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使其它空閑的基本線程能夠執(zhí)行新任務(wù),也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不在創(chuàng)建。
workQueue(任務(wù)隊(duì)列): 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。
- ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列。FIFO
- LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列。FIFO
maximumPoolSize(線程池最大數(shù)量):線程池允許的最大線程數(shù)。注意,如果使用了無(wú)界的任務(wù)隊(duì)列,這個(gè)參數(shù)就沒(méi)什么效果。
ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠
RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說(shuō)明線程池處于飽和狀態(tài),必須采用一種策略處理提交的新任務(wù)。默認(rèn)策略是 AbortPolicy,表示無(wú)法處理新任務(wù)時(shí)拋出異常。
- AbortPolicy:直接拋出異常
- CallerRunsPolicy:用調(diào)用者所在的線程運(yùn)行任務(wù)。
- DiscardPolicy:不處理,丟棄掉
- DiscardOldestPolicy:丟棄隊(duì)列最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
keepAliveTime(線程活動(dòng)保持時(shí)間):線程池的工作線程空閑后,保持存活的事件。
TimeUnit:保持時(shí)間單位
。
示例代碼演示:
先定義一個(gè)任務(wù),供以后使用:
class MyRun(val name: String) : Runnable {
override fun run() {
println("$name is running, ${Thread.currentThread().name}")
TimeUnit.SECONDS.sleep(3)
println("$name is finished")
}
}
調(diào)用線程池執(zhí)行任務(wù)一:
val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
TimeUnit.SECONDS.sleep(20)
executor.execute(MyRun("T2"))
結(jié)果:
T1 is running, pool-1-thread-1
T1 is finished
T2 is running, pool-1-thread-2
T2 is finished
分析:當(dāng) T1 執(zhí)行完成之后,此時(shí) pool-1-thread-1的線程所處的狀態(tài) WAITING:
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1c2083000 nid=0x4403 waiting on condition [0x0000700010f37000]
java.lang.Thread.State: WAITING (parking)
因?yàn)?corePoolSize 是 2,所以面對(duì)新的任務(wù)還是會(huì)用 新的線程 pool-1-thread-2 來(lái)執(zhí)行,程序結(jié)束后線程池中線程狀態(tài):
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1c2083000 nid=0x4403 waiting on condition [0x0000700010f37000]
java.lang.Thread.State: WAITING (parking)
"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fa1c287e800 nid=0x320b waiting on condition [0x000070001113d000]
java.lang.Thread.State: WAITING (parking)
調(diào)用線程池執(zhí)行任務(wù)二:
val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
executor.execute(MyRun("T2"))
TimeUnit.SECONDS.sleep(5)
executor.execute(MyRun("T3"))
executor.execute(MyRun("T4"))
執(zhí)行結(jié)果:
T1 is running, pool-1-thread-1
T2 is running, pool-1-thread-2
T2 is finished
T1 is finished
T3 is running, pool-1-thread-2
T4 is running, pool-1-thread-3
T3 is finished
T4 is finished
分析: T1、T2執(zhí)行完成后,線程池中線程有:pool-1-thread-1 和 pool-1-thread-2,此時(shí)當(dāng)T3提交給線程池時(shí),會(huì)把任務(wù)存到 BlockingQueue中等待執(zhí)行,再提交T4,此時(shí)有可能 BlockingQueue 是滿的狀態(tài),所以又因?yàn)?maximumPoolSize 是 10,所以會(huì)新創(chuàng)建線程執(zhí)行任務(wù):pool-1-thread-3。
當(dāng)T4剛剛執(zhí)行完成,我們來(lái)看一下線程池中線程的情況:
"pool-1-thread-3" #13 prio=5 os_prio=31 tid=0x00007f90bb893800 nid=0x4e0b waiting on condition [0x000070000175b000]
java.lang.Thread.State: TIMED_WAITING (parking)
"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007f90bb00f000 nid=0x3f03 waiting on condition [0x0000700001555000]
java.lang.Thread.State: TIMED_WAITING (parking)
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f90bb892800 nid=0x4103 waiting on condition [0x0000700001452000]
java.lang.Thread.State: WAITING (parking)
因?yàn)?,此時(shí)線程數(shù)為3,超過(guò)了corePoolSize的個(gè)數(shù),這時(shí)候線程就會(huì)進(jìn)入 TIMED_WAITING狀態(tài),直到線程數(shù)達(dá)到 corePoolSize 的個(gè)數(shù)為止。
我們?cè)俚却欢螘r(shí)間(超過(guò)30秒),這時(shí)候我們來(lái)看一下線程池中線程的情況:
"pool-1-thread-3" #13 prio=5 os_prio=31 tid=0x00007f90bb893800 nid=0x4e0b waiting on condition [0x000070000175b000]
java.lang.Thread.State: WAITING (parking)
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f90bb892800 nid=0x4103 waiting on condition [0x0000700001452000]
java.lang.Thread.State: WAITING (parking)
pool-1-thread-2 經(jīng)過(guò)超時(shí)時(shí)間之后,會(huì)銷毀。 此時(shí)線程池只剩下 pool-1-thread-3 和 pool-1-thread-1 兩條線程,維持核心線程數(shù)。
調(diào)用線程池執(zhí)行任務(wù)三:
fun main() {
val executor = ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, ArrayBlockingQueue(1))
executor.execute(MyRun("T1"))
executor.execute(MyRun("T2"))
TimeUnit.SECONDS.sleep(5)
executor.execute(MyRun("T3"))
TimeUnit.SECONDS.sleep(1) //稍微等待一下
executor.execute(MyRun("T4"))
}
結(jié)果:
T1 is running, pool-1-thread-1
T2 is running, pool-1-thread-2
T2 is finished
T1 is finished
T3 is running, pool-1-thread-2
T4 is running, pool-1-thread-1
T3 is finished
T4 is finished
分析: 基本和上一個(gè)示例差不多,只是在T3 和 T4提交之間,增加了一個(gè)延遲,這時(shí)候再提交T4的時(shí)候,BlockingQueue 就不為空了,所以T4提交是會(huì)放到 BlockingQueue 中等待執(zhí)行。
合理配置線程池:
CPU密集型任務(wù):應(yīng)盡可能盡可能小的配置線程(否則老是切換線程上下文的話,反而總時(shí)間會(huì)邊長(zhǎng))。例如配置 N(CPU) + 1
IO密集型任務(wù):IO密集型任務(wù)并不是一直再執(zhí)行任務(wù),則應(yīng)配置盡可能多線程(因?yàn)镮O就會(huì)阻塞,線程數(shù)多的話正好可以利用CPU空閑時(shí)間,提高利用率)。例如:2 * N(CPU)。
Executors 中幾種常見(jiàn)的線程池:
ThreadPoolExecutor 是線程池的核心 實(shí)現(xiàn)類,用來(lái)執(zhí)行提交的任務(wù)。
ScheduledThreadPoolExecutor 是一個(gè)線程池實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行任務(wù)?;蛘叨ㄆ趫?zhí)行任務(wù)。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 適用于為了滿足資源管理的需求,而需要限制當(dāng)前線程數(shù)量的應(yīng)用場(chǎng)景,比較適用于負(fù)載較重的服務(wù)器。 任務(wù)隊(duì)列使用的是 無(wú)界的 LinkedBlockingQueue,線程池大小有界。 創(chuàng)建用于容納固定線程數(shù)量的池子,每個(gè)線程存活時(shí)間是無(wú)限的,當(dāng)池子滿了不再添加線程,如果池子中線程都在繁忙,則將新任務(wù)放入阻塞隊(duì)列中。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
是大小無(wú)界限的線程池,適用于執(zhí)行很多短期異步任務(wù)的小程序,或者負(fù)載較輕的服務(wù)器。當(dāng)新任務(wù)到來(lái),則插入到SynchronousQueue中,由于SynchronousQueue 是同步隊(duì)列,因此會(huì)在池中尋找可用線程來(lái)執(zhí)行,若有可用線程則執(zhí)行,若沒(méi)有則創(chuàng)建一個(gè)新的線程來(lái)執(zhí)行該任務(wù)。若空閑線程大于指定超時(shí)時(shí)間,則線程會(huì)被銷毀。 SynchronousQueue是BlockingQueue的一種,SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存儲(chǔ)任何元素。也就是說(shuō)SynchronousQueue的每一次insert操作,必須等待其他線性的remove操作。而每一個(gè)remove操作也必須等待其他線程的insert操作。
也就是說(shuō)SynchronousQueue的每一次insert操作,必須等待其他線性的remove操作。而每一個(gè)remove操作也必須等待其他線程的insert操作。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 適用于需要保證順序地執(zhí)行各個(gè)任務(wù),并且在任意時(shí)候,不會(huì)有多個(gè)線程是活動(dòng)的。創(chuàng)建了只有一個(gè)線程的線程池,是無(wú)限存活的,繁忙是會(huì)有任務(wù)塞進(jìn)任務(wù)隊(duì)列。
ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
適用于需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù)。同時(shí)為了滿足資源管理的需求而需要限制后臺(tái)線程的數(shù)量的應(yīng)用場(chǎng)景。