- 問(wèn)題
- 1. 為什么要使用線程池,線程池用什么用
- 2. 說(shuō)說(shuō)幾種常見(jiàn)的線程池及使用場(chǎng)景
- 3. 線程池有哪幾種工作隊(duì)列
- 4. 怎么理解無(wú)界隊(duì)列和有界隊(duì)列
- 5. 線程池中的幾種重要的參數(shù)及流程
- 6. 單機(jī)上一個(gè)線程正在處理服務(wù),如果忽然斷電了怎么辦(正在處理和阻塞隊(duì)列里的請(qǐng)求怎么處理)
- 參考
問(wèn)題
問(wèn)題:
- 單機(jī)上一個(gè)線程正在處理服務(wù),如果忽然斷電了怎么辦(正在處理和阻塞隊(duì)列里的請(qǐng)求怎么處理)
- 為什么要使用線程池,線程池用什么用
- 說(shuō)說(shuō)幾種常見(jiàn)的線程池及使用場(chǎng)景
- 線程池有哪幾種工作隊(duì)列
- 怎么理解無(wú)界隊(duì)列和有界隊(duì)列
- 線程池中的幾種重要的參數(shù)及流程
1. 為什么要使用線程池,線程池用什么用
- 降低資源消耗:通過(guò)重用已經(jīng)創(chuàng)建的線程來(lái)降低線程創(chuàng)建和銷毀的消耗
- 提高響應(yīng)速度:任務(wù)到達(dá)時(shí)不需要等待線程創(chuàng)建就可以立即執(zhí)行
- 提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調(diào)優(yōu)和監(jiān)控
2. 說(shuō)說(shuō)幾種常見(jiàn)的線程池及使用場(chǎng)景
- newFixedThreadPool(固定大小的線程池)
- newSingleThreadExecutor(單線程線程池)
- newCachedThreadPool(可緩存線程的線程池)
- newScheduledThreadPool
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
線程池特點(diǎn):
- 核心線程數(shù)和最大線程數(shù)大小一樣
-
keepAliveTime為0 - 阻塞隊(duì)列是
LinkedBlockingQueue
它是固定大小的線程池,其核心線程數(shù)和最大線程數(shù)大小一樣。并且阻塞隊(duì)列用的是LinkedBlockingQueue,也就是說(shuō)線程最大數(shù)這個(gè)參數(shù)失效了基本,所以不會(huì)出現(xiàn)外包線程的存在,所以也可以認(rèn)為keepAliveTime參數(shù)是一個(gè)擺設(shè)。除非allowCoreThreadTimeOut方法的調(diào)用。
該線程池的工作機(jī)制是:
- 線程數(shù)少于核心線程數(shù),也就是設(shè)置的線程數(shù)時(shí),新建線程執(zhí)行任務(wù)
- 線程數(shù)等于核心線程數(shù)后,將任務(wù)加入阻塞隊(duì)列
- 由于隊(duì)列容量非常大(
Integer.MAX_VALUE),可以一直加加加。(當(dāng)線程池中的任務(wù)比較特殊時(shí),比如關(guān)于數(shù)據(jù)庫(kù)的長(zhǎng)時(shí)間的IO操作,可能導(dǎo)致OOM)
- 由于隊(duì)列容量非常大(
- 執(zhí)行完任務(wù)的線程反復(fù)去隊(duì)列中取任務(wù)執(zhí)行
適用場(chǎng)景:
FixedThreadPool 適用于處理CPU密集型的任務(wù),確保CPU在長(zhǎng)期被工作線程使用的情況下,盡可能的少的分配線程即可。一般Ncpu+1
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
線程池特點(diǎn):
- 核心線程數(shù)和最大線程數(shù)大小一樣且都是1
-
keepAliveTime為0 - 阻塞隊(duì)列是
LinkedBlockingQueue
該線程池的工作機(jī)制是:
- 線程池中沒(méi)有線程時(shí),新建一個(gè)線程執(zhí)行任務(wù)
- 有一個(gè)線程以后,將任務(wù)加入阻塞隊(duì)列,不停加加加
- 唯一的這一個(gè)線程不停地去隊(duì)列里取任務(wù)執(zhí)行
適用場(chǎng)景:
SingleThreadExecutor適用于串行執(zhí)行任務(wù)的場(chǎng)景,每個(gè)任務(wù)必須按順序執(zhí)行,不需要并發(fā)執(zhí)行。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
線程池特點(diǎn):
- 核心線程數(shù)為0,且最大線程數(shù)為
Integer.MAX_VALUE - 阻塞隊(duì)列是
SynchronousQueue
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue
鎖當(dāng)提交任務(wù)的速度大于處理任務(wù)的速度時(shí),每次提交一個(gè)任務(wù),就必然會(huì)創(chuàng)建一個(gè)線程。極端情況下會(huì)創(chuàng)建過(guò)多的線程,耗盡 CPU 和內(nèi)存資源。由于空閑 60 秒的線程會(huì)被終止,長(zhǎng)時(shí)間保持空閑的 CachedThreadPool 不會(huì)占用任何資源。
該線程池的工作機(jī)制是:
- 沒(méi)有核心線程,直接向
SynchronousQueue中提交任務(wù) - 如果有空閑線程,就去取出任務(wù)執(zhí)行;如果沒(méi)有空閑線程,就新建一個(gè)
- 執(zhí)行完任務(wù)的線程有60秒生存時(shí)間,如果在這個(gè)時(shí)間內(nèi)可以接到新任務(wù),就可以繼續(xù)活下去,否則就拜拜
適用場(chǎng)景:
CachedThreadPool 用于并發(fā)執(zhí)行大量短期的小任務(wù)。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
線程池特點(diǎn):
- 最大線程數(shù)為
Integer.MAX_VALUE - 阻塞隊(duì)列是
DelayedWorkQueue
ScheduledThreadPoolExecutor 添加任務(wù)提供了另外兩個(gè)方法:
- scheduleAtFixedRate() :按某種速率周期執(zhí)行
- scheduleWithFixedDelay():在某個(gè)延遲后執(zhí)行
兩種方法的內(nèi)部實(shí)現(xiàn)都是創(chuàng)建了一個(gè)ScheduledFutureTask對(duì)象封裝了任務(wù)的延遲執(zhí)行時(shí)間及執(zhí)行周期,并調(diào)用decorateTask()方法轉(zhuǎn)成RunnableScheduledFuture對(duì)象,然后添加到延遲隊(duì)列中。
DelayQueue:中封裝了一個(gè)優(yōu)先級(jí)隊(duì)列,這個(gè)隊(duì)列會(huì)對(duì)隊(duì)列中的ScheduledFutureTask 進(jìn)行排序,兩個(gè)任務(wù)的執(zhí)行 time 不同時(shí),time 小的先執(zhí)行;否則比較添加到隊(duì)列中的ScheduledFutureTask的順序號(hào) sequenceNumber ,先提交的先執(zhí)行。
該線程池的工作機(jī)制是:
- 調(diào)用上面兩個(gè)方法添加一個(gè)任務(wù)
- 線程池中的線程從 DelayQueue 中取任務(wù)
- 然后執(zhí)行任務(wù)
具體執(zhí)行步驟:
- 線程從 DelayQueue 中獲取 time 大于等于當(dāng)前時(shí)間的
ScheduledFutureTaskDelayQueue.take()
- 執(zhí)行完后修改這個(gè) task 的 time 為下次被執(zhí)行的時(shí)間
- 然后再把這個(gè) task 放回隊(duì)列中
DelayQueue.add()
適用場(chǎng)景:
ScheduledThreadPoolExecutor用于需要多個(gè)后臺(tái)線程執(zhí)行周期任務(wù),同時(shí)需要限制線程數(shù)量的場(chǎng)景。
3. 線程池有哪幾種工作隊(duì)列
- ArrayBlockingQueue (有界隊(duì)列):是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。
- LinkedBlockingQueue (無(wú)界隊(duì)列):一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列。
- SynchronousQueue(同步隊(duì)列): 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列。
- DelayQueue(延遲隊(duì)列):一個(gè)任務(wù)定時(shí)周期的延遲執(zhí)行的隊(duì)列。根據(jù)指定的執(zhí)行時(shí)間從小到大排序,否則根據(jù)插入到隊(duì)列的先后排序。
- PriorityBlockingQueue(優(yōu)先級(jí)隊(duì)列): 一個(gè)具有優(yōu)先級(jí)得無(wú)限阻塞隊(duì)列。
4. 怎么理解無(wú)界隊(duì)列和有界隊(duì)列
- 有界隊(duì)列即長(zhǎng)度有限,滿了以后ArrayBlockingQueue會(huì)插入阻塞。
- 無(wú)界隊(duì)列就是里面能放無(wú)數(shù)的東西而不會(huì)因?yàn)殛?duì)列長(zhǎng)度限制被阻塞,但是可能會(huì)出現(xiàn)OOM異常。
5. 線程池中的幾種重要的參數(shù)及流程
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:核心池的大小,在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中 -
maximumPoolSize:線程池最大線程數(shù)最大線程數(shù) -
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止 -
unit:參數(shù)keepAliveTime的時(shí)間單位TimeUtil類的枚舉類(DAYS、HOURS、MINUTES、SECONDS 等) -
workQueue:阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù) -
threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程 -
handler:拒絕處理任務(wù)的策略-
AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。(默認(rèn)這種) -
DiscardPolicy:也是丟棄任務(wù),但是不拋出異常 -
DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程) -
CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
-

執(zhí)行流程
- 當(dāng)有任務(wù)進(jìn)入時(shí),線程池創(chuàng)建線程去執(zhí)行任務(wù),直到核心線程數(shù)滿為止
- 核心線程數(shù)量滿了之后,任務(wù)就會(huì)進(jìn)入一個(gè)緩沖的任務(wù)隊(duì)列中
- 當(dāng)任務(wù)隊(duì)列為無(wú)界隊(duì)列時(shí),任務(wù)就會(huì)一直放入緩沖的任務(wù)隊(duì)列中,不會(huì)和最大線程數(shù)量進(jìn)行比較
- 當(dāng)任務(wù)隊(duì)列為有界隊(duì)列時(shí),任務(wù)先放入緩沖的任務(wù)隊(duì)列中,當(dāng)任務(wù)隊(duì)列滿了之后,才會(huì)將任務(wù)放入線程池,此時(shí)會(huì)拿當(dāng)前線程數(shù)與線程池允許的最大線程數(shù)進(jìn)行比較,如果超出了,則默認(rèn)會(huì)拋出異常。如果沒(méi)超出,然后線程池才會(huì)創(chuàng)建線程并執(zhí)行任務(wù),當(dāng)任務(wù)執(zhí)行完,又會(huì)將緩沖隊(duì)列中的任務(wù)放入線程池中,然后重復(fù)此操作。
6. 單機(jī)上一個(gè)線程正在處理服務(wù),如果忽然斷電了怎么辦(正在處理和阻塞隊(duì)列里的請(qǐng)求怎么處理)
經(jīng)過(guò)網(wǎng)上查閱,發(fā)現(xiàn)基本是沒(méi)有一個(gè)明確的回答的。不過(guò)思考過(guò)后一番,我感覺(jué)實(shí)現(xiàn)思路和MySQL的redo,undo功能很相似,我們可以對(duì)正在處理和阻塞隊(duì)列的任務(wù)做事物管理或者對(duì)阻塞隊(duì)列中的任務(wù)持久化處理,并且當(dāng)斷電或者系統(tǒng)崩潰,操作無(wú)法繼續(xù)下去的時(shí)候,可以通過(guò)回溯日志的方式來(lái)撤銷正在處理的已經(jīng)執(zhí)行成功的操作。然后重新執(zhí)行整個(gè)阻塞隊(duì)列。
即:
阻塞隊(duì)列持久化,正在處理事物控制。斷電之后正在處理的回滾,日志恢復(fù)該次操作。服務(wù)器重啟后阻塞隊(duì)列中的數(shù)據(jù)再加載