1 ThreadPoolExecutor
該線程池是比較常用的線程池。參數(shù)如下:
| 參數(shù)名 | 解釋 |
|---|---|
| corePoolSize | 核心線程池大小 |
| maximumPoolSize | 最大線程池大小 |
| keepAliveTime | 線程池中超過corePoolSize數(shù)目的空閑進(jìn)程的最大存活時(shí)間;可以allowCoreThreadTimeOut(true)使得核心線程有效時(shí)間 |
| TimeUnit | keepAliveTime時(shí)間單位 |
| workQueue | 阻塞任務(wù)隊(duì)列 |
| threadFactory | 新建線程工廠 |
| RejectedExecutionHandler | 當(dāng)提交任務(wù)數(shù)超過maxmumPoolSize+workQueue之和時(shí),任務(wù)會(huì)交給RejectedExecutionHandler來處理 |
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>
1.當(dāng)線程池小于corePoolSize時(shí),新提交任務(wù)將創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),即使此時(shí)線程池中存在空閑線程。
2.當(dāng)線程池達(dá)到corePoolSize時(shí),新提交任務(wù)將被放入workQueue中,等待線程池中任務(wù)調(diào)度執(zhí)行
3.當(dāng)workQueue已滿,且maximumPoolSize>corePoolSize時(shí),新提交任務(wù)會(huì)創(chuàng)建新線程執(zhí)行任務(wù)
4.當(dāng)提交任務(wù)數(shù)超過maximumPoolSize+workQueue時(shí),新提交任務(wù)由RejectedExecutionHandler處理
5.當(dāng)線程池中超過corePoolSize線程,空閑時(shí)間達(dá)到keepAliveTime時(shí),關(guān)閉空閑線程
6.當(dāng)設(shè)置allowCoreThreadTimeOut(true)時(shí),線程池中corePoolSize線程空閑時(shí)間達(dá)到keepAliveTime也將關(guān)閉
1.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
創(chuàng)建一個(gè)定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
創(chuàng)建線程池的時(shí)候默認(rèn)將corePoolSize和maximumPoolSize設(shè)置成相同值,表示不會(huì)創(chuàng)建出更多線程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
仔細(xì)看會(huì)發(fā)現(xiàn),在創(chuàng)建線程池的時(shí)候默認(rèn)將corePoolSize和maximumPoolSize設(shè)置成相同值,這是因?yàn)椴捎玫淖枞€程隊(duì)列采用的是LinkedBlockingQueue,該隊(duì)列是一個(gè)無邊界隊(duì)列,所以所有未在線程中運(yùn)行的任務(wù)都可以進(jìn)入到該隊(duì)列。實(shí)際上該線程池的大小為Integer.MAX_VALUE。
1.2 newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
這里就是同時(shí)將corePoolSize和maximumPoolSize設(shè)置成1。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
1.3 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
創(chuàng)建一個(gè)可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- 工作線程的創(chuàng)建數(shù)量幾乎沒有限制(其實(shí)也有限制的,數(shù)目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
- 如果長時(shí)間沒有往線程池中提交任務(wù),即如果工作線程空閑了指定的時(shí)間(默認(rèn)為1分鐘),則該工作線程將自動(dòng)終止。終止后,如果你又提交了新的任務(wù),則線程池重新創(chuàng)建一個(gè)工作線程。
- 在使用CachedThreadPool時(shí),一定要注意控制任務(wù)的數(shù)量,否則,由于大量線程同時(shí)運(yùn)行,很有會(huì)造成系統(tǒng)癱瘓。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
這里將maximumPoolSize設(shè)置成了Integer.MAX_VALUE,是由于SynchronousQueue阻塞隊(duì)列的大小是比較小的,如果不將maximumPoolSize設(shè)置成比較大的數(shù)就容易拋出異常。
一般都不建議使用Integer.MAX_VALUE大小的線程池,容易堆積大量的請(qǐng)求和創(chuàng)建大量的線程。
2 ForkJoinPool
ForkJoinPool原理類似分治法的思想,先把大的任務(wù)分成若干個(gè)小任務(wù)并計(jì)算,最后把所有小任務(wù)的計(jì)算結(jié)果合并起來。
2.1 newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
工作竊取線程池,默認(rèn)所有線程都存在一個(gè)自己的任務(wù)隊(duì)列,當(dāng)自己線程所有任務(wù)執(zhí)行完畢時(shí),可以從別的線程的任務(wù)隊(duì)列中獲取到未執(zhí)行的任務(wù)放入本線程執(zhí)行,這就是工作竊取,使用該線程池可以有效提高CPU利用率。
每一個(gè)工作線程簡單的通過以下兩條原則進(jìn)行活動(dòng):
- 若隊(duì)列非空,則代表自己線程的Task還沒執(zhí)行完畢,取出Task并執(zhí)行。
- 若隊(duì)列為空,則隨機(jī)選取一個(gè)其他的工作線程的Task并執(zhí)行。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
3.ScheduledThreadPoolExecutor
該線程池繼承自ThreadPoolExecutor。
3.1 newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
初始化的線程池可以在指定的時(shí)間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實(shí)際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)。
4 排隊(duì)策略
4.1 Direct handoffs 直接提交
* <li> <em> Direct handoffs.</em> A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. Here, an attempt to queue a task
* will fail if no threads are immediately available to run it, so a
* new thread will be constructed. This policy avoids lockups when
* handling sets of requests that might have internal dependencies.
* Direct handoffs generally require unbounded maximumPoolSizes to
* avoid rejection of new submitted tasks. This in turn admits the
* possibility of unbounded thread growth when commands continue to
* arrive on average faster than they can be processed. </li>
在收到提交的任務(wù)時(shí),會(huì)將任務(wù)直接提交給線程,并會(huì)不自己持有,如果沒有線程立刻能立刻處理該任務(wù),將會(huì)讓該任務(wù)直接失敗。
4.2 Unbounded queues 無界隊(duì)列
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
* capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
* each task is completely independent of others, so tasks cannot
* affect each others execution; for example, in a web page server.
* While this style of queuing can be useful in smoothing out
* transient bursts of requests, it admits the possibility of
* unbounded work queue growth when commands continue to arrive on
* average faster than they can be processed. </li>
使用的是一種無界隊(duì)列,例如LinkedBlockingQueue隊(duì)列。當(dāng)線程池中線程數(shù)目達(dá)到corePoolSize時(shí),所有的任務(wù)都會(huì)放入隊(duì)列中進(jìn)行等待,而不會(huì)創(chuàng)建新線程。所以對(duì)于maximunPoolSize的值并不會(huì)有什么影響。無界隊(duì)列適合那些任務(wù)獨(dú)立的情況,因?yàn)槿蝿?wù)之間不會(huì)相互影響。
4.3 Bounded queues 有界隊(duì)列
* <li><em>Bounded queues.</em> A bounded queue (for example, an
* {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
* used with finite maximumPoolSizes, but can be more difficult to
* tune and control. Queue sizes and maximum pool sizes may be traded
* off for each other: Using large queues and small pools minimizes
* CPU usage, OS resources, and context-switching overhead, but can
* lead to artificially low throughput. If tasks frequently block (for
* example if they are I/O bound), a system may be able to schedule
* time for more threads than you otherwise allow. Use of small queues
* generally requires larger pool sizes, which keeps CPUs busier but
* may encounter unacceptable scheduling overhead, which also
* decreases throughput. </li>
有界隊(duì)列,例如ArrayBlockingQueue??梢杂脕碓谟邢薜木€程池中防止資源被耗盡,但是難以維護(hù)和控制。
5.保持存活機(jī)制
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. This parameter can also be
* changed dynamically using method {@link #setKeepAliveTime(long,
* TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
* TimeUnit#NANOSECONDS} effectively disables idle threads from ever
* terminating prior to shut down. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads. But
* method {@link #allowCoreThreadTimeOut(boolean)} can be used to
* apply this time-out policy to core threads as well, so long as the
* keepAliveTime value is non-zero. </dd>
如果超過corePoolSize部分的線程如果閑置了超過keepAliveTime的時(shí)間(可以通過getKeepAliveTime()方法來獲取存活時(shí)間),線程將會(huì)被終止。這可以減少線程池使用不活躍時(shí)資源的浪費(fèi)。可以通過setKeepAliveTime()方法來設(shè)置存活時(shí)間??梢酝ㄟ^設(shè)置一個(gè)Long.MAX_VALUE的值來有效的避免空閑線程在關(guān)閉之前終止。allowCoreThreadTimeOut(boolean)可以讓corePoolSize的線程也適用存活機(jī)制。
6.拒絕策略
* <dt>Rejected tasks</dt>
*
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
* <em>rejected</em> when the Executor has been shut down, and also when
* the Executor uses finite bounds for both maximum threads and work queue
* capacity, and is saturated. In either case, the {@code execute} method
* invokes the {@link
* RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
* method of its {@link RejectedExecutionHandler}. Four predefined handler
* policies are provided:
*
* <ol>
*
* <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
* handler throws a runtime {@link RejectedExecutionException} upon
* rejection. </li>
*
* <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
* that invokes {@code execute} itself runs the task. This provides a
* simple feedback control mechanism that will slow down the rate that
* new tasks are submitted. </li>
*
* <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
* cannot be executed is simply dropped. </li>
*
* <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
* executor is not shut down, the task at the head of the work queue
* is dropped, and then execution is retried (which can fail again,
* causing this to be repeated.) </li>
*
* </ol>
*
* It is possible to define and use other kinds of {@link
* RejectedExecutionHandler} classes. Doing so requires some care
* especially when policies are designed to work only under particular
* capacity or queuing policies. </dd>
- AbortPolicy:默認(rèn)直接拋出RejectedExecutionException異常
- CallerRunsPolicy:由提交任務(wù)線程負(fù)責(zé)執(zhí)行
- DiscardPolicy:直接拋棄
- DiscardOldestPolicy:將消息隊(duì)列中的第一個(gè)任務(wù)替換為當(dāng)前新進(jìn)來的任務(wù)執(zhí)行。
7.線程池如何執(zhí)行任務(wù)
public void execute(Runnable command) {
//如果Rnnable對(duì)象為空,直接拋出空指針異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 如果當(dāng)前線程數(shù)目少于corePoolSize的大小,會(huì)嘗試開啟一個(gè)新線程給任務(wù)。同時(shí)會(huì)自動(dòng)檢測當(dāng)前線程池狀態(tài)和工作線程數(shù)目(ctl),避免產(chǎn)生不應(yīng)該有的警告。
- 如果一個(gè)任務(wù)能夠成功入隊(duì),仍然需要進(jìn)行雙重檢查來判斷是否需要開啟一個(gè)線程。因?yàn)榭赡茉谏洗螜z查后有新的線程停止或者在進(jìn)入方法后線程池就停止了。如果線程不再運(yùn)行狀態(tài)就從隊(duì)列中移除并拒絕該任務(wù),否則當(dāng)沒有正在運(yùn)行的線程就創(chuàng)建一個(gè)新線程。
- 如果任務(wù)進(jìn)入隊(duì)列失敗,嘗試開啟一個(gè)新線程,如果新線程創(chuàng)建失?。赡芫€程池已關(guān)閉或者已達(dá)到最大值),就拒絕該任務(wù)。
8.addWorker()
方法的作用:
檢查一個(gè)新worker能否按照線程池的狀態(tài)以及線程數(shù)目被添加進(jìn)去。如果滿足條件,會(huì)創(chuàng)建一個(gè)新worker,并把第一個(gè)任務(wù)作為自己的第一個(gè)任務(wù)。如果線程池已經(jīng)被關(guān)閉或停止,該方法就會(huì)放回false。同樣,如果創(chuàng)建新線程失敗也會(huì)返回false。如果創(chuàng)建失敗,有可能線程工廠回返回一個(gè)null,有可能跑出一個(gè)OutOfMemoryError的錯(cuò)誤,同時(shí)我們還要進(jìn)行回滾。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
首先就是對(duì)各種狀態(tài)的判斷,然后通過CAS將workerCont的值加1,如果失敗通過自旋的方式去自增。后面,新創(chuàng)建一個(gè)worker,并通過ReetranLock進(jìn)行加鎖。檢查持有鎖后線程池的狀態(tài),如果此時(shí)線程已經(jīng)啟動(dòng)會(huì)拋出IllegalThreadStateException的異常,同時(shí)更新largestPoolSize的值,然后啟動(dòng)線程。如果添加失敗,就執(zhí)行addWorkerFailed()方法。
9.addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
先進(jìn)行加鎖,判斷Worker是否存在,如果存在,就從workers中移除它。同時(shí),對(duì)workCount進(jìn)行自減。
9.getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}