一:類官方注釋預(yù)覽
/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* 一種ExecutorService,它使用可能的幾個(gè)池線程之一執(zhí)行每個(gè)提交的任務(wù),
* 這些線程池通常使用Executors工廠方法進(jìn)行配置。
*
* <p>Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*
* 線程池解決兩個(gè)不同的問題:它們通常在執(zhí)行大量異步任務(wù)時(shí)提供更好的性能,這是因?yàn)闇p少了每個(gè)任務(wù)的調(diào)用開銷,
* 它們還提供了一種方法來限制和管理執(zhí)行任務(wù)集合時(shí)所消耗的資源,包括線程。
* 每個(gè) ThreadPoolExecutor 還維護(hù)一些基本統(tǒng)計(jì)信息,例如已完成任務(wù)的數(shù)量。
*
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios. Otherwise, use the following guide when manually
* configuring and tuning this class:
*
* 為了在廣泛的上下文中有用,這個(gè)類提供了許多可調(diào)整的參數(shù)和可擴(kuò)展性掛鉤。
* 但是,建議程序員使用更方便的Executors工廠方法
* Executors.newCachedThreadPool(具有自動(dòng)線程回收功能的無界線程池)、
* Executors.newFixedThreadPool(固定大小的線程池)、
* Executors.newSingleThreadExecutor(單后臺(tái)線程),最常見的使用場景的預(yù)配置設(shè)置。
* 否則,請(qǐng)?jiān)谑謩?dòng)配置和優(yōu)化此類時(shí)使用以下指南:
*
* <dl>
*
* <dt>Core and maximum pool sizes</dt>
*
* <dd>A {@code ThreadPoolExecutor} will automatically adjust the
* pool size (see {@link #getPoolSize})
* according to the bounds set by
* corePoolSize (see {@link #getCorePoolSize}) and
* maximumPoolSize (see {@link #getMaximumPoolSize}).
*
* ThreadPoolExecutor將根據(jù)corePoolSize(請(qǐng)參閱getCorePoolSize)
* 和maximumPoolSize(請(qǐng)參閱getMaximumPoolSize)設(shè)置的界限自動(dòng)調(diào)整池大?。ㄕ?qǐng)參閱getPoolSize)。
*
* When a new task is submitted in method {@link #execute(Runnable)},
* and fewer than corePoolSize threads are running, a new thread is
* created to handle the request, even if other worker threads are
* idle. If there are more than corePoolSize but less than
* maximumPoolSize threads running, a new thread will be created only
* if the queue is full. By setting corePoolSize and maximumPoolSize
* the same, you create a fixed-size thread pool. By setting
* maximumPoolSize to an essentially unbounded value such as {@code
* Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
* number of concurrent tasks. Most typically, core and maximum pool
* sizes are set only upon construction, but they may also be changed
* dynamically using {@link #setCorePoolSize} and {@link
* #setMaximumPoolSize}. </dd>
*
* 當(dāng)execute(Runnable)方法提交任務(wù)時(shí),
* 如果正在運(yùn)行的線程數(shù)小于corePoolSize,就會(huì)新建一個(gè)線程來處理這個(gè)請(qǐng)求,即使其它工作線程空閑。
* 如果正在運(yùn)行的線程數(shù)大于corePoolSize 小于maximumPoolSize,只有當(dāng)queue滿了的時(shí)候,才會(huì)創(chuàng)建一個(gè)新線程。
* 通過設(shè)置相同的corePoolSize和maximumPoolSize,可以創(chuàng)建一個(gè)固定大小的線程池。
* 通過將maximumPoolSize設(shè)置為一個(gè)基本無界的值(例如Integer.MAX_VALUE),將允許池子容納任意數(shù)量的并發(fā)任務(wù)。
* 大多數(shù)情況下,corePoolSize和maximumPoolSize僅在構(gòu)建時(shí)設(shè)置,
* 但是他們也可以通過setCorePoolSize和setMaximumPoolSize動(dòng)態(tài)修改。
*
* <dt>On-demand construction</dt>
*
* 按需構(gòu)建
*
* <dd>By default, even core threads are initially created and
* started only when new tasks arrive, but this can be overridden
* dynamically using method {@link #prestartCoreThread} or {@link
* #prestartAllCoreThreads}. You probably want to prestart threads if
* you construct the pool with a non-empty queue. </dd>
*
* 默認(rèn)情況下,只有在新任務(wù)到達(dá)時(shí)才初始創(chuàng)建和啟動(dòng)核心線程,但這可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 動(dòng)態(tài)覆蓋。
* 如果您使用非空隊(duì)列構(gòu)造池,您可能想要預(yù)啟動(dòng)線程
*
* <dt>Creating new threads</dt>
*
* 創(chuàng)建新線程
*
* <dd>New threads are created using a {@link ThreadFactory}. If not
* otherwise specified, a {@link Executors#defaultThreadFactory} is
* used, that creates threads to all be in the same {@link
* ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
* non-daemon status. By supplying a different ThreadFactory, you can
* alter the thread's name, thread group, priority, daemon status,
* etc. If a {@code ThreadFactory} fails to create a thread when asked
* by returning null from {@code newThread}, the executor will
* continue, but might not be able to execute any tasks. Threads
* should possess the "modifyThread" {@code RuntimePermission}. If
* worker threads or other threads using the pool do not possess this
* permission, service may be degraded: configuration changes may not
* take effect in a timely manner, and a shutdown pool may remain in a
* state in which termination is possible but not completed.</dd>
*
* 使用一個(gè)ThreadFactory來創(chuàng)建新線程。如果沒有特別指定,會(huì)使用Executors.defaultThreadFactory,
* 創(chuàng)建的線程都在同一個(gè) ThreadGroup 中,并具有相同的 NORM_PRIORITY 優(yōu)先級(jí)和非守護(hù)進(jìn)程狀態(tài)。
* 通過提供一個(gè)不同的ThreadFactory,你可以改變線程的名字、線程組、優(yōu)先級(jí)、守護(hù)狀態(tài)等。
* 當(dāng)請(qǐng)求創(chuàng)建線程,但是ThreadFactory通過newThread方法創(chuàng)建失敗,executor將繼續(xù)執(zhí)行,但是可能不會(huì)處理任何任務(wù)。
* 線程應(yīng)該擁有RuntimePermission的"modifyThread"權(quán)限。如果工作線程或其它使用池的線程不擁有此權(quán)限,
* 服務(wù)可能會(huì)降級(jí):配置更改可能無法及時(shí)生效,關(guān)閉池可能會(huì)保持可以終止但未完成的狀態(tài)。
*
* <dt>Keep-alive times</dt>
*
* <dd>If the pool currently has more than corePoolSize threads,
* excess threads will be terminated if they have been idle for more
* than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
* This provides a means of reducing resource consumption when the
* pool is not being actively used. If the pool becomes more active
* later, new threads will be constructed. This parameter can also be
* changed dynamically using method {@link #setKeepAliveTime(long,
* TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link
* TimeUnit#NANOSECONDS} effectively disables idle threads from ever
* terminating prior to shut down. By default, the keep-alive policy
* applies only when there are more than corePoolSize threads. But
* method {@link #allowCoreThreadTimeOut(boolean)} can be used to
* apply this time-out policy to core threads as well, so long as the
* keepAliveTime value is non-zero. </dd>
*
* 如果池中當(dāng)前的線程數(shù)量大于corePoolSize,如果空閑時(shí)間超過keepAliveTime,過量的將被終止。
* 這提供了一種在未積極使用池時(shí)減少資源消耗的方法。如果池稍后變得更加活躍,則將構(gòu)建新線程。
* 該參數(shù)也可以通過setKeepAliveTime來動(dòng)態(tài)修改。使用Long.MAX_VALUE這個(gè)值可以在關(guān)閉前禁止終止線程。
* 默認(rèn)情況下,僅當(dāng)有超過 corePoolSize 的線程時(shí),keep-alive 策略才適用。
* 但是方法 allowCoreThreadTimeOut(boolean) 也可將此超時(shí)策略應(yīng)用于核心線程,只要 keepAliveTime 值不為零。
*
* <dt>Queuing</dt>
*
* <dd>Any {@link BlockingQueue} may be used to transfer and hold
* submitted tasks. The use of this queue interacts with pool sizing:
*
* 任何 BlockingQueue 都可用于傳輸和保持提交的任務(wù)。此隊(duì)列的使用與池大小相互影響:
*
* <ul>
*
* <li> If fewer than corePoolSize threads are running, the Executor
* always prefers adding a new thread
* rather than queuing.</li>
* 如果正在運(yùn)行的線程數(shù)少于corePoolSize,Executor總是添加一個(gè)新線程 而不是排隊(duì)。
*
* <li> If corePoolSize or more threads are running, the Executor
* always prefers queuing a request rather than adding a new
* thread.</li>
* 如果正在運(yùn)行的線程數(shù)大于等于corePoolSize,Executor總是將請(qǐng)求進(jìn)行排隊(duì),而不是添加一個(gè)新的線程。
*
* <li> If a request cannot be queued, a new thread is created unless
* this would exceed maximumPoolSize, in which case, the task will be
* rejected.</li>
* 如果一個(gè)請(qǐng)求不能進(jìn)行排隊(duì),將創(chuàng)建一個(gè)新的線程,除非這會(huì)超過maximumPoolSize,這種情況下任務(wù)將被拒絕。
*
* </ul>
*
* There are three general strategies for queuing:
* 排隊(duì)的三種一般性策略:
* <ol>
*
* <li> <em> Direct handoffs.</em> A good default choice for a work
* queue is a {@link SynchronousQueue} that hands off tasks to threads
* without otherwise holding them. Here, an attempt to queue a task
* will fail if no threads are immediately available to run it, so a
* new thread will be constructed. This policy avoids lockups when
* handling sets of requests that might have internal dependencies.
* Direct handoffs generally require unbounded maximumPoolSizes to
* avoid rejection of new submitted tasks. This in turn admits the
* possibility of unbounded thread growth when commands continue to
* arrive on average faster than they can be processed. </li>
* 直接傳遞。工作隊(duì)列的一個(gè)很好的默認(rèn)選擇是 SynchronousQueue ,它將任務(wù)交給線程而不用其他方式保留它們。
* 在這里,如果沒有線程可以立即運(yùn)行任務(wù),則嘗試將任務(wù)排隊(duì)會(huì)失敗,因此將構(gòu)造一個(gè)新線程。
* 此策略在處理可能具有內(nèi)部依賴關(guān)系的請(qǐng)求集時(shí)避免鎖定。
* 直接傳遞通常需要無限的MaximumPoolSize,以避免拒絕新提交的任務(wù)。
* 這繼而允許線程的無限增長(當(dāng)任務(wù)繼續(xù)以平均快于其處理速度的速度到達(dá)時(shí))。
*
* <li><em> Unbounded queues.</em> Using an unbounded queue (for
* example a {@link LinkedBlockingQueue} without a predefined
* capacity) will cause new tasks to wait in the queue when all
* corePoolSize threads are busy. Thus, no more than corePoolSize
* threads will ever be created. (And the value of the maximumPoolSize
* therefore doesn't have any effect.) This may be appropriate when
* each task is completely independent of others, so tasks cannot
* affect each others execution; for example, in a web page server.
* While this style of queuing can be useful in smoothing out
* transient bursts of requests, it admits the possibility of
* unbounded work queue growth when commands continue to arrive on
* average faster than they can be processed. </li>
* 無界隊(duì)列。使用一個(gè)無界隊(duì)列(例如一個(gè)未預(yù)定義容量的LinkedBlockingQueue),
* 將導(dǎo)致新任務(wù)在所有corePoolSize線程忙時(shí)在隊(duì)列中等待。
* 因此,創(chuàng)建的線程不會(huì)超過corePoolSize。(因此,maximumPoolSize的值沒有任何影響。)
* 當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù)時(shí),這可能是合適的,因此任務(wù)不會(huì)影響其他任務(wù)的執(zhí)行;
* 例如,在網(wǎng)頁服務(wù)器中。雖然這種排隊(duì)方式有助于消除瞬時(shí)的請(qǐng)求突發(fā),
* 但當(dāng)命令繼續(xù)以平均比處理速度更快的速度到達(dá)時(shí),工作隊(duì)列可能會(huì)無限增長。
*
* <li><em>Bounded queues.</em> A bounded queue (for example, an
* {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
* used with finite maximumPoolSizes, but can be more difficult to
* tune and control. Queue sizes and maximum pool sizes may be traded
* off for each other: Using large queues and small pools minimizes
* CPU usage, OS resources, and context-switching overhead, but can
* lead to artificially low throughput. If tasks frequently block (for
* example if they are I/O bound), a system may be able to schedule
* time for more threads than you otherwise allow. Use of small queues
* generally requires larger pool sizes, which keeps CPUs busier but
* may encounter unacceptable scheduling overhead, which also
* decreases throughput. </li>
* 有界隊(duì)列。有界隊(duì)列(例如,ArrayBlockingQueue)在與有限的maximumPoolSizes
* 一起使用時(shí)有助于防止資源耗盡,但可能更難優(yōu)化和控制。
* 隊(duì)列大小和最大池大小可以相互權(quán)衡:使用大型隊(duì)列和小型池可以最大限度地
* 減少CPU使用、操作系統(tǒng)資源和上下文切換開銷,但也可能導(dǎo)致人為的低吞吐量。
* 如果任務(wù)經(jīng)常阻塞(例如,如果它們是I/O綁定的),系統(tǒng)可能能夠?yàn)楦嗟木€程安排時(shí)間,而不是您允許的線程。
* 使用小隊(duì)列通常需要更大的池大小,這使CPU更繁忙,但可能會(huì)遇到不可接受的調(diào)度開銷,這也會(huì)降低吞吐量。
*
* </ol>
*
* </dd>
*
* <dt>Rejected tasks</dt>
* 拒絕的任務(wù)
*
* <dd>New tasks submitted in method {@link #execute(Runnable)} will be
* <em>rejected</em> when the Executor has been shut down, and also when
* the Executor uses finite bounds for both maximum threads and work queue
* capacity, and is saturated. In either case, the {@code execute} method
* invokes the {@link
* RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
* method of its {@link RejectedExecutionHandler}. Four predefined handler
* policies are provided:
* 當(dāng)執(zhí)行器已關(guān)閉時(shí),以及當(dāng)執(zhí)行器對(duì)最大線程和工作隊(duì)列容量使用有限邊界且已飽和時(shí),
* 在方法execute(Runnable)中提交的新任務(wù)將被拒絕。
* 在這兩種情況下,execute方法調(diào)用其RejectedExecutionHandler的rejectedExecution()方法。
* 提供四個(gè)預(yù)定義的策略處理器:
*
* <ol>
*
* <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
* handler throws a runtime {@link RejectedExecutionException} upon
* rejection. </li>
* 在默認(rèn)的AbortPolicy中,處理程序在拒絕時(shí)拋出運(yùn)行時(shí)RejectedExecutionException。
*
* <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
* that invokes {@code execute} itself runs the task. This provides a
* simple feedback control mechanism that will slow down the rate that
* new tasks are submitted. </li>
* 在CallerRunsPolicy中,調(diào)用execute本身的線程運(yùn)行任務(wù)。
* 這提供了一種簡單的反饋控制機(jī)制,可以降低提交新任務(wù)的速度。
*
* <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
* cannot be executed is simply dropped. </li>
* 在DiscardPolicy中,無法執(zhí)行的任務(wù)被簡單地丟棄。
*
* <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
* executor is not shut down, the task at the head of the work queue
* is dropped, and then execution is retried (which can fail again,
* causing this to be repeated.) </li>
* 在DiscardOldestPolicy中,如果未關(guān)閉執(zhí)行器,則會(huì)丟棄工作隊(duì)列頭部的任務(wù),
* 然后重試執(zhí)行(這可能再次失敗,導(dǎo)致重復(fù)執(zhí)行)
*
* </ol>
*
* It is possible to define and use other kinds of {@link
* RejectedExecutionHandler} classes. Doing so requires some care
* especially when policies are designed to work only under particular
* capacity or queuing policies. </dd>
* 可以定義和使用其他類型的RejectedExecutionHandler類。
* 這樣做需要一定的謹(jǐn)慎,特別是當(dāng)策略設(shè)計(jì)為僅在特定容量或排隊(duì)策略下工作時(shí)。
*
* <dt>Hook methods</dt>
* 鉤子方法
*
* <dd>This class provides {@code protected} overridable
* {@link #beforeExecute(Thread, Runnable)} and
* {@link #afterExecute(Runnable, Throwable)} methods that are called
* before and after execution of each task. These can be used to
* manipulate the execution environment; for example, reinitializing
* ThreadLocals, gathering statistics, or adding log entries.
* Additionally, method {@link #terminated} can be overridden to perform
* any special processing that needs to be done once the Executor has
* fully terminated.
* 此類提供了protected的可重寫的
* beforeExecute(Thread, Runnable)和
* afterExecute(Runnable, Throwable)方法,這些方法在每個(gè)任務(wù)執(zhí)行之前和之后都會(huì)被調(diào)用。
*
* <p>If hook or callback methods throw exceptions, internal worker
* threads may in turn fail and abruptly terminate.</dd>
* 如果鉤子或回調(diào)方法拋出異常,內(nèi)部工作線程可能會(huì)失敗并突然終止。
*
* <dt>Queue maintenance</dt>
* 隊(duì)列維護(hù)
*
* <dd>Method {@link #getQueue()} allows access to the work queue
* for purposes of monitoring and debugging. Use of this method for
* any other purpose is strongly discouraged. Two supplied methods,
* {@link #remove(Runnable)} and {@link #purge} are available to
* assist in storage reclamation when large numbers of queued tasks
* become cancelled.</dd>
* 方法getQueue()允許訪問工作隊(duì)列以進(jìn)行監(jiān)視和調(diào)試。
* 強(qiáng)烈反對(duì)將此方法用于任何其他目的。
* 提供的兩種方法,remove(Runnable)和purge可用于在取消大量排隊(duì)的任務(wù)時(shí)幫助進(jìn)行存儲(chǔ)回收。
*
* <dt>Finalization</dt>
*
* <dd>A pool that is no longer referenced in a program <em>AND</em>
* has no remaining threads will be {@code shutdown} automatically. If
* you would like to ensure that unreferenced pools are reclaimed even
* if users forget to call {@link #shutdown}, then you must arrange
* that unused threads eventually die, by setting appropriate
* keep-alive times, using a lower bound of zero core threads and/or
* setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
* 程序中不再引用且沒有剩余線程的池將自動(dòng)shutdown。
* 如果您希望確保即使用戶忘記調(diào)用shutdown也能回收未引用的池,
* 那么您必須通過設(shè)置適當(dāng)?shù)膋eep-alive時(shí)間、使用零核心線程的下限
* 和/或設(shè)置allowCoreThreadTimeOut(boolean)。來安排未使用的線程最終死亡。
*
* </dl>
*
* <p><b>Extension example</b>.
* 擴(kuò)展示例
*
* Most extensions of this class
* override one or more of the protected hook methods. For example,
* here is a subclass that adds a simple pause/resume feature:
* 此類的大多數(shù)擴(kuò)展重寫一個(gè)或多個(gè)受保護(hù)的鉤子方法。
* 例如,下面是一個(gè)子類,它添加了一個(gè)簡單的暫停/恢復(fù)功能:
*
* <pre> {@code
* class PausableThreadPoolExecutor extends ThreadPoolExecutor {
* private boolean isPaused;//表示是否暫停
* private ReentrantLock pauseLock = new ReentrantLock();
* private Condition unpaused = pauseLock.newCondition();
*
* public PausableThreadPoolExecutor(...) { super(...); }
*
* protected void beforeExecute(Thread t, Runnable r) {
* super.beforeExecute(t, r);
* pauseLock.lock();
* try {
* while (isPaused) unpaused.await(); //如果已暫停,等在Condition上
* } catch (InterruptedException ie) {
* t.interrupt();
* } finally {
* pauseLock.unlock();
* }
* }
*
* public void pause() {
* pauseLock.lock();
* try {
* isPaused = true;//設(shè)置暫停標(biāo)志
* } finally {
* pauseLock.unlock();
* }
* }
*
* public void resume() {
* pauseLock.lock();
* try {
* isPaused = false;
* unpaused.signalAll();//恢復(fù),則喚醒所有等待在Condition上的線程
* } finally {
* pauseLock.unlock();
* }
* }
* }}</pre>
*
* @since 1.5
* @author Doug Lea
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
}
總結(jié):類注釋和變量注釋都描述的很清楚了(過一遍注釋就都清楚了)。
- 是一個(gè)用線程池實(shí)現(xiàn)的ExecutorService。
- 通過Executors可以方便的構(gòu)造不同類型的線程池(無界隊(duì)列的)。
- 新任務(wù)提交時(shí),根據(jù)當(dāng)前線程數(shù)來決定是創(chuàng)建新線程還是加入隊(duì)列。
- 無法創(chuàng)建線程隊(duì)列也滿了,則執(zhí)行拒絕策略。
- 提供了一些鉤子方法供子類使用。
二:核心內(nèi)部變量
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* 主池控制狀態(tài),ctl,是一個(gè)包含兩個(gè)概念字段的原子整數(shù):
* workerCount, 指示有效線程數(shù)
* runState, 只是是否正在運(yùn)行、關(guān)閉等。
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
* 為了將他們打包進(jìn)一個(gè)int,我們限制workerCount為(2^29)-1 (大約5億)個(gè)線程而不是(2^31)-1(20億)個(gè)。
* 如果這在將來成為問題,可以將變量更改為AtomicLong,并調(diào)整下面的移位/掩碼常數(shù)。
* 但是,在需要之前,使用int代碼會(huì)更快、更簡單一些。
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
* workCount是允許啟動(dòng)和不允許停止的worker數(shù)量。
* 該值可能瞬間的不同于活動(dòng)線程的實(shí)際數(shù)量,例如,當(dāng)ThreadFactory在被請(qǐng)求時(shí)無法創(chuàng)建線程,
* 并且退出的線程在終止之前仍在執(zhí)行簿記時(shí)。用戶可見池大小將報(bào)告為worker集合的當(dāng)前大小。
*
* The runState provides the main lifecycle control, taking on values:
* runState提供主要的生命周期控制,具有以下值:
*
* RUNNING: Accept new tasks and process queued tasks
* 接受新任務(wù)并處理排隊(duì)的任務(wù)
*
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* 不接受新任務(wù),但是執(zhí)行排隊(duì)的任務(wù)
*
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* 不接受新任務(wù),不執(zhí)行排隊(duì)的任務(wù),并且中斷正在執(zhí)行的任務(wù)。
*
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* 所有任務(wù)已經(jīng)結(jié)束,workCount為0,轉(zhuǎn)換到狀態(tài)TIDYING的線程將運(yùn)行terminated()鉤子方法
*
* TERMINATED: terminated() has completed
* terminated()已完成
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 這些值之間的數(shù)字順序很重要,以便進(jìn)行有序比較。
* 運(yùn)行狀態(tài)隨時(shí)間單調(diào)增加,但不需要達(dá)到每個(gè)狀態(tài)。這些轉(zhuǎn)變是:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* 調(diào)用shutdown()時(shí),可能隱式地在finalize()中
*
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* 調(diào)用shutdownNow()時(shí)
*
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* 當(dāng)隊(duì)列和池都是空的時(shí)候
*
* STOP -> TIDYING
* When pool is empty
* 當(dāng)池是空的時(shí)候
*
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
* 當(dāng)terminated()鉤子方法執(zhí)行完時(shí)
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
* 在awaitTermination()中等待的線程將在狀態(tài)達(dá)到TERMINATED時(shí)返回。
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
* 檢測從SHUTDOWN到TIDYING的轉(zhuǎn)換并不像您希望的那樣簡單,
* 因?yàn)殛?duì)列在非空之后可能變?yōu)榭?,而在SHUTDOWN狀態(tài)期間,隊(duì)列也可能變?yōu)榭眨? * 我們看到workerCount為0(有時(shí)需要重新檢查——見下文)。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//實(shí)際最大線程數(shù),用戶指定的最大線程數(shù)不會(huì)超過此值。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 五個(gè)線程池狀態(tài),留意一下大小關(guān)系
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取當(dāng)前線程池狀態(tài),從ctl中解析出來。
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取當(dāng)前worker數(shù)量,即線程數(shù)量,也是通過ctl解析出來。
private static int workerCountOf(int c) { return c & CAPACITY; }
//將狀態(tài)和worker數(shù)量兩個(gè)屬性組成一個(gè)ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用于判斷當(dāng)前線程池是否是運(yùn)行狀態(tài),只有RUNNING的值小于SHUTDOWN
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
* 用于保存任務(wù)并將其傳遞給工作線程的隊(duì)列。
* 我們不要求workQueue.poll()返回null必然意味著workQueue.isEmpty()為空,
* 因此,僅依靠isEmpty來查看隊(duì)列是否為空(例如,在決定是否從關(guān)閉轉(zhuǎn)換為清理時(shí),我們必須這樣做)。
* 這適用于特殊用途的隊(duì)列,例如允許poll()返回null的DelayQueues,
* 即使延遲過期后poll()可能返回非null。
*
* 注意:這里是一個(gè)BlockingQueue,所以一個(gè)線程沒有任務(wù)執(zhí)行的時(shí)候,是通過poll、take阻塞在該queue上的。
*
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
* 鎖定workers集合和相關(guān)簿記的訪問權(quán)限。
* 雖然我們可以使用某種類型的并發(fā)集,但事實(shí)證明,通常最好使用鎖。
* 其中一個(gè)原因是它序列化了中斷的workers???,從而避免了不必要的中斷風(fēng)暴,尤其是在關(guān)閉期間。
* 否則,退出的線程將同時(shí)中斷那些尚未中斷的線程。
* 它還簡化了一些與largestPoolSize等相關(guān)的統(tǒng)計(jì)簿記。
* 我們還在shutdown和shutdownNow時(shí)保持mainLock,以確保workers設(shè)置穩(wěn)定,
* 同時(shí)分別檢查是否允許中斷和實(shí)際中斷。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 包含池中所有worker線程的集合。僅在持有mainLock時(shí)訪問。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
* 支持awaitTermination方法的一個(gè)Condition
* 就是你想在線程池終止結(jié)束后被通知,可以通過awaitTermination方法await到該Condition上,
* 等線程池終止完成后,會(huì)進(jìn)行terminal.signalAll()進(jìn)行喚醒。
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
* 跟蹤達(dá)到的最大池大小。僅在mainLock下訪問。
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
* 已完成任務(wù)的計(jì)數(shù)器。僅在工作線程終止時(shí)更新。僅在mainLock下訪問。
*/
private long completedTaskCount;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
* 所有用戶控制參數(shù)都聲明為volatile,因此正在進(jìn)行的操作基于最新的值,
* 但不需要鎖定,因?yàn)闆]有內(nèi)部不變數(shù)據(jù)依賴于它們相對(duì)于其他操作同步更改。
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
* 新線程的工廠。所有線程通過這個(gè)工廠來創(chuàng)建(通過addWorker方法)。
* 所有調(diào)用者都必須為addWorker失敗做好準(zhǔn)備,這可能反映了系統(tǒng)或用戶限制線程數(shù)量的策略。
* 即使未將其視為錯(cuò)誤,創(chuàng)建線程的失敗也可能會(huì)導(dǎo)致新任務(wù)被拒絕,或現(xiàn)有任務(wù)仍卡在隊(duì)列中。
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
* 我們更進(jìn)一步,保留池不變量,即使在遇到錯(cuò)誤(例如OutOfMemoryError)時(shí)也是如此,
* 這些錯(cuò)誤可能在嘗試創(chuàng)建線程時(shí)拋出。由于需要在Thread.start中分配本機(jī)堆棧,
* 因此此類錯(cuò)誤非常常見,用戶需要執(zhí)行清理池關(guān)閉以進(jìn)行清理。
* 可能會(huì)有足夠的內(nèi)存來完成清理代碼,而不會(huì)遇到另一個(gè)OutOfMemoryError。
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
* execute中在飽和或關(guān)閉時(shí)調(diào)用的處理程序。
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
* 空閑線程等待工作的超時(shí)時(shí)間(納秒)。
* 當(dāng)目前存在線程多于corePoolSize或allowCoreThreadTimeOut(允許核心線程超時(shí))時(shí),線程使用此超時(shí)。
* 否則,他們將永遠(yuǎn)等待新的工作。
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
* 如果是false(默認(rèn)值),核心線程即使在空閑時(shí)也保持活著。
* 如果是true,核心線程在等待工作時(shí)使用keepAliveTime來進(jìn)行超時(shí)。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
* 核心線程數(shù)是保持存活狀態(tài)(不允許超時(shí)等)的最小工作線程數(shù),
* 除非設(shè)置了allowCoreThreadTimeOut,在這種情況下,最小值為零。
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
* 最大池大小。請(qǐng)注意,實(shí)際最大值在內(nèi)部由CAPACITY限制。
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
* 默認(rèn)被拒絕的執(zhí)行程序。
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
總結(jié):有以下核心變量
- ctl(AtomicInteger):線程池的主要狀態(tài)變量,封裝了運(yùn)行狀態(tài)和當(dāng)前線程數(shù)兩個(gè)值。
通過CAS操作更改。 - workQueue(BlockingQueue<Runnable>):阻塞的任務(wù)隊(duì)列,用于保存未執(zhí)行的任務(wù),
也用來hold住空閑的worker線程,通過take、poll方法。 - workers(HashSet<Worker>):Worker線程集合,存著所有創(chuàng)建出來的未終止的worker。
僅在持有mainLock時(shí)訪問。 - mainLock(ReentrantLock):各種狀態(tài)變更、workers集合變更時(shí)控制多線程并發(fā)導(dǎo)致錯(cuò)誤。
- termination(Condition): 一個(gè)Condition,用于終止時(shí)通過termination.signalAll()通知其它線程。
- largestPoolSize(ing):用于記錄達(dá)到的最大池大小。僅在mainLock下訪問。
- completedTaskCount(long):用于記錄已完成的任務(wù)數(shù)量。僅在工作線程終止時(shí)更新。僅在mainLock下訪問。
- threadFactory(ThreadFactory): 用于再創(chuàng)建worker時(shí)創(chuàng)建新的線程。
- handler(RejectedExecutionHandler):用于處理拒絕邏輯。
- keepAliveTime(long):空閑線程存活時(shí)間。
- allowCoreThreadTimeOut(boolean):表示核心線程是否允許超時(shí)終止。
- corePoolSize:核心線程數(shù)的最大值。
- maximumPoolSize:線程池的最大值,實(shí)際最大值不會(huì)超過(1 << 29) - 1
- 狀態(tài)的變化:
- RUNNING -> SHUTDOWN 調(diào)用shutdown時(shí)
- (RUNNING or SHUTDOWN) -> STOP 調(diào)用shutdownNow時(shí)
- SHUTDOWN -> TIDYING 調(diào)用shutdown后,當(dāng)隊(duì)列和池都為空的時(shí)候
- STOP -> TIDYING 調(diào)用shutdownNow后,當(dāng)池為空的時(shí)候
- TIDYING -> TERMINATED 當(dāng)terminated()鉤子方法執(zhí)行完時(shí)
三:內(nèi)部類Worker
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
* Worker類主要維護(hù)運(yùn)行任務(wù)的線程的中斷控制狀態(tài),以及其他次要的簿記。
* 此類適時(shí)地?cái)U(kuò)展AbstractQueuedSynchronizer,以簡化獲取和釋放圍繞每個(gè)任務(wù)執(zhí)行的鎖的過程。
* 這可以防止旨在喚醒等待任務(wù)的工作線程的中斷,而不是中斷正在運(yùn)行的任務(wù)。
* 我們實(shí)現(xiàn)了一個(gè)簡單的不可重入互斥鎖,而不是使用可重入鎖(ReentrantLock),
* 因?yàn)槲覀儾幌Mぷ魅蝿?wù)在調(diào)用諸如setCorePoolSize之類的池控制方法時(shí)能夠重新獲取鎖。
* 此外,為了在線程實(shí)際開始運(yùn)行任務(wù)之前抑制中斷,我們將鎖定狀態(tài)初始化為負(fù)值,
* 并在啟動(dòng)時(shí)清除它(在runWorker中)。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
// 用來運(yùn)行此worker的線程,如果工廠失敗則為null。
final Thread thread;
/** Initial task to run. Possibly null. */
//要運(yùn)行的初始任務(wù)??赡転閚ull。
Runnable firstTask;
/** Per-thread task counter */
//每個(gè)線程完成的任務(wù)計(jì)數(shù)器
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// inhibit interrupts until runWorker
// 在運(yùn)行worker之前阻止中斷,咋阻止呢?
// 先簡單說下就是中斷前會(huì)先來lock,lock成功才會(huì)中斷,這時(shí)state時(shí)-1,所以lock不會(huì)成功。
// start之后,lock會(huì)變?yōu)?,這時(shí)候就可以中斷了。
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//將主運(yùn)行循環(huán)委托給外部runWorker方法
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// 0表示未鎖定狀態(tài)
// The value 1 represents the locked state.
// 1表示鎖定狀態(tài)
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
//如果已經(jīng)啟動(dòng)了,則中斷,state>=0,已經(jīng)不是初始狀態(tài)-1,表示已經(jīng)啟動(dòng)
//該方法是在shutdownNow是被調(diào)用的
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
總結(jié):
- Worker是線程池中一個(gè)線程的封裝類,一個(gè)worker就代表一個(gè)線程。
- worker是AQS的實(shí)現(xiàn)類,主要是用來表示是否正在執(zhí)行任務(wù),鎖定狀態(tài)表示正在執(zhí)行任務(wù)。
- worker的初始狀態(tài)為-1,表示還未啟動(dòng),所以lock(執(zhí)行任務(wù))之前,需要現(xiàn)unlock再lock。
- 任務(wù)的執(zhí)行邏輯不再worker中,被委托到了runWorker方法。
四:用于擴(kuò)展的鉤子方法
public class ThreadPoolExecutor extends AbstractExecutorService {
/* Extension hooks */
//擴(kuò)展的鉤子
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread {@code t} that
* will execute task {@code r}, and may be used to re-initialize
* ThreadLocals, or to perform logging.
* 在給定線程中執(zhí)行給定Runnable之前調(diào)用的方法。
* 此方法由執(zhí)行任務(wù)r的線程t調(diào)用,可用于重新初始化ThreadLocals或執(zhí)行日志記錄
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.beforeExecute} at the end of
* this method.
* 此實(shí)現(xiàn)不做任何事情,但可以在子類中進(jìn)行自定義。
* 注意:為了正確嵌套多個(gè)重寫,子類通常應(yīng)該在這個(gè)方法的末尾調(diào)用super.beforeExecute。
*
* @param t the thread that will run task {@code r}
* @param r the task that will be executed
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught {@code RuntimeException}
* or {@code Error} that caused execution to terminate abruptly.
* 在完成給定Runnable的執(zhí)行時(shí)調(diào)用的方法。此方法由執(zhí)行任務(wù)的線程調(diào)用。
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.afterExecute} at the
* beginning of this method.
* 此實(shí)現(xiàn)不做任何事情,但可以在子類中進(jìn)行自定義。
* 注意:為了正確嵌套多個(gè)重寫,子類通常應(yīng)該在這個(gè)方法的開始處調(diào)用super.afterExecute
*
*
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception if a task has been aborted:
* 當(dāng)動(dòng)作顯式或通過submit等方法包含在任務(wù)(如FutureTask)中時(shí),
* 這些任務(wù)對(duì)象捕獲并維護(hù)計(jì)算異常,因此它們不會(huì)導(dǎo)致突然終止,
* 并且內(nèi)部異常不會(huì)傳遞給此方法。
* 如果您想在這個(gè)方法中捕獲這兩種失敗,您可以進(jìn)一步探測這種情況,
* 例如在這個(gè)示例子類中,如果任務(wù)已中止,則打印直接原因或底層異常:
*
* <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null && r instanceof Future<?>) {
* try {
* Object result = ((Future<?>) r).get();
* } catch (CancellationException ce) {
* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* Thread.currentThread().interrupt(); // ignore/reset
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}</pre>
*
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
* 當(dāng)執(zhí)行器終止時(shí)調(diào)用的方法。默認(rèn)實(shí)現(xiàn)什么都不做。
* 注意:為了正確嵌套多個(gè)重寫,子類通常應(yīng)該在這個(gè)方法中調(diào)用super.terminated。
*/
protected void terminated() { }
}
總結(jié):提供了三個(gè)子類可以擴(kuò)展的鉤子方法
- beforeExecute 執(zhí)行任務(wù)之前調(diào)用
- afterExecute 執(zhí)行任務(wù)之后調(diào)用
- terminated 終止時(shí)調(diào)用(TIDYING和TERMINATED狀態(tài)之間調(diào)用)