
Java線程 一般采用new Thread().start();的方式開啟一個(gè)新的線程池實(shí)例。但是這樣的會(huì)無(wú)節(jié)制的創(chuàng)建線程、回收線程,造成頻繁的GC。
線程池的由來(lái)就是為了解決此問(wèn)題的。線程池旨在線程的復(fù)用,這就可以節(jié)約我們用以往的方式創(chuàng)建線程和銷毀所消耗的時(shí)間,減少線程頻繁調(diào)度的開銷,從而節(jié)約系統(tǒng)資源,提高系統(tǒng)吞吐量。
在Java線程池概念中,ExecutorService,它是一個(gè)接口,其實(shí)如果要從真正意義上來(lái)說(shuō),它可以叫做線程池的服務(wù),因?yàn)樗峁┝吮姸嘟涌赼pi來(lái)控制線程池中的線程,而真正意義上的線程池就是:ThreadPoolExecutor,它實(shí)現(xiàn)了ExecutorService接口,并封裝了一系列的api使得它具有線程池的特性,其中包括工作隊(duì)列、核心線程數(shù)、最大線程數(shù)等。
ThreadPoolExecutor
ThreadPoolExecutor參數(shù)
- int corePoolSize ——線程池中核心線程的數(shù)量。
- int maximumPoolSize ——線程池中最大線程數(shù)量。
- long keepAliveTime——非核心線程的超時(shí)時(shí)長(zhǎng),當(dāng)系統(tǒng)中非核心線程閑置時(shí)間超過(guò)keepAliveTime之后,則會(huì)被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut屬性設(shè)置為true,則該參數(shù)也表示核心線程的超時(shí)時(shí)長(zhǎng)。
- TimeUnit unit ——時(shí)間單位,有納秒、微秒、毫秒、秒、分、時(shí)、天等。
- BlockingQueue< Runnable > workQueue ——線程池中的任務(wù)隊(duì)列,該隊(duì)列主要用來(lái)存儲(chǔ)已經(jīng)被提交但是尚未執(zhí)行的任務(wù)。
- ThreadFactory threadFactory —— 線程工廠,為了給線程池提供創(chuàng)建線程的功能。
- RejectedExecutionHandler handler——拒絕策略,當(dāng)線程無(wú)法執(zhí)行新任務(wù)時(shí)(一般是由于線程池中的線程數(shù)量已經(jīng)達(dá)到最大數(shù)或者線程池關(guān)閉導(dǎo)致的),默認(rèn)情況下,當(dāng)線程池?zé)o法處理新線程時(shí),會(huì)拋出一個(gè)RejectedExecutionException。
任務(wù)隊(duì)列
用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,可以選擇以下幾個(gè):
- ArrayBlockingQueue:基于數(shù)組的阻塞隊(duì)列,按照FIFO原則進(jìn)行排序。
- LinkedBlockingQueue:基于鏈表的阻塞隊(duì)列,按照FIFO原則對(duì)元素進(jìn)行排序。
- SynchronousQueue:一個(gè)不儲(chǔ)存元素的阻塞隊(duì)列,每一個(gè)插入操作必須等到另外一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。
- PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列。
任務(wù)拒絕策略
- ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
- ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
ThreadPoolExecutor工作規(guī)則
- 如果線程池中的線程數(shù)未達(dá)到核心線程數(shù),則會(huì)立馬啟用一個(gè)核心線程去執(zhí)行。
- 如果線程池中的線程數(shù)已經(jīng)達(dá)到核心線程數(shù),且workQueue未滿,則將新線程放入workQueue中等待執(zhí)行。
- 如果線程池中的線程數(shù)已經(jīng)達(dá)到核心線程數(shù)且workQueue已滿,但未超過(guò)非核心線程數(shù),則開啟一個(gè)非核心線程來(lái)執(zhí)行任務(wù)。
- 如果線程池中的線程數(shù)已經(jīng)超過(guò)非核心線程數(shù),則拒絕執(zhí)行該任務(wù)。
注:核心線程、非核心線程和線程隊(duì)列,是三個(gè)概念哦,別混為一談。
ThreadPoolExecutor封裝案例
/**
* 類描述:線程池管理器
*
* @author jinzifu
* @Email jinzifu123@163.com
* @date 2017/12/7 1150
*/
public class ThreadPoolManager {
/**
* 核心線程的數(shù)量。當(dāng)前設(shè)備可用處理器核心數(shù)*2 + 1,能夠讓cpu的效率得到最大程度執(zhí)行。
*/
private int corePoolSize;
/**
* 最大線程數(shù)量,表示當(dāng)緩沖隊(duì)列滿的時(shí)候能繼續(xù)容納的等待任務(wù)的數(shù)量。
*/
private int maximumPoolSize;
/**
* 非核心線程的超時(shí)時(shí)長(zhǎng),當(dāng)系統(tǒng)中非核心線程閑置時(shí)間超過(guò)keepAliveTime之后,則會(huì)被回收。
* 如果ThreadPoolExecutor的allowCoreThreadTimeOut屬性設(shè)置為true,則該參數(shù)也表示核心線程的超時(shí)時(shí)長(zhǎng)。
*/
private long keepAliveTime = 1;
/**
* 時(shí)間單位,有納秒、微秒、毫秒、秒、分、時(shí)、天等。
*/
private TimeUnit timeUnit = TimeUnit.HOURS;
private ThreadPoolExecutor threadPoolExecutor;
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private ThreadPoolManager() {
init();
}
private static class SingleClass {
private static final ThreadPoolManager THREAD_POOL_MANAGER = new ThreadPoolManager();
}
/**
* 靜態(tài)內(nèi)部類單例模式
*
* @return
*/
public static ThreadPoolManager getInstance() {
return SingleClass.THREAD_POOL_MANAGER;
}
/**
* 配置線程池屬性
* 部分參考AsyncTask的配置設(shè)計(jì)
*/
private void init() {
corePoolSize = CPU_COUNT + 1;
maximumPoolSize = CPU_COUNT * 2 + 1;
threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
timeUnit,
new LinkedBlockingDeque<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
/**
* 執(zhí)行任務(wù)
*
* @param runnable
*/
public void execute(Runnable runnable) {
if (runnable != null) {
threadPoolExecutor.execute(runnable);
}
}
/**
* 從線程池移除任務(wù)
*
* @param runnable
*/
public void remove(Runnable runnable) {
if (runnable != null) {
threadPoolExecutor.remove(runnable);
}
}
}
FixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
從方法注釋和參數(shù)配置來(lái)看,核心線程數(shù)和最大線程數(shù)相同(意味著沒有非核心線程,且線程池處于?;顮顟B(tài)以等待任務(wù),除非調(diào)用shutdown shutdown關(guān)閉線程隊(duì)列中的任務(wù)),超過(guò)核心線程數(shù)的任務(wù)都要進(jìn)入線程隊(duì)列中等待,當(dāng)核心線程處于閑置狀態(tài)時(shí)就繼續(xù)執(zhí)行隊(duì)列里的任務(wù)。線程池隊(duì)列是沒有參數(shù),說(shuō)明隊(duì)列長(zhǎng)度是默認(rèn)的Integer.MAX_VALUE(2的31次方減1)。
特點(diǎn):固定設(shè)置線程數(shù),快速響應(yīng)。
外部調(diào)用示例:
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(1000);
Log.d(TAG, "" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
SingleThreadExecutor
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
從方法注釋上看,和FixedThreadPool幾乎一樣,只不過(guò)該線程池的固定線程個(gè)數(shù)是1。這樣的好處是避免了線程間的同步問(wèn)題。
CachedThreadPool
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從方法注釋里看,該線程池沒有核心線程,所有非核心線程數(shù)就是最大線程數(shù)。由于最大線程數(shù)為無(wú)限大,所以每當(dāng)我們添加一個(gè)新任務(wù)進(jìn)來(lái)的時(shí)候,如果線程池中有空閑的線程,則由該空閑的線程執(zhí)行新任務(wù),如果沒有空閑線程,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)。根據(jù)CachedThreadPool的特點(diǎn),我們可以在有大量任務(wù)請(qǐng)求的時(shí)候使用CachedThreadPool,因?yàn)楫?dāng)CachedThreadPool中沒有新任務(wù)的時(shí)候,它里邊所有的線程都會(huì)因?yàn)槌瑫r(shí)(60秒)而被終止。
這里它使用了SynchronousQueue作為線程隊(duì)列。
特點(diǎn):CachedTreadPool一個(gè)最大的優(yōu)勢(shì)是它可以根據(jù)程序的運(yùn)行情況自動(dòng)來(lái)調(diào)整線程池中的線程數(shù)量。
ScheduledThreadPool
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
從方法注釋里可以看到,它的核心線程數(shù)量是固定的(我們?cè)跇?gòu)造的時(shí)候傳入的),但是非核心線程是無(wú)窮大,當(dāng)非核心線程閑置時(shí),則會(huì)被立即回收。
ScheduledExecutorService擴(kuò)展了ExecutorService接口,提供時(shí)間排程的功能。
public interface ScheduledExecutorService extends ExecutorService {
/**
* Creates and executes a one-shot action that becomes enabled
* after the given delay.
* 翻譯:創(chuàng)建并執(zhí)行在給定延遲后啟用的一次性操作。
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* Creates and executes a ScheduledFuture that becomes enabled after the
* given delay.
* 翻譯:創(chuàng)建并執(zhí)行在給定延遲后啟用的操作。
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the given
* period; that is, executions will commence after
* 參數(shù)initialDelay:表示第一次延時(shí)的時(shí)間
* 參數(shù)period:表示任務(wù)執(zhí)行開始后距下一個(gè)任務(wù)開始執(zhí)行的時(shí)間間隔。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* Creates and executes a periodic action that becomes enabled first
* after the given initial delay, and subsequently with the
* given delay between the termination of one execution and the
* commencement of the next.
* 參數(shù)initialDelay:表示第一次延時(shí)的時(shí)間
* 參數(shù)delay:表示任務(wù)執(zhí)行結(jié)束后距下一個(gè)任務(wù)開始執(zhí)行的時(shí)間間隔
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
情景代碼測(cè)試1:executorService.scheduleWithFixedDelay(new MyRunnable(5), 1, 3, TimeUnit.SECONDS);
class MyRunnable implements Runnable {
private int count;
final int[] i = {0};
public MyRunnable(int i) {
this.count = i;
}
@Override
public void run() {
try {
i[0] = i[0] + 1;
if (i[0] < count) {
Log.d(TAG, "開始第" + i[0] + "次打印");
Thread.sleep(1000);
Log.d(TAG, "結(jié)束第" + i[0] + "次打印");
} else {
Log.d(TAG, "停止延時(shí)執(zhí)行...");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Log日志如下:
12-07 14:24:41.556 9263-9263 D/ThreadFragment: 開始延時(shí)執(zhí)行...
12-07 14:24:42.557 9263-9353 D/ThreadFragment: 開始第1次打印
12-07 14:24:43.559 9263-9353 D/ThreadFragment: 結(jié)束第1次打印
12-07 14:24:46.561 9263-9353 D/ThreadFragment: 開始第2次打印
12-07 14:24:47.562 9263-9353 D/ThreadFragment: 結(jié)束第2次打印
12-07 14:24:50.564 9263-9353 D/ThreadFragment: 開始第3次打印
12-07 14:24:51.567 9263-9353 D/ThreadFragment: 結(jié)束第3次打印
12-07 14:24:54.569 9263-9353 D/ThreadFragment: 開始第4次打印
12-07 14:24:55.570 9263-9353 D/ThreadFragment: 結(jié)束第4次打印
12-07 14:24:58.572 9263-9353 D/ThreadFragment: 停止延時(shí)執(zhí)行...
從log日志可以看出:
- 14:24:41到14:24:42是延遲1秒啟動(dòng)的。
- 結(jié)束第1次打印時(shí)間與開始第2次打印時(shí)間是相隔3秒,驗(yàn)證了每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲delay。
情景代碼測(cè)試2:executorService.scheduleAtFixedRate(new MyRunnable(5), 1, 3, TimeUnit.SECONDS);
Log日志如下(MyRunnable內(nèi)代碼一樣):
12-07 14:23:21.371 8012-8012 D/ThreadFragment: 開始延時(shí)執(zhí)行...
12-07 14:23:22.373 8012-8115 D/ThreadFragment: 開始第1次打印
12-07 14:23:23.374 8012-8115 D/ThreadFragment: 結(jié)束第1次打印
12-07 14:23:25.373 8012-8115 D/ThreadFragment: 開始第2次打印
12-07 14:23:26.375 8012-8115 D/ThreadFragment: 結(jié)束第2次打印
12-07 14:23:28.374 8012-8115 D/ThreadFragment: 開始第3次打印
12-07 14:23:29.375 8012-8115 D/ThreadFragment: 結(jié)束第3次打印
12-07 14:23:31.374 8012-8115 D/ThreadFragment: 開始第4次打印
12-07 14:23:32.376 8012-8115 D/ThreadFragment: 結(jié)束第4次打印
12-07 14:23:34.372 8012-8115 D/ThreadFragment: 停止延時(shí)執(zhí)行...
從log日志可以看出:
- 14:23:21到14:23:22是延遲1秒啟動(dòng)的。
- 開始第1次打印時(shí)間與開始第2次打印時(shí)間是period = 3秒。
注意:通過(guò)ScheduledExecutorService執(zhí)行的周期任務(wù),如果任務(wù)執(zhí)行過(guò)程中拋出了異常,那么過(guò)ScheduledExecutorService就會(huì)停止執(zhí)行任務(wù),且也不會(huì)再周期地執(zhí)行該任務(wù)了。所以你如果想保住任務(wù)都一直被周期執(zhí)行,那么catch一切可能的異常。