ExecutorService] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent /ExecutorService.html)這個(gè)接口從Java 5開始就已經(jīng)存在了。這得追溯到2004年了。這里小小地提醒一下,官方已經(jīng)不再支持Java 5, Java 6了,Java 7[在半年后也將停止支持?。我之所以會(huì)提起ExecutorService這么舊的一個(gè)接口是因?yàn)?,大多?shù)Java程序員并沒有搞清楚它的工作原理。關(guān)于它可以介紹的有很多,這里我只想分享它的一些較少為人所知的特性以及實(shí)踐技巧。本文主要是面向初級(jí)程序員的,并沒有過于高深的東西。
1. 線程命名
這點(diǎn)得反復(fù)強(qiáng)調(diào)。對(duì)正在運(yùn)行的JVM進(jìn)行線程轉(zhuǎn)儲(chǔ)(thread dump)或者調(diào)試時(shí),線程池默認(rèn)的命名機(jī)制是pool-N-thread-M,這里N是線程池的序號(hào)(每新創(chuàng)建一個(gè)線程池,這個(gè)N都會(huì)加一),而M是池 里線程的序號(hào)。比方說,pool-2-thread-3指的是JVM生命周期中第二個(gè)線程池里的第三個(gè)線程。參考這里?Executors.defaultThreadFactory()] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent /Executors.html#defaultThreadFactory--)。這樣的名字表述性不佳。由于JDK將命名機(jī)制都隱藏在 [ThreadFactory?里面,這使得要正確地命名線程得稍微費(fèi)點(diǎn)工夫。所幸的是Guava提供了這么一個(gè)工具類:

2. 根據(jù)上下文切換名字
這是我從?高效的jstack:如何對(duì)高速運(yùn)行的服務(wù)器進(jìn)行調(diào)試?一文中學(xué)到的一個(gè)技巧。線程名可以隨時(shí)進(jìn)行修改,只要你想這么做的話。這是有一定的意義的,因?yàn)榫€程轉(zhuǎn)儲(chǔ)只能看到類名和方法名,而沒有參數(shù)及本地變量。通 過調(diào)整線程名可以保留一些比較關(guān)鍵的上下文信息,這樣排查消息/記錄/查詢等變慢或者出現(xiàn)死鎖的問題時(shí)就容易多了。示例:

在try-finally塊中當(dāng)前線程的名字是Processing-某個(gè)消息ID。這對(duì)跟蹤系統(tǒng)內(nèi)的消息流會(huì)比較有用。
3. 顯式地安全地關(guān)閉線程
客戶端線程和線程池之間會(huì)有一個(gè)任務(wù)隊(duì)列。當(dāng)程序要關(guān)閉時(shí),你需要注意兩件事情:入隊(duì)的這些任務(wù)的情況怎么樣了以及正在運(yùn)行的這個(gè)任務(wù)執(zhí)行得如 何了。令人驚訝的是很多開發(fā)人員并沒能正確地或者有意識(shí)地去關(guān)閉線程池。正確的方法有兩種:一個(gè)是讓所有的入隊(duì)任務(wù)都執(zhí)行完畢(shutdown()), 再就是舍棄這些任務(wù)(shutdownNow())——這完全取決于你。比如說如果我們提交了N多任務(wù)并且希望等它們都執(zhí)行完后才返回的話,那么就使用 shutdown():

本例中我們發(fā)送了許多電子郵件,每一封郵件都對(duì)應(yīng)著線程池中的一個(gè)任務(wù)。提交完這些任務(wù)后我們會(huì)關(guān)閉線程池,這樣就不會(huì)再有新的任務(wù)進(jìn)來了。然 后我們會(huì)至少等待一分鐘,直到這些任務(wù)執(zhí)行完。如果1分鐘后還是有的任務(wù)沒執(zhí)行到的話,awaitTermination()便會(huì)返回false。但是剩 下的任務(wù)還會(huì)繼續(xù)執(zhí)行。我知道有些趕時(shí)髦的人會(huì)這么寫:
emails.parallelStream().forEach(this::sendEmail);
他們覺得我那樣很老套,不過我個(gè)人比較喜歡能控制并發(fā)線程的數(shù)量。還有一個(gè)優(yōu)雅地關(guān)閉掉線程池的方法就是shutdownNow():
final List rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());
這么做的話隊(duì)列中的所有任務(wù)都會(huì)被舍棄并返回。已執(zhí)行的任務(wù)仍會(huì)繼續(xù)執(zhí)行。
4. 謹(jǐn)慎地處理中斷
Future的一個(gè)較少提及的特性便是cancelling。這里我就不重復(fù)多說了,可以看下我之前的一篇文章:?InterruptedException及線程中斷?。
5. 監(jiān)控隊(duì)列長(zhǎng)度,確保隊(duì)列有界
不當(dāng)?shù)木€程池大小會(huì)使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊(duì)列會(huì)持續(xù)變大,消耗過多內(nèi)存。而過多的線程又會(huì) 由于頻繁的上下文切換導(dǎo)致整個(gè)系統(tǒng)的速度變緩——殊途而同歸。隊(duì)列的長(zhǎng)度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負(fù)了它可以暫時(shí)拒絕掉新的請(qǐng) 求:
final BlockingQueue queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? queue);
上面的代碼等價(jià)于Executors.newFixedThreadPool(n),然而不同的是默認(rèn)的實(shí)現(xiàn)是一個(gè)無界的 LinkedBlockingQueue。這里我們用的是一個(gè)固定100大小的ArrayBlockingQueue。也就是說如果已經(jīng)有100個(gè)任務(wù)在 隊(duì)列中了(還有N個(gè)在執(zhí)行中),新的任務(wù)就會(huì)被拒絕掉,并拋出RejectedExecutionException異常。由于這里的隊(duì)列是在外部聲明 的,我們還可以時(shí)不時(shí)地調(diào)用下它的size()方法來將隊(duì)列大小記錄在到日志/JMX/或者你所使用的監(jiān)控系統(tǒng)中。
6. 別忘了異常處理
下面這段代碼執(zhí)行的結(jié)果是什么?
executorService.submit(() -> {? ? System.out.println(1 /0);
});
我被它坑過無數(shù)回了:它什么也不會(huì)輸出。沒有任何的java.lang.ArithmeticException: / by zero的征兆,啥也沒有。線程池會(huì)把這個(gè)異常吞掉,就像什么也沒發(fā)生過一樣。如果是你自己創(chuàng)建的java.lang.Thread還好,這樣?UncaughtExceptionHandler?還能起作用。不過如果是線程池的話你就得小心了。如果你提交的是Runnable對(duì)象的話(就像上面那個(gè)一樣,沒有返回值),你得將整個(gè)方法體用try- catch包起來,至少打印一下異常。如果你提交的是Callable的話,得確保你在用get()方法取值的時(shí)候重新拋 出異常:
final Future division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();
有趣的是Spring框架的@Async為此還弄出了個(gè)BUG,參見:?SPR-8995](https://jira.spring.io/browse/SPR-8995)以及 [SPR-12090?。
7. 監(jiān)控隊(duì)列中的等待時(shí)間
監(jiān)控工作隊(duì)列的長(zhǎng)度只是一個(gè)方面。然而排除故障時(shí)查看從提交任務(wù)到實(shí)際執(zhí)行之間的時(shí)間差就顯得非常重要了。這個(gè)時(shí)間差越接近0就越好(說明正好 線程池中有空閑的線程),否則任務(wù)要入隊(duì)的話這個(gè)時(shí)間就會(huì)增加了。再進(jìn)一步說,如果線程池不是固定線程數(shù)的話,執(zhí)行新的任務(wù)還得新創(chuàng)建一個(gè)線程,這個(gè)同樣 也會(huì)消耗一定的時(shí)間。為了能更好地監(jiān)控這項(xiàng)指標(biāo),可以對(duì)ExecutorService做一下封裝:

這個(gè)實(shí)現(xiàn)并不完整,不過也能說明大概的意思了。當(dāng)我們將任務(wù)提交給線程池的時(shí)候,便立即開始記錄它的時(shí)間。一旦這個(gè)任務(wù)被取出并開始執(zhí)行時(shí)便停 止計(jì)時(shí)。不要被代碼中的startTime和queueDuration這兩個(gè)變量搞混了。事實(shí)上它們是在兩個(gè)不同的線程中進(jìn)行求值的,通常都會(huì)差個(gè)毫秒 級(jí)或者秒級(jí):
Task com.nurkiewicz.MyTask@7c7f3894 spent9883msin queue
8. 保留客戶端的棧跟蹤信息
近來響應(yīng)式編程受到了不少關(guān)注。?Reactive manifesto](http://www.reactivemanifesto.org/), [reactive streams](http://www.reactive-streams.org/), [RxJava](https://github.com/ReactiveX/RxJava)(僅發(fā)布了1.0版本!),[Clojure agents](http://clojure.org/agents), [scala.rx?等等。它們都非常不錯(cuò),但棧跟蹤信息就完蛋了,它們幾乎是毫無價(jià)值的。假設(shè)提交到線程池中的一個(gè)任務(wù)出現(xiàn)了異常:
java.lang.NullPointerException:nullatcom.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]atcom.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]atjava.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]atjava.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
可以很容易發(fā)現(xiàn)NPE異常出現(xiàn)在MyTask的76行。但是我們并不知道是誰提交的這個(gè)任務(wù),因?yàn)闂P畔⒅荒芸吹絋hread以及 ThreadPoolExecutor。技術(shù)上來講我們當(dāng)然是可以看下代碼,看看是何處創(chuàng)建的MyTask。不過如果沒有線程在這中間的話,我們馬上便能 知道是誰提交的任務(wù)。那么如果我們可以保留客戶端代碼(提交任務(wù)的那段代碼)的棧信息呢?這個(gè)想法并非我首創(chuàng)的,?Hazelcast](http://hazelcast.com/)就將[異常從所有者節(jié)點(diǎn)傳播到了客戶端中?。下面是一個(gè)非常簡(jiǎn)單的將客戶端棧信息保留下來以便失敗時(shí)查看的例子:

這樣一旦失敗的話我們便可以取到完整的棧信息以及提交任務(wù)時(shí)所在的線程的名字。跟之前相比我們有了一些更有價(jià)值的信息:
Exceptionjava.lang.NullPointerExceptionintasksubmittedfromthradmainhere:java.lang.Exception:Clientstacktraceatcom.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]atcom.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]atcom.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod) ~[na:1.8.0]atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]atjava.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]atcom.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
9. 優(yōu)先使用CompletableFuture
Java 8中引入了更為強(qiáng)大的?CompletableFuture?。有可能的話盡量使用下它。ExecutorService并沒有擴(kuò)展以支持這個(gè)增強(qiáng)型的接口,因此你得自己動(dòng)手了。這么寫是不行的了:
final Future future =
? ? executorService.submit(this::calculate);
你得這樣:
final CompletableFuture future =
? ? CompletableFuture.supplyAsync(this::calculate, executorService);
CompletableFuture 繼承自Future,因此跟之前的用法一樣。但是使用你接口的人一定會(huì)感謝CompletableFuture所提供的這些額外的功能的。
10. 同步隊(duì)列
SynchronousQueue?是一個(gè)非常有意思的BlockingQueue。它本身甚至都算不上是一個(gè)數(shù)據(jù)結(jié)構(gòu)。最好的解釋就是它是一個(gè)容量為0的隊(duì)列。這里引用下Java文檔中的一段話:
每一個(gè)insert操作都需要等待另一個(gè)線程的一個(gè)對(duì)應(yīng)的remove操作,反之亦然。同步隊(duì)列內(nèi)部不會(huì)有 任何空間,甚至連一個(gè)位置也沒有。你無法對(duì)同步隊(duì)列執(zhí)行peek操作,因?yàn)閮H當(dāng)你要移除一個(gè)元素的時(shí)候才存在這么個(gè)元素;如果沒有別的線程在嘗試移除一個(gè) 元素你也無法往里面插入元素;你也無法對(duì)它進(jìn)行遍歷,因?yàn)樗裁炊紱]有。。。
同步隊(duì)列與CSP和Ada中所用到的集結(jié)管道(rendezvous channel)有異曲同工之妙。
它和線程池有什么關(guān)系?你可以試試在ThreadPoolExecutor中用下SynchronousQueue:
BlockingQueue queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? queue);
我們創(chuàng)建了一個(gè)擁有兩個(gè)線程的線程池,以及一個(gè)SynchronousQueue。由于SynchronousQueue本質(zhì)上是一個(gè)容量為0 的隊(duì)列,因此這個(gè)ExecutorService只有當(dāng)有空閑線程的時(shí)候才能接受新的任務(wù)。如果所有的線程都在忙,新的任務(wù)便會(huì)馬上被拒絕掉,不會(huì)進(jìn)行等 待。這在要么立即執(zhí)行,要么馬上丟棄的后臺(tái)執(zhí)行的場(chǎng)景中會(huì)非常有用。
終于講完了,希望你能找到一個(gè)自己感興趣的特性!