為什么用線程池
在我們進(jìn)行開(kāi)發(fā)的時(shí)候,為了充分利用系統(tǒng)資源,我們通常會(huì)進(jìn)行多線程開(kāi)發(fā),實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,需要使用線程的時(shí)候就去創(chuàng)建一個(gè)線程(繼承Thread類、實(shí)現(xiàn)Runnable接口、使用Callable和Future),但是這樣也有一點(diǎn)問(wèn)題,就是如果并發(fā)的線程數(shù)量很多,創(chuàng)建線程、銷毀線程都是需要消耗時(shí)間、資源,這個(gè)時(shí)候線程池就派上用場(chǎng)了
一、四種線程池的介紹
Java通過(guò)Executors提供了四種線程池,分別是
1.newSingleThreadExecutor()
創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)都是按照指定的順序(FIFO,LIFO,優(yōu)先級(jí))執(zhí)行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
2.newFixedThreadExecutor()
創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor
(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
3.newCachedThreadPool()
創(chuàng)建一個(gè)可緩存的線程池,如果當(dāng)前沒(méi)有可用線程,在執(zhí)行結(jié)束后緩存60s,如果不被調(diào)用則移除線程。調(diào)用execute()方法時(shí)可以重用緩存中的線程。適用于很多短期異步任務(wù)的環(huán)境,可以提高程序性能。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor
(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
4.newScheduledThreadPool()(在ScheduleThreadPoolExecutor類中,ThreadPoolExecutor的子類)
創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
線程池的使用
使用方式一(不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個(gè)靜態(tài)方法來(lái)創(chuàng)建線程池):
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
threadPoolExecutor.execute(new Runnable() {
@Override public void run() {
}
});
使用方式二:
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
ExecutorService executorService3 = Executors.newCachedThreadPool();
ExecutorService executorService4 = Executors.newScheduledThreadPool(3);
executorService1.execute(new Runnable() {
@Override public void run() {
}
});
ExecutorService是真正的線程池接口,所以我們?cè)谕ㄟ^(guò)Executors創(chuàng)建各種線程時(shí),都是采用上述代碼所示的方式
二、線程池的底層類與接口
在介紹線程池的實(shí)現(xiàn)機(jī)制之前,先了解一下線程池重要的類或接口
ExecutorService是真正的線程池接口
Executors是靜態(tài)工廠的功能,生產(chǎn)各種類型線程池
Executor是線程池的頂級(jí)接口,只是一個(gè)執(zhí)行線程的工具,只提供一個(gè)execute(Runnable command)的方法,真正的線程池接口是ExecutorService
AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,實(shí)現(xiàn)了其中大部分的方法(有沒(méi)有實(shí)現(xiàn)的方法,所以被聲明為Abstract)
ThreadPoolExecutor,繼承了AbstractExecutorService,是ExecutorService的默認(rèn)實(shí)現(xiàn)
1、ThreadPoolExecutor類
(1)構(gòu)造函數(shù)
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個(gè)類,因此如果要透徹地了解Java中的線程池,必須先了解這個(gè)類。
在ThreadPoolExecutor類中提供了四個(gè)構(gòu)造方法(前面三個(gè)構(gòu)造器都是調(diào)用第四個(gè)構(gòu)造器進(jìn)行的初始化工作):
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
構(gòu)造器中各個(gè)參數(shù)的含義:
corePoolSize:核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;
unit:參數(shù)keepAliveTime的時(shí)間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue:一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響,一般來(lái)說(shuō),這里的阻塞隊(duì)列有以下幾種選擇:ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程;
handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
(2)ThreadPoolExecutor方法
execute()
submit()
shutdown()
shutdownNow()
getQueue()
getPoolSize()
getActiveCount()
getCompletedTaskCount()
execute()方法實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法,通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒(méi)有對(duì)其進(jìn)行重寫(xiě),這個(gè)方法也是用來(lái)向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過(guò)它利用了Future來(lái)獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。
shutdown()和shutdownNow()是用來(lái)關(guān)閉線程池的。
getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法
還有在下文提到的runwork()、addwork()、processworkerExit() 方法等等
2、AbstractExecutorService類
下面我們看一下AbstractExecutorService的具體方法
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {...};
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {...};
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) {...};
public <T> Future<T> submit(Callable<T> task) {...};
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {...};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {...};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {...};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {...};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {...};
}
AbstractExecutorService是一個(gè)抽象類,它實(shí)現(xiàn)了ExecutorService接口。
3、ExecutorService接口
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實(shí)現(xiàn):
4、Executor接口
public interface Executor {
void execute(Runnable command);
}
上述類與接口之間的關(guān)系
Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來(lái)執(zhí)行傳進(jìn)去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
三、線程池的底層實(shí)現(xiàn)原理
1、線程池的狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中ctl這個(gè)AtomicInteger的功能很強(qiáng)大,其高3位用于維護(hù)線程池運(yùn)行狀態(tài),低29位維護(hù)線程池中線程數(shù)量
1、RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態(tài)的線程池會(huì)接收新任務(wù),也會(huì)處理在阻塞隊(duì)列中等待處理的任務(wù)
2、SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀態(tài)的線程池不會(huì)再接收新任務(wù),但還會(huì)處理已經(jīng)提交到阻塞隊(duì)列中等待處理的任務(wù)
3、STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態(tài)的線程池不會(huì)再接收新任務(wù),不會(huì)處理在阻塞隊(duì)列中等待的任務(wù),而且還會(huì)中斷正在運(yùn)行的任務(wù)
4、TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務(wù)都被終止了,workerCount為0,為此狀態(tài)時(shí)還將調(diào)用terminated()方法
5、TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法調(diào)用完成后變成此狀態(tài)
這些狀態(tài)均由int型表示,大小關(guān)系為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個(gè)順序基本上也是遵循線程池從 運(yùn)行 到 終止這個(gè)過(guò)程。
runStateOf(int c) 方法:c & 高3位為1,低29位為0的~CAPACITY,用于獲取高3位保存的線程池狀態(tài)
workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用于獲取低29位的線程數(shù)量
ctlOf(int rs, int wc)方法:參數(shù)rs表示runState,參數(shù)wc表示workerCount,即根據(jù)runState和workerCount打包合并成ctl
2、執(zhí)行過(guò)程
一個(gè)任務(wù)從提交到執(zhí)行完畢經(jīng)歷過(guò)程如下:
第一步:如果當(dāng)前線程池中的線程數(shù)目小于corePoolSize,則每來(lái)一個(gè)任務(wù),就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行這個(gè)任務(wù);
第二步:如果當(dāng)前線程池中的線程數(shù)目>=corePoolSize,則每來(lái)一個(gè)任務(wù),會(huì)嘗試將其添加到任務(wù)緩存隊(duì)列當(dāng)中,若添加成功,則該任務(wù)會(huì)等待空閑線程將其取出去執(zhí)行;若添加失?。ㄒ话銇?lái)說(shuō)是任務(wù)緩存隊(duì)列已滿),則會(huì)嘗試創(chuàng)建新的線程去執(zhí)行這個(gè)任務(wù);
第三步:如果線程池中的線程數(shù)量大于等于corePoolSize,且隊(duì)列workQueue已滿,但線程池中的線程數(shù)量小于maximumPoolSize,則會(huì)創(chuàng)建新的線程來(lái)處理被添加的任務(wù)
第四步:如果當(dāng)前線程池中的線程數(shù)目達(dá)到maximumPoolSize,則會(huì)采取任務(wù)拒絕策略進(jìn)行處理;
3、任務(wù)拒絕策略
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略,通常有以下四種策略:
- ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
4、在執(zhí)行過(guò)程中所涉及到的方法
(1)execute(Runnable command)(提交任務(wù))
在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute()方法,雖然通過(guò)submit也可以提交任務(wù),但是實(shí)際上submit方法里面最終調(diào)用的還是execute()方法,所以我們只需要研究execute()方法的實(shí)現(xiàn)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
源碼解析:
1、如果線程池當(dāng)前線程數(shù)量少于corePoolSize,則addWorker(command, true)創(chuàng)建新worker線程,如創(chuàng)建成功返回,如沒(méi)創(chuàng)建成功,則執(zhí)行后續(xù)步驟;
addWorker(command, true)失敗的原因可能是:
A、線程池已經(jīng)shutdown,shutdown的線程池不再接收新任務(wù)
B、workerCountOf(c) < corePoolSize 判斷后,由于并發(fā),別的線程先創(chuàng)建了worker線程,導(dǎo)致workerCount>=corePoolSize
2、如果線程池還在running狀態(tài),將task加入workQueue阻塞隊(duì)列中,如果加入成功,進(jìn)行double-check,如果加入失?。赡苁顷?duì)列已滿),則執(zhí)行后續(xù)步驟;
double-check主要目的是判斷剛加入workQueue阻塞隊(duì)列的task是否能被執(zhí)行
A、如果線程池已經(jīng)不是running狀態(tài)了,應(yīng)該拒絕添加新任務(wù),從workQueue中刪除任務(wù)
B、如果線程池是運(yùn)行狀態(tài),或者從workQueue中刪除任務(wù)失敗(剛好有一個(gè)線程執(zhí)行完畢,并消耗了這個(gè)任務(wù)),確保還有線程執(zhí)行任務(wù)(只要有一個(gè)就夠了)
3、如果線程池不是running狀態(tài) 或者 無(wú)法入隊(duì)列,嘗試開(kāi)啟新線程,擴(kuò)容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當(dāng)前command
(2)addwork(Runnable firstTask,boolean core)(添加任務(wù))
參數(shù)說(shuō)明:
firstTask:worker線程的初始任務(wù),可以為空
core:true(將corePoolSize作為上限),false(將maximumPoolSize作為上限)
addWorker方法有4種傳參的方式:
1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)
在execute方法中就使用了前3種,結(jié)合這個(gè)核心方法進(jìn)行以下分析
第一個(gè):線程數(shù)小于corePoolSize時(shí),放一個(gè)需要處理的task進(jìn)Workers Set。如果Workers Set長(zhǎng)度超過(guò)corePoolSize,就返回false
第二個(gè):當(dāng)隊(duì)列被放滿時(shí),就嘗試將這個(gè)新來(lái)的task直接放入Workers Set,而此時(shí)Workers Set的長(zhǎng)度限制是maximumPoolSize。如果線程池也滿了的話就返回false
第三個(gè):放入一個(gè)空的task進(jìn)workers Set,長(zhǎng)度限制是maximumPoolSize。這樣一個(gè)task為空的worker在線程執(zhí)行的時(shí)候會(huì)去任務(wù)隊(duì)列里拿任務(wù),這樣就相當(dāng)于創(chuàng)建了一個(gè)新的線程,只是沒(méi)有馬上分配任務(wù)
第四個(gè):這個(gè)方法就是放一個(gè)null的task進(jìn)Workers Set,而且是在小于corePoolSize時(shí),如果此時(shí)Set中的數(shù)量已經(jīng)達(dá)到corePoolSize那就返回false,什么也不干。實(shí)際使用中是在prestartAllCoreThreads()方法,這個(gè)方法用來(lái)為線程池預(yù)先啟動(dòng)corePoolSize個(gè)worker等待從workQueue中獲取任務(wù)執(zhí)行
源碼解析:
1、判斷線程池當(dāng)前是否為可以添加worker線程的狀態(tài),可以則繼續(xù)下一步,不可以return false:
A、線程池狀態(tài)>shutdown,可能為stop、tidying、terminated,不能添加worker線程
B、線程池狀態(tài)==shutdown,firstTask不為空,不能添加worker線程,因?yàn)閟hutdown狀態(tài)的線程池不接收新任務(wù)
C、線程池狀態(tài)==shutdown,firstTask==null,workQueue為空,不能添加worker線程,因?yàn)閒irstTask為空是為了添加一個(gè)沒(méi)有任務(wù)的線程再?gòu)膚orkQueue獲取task,而workQueue為空,說(shuō)明添加無(wú)任務(wù)線程已經(jīng)沒(méi)有意義
2、線程池當(dāng)前線程數(shù)量是否超過(guò)上限(corePoolSize 或 maximumPoolSize),超過(guò)了return false,沒(méi)超過(guò)則對(duì)workerCount+1,繼續(xù)下一步
3、在線程池的ReentrantLock保證下,向Workers Set中添加新創(chuàng)建的worker實(shí)例,添加完成后解鎖,并啟動(dòng)worker線程,如果這一切都成功了,return true,如果添加worker入Set失敗或啟動(dòng)失敗,調(diào)用addWorkerFailed()邏輯
其中,線程池會(huì)把每個(gè)線程封裝成一個(gè)Worker對(duì)象,由addWorker(Runnable firstTask, boolean core)方法控制,firstTask代表線程池首要執(zhí)行的任務(wù),core代表是否使用corePoolSize參數(shù)作為線程池最大標(biāo)記。
(3)內(nèi)部類worker
Worker類本身既實(shí)現(xiàn)了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡(jiǎn)稱AQS),所以其既是一個(gè)可執(zhí)行的任務(wù),又可以達(dá)到鎖的效果
new Worker()
1、將AQS的state置為-1,在runWoker()前不允許中斷
2、待執(zhí)行的任務(wù)會(huì)以參數(shù)傳入,并賦予firstTask
3、用Worker這個(gè)Runnable創(chuàng)建Thread
之所以Worker自己實(shí)現(xiàn)Runnable,并創(chuàng)建Thread,在firstTask外包一層,是因?yàn)橐ㄟ^(guò)Worker控制中斷,而firstTask這個(gè)工作任務(wù)只是負(fù)責(zé)執(zhí)行業(yè)務(wù)
Worker控制中斷主要有以下幾方面:
1、初始AQS狀態(tài)為-1,此時(shí)不允許中斷interrupt(),只有在worker線程啟動(dòng)了,執(zhí)行了runWoker(),將state置為0,才能中斷
不允許中斷體現(xiàn)在:
A、shutdown()線程池時(shí),會(huì)對(duì)每個(gè)worker tryLock()上鎖,而Worker類這個(gè)AQS的tryAcquire()方法是固定將state從0->1,故初始狀態(tài)state==-1時(shí)tryLock()失敗,沒(méi)發(fā)interrupt()
B、shutdownNow()線程池時(shí),不用tryLock()上鎖,但調(diào)用worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯
2、為了防止某種情況下,在運(yùn)行中的worker被中斷,runWorker()每次運(yùn)行任務(wù)時(shí)都會(huì)lock()上鎖,而shutdown()這類可能會(huì)終止worker的操作需要先獲取worker的鎖,這樣就防止了中斷正在運(yùn)行的線程
Worker實(shí)現(xiàn)的AQS為不可重入鎖,為了是在獲得worker鎖的情況下再進(jìn)入其它一些需要加鎖的方法
Worker和Task的區(qū)別:
Worker是線程池中的線程,而Task雖然是runnable,但是并沒(méi)有真正執(zhí)行,只是被Worker調(diào)用了run方法,后面會(huì)看到這部分的實(shí)現(xiàn)。
(4)runwork(Worker w)(執(zhí)行任務(wù))
1、Worker線程啟動(dòng)后,通過(guò)Worker類的run()方法調(diào)用runWorker(this)
2、執(zhí)行任務(wù)之前,首先worker.unlock(),將AQS的state置為0,允許中斷當(dāng)前worker線程
3、開(kāi)始執(zhí)行firstTask,調(diào)用task.run(),在執(zhí)行任務(wù)前會(huì)上鎖wroker.lock(),在執(zhí)行完任務(wù)后會(huì)解鎖,為了防止在任務(wù)運(yùn)行時(shí)被線程池一些中斷操作中斷
4、在任務(wù)執(zhí)行前后,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute() 和 afterExecute()方法
5、無(wú)論在beforeExecute()、task.run()、afterExecute()發(fā)生異常上拋,都會(huì)導(dǎo)致worker線程終止,進(jìn)入processWorkerExit()處理worker退出的流程
6、如正常執(zhí)行完當(dāng)前task后,會(huì)通過(guò)getTask()從阻塞隊(duì)列中獲取新任務(wù),當(dāng)隊(duì)列中沒(méi)有任務(wù),且獲取任務(wù)超時(shí),那么當(dāng)前worker也會(huì)進(jìn)入退出流程
(5)getTask()(獲取任務(wù))
1、首先判斷是否可以滿足從workQueue中獲取任務(wù)的條件,不滿足return null
A、線程池狀態(tài)是否滿足:
?。╝)shutdown狀態(tài) + workQueue為空 或 stop狀態(tài),都不滿足,因?yàn)楸籹hutdown后還是要執(zhí)行workQueue剩余的任務(wù),但workQueue也為空,就可以退出了
(b)stop狀態(tài),shutdownNow()操作會(huì)使線程池進(jìn)入stop,此時(shí)不接受新任務(wù),中斷正在執(zhí)行的任務(wù),workQueue中的任務(wù)也不執(zhí)行了,故return null返回
B、線程數(shù)量是否超過(guò)maximumPoolSize 或 獲取任務(wù)是否超時(shí)
?。╝)線程數(shù)量超過(guò)maximumPoolSize可能是線程池在運(yùn)行時(shí)被調(diào)用了setMaximumPoolSize()被改變了大小,否則已經(jīng)addWorker()成功不會(huì)超過(guò)maximumPoolSize
(b)如果 當(dāng)前線程數(shù)量>corePoolSize,才會(huì)檢查是否獲取任務(wù)超時(shí),這也體現(xiàn)了當(dāng)線程數(shù)量達(dá)到maximumPoolSize后,如果一直沒(méi)有新任務(wù),會(huì)逐漸終止worker線程直到corePoolSize
2、如果滿足獲取任務(wù)條件,根據(jù)是否需要定時(shí)獲取調(diào)用不同方法:
A、workQueue.poll():如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒(méi)有任務(wù),返回null
B、workQueue.take():如果阻塞隊(duì)列為空,當(dāng)前線程會(huì)被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線程被喚醒,take方法返回任務(wù)
3、在阻塞從workQueue中獲取任務(wù)時(shí),可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執(zhí)行第1步中的判斷,滿足就繼續(xù)獲取任務(wù),不滿足return null,會(huì)進(jìn)入worker退出的流程
(6)processWorkerExit(Worker w,boolean completedAbruptly)(worker線程退出)
參數(shù):
worker: 要結(jié)束的worker
completedAbruptly: 是否突然完成(是否因?yàn)楫惓M顺觯?br>
執(zhí)行流程:
1、worker數(shù)量-1
A、如果是突然終止,說(shuō)明是task執(zhí)行時(shí)異常情況導(dǎo)致,即run()方法執(zhí)行時(shí)發(fā)生了異常,那么正在工作的worker線程數(shù)量需要-1
B、如果不是突然終止,說(shuō)明是worker線程沒(méi)有task可執(zhí)行了,不用-1,因?yàn)橐呀?jīng)在getTask()方法中-1了
2、從Workers Set中移除worker,刪除時(shí)需要上鎖mainlock
3、tryTerminate():在對(duì)線程池有負(fù)效益的操作時(shí),都需要“嘗試終止”線程池,大概邏輯:
判斷線程池是否滿足終止的狀態(tài)
A、如果狀態(tài)滿足,但還有線程池還有線程,嘗試對(duì)其發(fā)出中斷響應(yīng),使其能進(jìn)入退出流程
B、沒(méi)有線程了,更新?tīng)顟B(tài)為tidying->terminated
4、是否需要增加worker線程,如果線程池還沒(méi)有完全終止,仍需要保持一定數(shù)量的線程
線程池狀態(tài)是running 或 shutdown
A、如果當(dāng)前線程是突然終止的,addWorker()
B、如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量,addWorker()
故如果調(diào)用線程池shutdown(),直到workQueue為空前,線程池都會(huì)維持corePoolSize個(gè)線程,然后再逐漸銷毀這corePoolSize個(gè)線程