電腦的CPU資源是有限的,任務(wù)的處理速度與線程數(shù)量之間并不是正相關(guān)。當線程數(shù)量過多,CPU要頻繁的在不同線程切換,反而會引起處理性能的下降。線程池中最大的線程數(shù),是考慮多種因素來事先設(shè)定的,比如硬件的條件,業(yè)務(wù)的類型等等。
當我們向一個固定大小的的線程池中請求一個線程時,當線程池中沒有空閑資源了,這個時候線程池如何處理這個請求?是拒絕請求還是排隊請求?各種策略又是如何實現(xiàn)的呢?
實際上,這些問題的處理并不復雜,底層的數(shù)據(jù)結(jié)構(gòu),就是隊列(queue)。
一、Java線程池介紹
1,線程池的作用
限制系統(tǒng)中執(zhí)行線程的數(shù)量。
減少了創(chuàng)建和銷毀線程的次數(shù),重復利用線程。
2,主要的類
Executor:執(zhí)行線程的接口
ExecutorSerivce: 線程池接口
ThreadPoolExecutor :線程池類
Executors:常用線程池工廠
3,常用的線程池
配置線程池是比較復雜的過程,所有可以使用現(xiàn)有的線程池工廠生成常用的線程池:
- newFixedThreadPool 創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。為了合理利用資源,我們通常把定長池的長度設(shè)置為當前PC機獲取cpu核心數(shù):Runtime.getRuntime().availableProcessors():獲取當前CPU核心數(shù);
- newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程;
- newScheduledThreadPool 創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行;
- newSingleThreadExecutor 創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MyThreadPool {
public static void main(String [] args){
int num = Runtime.getRuntime().availableProcessors();
Executor executor = Executors.newFixedThreadPool(num);
for (int i = 0 ; i<num ; i++){
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("我是一個子線程!!");
}
});
}
}
}

我們再來看Executors.newFixedThreadPool(num),點進去,會發(fā)現(xiàn)就是new了一個LinkedBlockingQueue:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

二、線程池和隊列結(jié)合實現(xiàn)一個日志處理
JDK自己的線程池底層不光是用隊列實現(xiàn)的,我們也可以使用線程池和隊列相結(jié)合,來實現(xiàn)一些功能。
通常我們會把要執(zhí)行的任務(wù)放入一個隊列中,由線程池來執(zhí)行,比如爬蟲、日志。我們先來看一個線程池和隊列結(jié)合實現(xiàn)日志記錄的例子。
import com.swagger.demo.Entity.LogContentEntity;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@Configuration
@Aspect
@Component
public class AopLogConfig implements Runnable {
@Autowired
private HttpServletRequest request;
private LinkedBlockingQueue<LogContentEntity> logQueue;
public AopLogConfig() {
//Spring啟動后,該對象創(chuàng)建時。初始化隊列以及線程池。
logQueue = new LinkedBlockingQueue<LogContentEntity>(3000);
int num = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(num);
for (int i = 0 ;i<num ;i++){
executor.execute(this);
}
}
@Before("execution(public * com.swagger.demo.controller..*.*(..))")
public void doBefore(JoinPoint joinPoint) throws Exception{
//日志記錄的信息可自行修改
LogContentEntity Log = new LogContentEntity();
String method = request.getMethod();
Log.setHttpMethod(method);
String url = request.getRequestURL().toString();
Log.setUrl(url);
String ip = request.getRemoteAddr();
Log.setIp(ip);
Log.setContent("test Log Content");
//將需要記錄的日志對象放到隊列中等待線程異步執(zhí)行。
logQueue.put(Log);
}
@Override
public void run() {
try{
while(true){
//如果隊列里沒有,則會阻塞;
LogContentEntity take = logQueue.take();
//日志處理邏輯可自行修改;
System.out.println(take.toString());
}
}catch(Exception e){
e.printStackTrace();
}
}
}

三、線程池+隊列以優(yōu)先級方式執(zhí)行隊列任務(wù)
import java.util.concurrent.TimeUnit;
public class MyPriorityTask implements Runnable, Comparable<MyPriorityTask> {
private int priority;
private String name;
public MyPriorityTask(String name, int priority) {
this.name = name;
this.priority = priority;
}
public void run() {
System.out.printf("MyPriorityTask: %s Priority :%d\n", name, priority);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int compareTo(MyPriorityTask o) {
if (this.getPriority() < o.getPriority()) {
return 1;
}
if (this.getPriority() > o.getPriority()) {
return -1;
}
return 0;
}
public int getPriority() {
return priority;
}
}

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
for (int i = 0; i < 100; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, 0);
executor.execute(task);
System.out.println(executor.getTaskCount());
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 101; i < 8; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, 1);
executor.execute(task);
System.out.println(executor.getTaskCount());
}
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Main: End of the program.\n");
}
}

四、使用線程池的一些陷阱
盡管線程池對于構(gòu)建多線程應(yīng)用是個很強大的機制,但它也不是沒有缺點的。使用線程池構(gòu)建的應(yīng)用會面臨其他多線程應(yīng)用所面對的一樣的并發(fā)風險,比如同步錯誤和死鎖,此外線程池還有其他的一些特有缺陷,比如 線程池-關(guān)聯(lián) 死鎖,資源不足,還有線程泄漏。
1.死鎖
任何多線程應(yīng)用都會面臨死鎖的風險。彼此雙方都在等待一個事件,而這個事件只能有對方提供,這樣一對進程或者線程我們稱之為死鎖。死鎖最簡單的情況是線程 A 持有了對象 X 的獨占鎖,線程 A 在等待對象 Y 的鎖,而線程 B 恰恰持有了對象 Y 的獨占鎖,線程 B 在等待對象 X 的鎖。除非有某種辦法能夠打破這種鎖等待(Java 鎖機制不能支持這個),否則的話這一對死鎖線程將會永久地等待下去。
既然死鎖是所有多線程編程都將面臨的風險,線程池為我們引入了另一種死鎖:線程池中所有線程都在阻塞等待隊列中另一個任務(wù)的執(zhí)行結(jié)果,但是另一個任務(wù)無法得到執(zhí)行,因為池中壓根兒就沒用空閑的可用線程。這種情況在線程池用于一些相互影響對象的模擬實現(xiàn)中可能會出現(xiàn),這些模擬對象彼此發(fā)送查詢?nèi)缓笞鳛槿蝿?wù)隊列進行執(zhí)行,發(fā)起查詢的對象同步等待響應(yīng)。
2.資源不足
線程池的優(yōu)點之一是他們在大多數(shù)情況下比其他的調(diào)度機制具備更好的性能,比如我們上面所討論的那幾種。但這個取決于你有沒有恰當?shù)嘏渲昧司€程池大小。線程占用大量的資源,包括內(nèi)存和其他系統(tǒng)資源。除了線程對象所必須的內(nèi)存之外,每個線程還需要兩個執(zhí)行調(diào)用棧,這個??赡軙艽?。此外,JVM 可能還會為每個 Java 線程創(chuàng)建一個本地線程,這樣將會占用額外的系統(tǒng)資源。最后,雖然線程之間切換的調(diào)度開銷很小,大量的線程上下文切換也會影響到你的應(yīng)用性能。
如果線程池過大的話,這些眾多線程所消耗的資源將會明顯影響到系統(tǒng)性能。時間會浪費在線程之間的切換上,配置有比你實際需要更多的線程會引起資源不足的問題,因為池中線程所占用的資源如果用在其他任務(wù)上可能會更高效。除了這些線程本身所使用的資源之外,服務(wù)請求時所做的工作可能會需要額外資源,比如 JDBC 連接,套接字,或者文件。這些也是有限的資源,而且對它們進行過高并發(fā)請求的話可能會導致失效,比如無法分配一個 JDBC 連接。
3.并發(fā)錯誤
線程池以及其他隊列機制依賴于 wait() 和 notify() 方法的使用,這可能會變得很棘手。如果編碼不當?shù)脑?,很可能會導致通知丟失,結(jié)果就是池中的線程都處于一個空閑的狀態(tài),而實際上隊列中有任務(wù)需要處理。在使用這些工具的時候要打起十二萬分的精神;即便是專家在用它們的時候也經(jīng)常會失誤。幸運的是,可以使用一些現(xiàn)成的實現(xiàn),這些實現(xiàn)久經(jīng)考驗,比如下文將會討論到的 你無須自行編碼 實現(xiàn)的 java.util.concurrent 包。
4.線程泄漏
各種各樣的線程池中存在的一個重大的危險就是線程泄漏,當一個線程被從線程池中移除去執(zhí)行一個任務(wù),任務(wù)執(zhí)行結(jié)束之后卻沒有返還給線程池的時候,就會出現(xiàn)這種危險。出現(xiàn)這種情況的一種方式是當任務(wù)拋出一個 RuntimeException 或一個 Error 時。如果線程池類沒有捕捉到這些,該線程將會傻傻地存在于線程池之中,而線程池的線程數(shù)量則會被永久地減一。當這種情況發(fā)生的次數(shù)足夠多的時候,線程池最終將為空(無可用線程),而系統(tǒng)則會癱瘓,因為已經(jīng)沒有線程來處理任務(wù)了。
癱瘓的任務(wù),比如那些永久等待不保證可用資源或者等待已經(jīng)回家了的用戶輸入的任務(wù),也可以造成相等于線程泄漏一樣的后果。如果一個線程永久地被這樣一個任務(wù)所占用了的話,它造成的影響和從池中移除是一樣的。像這樣的任務(wù)應(yīng)該要么給它們一個線程池之外的線程,要么控制一下它們的等待時間。
5.請求過載
服務(wù)器很可能會被鋪天蓋地而來的請求所淹沒。這種情況下,我們可能并不想讓每個進來的請求都放進我們的工作隊列,因為等待執(zhí)行的任務(wù)隊列也可能會占用過多系統(tǒng)資源并導致資源不足。這時候要做什么就取決于你的決定了,比如你可以通過一個表示服務(wù)器暫時太忙的響應(yīng)來拒絕這些請求。
五、高效線程池使用指南
你只需要遵循一些簡單的指導方針,線程池就可以成為你構(gòu)建服務(wù)應(yīng)用的一個非常有效的方法:
- 不要把同步等待其他任務(wù)執(zhí)行結(jié)果的任務(wù)放進任務(wù)隊列。這將導致上文所描述那種死鎖,池中所有線程都在等待一個任務(wù)的執(zhí)行結(jié)果,而隊列中的這個任務(wù)無法得到執(zhí)行因為所有線程都在使用中。
- 可能長時間操作的任務(wù)放入線程池的時候要慎重。如果程序必須要等待一個資源,比如一個 I/O 的完成,定義一個最長等待時間,然后失敗或稍后重新執(zhí)行。這就保證了通過將一個線程從一個可能會完成的任務(wù)中釋放出來而最終一些其他任務(wù)得到成功執(zhí)行。
- 理解你的任務(wù)。想要有效地調(diào)整線程池大小,你需要理解隊列中那些任務(wù)要做的事情。它們是 CPU 密集型操作嗎?它們會長時間占用 I/O 嗎?你的答案會影響到你對你的應(yīng)用的配置。如果這些任務(wù)來自不同的類、有著截然不同的特征,為不同類型的任務(wù)定制不同的工作隊列也許更行得通,這樣每個池都能夠得到有據(jù)配置。
線程池大小配置
調(diào)整線程池的大小在很大程度上是一件避免兩個錯誤的事情:擁有過多或過少的線程。幸運的是,對于大多數(shù)應(yīng)用而言太多或太少之間的中間地帶還是很寬廣的。
回顧應(yīng)用中使用線程的兩個主要優(yōu)點:在等待一個諸如 I/O 之類的慢操作的時候進程能夠繼續(xù)進行,利用多個處理器的可用性。在一個 N 處理器主機上運行一個計算密集型的應(yīng)用,通過設(shè)置線程數(shù)量為 N 增加額外的線程可能會提高吞吐量,但添加的額外線程超過 N 的話就沒有什么好處了。確實,過多的線程甚至會降低性能因為會帶來額外的上下文切換開銷。
線程池最佳大小取決于可用處理器的數(shù)量和工作隊列中任務(wù)的性質(zhì)。對于在一個 N-處理器 系統(tǒng)中一個的將持有完全計算密集型任務(wù)的工作隊列,通常獲得 CPU 最大利用率的話是配置線程池大小為 N 或 N + 1 個線程。
對于可能要等待 I/O 完成的任務(wù),比如,一個從 socket 中讀取一個 HTTP 請求的任務(wù) - 你需要增加線程池的線程的數(shù)量超出可用處理器的數(shù)量,因為所有的線程都在同一時間工作。通過分析,你可以為一個典型的請求估算出等待時間(WT)和服務(wù)時間(ST)之間的比率。比如我們稱這個比率為 WT/ST,對于一個 N-處理器系統(tǒng),你需要大約 N * (1 + WT/ST) 個線程來保持處理器得到充分利用。
處理器利用率并非配置線程池大小的唯一依據(jù)。因為在線程池增長的時候,你可能會遇到調(diào)度器的局限性,內(nèi)存可用性,或者其他系統(tǒng)資源,比如 socket 的數(shù)量,打開文件的處理,或者數(shù)據(jù)庫連接等問題。
六、總結(jié)
- 使用JDK的方法創(chuàng)建會產(chǎn)生OOM情況,主要原因是用LinkedBlockingQueue隊列,該隊列可以導致OOM。
- 線程可以使用用阿里巴巴推薦的方法,但是因為定線程數(shù)量,并且隊列用的是ArrayBlockingQueue,所以效率較低,不過可以保證內(nèi)存不會OOM。
- 無需自行編碼。Doug Lea 寫了一個杰出的開源并發(fā)工具包,java.util.concurrent,包含了互斥,信合,能夠在并發(fā)訪問下性能表現(xiàn)良好的集合類諸如隊列和哈希表,以及一些工作隊列的實現(xiàn)。這個包里的 PooledExecutor 類是一個高效的、被廣泛使用的、基于工作隊列的一個線程池的正確實現(xiàn)。不用再嘗試著自己去寫代碼實現(xiàn)了,那樣很容易出錯,你可以考慮使用 java.util.concurrent 包里的一些工具。
- 線程池是構(gòu)建服務(wù)器應(yīng)用的很有用的一個工具。它的概念很簡單,但在實現(xiàn)或者使用的時候需要注意一些問題,比如死鎖,資源不足,以及 wait() 和 notify() 的復雜性。如果你發(fā)現(xiàn)自己的應(yīng)用需要一個線程池,考慮一下使用 java.util.concurrent 包里的某個 Executor 類,比如 PooledExecutor,不要去從頭寫一個。如果你發(fā)現(xiàn)你在創(chuàng)建一些要處理簡短任務(wù)的線程,你就應(yīng)該考慮使用線程池了。
我的微信公眾號:架構(gòu)真經(jīng)(id:gentoo666),分享Java干貨,高并發(fā)編程,熱門技術(shù)教程,微服務(wù)及分布式技術(shù),架構(gòu)設(shè)計,區(qū)塊鏈技術(shù),人工智能,大數(shù)據(jù),Java面試題,以及前沿熱門資訊等。每日更新哦!


參考資料:
- https://blog.csdn.net/weixin_39770927/article/details/81360511
- https://blog.csdn.net/zhangqinfu/article/details/52931530
- https://blog.csdn.net/every__day/article/details/83900109
- https://blog.csdn.net/wwp231/article/details/52504687
- https://blog.csdn.net/qq360694660/article/details/78296919
- https://blog.csdn.net/defonds/article/details/43796951