前言
去年面試的時(shí)候,被問到過線程池如何實(shí)現(xiàn)復(fù)用以達(dá)到節(jié)約線程資源的目的。當(dāng)時(shí)回答比較簡單,當(dāng)時(shí)并不是很清楚線程池如何做到復(fù)用一個(gè)線程。今天我們就以 Executors.newCachedThreadPool() 方法創(chuàng)建的線程池為例,探究線程復(fù)用的秘密。Here we go!## 線程池的參數(shù)我們先來看看,創(chuàng)建一個(gè)線程池需要哪些參數(shù)。> corePoolSize 核心線程數(shù)大小。當(dāng)提交一個(gè)任務(wù)時(shí),如果當(dāng)前線程數(shù)小于corePoolSize,就會(huì)創(chuàng)建一個(gè)線程。即使其他有可用的空閑線程。> runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。 可以選擇以下幾個(gè)阻塞隊(duì)列: - ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。 - LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)列。 - SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等上一個(gè)元素被移除之后,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列。 - PriorityBlockingQueue:一個(gè)具有優(yōu)先級的無限阻塞隊(duì)列。>不同的runnableTaskQueue對線程池運(yùn)行邏輯有很大影響> maximumPoolSize(線程池最大大?。壕€程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是如果使用了無界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒什么效果。> keepAliveTime 線程執(zhí)行結(jié)束后,保持存活的時(shí)間。> ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個(gè)創(chuàng)建出來的線程設(shè)置更有意義的名字。> RejectedExecutionHandler 線程池隊(duì)列飽和之后的執(zhí)行策略,默認(rèn)是采用AbortPolicy。JDK提供四種實(shí)現(xiàn)方式: - AbortPolicy:直接拋出異常 - CallerRunsPolicy :只用調(diào)用者所在線程來運(yùn)行任務(wù) - DiscardOldestPolicy 丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù) - DiscardPolicy : 不處理,丟棄掉> TimeUnit: keepalive的時(shí)間單位,可選的單位有天(DAYS),小時(shí)(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。我們來看看 Executors.newCachedThreadPool() 里面的構(gòu)造:java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } corePoolSize 為 0,意味著核心線程數(shù)是 0。 maximumPoolSize 是 Integer.MAX_VALUE ,意味這可以一直往線程池提交任務(wù),不會(huì)執(zhí)行 reject 策略。 keepAliveTime 和 unit 決定了線程的存活時(shí)間是 60s,意味著一個(gè)線程空閑60s后才會(huì)被回收。 reject 策略是默認(rèn)的 AbortPolicy,當(dāng)線程池超出最大限制時(shí)拋出異常。不過這里 CacheThreadPool 的沒有最大線程數(shù)限制,所以 reject 策略沒用。runnableTaskQueue 是 SynchronousQueue。該隊(duì)列的特點(diǎn)是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)。使用該隊(duì)列是實(shí)現(xiàn) CacheThreadPool 的關(guān)鍵之一。SynchronousQueue 的詳細(xì)原理參考這里:SynchronousQueue實(shí)現(xiàn)原理[https://blog.csdn.net/yanyan19880509/article/details/52562039 ]我們看看 CacheThreadPool 的注釋介紹,大意是說當(dāng)有任務(wù)提交進(jìn)來,會(huì)優(yōu)先使用線程池里可用的空閑線程來執(zhí)行任務(wù),但是如果沒有可用的線程會(huì)直接創(chuàng)建線程??臻e的線程會(huì)保留 60s,之后才會(huì)被回收。這些特性決定了,當(dāng)需要執(zhí)行很多短時(shí)間的任務(wù)時(shí),CacheThreadPool 的線程復(fù)用率比較高, 會(huì)顯著的提高性能。而且線程60s后會(huì)回收,意味著即使沒有任務(wù)進(jìn)來,CacheThreadPool 并不會(huì)占用很多資源。注釋簡單明了說明了 CacheThreadPool 的特性和適用場景,我們后面在閱讀代碼的過程中,會(huì)對注釋的說明有進(jìn)一步的理解。終于到了要進(jìn)入源碼的時(shí)候,天天看郭神博客讓我學(xué)到一個(gè)技巧,必須帶著問題去看閱讀,不管是看書還是看代碼,這樣才能事半功倍。那么問題來了:1.CacheThreadPool 如何實(shí)現(xiàn)線程保留60s。2.CacheThreadPool 如何實(shí)現(xiàn)線程復(fù)用。帶著這兩個(gè)問題,去源碼里尋找答案吧~首先我們向線程池提交任務(wù)一般用 execute() 方法,我們就從這里入手:javapublic void execute(Runnable command) { if (command == null) throw new NullPointerException(); //1.如果當(dāng)前存在的線程少于corePoolSize,會(huì)新建線程來執(zhí)行任務(wù)。然后各種檢查狀態(tài) int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2.如果task被成功加入隊(duì)列,還是要double-check if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3.如果task不能加入到隊(duì)列,會(huì)嘗試創(chuàng)建線程。如果創(chuàng)建失敗,走reject流程 else if (!addWorker(command, false)) reject(command);1. 第一步比較簡單,如果當(dāng)前運(yùn)行的線程少于核心線程,調(diào)用 addWorker(),創(chuàng)建一個(gè)線程。但是因?yàn)?CacheThreadPool 的 corePoolSize 是0,所以會(huì)跳過這步,并不會(huì)創(chuàng)建核心線程。2. 關(guān)鍵在第二步,首先判斷了線程池是否運(yùn)行狀態(tài),緊接著調(diào)用 workQueue.offer() 往對列添加 task 。 workQueue 是一個(gè) BlockingQueue ,我們知道 BlockingQueue.offer() 方法是向隊(duì)列插入元素,如果成功返回 true ,如果隊(duì)列沒有可用空間返回 false 。 CacheThreadPool 用的是 SynchronousQueue ,前面了解過 SynchronousQueue 的特性,添加到 SynchronousQueue 的元素必須被其他線程取出,才能塞入下一個(gè)元素。等會(huì)我們再來看看哪里是從 SynchronousQueue 取出元素。這里當(dāng)任務(wù)入隊(duì)列成功后,再次檢查了線程池狀態(tài),還是運(yùn)行狀態(tài)就繼續(xù)。然后檢查當(dāng)前運(yùn)行線程數(shù)量,如果當(dāng)前沒有運(yùn)行中的線程,調(diào)用 addWorker() ,第一個(gè)參數(shù)為 null 第二個(gè)參數(shù)是 false ,標(biāo)明了非核心線程。為什么這里 addWorker() 第一個(gè)方法要用null?帶著這個(gè)疑問,我們來看看 addWorker() 方法:Javaprivate boolean addWorker(Runnable firstTask, boolean core) { //...這里有一段cas代碼,通過雙重循環(huán)目的是通過cas增加線程池線程個(gè)數(shù) boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; //...省略部分代碼 workers.add(w); //...省略部分代碼 workerAdded=true; if (workerAdded) { t.start(); workerStarted = true; } }源代碼比較長,這里省略了一部分。過程主要分成兩步,第一步是一段 cas 代碼通過雙重循環(huán)檢查狀態(tài)并為當(dāng)前線程數(shù)擴(kuò)容 +1,第二部是將任務(wù)包裝成 worker 對象,用線程安全的方式添加到當(dāng)前工作 HashSet() 里,并開始執(zhí)行線程。終于讀到線程開始執(zhí)行的地方了,里程碑式的勝利啊同志們!但是我們注意到,task 為 null ,Worker 里面的 firstTask 是 null ,那么 wokrer thread 里面是怎么工作下去的呢?繼續(xù)跟蹤代碼,Worker 類繼承 Runnable 接口,因此 worker thread start 后,走的是 worker.run()方法:javapublic void run() { runWorker(this); }繼續(xù)進(jìn)入 runWorker() 方法:javafinal void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; //省略代碼 while (task != null || (task = getTask()) != null) { //..省略 try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (Exception x) { thrown = x; throw x; } //省略代碼 } //省略代碼 }可以看到這里判斷了 firstTask 如果為空,就調(diào)用 getTask() 方法。getTask() 方法是從 workQueue 拉取任務(wù)。所以到這里之前的疑問就解決了,調(diào)用 addWorker(null,false) 的目的是啟動(dòng)一個(gè)線程,然后再 workQueue 拉取任務(wù)執(zhí)行。繼續(xù)跟蹤 getTask() 方法:private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { //..省略 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //..省略 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}終于看到從 workQueue 拉取元素了。 CacheThreadPool 構(gòu)造的時(shí)候 corePoolSize 是 0,allowCoreThreadTimeOut 默認(rèn)是 false ,因此 timed 一直為 true ,會(huì)調(diào)用 workQueue.poll() 從隊(duì)列拉取一個(gè)任務(wù),等待 60s, 60s后超時(shí),線程就會(huì)會(huì)被回收。如果 60s 內(nèi),進(jìn)來一個(gè)任務(wù),會(huì)發(fā)生什么情況?任務(wù)在 execute() 方法里,會(huì)被 offer() 進(jìn) workQueue ,因?yàn)槟壳瓣?duì)列是空的,所以 offer 進(jìn)來后,馬上會(huì)被阻塞的 worker.poll() 拉取出來,然后在 runWorker() 方法里執(zhí)行,因?yàn)榫€程沒有新建所以達(dá)到了線程的復(fù)用。至此,我們已經(jīng)明白了線程復(fù)用的秘密,以及線程保留 60s 的實(shí)現(xiàn)方法。回到 execute() 方法,還有剩下一個(gè)邏輯Java//3.如果task不能加入到隊(duì)列,會(huì)嘗試創(chuàng)建線程。如果創(chuàng)建失敗,走reject流程else if (!addWorker(command, false)) reject(command);因?yàn)?CacheThreadPool 用的 SynchronousQueue ,所以沒有空閑線程, SynchronousQueue 有一個(gè)元素正在被阻塞,那么就不能加入到隊(duì)列里。會(huì)走到 addWorker(commond,false) 這里,這個(gè)時(shí)候因?yàn)榫蜁?huì)新建線程來執(zhí)行任務(wù)。如果 addWorker() 返回 false 才會(huì)走 reject 策略。那么什么時(shí)候 addWorker() 什么時(shí)候會(huì)返回false呢?我們看代碼:javaprivate boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); 1.線程池已經(jīng)shutdown,或者提交進(jìn)來task為ull且隊(duì)列也是空,返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); 2.如果需要?jiǎng)?chuàng)建核心線程但是當(dāng)前線程已經(jīng)大于corePoolSize 返回false,如果是非核心線程但是已經(jīng)超出maximumPoolSize,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; //省略代碼。。。 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //省略代碼。。。 } } } //省略代碼。。。 }addWorker() 有以下情況會(huì)返回 false :1. 線程池已經(jīng) shutdown,或者提交進(jìn)來 task 為ull且同時(shí)任務(wù)隊(duì)列也是空,返回 false。2. 如果需要?jiǎng)?chuàng)建核心線程但是當(dāng)前線程已經(jīng)大于 corePoolSize 返回 false,如果是非核心線程但是已經(jīng)超出 maximumPoolSize ,返回 false。3. 創(chuàng)建線程后,檢查是否已經(jīng)啟動(dòng)。我們逐條檢查。第一點(diǎn)只有線程池被 shutDown() 才會(huì)出現(xiàn)。第二點(diǎn)由于 CacheThreadPool 的 corePoolSize 是 0 , maximumPoolSize 是 Intger.MAX_VALUE ,所以也不會(huì)出現(xiàn)。第三點(diǎn)是保護(hù)性錯(cuò)誤,我猜因?yàn)榫€程允許通過外部的 ThreadFactory 創(chuàng)建,所以檢查了一下是否外部已經(jīng) start,如果開發(fā)者編碼規(guī)范,一般這種情況也不會(huì)出現(xiàn)。綜上,在線程池沒有 shutDown 的情況下,addWorker() 不會(huì)返回 false ,不會(huì)走reject流程,所以理論上 CacheThreadPool 可以一直提交任務(wù),符合CacheThreadPool注釋里的描述。## 總結(jié)CacheThreadPool 的運(yùn)行流程如下:1. 提交任務(wù)進(jìn)線程池。2. 因?yàn)?corePoolSize 為0的關(guān)系,不創(chuàng)建核心線程。3. 嘗試將任務(wù)添加到 SynchronousQueue 隊(duì)列。4. 如果SynchronousQueue 入列成功,等待被當(dāng)前運(yùn)行的線程空閑后拉取執(zhí)行。如果當(dāng)前運(yùn)行線程為0,調(diào)用addWorker( null , false )創(chuàng)建一個(gè)非核心線程出來,然后從 SynchronousQueue 拉取任務(wù)并在當(dāng)前線程執(zhí)行,實(shí)現(xiàn)線程的復(fù)用。5. 如果 SynchronousQueue 已有任務(wù)在等待,入列失敗。因?yàn)?maximumPoolSize 無上限的原因,創(chuàng)建新的非核心線程來執(zhí)行任務(wù)??v觀整個(gè)流程,通過設(shè)置 ThreadPoolExecutor 的幾個(gè)參數(shù),并加上應(yīng)用 SynchronousQueue 的特性,然后在 ThreadPoolExecutor 的運(yùn)行框架下,構(gòu)建出了一個(gè)可以線程復(fù)用的線程池。ThreadPoolExecutor 還有很強(qiáng)的擴(kuò)展性,可以通過自定義參數(shù)來實(shí)現(xiàn)不同的線程池。這么牛X的代碼,這輩子寫是不可能寫得出來了,爭取能完全讀懂吧。。謝謝閱讀,相信看完這篇文章的你,下次被問到線程池相關(guān)的問題,再也不會(huì)答不上來了吧~### 引申Executors 還提供了這么一個(gè)方法 Executors.newFixedThreadPool(4) 來創(chuàng)建一個(gè)有固定線程數(shù)量的線程池,我們看看創(chuàng)建的參數(shù):```Javapublic static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
參數(shù)中核心線程和最大線程一樣,線程保留時(shí)間 0 ,使用 LinkedBlockingQueue 作為任務(wù)隊(duì)列,這樣的線程池有什么樣的特性呢?我們看看注釋說明,大意是說這是一個(gè)有著固定線程數(shù)量且使用無界隊(duì)列作為線程隊(duì)列的線程池。如果有新的任務(wù)提交,但是沒有線程可用,這個(gè)任務(wù)會(huì)一直等待直到有可用的線程。如果一個(gè)線程因?yàn)楫惓=K止了,當(dāng)線程不夠用的時(shí)候會(huì)再創(chuàng)建一個(gè)出來。線程會(huì)一直保持,直到線程池 shutDown。
和 CacheThreadPool 相比,F(xiàn)ixedThreadPool 注釋里描述的特性有幾個(gè)不同的地方。
1. 因?yàn)?corePoolSize == maximumPoolSize ,所以FixedThreadPool只會(huì)創(chuàng)建核心線程。
2. 在 getTask() 方法,如果隊(duì)列里沒有任務(wù)可取,線程會(huì)一直阻塞在 LinkedBlockingQueue.take() ,線程不會(huì)被回收。
3. 由于線程不會(huì)被回收,會(huì)一直卡在阻塞,所以沒有任務(wù)的情況下, FixedThreadPool 占用資源更多。
FixedThreadPool 和 CacheThreadPool 也有相同點(diǎn),都使用無界隊(duì)列,意味著可用一直向線程池提交任務(wù),不會(huì)觸發(fā) reject 策略。
## 參考文章
聊聊并發(fā)(三)—— JAVA 線程池的分析和使
用[http://www.infoq.com/cn/articles/java-threadPool ]
SynchronousQueue 實(shí)現(xiàn)原理 [https://blog.csdn.net/yanyan19880509/article/details/52562039 ]
Java 中線程池 ThreadPoolExecutor 原理探究[http://ifeve.com/java%E4%B8%AD%E7%BA%BF%E7%A8%8B%E6%B1%A0threadpoolexecutor%E5%8E%9F%E7%90%86%E6%8E%A2%E7%A9%B6/ ]