構(gòu)建服務(wù)器應(yīng)用程序的有效方法 — 線程池
為什么使用線程池?
每次通過(guò) new Thread 創(chuàng)建線程并不是一種好的方式,每次 new Thread 新建和銷毀對(duì)象性能較差,線程缺乏統(tǒng)一管理,可能無(wú)限制新建線程,相互之間競(jìng)爭(zhēng)、占用過(guò)多資源導(dǎo)致死鎖,并且缺乏定時(shí)執(zhí)行、定期執(zhí)行、線程中斷等功能。
Java 提供了 4 種線程池,能夠有效地管理、調(diào)度線程,避免過(guò)多的資源損耗。它的優(yōu)點(diǎn)如下:
1. 重用存在的線程,減少對(duì)象創(chuàng)建、銷毀的開(kāi)銷;
2. 可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率,同時(shí)避免過(guò)多資源競(jìng)爭(zhēng),避免堵塞;
3. 提供定時(shí)執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
線程池原理簡(jiǎn)單地解釋就是:會(huì)創(chuàng)建多個(gè)線程并且進(jìn)行管理,提交給線程的任務(wù)會(huì)被線程池指派給其中的線程進(jìn)行執(zhí)行,通過(guò)線程池的統(tǒng)一調(diào)度、管理使得多線程的使用更簡(jiǎn)單、高效。

線程池都實(shí)現(xiàn)了 ExecutorService 接口,該接口定義了線程池需要實(shí)現(xiàn)的接口,如 submit、execute、shutdown 等。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable command);
}
兩種常用的線程池實(shí)現(xiàn):
ThreadPoolExecutor,也就是我們用最多的線程池實(shí)現(xiàn);
ScheduledThreadPoolExecutor,則用于周期性地執(zhí)行任務(wù)。
我們一般都不會(huì)通過(guò) new 的形式來(lái)創(chuàng)建線程池,因?yàn)閯?chuàng)建參數(shù)過(guò)程相對(duì)復(fù)雜,所以,JDK 提供了一個(gè) Executors 工廠類來(lái)簡(jiǎn)化這個(gè)過(guò)程。下面分別介紹 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的使用。
1. 啟動(dòng)指定數(shù)量的線程 — ThreadPoolExecutor
ThreadPoolExecutor 是線程池的實(shí)現(xiàn)之一,它的功能是啟動(dòng)指定數(shù)量的線程以及將任務(wù)添加到一個(gè)隊(duì)列中,并且將任務(wù)分發(fā)給空閑的線程。
ExecutorService 的生命周期包括 3 種狀態(tài):運(yùn)行、關(guān)閉、終止。創(chuàng)建后便進(jìn)入運(yùn)行狀態(tài),當(dāng)調(diào)用 shutdown() 方法時(shí)便進(jìn)入關(guān)閉狀態(tài),此時(shí) ExecutorService 不再接受新的任務(wù),但它還在執(zhí)行已經(jīng)提交了的任務(wù)。當(dāng)所有已經(jīng)提交了的任務(wù)執(zhí)行完后,就變成終止?fàn)顟B(tài)。
ThreadPoolExecutor 構(gòu)造函數(shù)如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
| 參數(shù)名 | 作用 |
|---|---|
| corePoolSize | 線程池中所保存的核心線程數(shù)。 |
| maximumPoolSize | 線程池允許創(chuàng)建的最大線程數(shù)。 |
| keepAliveTime | 當(dāng)前線程池線程總數(shù)大于核心線程數(shù)時(shí),終止多余的空閑線程的時(shí)間。 |
| unit | keepAliveTime 參數(shù)的時(shí)間單位,可選值有毫秒、秒、分等。 |
| workQueue | 任務(wù)隊(duì)列,如果當(dāng)前線程池達(dá)到核心線程數(shù) corePoolSize,且當(dāng)前所有線程都處于活動(dòng)狀態(tài)時(shí),則將新加入的任務(wù)放到此隊(duì)列中。 |
| threadFactory | 線程工廠,讓用戶可以定制線程的創(chuàng)建過(guò)程,通常不需設(shè)置。 |
| Handler | 拒絕策略,當(dāng)線程池與 workQueue 隊(duì)列都滿了的情況下,對(duì)新加任務(wù)采取的處理策略。 |
線程池啟動(dòng)后默認(rèn)是空的,只有任務(wù)來(lái)臨才會(huì)創(chuàng)建線程以處理請(qǐng)求。
prestartAllCoreThreads()方法可以讓線程池啟動(dòng)后立即啟動(dòng)所有核心線程以等待任務(wù)。任務(wù)數(shù)量小于 corePoolSize,則立即創(chuàng)建新線程來(lái)處理任務(wù);
任務(wù)數(shù)量大于 corePoolSize,但小于 maximumPoolSize,則將任務(wù)放進(jìn) workQueue,當(dāng)阻塞隊(duì)列滿時(shí)才創(chuàng)建新線程;
如果 corePoolSize 與 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。
workQueue 有下列幾個(gè)常用實(shí)現(xiàn):
ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界隊(duì)列,此隊(duì)列按 FIFO 原則對(duì)任務(wù)進(jìn)行排序。如果隊(duì)列滿了還有任務(wù)進(jìn)來(lái),則調(diào)用拒絕策略。
LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的無(wú)界隊(duì)列,此隊(duì)列按 FIFO 原則對(duì)任務(wù)進(jìn)行排序。因?yàn)樗菬o(wú)界的,所以不會(huì)滿,采用此隊(duì)列后線程池將忽略拒絕策略(handler)參數(shù),同時(shí)忽略最大線程數(shù) maximumPoolSize 等參數(shù)。
SynchronousQueue:直接將任務(wù)提交給線程而不是將它加入到隊(duì)列,實(shí)際上此隊(duì)列是空的。如果新任務(wù)來(lái)了線程池沒(méi)有任何可用線程處理的話,則調(diào)用拒絕策略。其實(shí)要是把 maximumPoolSize 設(shè)置成無(wú)界(integer.MAX_VALUE),加上 SynchronousQueue 隊(duì)列,就等同于 Executors.newCachedThreadPool()。
PriorityBlockingQueue:具有優(yōu)先級(jí)的隊(duì)列的有界隊(duì)列,可以自定義優(yōu)先級(jí),默認(rèn)是按自然排序,可能很多場(chǎng)合并不適合。
當(dāng)線程池與 workQueue 隊(duì)列都滿了的情況下,對(duì)新加任務(wù)采取的處理策略有如下四個(gè)默認(rèn)實(shí)現(xiàn):
AbortPolicy:拒絕任務(wù),拋出 RejectedExecutionException 異常。線程池的默認(rèn)策略。
CallerRunsPolicy:用于被拒絕任務(wù)的處理程序,它直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。
DiscardOldestPolicy:如果線程池尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過(guò)程)。這樣的結(jié)果是最后加入的任務(wù)反而有可能被執(zhí)行,先前加入的都被拋棄了。
DiscardPolicy:加不進(jìn)的任務(wù)都被拋棄了,同時(shí)沒(méi)有異常拋出。
1.1 newFixedThreadPool(int size)
對(duì)與 Android 平臺(tái)來(lái)說(shuō),由于資源有限,最常使用的就是通過(guò) Executors.newFixedThreadPool(int size) 函數(shù)來(lái)啟動(dòng)固定數(shù)量的線程池:
public class ExecutorDemo {
// 任務(wù)數(shù)量
private static final int MAX = 10;
public static void fixedThreadPool(int size) throws ExecutionException, InterruptedException {
// 創(chuàng)建固定數(shù)量的線程池
ExecutorService executorService = Executors.newFixedThreadPool(size);
for (int i = 1; i <= MAX; i++) {
// 提交任務(wù)
Future<Integer> task = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("執(zhí)行線程:" + Thread.currentThread().getName());
return fibc(40);
}
});
System.out.println("第 " + i + " 次計(jì)算,結(jié)果:" + task.get());
}
}
private static int fibc(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fibc(n - 1) + fibc(n - 2);
}
}
newFixedThreadPool(int nThreads) 的實(shí)現(xiàn):
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
在該函數(shù)中,會(huì)調(diào)用 ThreadPoolExecutor 的構(gòu)造函數(shù),設(shè)置它的 corePoolSize 和 maximumPoolSize 值都是 nThreads,并且設(shè)置 keepAliveTime 參數(shù)為 0 毫秒,最后設(shè)置無(wú)界任務(wù)隊(duì)列。這樣該線程池就含有固定個(gè)數(shù)的線程,并且能容納無(wú)限個(gè)任務(wù)。
輸出結(jié)果如下:
執(zhí)行線程:pool-3-thread-1
第 1 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 2 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-3
第 3 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-1
第 4 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 5 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-3
第 6 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-1
第 7 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 8 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-3
第 9 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-1
第 10 次計(jì)算,結(jié)果:102334155
1.2 newCachedThreadPool()
線程越多,并發(fā)量越大,然而占用的內(nèi)存也就越大,指定過(guò)大的線程數(shù)量并不可取。因此,我們可能需要一種場(chǎng)景,如果來(lái)了一個(gè)新的任務(wù),并且沒(méi)有空閑線程可用,此時(shí)必須馬上創(chuàng)建一個(gè)線程來(lái)立即執(zhí)行任務(wù)。這時(shí)就可以通過(guò) Executors 的 newCachedThreadPool 函數(shù)來(lái)實(shí)現(xiàn)。
// 創(chuàng)建線程池
public static void cachedThreadPool() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 1; i <= MAX; i++) {
// 提交任務(wù)
Future<Integer> task = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("執(zhí)行線程:" + Thread.currentThread().getName());
return fibc(40);
}
});
System.out.println("第 " + i + " 次計(jì)算,結(jié)果:" + task.get());
}
}
newCachedThreadPool() 函數(shù)實(shí)現(xiàn):
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,newCachedThreadPool 函數(shù)不需傳入線程的數(shù)量。它會(huì)調(diào)用 ThreadPoolExecutor 的構(gòu)造函數(shù),設(shè)置它的
maximumPoolSize 值為無(wú)界值(Integer.MAX_VALUE),并且設(shè)置 keepAliveTime 參數(shù)為 60 秒,最后設(shè)置 SynchronousQueue
任務(wù)隊(duì)列。這樣就可以適應(yīng)任意數(shù)量的并發(fā)任務(wù)。線程池為每個(gè)任務(wù)都創(chuàng)建了 1 個(gè)線程,當(dāng)然這是在沒(méi)有線程空閑的情況下才會(huì)創(chuàng)建新的線程。若一個(gè)線程中的任務(wù)已經(jīng)做完了,那么這個(gè)線程可以為未被執(zhí)行的任務(wù)提供執(zhí)行。
輸出結(jié)果如下:
執(zhí)行線程:pool-3-thread-1
第 1 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 2 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 3 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 4 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 5 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 6 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 7 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 8 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 9 次計(jì)算,結(jié)果:102334155
執(zhí)行線程:pool-3-thread-2
第 10 次計(jì)算,結(jié)果:102334155
2. 定時(shí)執(zhí)行一些任務(wù) — ScheduledThreadPoolExecutor
當(dāng)我們需要定時(shí)執(zhí)行一些任務(wù),可以通過(guò) ScheduledThreadPoolExecutor 來(lái)實(shí)現(xiàn)。通過(guò) Executors 的 newScheduledThreadPool 函數(shù)可以很方便地創(chuàng)建定時(shí)執(zhí)行任務(wù)的線程池。
下面是一個(gè)例子:
public class ScheduledThreadPoolDemo {
public static void scheduledThreadPool() {
// 創(chuàng)建定時(shí)執(zhí)行的線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
// 參數(shù) 2 為第一次執(zhí)行任務(wù)延遲的時(shí)間,
// 意思就是第一次調(diào)度開(kāi)始時(shí)間點(diǎn)=當(dāng)前時(shí)間 + initialDelay
// 參數(shù) 3 為執(zhí)行周期
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName() + ",定時(shí)計(jì)算1:");
System.out.println("結(jié)果:" + fibc(30));
}
}, 1, 2, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("Thread: " + Thread.currentThread().getName() + ",定時(shí)計(jì)算2:");
System.out.println("結(jié)果:" + fibc(40));
}
}, 1, 2, TimeUnit.SECONDS);
}
private static int fibc(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fibc(n - 1) + fibc(n - 2);
}
}
public class Main {
public static void main(String[] args) {
// write your code here
ScheduledThreadPoolDemo.scheduledThreadPool();
}
該線程池有 3 個(gè)線程,我們指定了 2 個(gè)定時(shí)任務(wù),因此,該線程池有兩個(gè)線程來(lái)定時(shí)完成任務(wù)。
scheduleAtFixedRate() 函數(shù)的實(shí)現(xiàn):
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
scheduleAtFixedRate() 函數(shù)就是設(shè)置定時(shí)任務(wù)的方法,參數(shù) 1 是要執(zhí)行的任務(wù),參數(shù) 2 是第一次運(yùn)行任務(wù)時(shí)延遲時(shí)間(第一次調(diào)度開(kāi)始時(shí)間點(diǎn) = 當(dāng)前時(shí)間 + initialDelay ),參數(shù) 3 是定時(shí)任務(wù)的周期(兩次任務(wù)調(diào)度的間隔時(shí)間),參數(shù) 4 是時(shí)間單元,這里設(shè)置為秒。
部分輸出結(jié)果:
Thread: pool-2-thread-1,定時(shí)計(jì)算1:
Thread: pool-2-thread-2,定時(shí)計(jì)算2:
結(jié)果:832040
結(jié)果:102334155
Thread: pool-2-thread-1,定時(shí)計(jì)算1:
Thread: pool-2-thread-3,定時(shí)計(jì)算2:
結(jié)果:832040
結(jié)果:102334155