本篇文章通過服務(wù)器通信和頁面渲染兩個功能的實(shí)現(xiàn)來加深多線程中Future和Executor的理解。
服務(wù)器通信
串行執(zhí)行任務(wù)
任務(wù)執(zhí)行最簡單的策略就是在單線程中串行執(zhí)行各項(xiàng)任務(wù),并不會涉及多線程。
以創(chuàng)建通訊服務(wù)為例,我們可以這樣實(shí)現(xiàn)(很low)
@Test
public void singleThread() throws IOException {
ServerSocket serverSocket= new ServerSocket(8088);
while (true){
Socket conn = serverSocket.accept();
handleRequest(conn);
}
}
代碼很簡單,理論上沒什么毛病,但是實(shí)際使用中只能處理一個請求。但是當(dāng)處理任務(wù)很耗時(shí)并且在多次請求時(shí)會阻塞無法及時(shí)響應(yīng)。
由此可見串行處理機(jī)制通常都無法提供高吞吐率或快速響應(yīng)性。
顯式的為任務(wù)創(chuàng)建線程
串行執(zhí)行任務(wù)這么 low,我們來通過多線程來處理請求吧:當(dāng)接收到請求后創(chuàng)建新的線程去執(zhí)行任務(wù)。new Thread()應(yīng)該就能實(shí)現(xiàn)。
初級版本:
@Test
public void perThreadTask() throws IOException {
ServerSocket serverSocket = new ServerSocket(8088);
while (true) {
Socket conn = serverSocket.accept();
Runnable r = new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
};
new Thread(r).start();
}
}
微弱的優(yōu)點(diǎn)
- 對于每個請求,都創(chuàng)建了一個線程來處理,達(dá)到多線程并行效果
- 任務(wù)處理從主線程分離出來,使得主循環(huán)能更快的處理下一個請求
為每個任務(wù)分配一個線程存在一些缺陷,尤其當(dāng)需要創(chuàng)建大量的線程時(shí)
- 線程生命周期的開銷非常高。根據(jù)平臺的不同,實(shí)際的開銷也不同。但是線程的創(chuàng)建過程都會需要時(shí)間,并且需要 JVM 和操作系統(tǒng)提供一些輔助操作。
- 資源消耗?;钴S的線程會消耗系統(tǒng)資源,尤其是內(nèi)存。如果可運(yùn)行的線程數(shù)量多余可用處理器的數(shù)量,那么有些線程將閑置。大量閑置的線程會占用許多內(nèi)存,給垃圾回收器帶來壓力。如果你已經(jīng)擁有足夠多的線程使所有 CPU 保持忙碌狀態(tài),那么多余的線程反而會降低性能。
- 穩(wěn)定性。隨著平臺的不同,可創(chuàng)建線程數(shù)量的限制是不同的,并受多個因素制約,包括 JVM 的啟動參數(shù)、Thread 構(gòu)造函數(shù)中請求的棧大小,以及底層操作系統(tǒng)對線程的限制等。如果破壞了這些限制,很可能拋出 OOM 異常。
<h5>
上面兩種方式都存在一些問題:單線程串行的問題在于其糟糕的響應(yīng)性和吞吐量;而為每個任務(wù)分配線程的問題在于資源消耗和管理的復(fù)雜性。</h5>
<h5>
在 Java 類庫中,任務(wù)執(zhí)行的主要抽象不是 Thread,而是 Executor
</h5>
public interface Executor {
void execute(Runnable command);
}
Executor 框架
Executor 基于生產(chǎn)者-消費(fèi)者模式,提交任務(wù)的操作相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)者。

通訊優(yōu)化
對于以前的通訊服務(wù)我們可以用 Executor 進(jìn)一步優(yōu)化一下
@Test
public void limitExecutorTask() throws IOException {
final int nThreads = 100;
ExecutorService exec = Executors.newFixedThreadPool(nThreads);
ServerSocket serverSocket = new ServerSocket(8088);
while (true) {
Socket conn = serverSocket.accept();
Runnable r = new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
};
exec.execute(r);
}
}
線程池
線程池從字面來看時(shí)指管理一組同構(gòu)工作線程的資源池。它與工作隊(duì)列密切相關(guān),它在工作隊(duì)列中保存了所有等待執(zhí)行的任務(wù)。
線程池通過重用現(xiàn)有的線程而不是創(chuàng)建新線程,可以在處理多個請求時(shí)分?jǐn)傇诰€程創(chuàng)建和銷毀過程中產(chǎn)生的巨大開銷。另一個額外的好處是,當(dāng)請求到達(dá)時(shí),工作線程已經(jīng)存在,因此不會由于等待創(chuàng)建線程而延遲任務(wù)的執(zhí)行,挺高響應(yīng)性。
JAVA 類庫中提供了一個靈活的線程池以及一些有用的默認(rèn)配置??梢酝ㄟ^ Executors 中的靜態(tài)工廠方法來創(chuàng)建。
newFixedThreadPool將創(chuàng)建一個固定長度的線程池,每當(dāng)提交一個任務(wù)時(shí)就創(chuàng)建一個線程,直到達(dá)到線程的最大數(shù)量。newCacheedThreadPool將創(chuàng)建一個可緩存的線程池,如果線程池的當(dāng)前規(guī)模超過了處理需求時(shí),那么將回收空閑的線程,而當(dāng)需求增加時(shí),則可以添加新的線程,線程池的規(guī)模則不存在限制。newSingleThreadPool是一個單線程的 Executor,它創(chuàng)建單個工作者線程來執(zhí)行任務(wù),如果這個線程異常結(jié)束,會創(chuàng)建另一個線程來替代。newSingleThreadPool能確保依照任務(wù)在隊(duì)列中的順序來串行執(zhí)行。newScheduledThreadPool創(chuàng)建一個固定長度的線程池,而且以延遲或定時(shí)的方式來執(zhí)行任務(wù),類似于 Timer。
Executor 生命周期
為了解決執(zhí)行服務(wù)的聲明周期問題,Executor 擴(kuò)展了 ExecutorService接口,添加了一些用于管理生命周期的方法shutdown(),shutdownNow(),isShutdown(),isTerminated(),awaitTermination()。
ExecutorService的生命周期有3中狀態(tài):運(yùn)行、關(guān)閉和已終止。初始創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。
-
shutdown()方法將執(zhí)行平緩的關(guān)閉過程:不再接受新的任務(wù),同時(shí)等待已經(jīng)提交的任務(wù)執(zhí)行完成,包括那些還未開始執(zhí)行的任務(wù)。 -
shutdownNow()方法將執(zhí)行粗暴的關(guān)閉過程:它將嘗試取消所有運(yùn)行中任務(wù),并且不再啟動隊(duì)列中尚未開始執(zhí)行的任務(wù)。
等待所有任務(wù)完成后,ExecutorService將轉(zhuǎn)入終止?fàn)顟B(tài)??梢哉{(diào)用awaitTermination來等待到達(dá)終止?fàn)顟B(tài),或者通過isTerminated來輪詢是否已終止。
服務(wù)器通訊初步牛批版本
class LifecycleWebServer {
private ExecutorService exec;
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
Socket conn = socket.accept();
exec.execute(new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
});
}catch (RejectedExecutionException e){
if (!exec.isShutdown()){
System.out.println("task submission reject::"+e);
}
}
}
}
public void stop(){
exec.shutdown();
}
void handleRequest(Socket conn) {
Request req = readRequest(conn);
if(isShutdownRequest(req)){
stop();
}else {
dispatchRequest(req);
}
}
private void dispatchRequest(Request req) {
//......分發(fā)請求
}
private boolean isShutdownRequest(Request req) {
//......判斷是否是 shutdown 請求
}
private Request readRequest(Socket conn) {
//......解析請求
}
}
通過 ExecutorService 增加對任務(wù)生命周期的管理。
延遲任務(wù)與生命周期
Timer是作者使用較多的任務(wù)類,主要用來管理延遲任務(wù)以及周期任務(wù)。因?yàn)?Timer 本身還是存在一些缺陷:
-
Timer在執(zhí)行所有定時(shí)任務(wù)時(shí)只會創(chuàng)建一個線程。如果某個任務(wù)的執(zhí)行時(shí)間過長,那么將破壞其他TimerTask的定時(shí)精確性。public void timerTest() { Timer timer = new Timer(); System.out.println("Timer Test Start " +new Date()); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("001 working current " +new Date()); try { Thread.sleep(4*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("001 working current " +new Date()); } },1000); timer.schedule(new TimerTask() { @Override public void run() { try { Thread.sleep(1000); System.out.println("002 working current " +new Date()); Thread.sleep(1000); System.out.println("002 working current " +new Date()); Thread.sleep(1000); System.out.println("002 working current " +new Date()); Thread.sleep(1000); System.out.println("002 working current " +new Date()); } catch (InterruptedException e) { e.printStackTrace(); } } },2000); }
打印 log:
Timer Test Start Tue Dec 10 11:52:44 CST 2019
001 working current Tue Dec 10 11:52:45 CST 2019
001 working current Tue Dec 10 11:52:49 CST 2019
002 working current Tue Dec 10 11:52:50 CST 2019
002 working current Tue Dec 10 11:52:51 CST 2019
002 working current Tue Dec 10 11:52:52 CST 2019
002 working current Tue Dec 10 11:52:53 CST 2019
從時(shí)間戳上可以看出兩個 TimerTask 是串行執(zhí)行的。時(shí)間調(diào)度出現(xiàn)了問題
- 另一個是線程泄露問題:當(dāng) TimerTask 拋出一個未檢查的異常,那么 Timer 將表現(xiàn)出糟糕的行為。Timer 線程并不捕獲異常,因此當(dāng) TimerTask 拋出未檢查的異常時(shí)將終止定時(shí)線程,并且不會恢復(fù)線程的執(zhí)行。
請盡量減少或者停止 Timer 的使用,ScheduledThreadPoolExecutor能夠正確處理這些表現(xiàn)出錯誤行為的任務(wù)。
public void testScheduled(){
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);
System.out.println("scheduled test " + new Date());
ScheduledFuture<?> work1 = executor.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("001 Worker " + new Date());
return "work1 finish";
}
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> work2 = executor.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
Thread.sleep(1000);
System.out.println("002 Worker " + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "work2 Finish";
}
}, 2, TimeUnit.SECONDS);
}
輸出 log:
scheduled test Tue Dec 10 15:54:10 CST 2019
002 Worker Tue Dec 10 15:54:13 CST 2019
002 Worker Tue Dec 10 15:54:14 CST 2019
001 Worker Tue Dec 10 15:54:15 CST 2019
002 Worker Tue Dec 10 15:54:15 CST 2019
002 Worker Tue Dec 10 15:54:16 CST 2019
從 log 來看,時(shí)間調(diào)度上符合我們的預(yù)期,棒棒噠。
頁面渲染
來自面試官的提問:瀏覽器是怎樣加載網(wǎng)頁的?
方法一:使用簡單串行
最簡單的方法是對HTML文檔進(jìn)行串行處理。當(dāng)遇到文本標(biāo)簽時(shí),將其繪制到圖像緩存中。當(dāng)遇到圖像引用時(shí),先通過網(wǎng)絡(luò)獲取,然后再將其繪制到圖像緩存中。這種方式算是一種思路,但是可能會令使用者感到方案,他們必須等待很長時(shí)間,直到顯示所有的文本。
@Test
public void singleThreadRender() {
CharSequence source = "";
renderText(source);
List<ImageData> imageDatas = new ArrayList<>();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageDatas.add(imageInfo.downloadImage());
}
for (ImageData imageData : imageDatas) {
renderImage(imageData);
}
}
了解 Callable 和 Future
Executor框架使用 Runnable作為其基本的任務(wù)表示形式。Runnable是一種有很大局限的抽象,雖然能夠異步執(zhí)行任務(wù),但是它不能返回一個值或者拋出受檢查的異常。
許多任務(wù)實(shí)際上都是存在延遲的計(jì)算(像執(zhí)行數(shù)據(jù)庫查詢、從網(wǎng)絡(luò)上獲取資源、或者計(jì)算某個復(fù)雜的功能)。對于這些任務(wù),Callable是一種更好的抽象:它認(rèn)為主入口點(diǎn)應(yīng)該返回一個值,并可能拋出一個異常。
Runnable和Callable描述的都是抽象的計(jì)算任務(wù)。這些任務(wù)通常都應(yīng)該有一個明確的起始點(diǎn),并且最終會結(jié)束。Executor執(zhí)行任務(wù)有4個生命周期階段:創(chuàng)建、提交、開始和完成。由于有些任務(wù)可能需要很長的時(shí)間,因此通常希望能夠及時(shí)取消。再 Executor框架中,已提交但尚未開始的任務(wù)可以取消,但是對于那些已經(jīng)開始的任務(wù),只有當(dāng)它們能響應(yīng)中斷時(shí),才能取消。
Future表示一個任務(wù)的生命周期,并提供了相應(yīng)的方法來判斷任務(wù)是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。在 Future 規(guī)范中包含的隱含意義是,任務(wù)的聲明周期只能前進(jìn),不能后腿,就像ExcutorService 的生命周期一樣。當(dāng)某個任務(wù)完成后,它就永遠(yuǎn)停留在完成狀態(tài)上。
Future 包含如下方法:
interface Future{
boolean cancel()
boolean get()
boolean isCancelled()
boolean isDone()
}
get()方法的行為取決于任務(wù)的狀態(tài)(尚未開始、正在運(yùn)行、已完成)。如果任務(wù)已完成,方法會立即返回或者拋出一個異常;如果任務(wù)沒有完成,方法 將阻塞直到任務(wù)完成。
可以通過多種方法創(chuàng)建一個Future來描述任務(wù)。ExecutorService中的所有的 submit 方法都將返回一個Future,從而將一個Runnable或者Callable提交給 Executor,并得到一個 Future用來獲取任務(wù)的執(zhí)行結(jié)果或者取消任務(wù)。
方法二:使用Future實(shí)現(xiàn)渲染
為了使頁面渲染具有更高的并發(fā)性,我們分解成兩個任務(wù):一個是渲染所有的文本(
CPU 密集型);另一個是下載所有的圖像(I/O 密集型)。
Callable 和 Future有助于協(xié)同任務(wù)之間的交互。
@Test
public void futureRender() {
CharSequence source = "";
ExecutorService executor = Executors.newFixedThreadPool(10);
List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
@Override
public List<ImageData> call() throws Exception {
List<ImageData> result = new ArrayList<>();
for (ImageInfo imageInfo : imageInfos) {
result.add(imageInfo.downloadImage());
}
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageDatas = future.get();
for (ImageData imageData : imageDatas) {
renderImage(imageData);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
future.cancel(true);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
futureRender使得渲染文本與下載圖像數(shù)據(jù)的任務(wù)并發(fā)執(zhí)行,當(dāng)所有圖像下載完成后,會顯示到頁面上。對比串行版本已經(jīng)提高了效率和用戶體驗(yàn)。但我們還可以做得更好,我們不必等到所有的圖像都下載完成,而是希望沒下載完一副圖像就顯示出來。
了解CompletionService
CompletionService的實(shí)現(xiàn)類是ExecutorCompletionService,它將Executor和BlockingQueue的功能融合在一起。
如果想及時(shí)獲取任計(jì)算的結(jié)果,按照前面的思路我們可以先保留任務(wù)提交Executor后返回的 Future,然后不斷的調(diào)用get()方法來獲取。這種方式雖然可行,但是不夠優(yōu)雅。幸運(yùn)的是有CompletionService。
請仔細(xì)閱讀take()方法說明:
/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
Future<V> take() throws InterruptedException;
take() 會取出并從隊(duì)列移除已完成的任務(wù)。so,我們可以這樣實(shí)現(xiàn):
使用CompletionService 實(shí)現(xiàn)頁面渲染
@Test
public void completionServiceRender(ExecutorService executor, CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
for (ImageInfo imageInfo : info) {
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}
renderText(source);
try {
int taskSize = info.size();
for (int i = 0; i < taskSize; i++) {
Future<ImageData> f = completionService.take();
ImageData data = f.get();
renderImage(data);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
為任務(wù)設(shè)置時(shí)限
新需求:對于耗時(shí)任務(wù),等待特定時(shí)間后仍未完成,則取消任務(wù)。
需求合情合理。這種情況下,我們可以使用Future的get()方法,官方描述如下:
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
- 兩個參數(shù):等待的時(shí)間、時(shí)間單位。
- 請注意拋出的異常,我們可以通過捕獲
TimeoutException來處理超時(shí)情況。