為什么要使用線程池?
創(chuàng)建線程和銷毀線程的花銷是比較大的,這些時間有可能比處理業(yè)務(wù)的時間還要長。這樣頻繁的創(chuàng)建線程和銷毀線程,再加上業(yè)務(wù)工作線程,消耗系統(tǒng)資源的時間,可能導(dǎo)致系統(tǒng)資源不足。(我們可以把創(chuàng)建和銷毀的線程的過程去掉)
線程池有什么作用?
1、提高效率 創(chuàng)建好一定數(shù)量的線程放在池中,等需要使用的時候就從池中拿一個,這要比需要的時候創(chuàng)建一個線程對象要快的多。
2、方便管理 可以編寫線程池管理代碼對池中的線程同一進(jìn)行管理,比如說啟動時有該程序創(chuàng)建100個線程,每當(dāng)有請求的時候,就分配一個線程去工作,如果剛好并發(fā)有101個請求,那多出的這一個請求可以排隊(duì)等候,避免因無休止的創(chuàng)建線程導(dǎo)致系統(tǒng)崩潰。
說說幾種常見的線程池及使用場景
1、newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2、newFixedThreadPool
創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊(duì)列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、newCachedThreadPool
創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4、newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險。 說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
??主要問題是堆積的請求處理隊(duì)列可能會耗費(fèi)非常大的內(nèi)存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
??主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。
Positive example 1:
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService =
new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()
);
Positive example 2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown
Positive example 3:
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);
個人在項(xiàng)目中用到的是第三種,業(yè)務(wù)需求,每天會有調(diào)度服務(wù)器會通過http協(xié)議請求
<bean id="xxDataThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心線程數(shù) -->
<property name="corePoolSize" value="50"/>
<!-- 最大線程數(shù) -->
<property name="maxPoolSize" value="500"/>
<!-- 隊(duì)列最大長度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="10"/>
<!-- 線程池維護(hù)線程所允許的空閑時間 -->
<property name="keepAliveSeconds" value="1"/>
<!-- 線程池對拒絕任務(wù)(無線程可用)的處理策略 如果已經(jīng)超過了限制丟棄消息,不進(jìn)行處理 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$DiscardPolicy"/>
</property>
</bean>
@Controller
@RequestMapping("/windData")
public class WindDataListener {
private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");
@Autowired
private ThreadPoolTaskExecutor controlerThreadPool;
@Autowired
private ThreadPoolTaskExecutor windDataThreadPool;
@Autowired
private WindDataRuntimeService runtimeService;
@Autowired
private MaintainAlarmSender maintainAlarmSender;
/**
* 啟動調(diào)度
*/
@RequestMapping(value = "/receiveMsg", method = RequestMethod.GET)
@ResponseBody
public void receiveMsg() {
final String paramLog = LogConst.BUSSINESS_NAME + LogConst.HTTP_API;
logger.info("[{}][接收到調(diào)度消息]", paramLog);
//定時調(diào)度,可能有多個http請求,把請求都放在controlerThreadPool里面
controlerThreadPool.execute(new WindDataDispatcher(windDataThreadPool, runtimeService, maintainAlarmSender,
MDC.getCopyOfContextMap()));
logger.info("[{}][響應(yīng)給調(diào)度系統(tǒng)]", paramLog);
}
}
public class WindDataDispatcher implements Runnable {
private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");
private ThreadPoolTaskExecutor taskThreadPool;
private WindDataRuntimeService runtimeService;
private MaintainAlarmSender maintainAlarmSender;
private Map<Object, Object> mdcMap;
public WindDataDispatcher(ThreadPoolTaskExecutor taskThreadPool, WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map<Object, Object> mdcMap) {
this.taskThreadPool = taskThreadPool;
this.runtimeService = runtimeService;
this.maintainAlarmSender = maintainAlarmSender;
this.mdcMap = mdcMap;
}
@Override
public void run() {
if (null != mdcMap) {
MDC.setContextMap(mdcMap);
}
final String paramLog = LogConst.BUSSINESS_NAME + LogConst.DISPATCHER;
logger.info("[{}啟動]", paramLog);
taskThreadPool.execute(new WindDataExecutor(runtimeService, maintainAlarmSender, mdcMap));
logger.info("[{}結(jié)束]", paramLog);
}
}
public class WindDataExecutor implements Runnable {
private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");
private WindDataRuntimeService runtimeService;
private MaintainAlarmSender maintainAlarmSender;
private Map<Object, Object> mdcMap;
public WindDataExecutor(WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map<Object, Object> mdcMap) {
this.runtimeService = runtimeService;
this.maintainAlarmSender = maintainAlarmSender;
this.mdcMap = mdcMap;
}
@Override
public void run() {
if (null != mdcMap) {
MDC.setContextMap(mdcMap);
}
final String paramLog = LogConst.BUSSINESS_NAME + LogConst.EXECUTOR;
logger.info("[{}啟動]", paramLog);
try {
runtimeService.groundWindData();
} catch (Exception e) {
logger.error("[{}異常]{}", new Object[]{paramLog, e});
maintainAlarmSender.sendMail(MaintainAlarmSender.DEFAULT_MAIL_SUB, paramLog + "異常:" + e);
}
logger.info("[{}結(jié)束]", paramLog);
}
}
線程池都有哪幾種工作隊(duì)列
1、ArrayBlockingQueue
是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
2、LinkedBlockingQueue
一個基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊(duì)列
3、SynchronousQueue
一個不存儲元素的阻塞隊(duì)列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊(duì)列。
4、PriorityBlockingQueue
一個具有優(yōu)先級的無限阻塞隊(duì)列。
線程池中的幾種重要的參數(shù)及流程說明
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:核心池的大小,這個參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù),這個參數(shù)也是一個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個線程;
keepAliveTime:表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達(dá)到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;
unit:參數(shù)keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue:一個阻塞隊(duì)列,用來存儲等待執(zhí)行的任務(wù),這個參數(shù)的選擇也很重要,會對線程池的運(yùn)行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
PriorityBlockingQueue
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和SynchronousQueue。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
threadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程做些更有意義的事情,比如設(shè)置daemon和優(yōu)先級等等
handler:表示當(dāng)拒絕處理任務(wù)時的策略,有以下四種取值:
1、AbortPolicy:直接拋出異常。
2、CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
3、DiscardOldestPolicy:丟棄隊(duì)列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)。
4、DiscardPolicy:不處理,丟棄掉。
5、也可以根據(jù)應(yīng)用場景需要來實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
ThreadPoolExecutor 源碼理解 https://www.cnblogs.com/dolphin0520/p/3932921.html
public static void test(int size) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 20, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));
for (int i = 0; i < size; i++) {
poolExecutor.execute(new DemoTask(i));
Console.log("poolSize:" + poolExecutor.getPoolSize());
Console.log("corePoolSize:" + poolExecutor.getCorePoolSize());
Console.log("maximumPoolSize:" + poolExecutor.getMaximumPoolSize());
Console.log("queue:" + poolExecutor.getQueue().size());
Console.log("completedTaskCount:" + poolExecutor.getCompletedTaskCount());
Console.log("largestPoolSize:" + poolExecutor.getLargestPoolSize());
Console.log("keepAliveTime:" + poolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
}
poolExecutor.shutdown();
}
class DemoTask implements Runnable {
private int taskNum;
public DemoTask(int taskNum) {
this.taskNum = taskNum;
}
@Override
public void run() {
Console.log(StringUtils.center("正在執(zhí)行" + taskNum, 20, "="));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Console.log(StringUtils.center("執(zhí)行完畢" + taskNum, 20, "="));
}
}
=======正在執(zhí)行0========
poolSize:1
corePoolSize:5
maximumPoolSize:20
queue:0
completedTaskCount:0
largestPoolSize:1
keepAliveTime:2
poolSize:2
corePoolSize:5
maximumPoolSize:20
queue:0
completedTaskCount:0
=======正在執(zhí)行1========
largestPoolSize:2
keepAliveTime:2
poolSize:3
corePoolSize:5
maximumPoolSize:20
=======正在執(zhí)行2========
queue:0
completedTaskCount:0
largestPoolSize:3
keepAliveTime:2
poolSize:4
corePoolSize:5
maximumPoolSize:20
queue:0
=======正在執(zhí)行3========
completedTaskCount:0
largestPoolSize:4
keepAliveTime:2
poolSize:5
corePoolSize:5
=======正在執(zhí)行4========
maximumPoolSize:20
queue:0
completedTaskCount:0
largestPoolSize:5
keepAliveTime:2
poolSize:5
corePoolSize:5
maximumPoolSize:20
queue:1
completedTaskCount:0
largestPoolSize:5
keepAliveTime:2
poolSize:5
corePoolSize:5
maximumPoolSize:20
queue:2
completedTaskCount:0
largestPoolSize:5
keepAliveTime:2
poolSize:5
corePoolSize:5
maximumPoolSize:20
queue:3
completedTaskCount:0
largestPoolSize:5
keepAliveTime:2
poolSize:5
corePoolSize:5
maximumPoolSize:20
queue:4
completedTaskCount:0
largestPoolSize:5
keepAliveTime:2
poolSize:5
corePoolSize:5
maximumPoolSize:20
queue:5
completedTaskCount:0
largestPoolSize:5
keepAliveTime:2
poolSize:6
corePoolSize:5
maximumPoolSize:20
queue:5
completedTaskCount:0
largestPoolSize:6
keepAliveTime:2
poolSize:7
corePoolSize:5
maximumPoolSize:20
queue:5
completedTaskCount:0
largestPoolSize:7
keepAliveTime:2
=======正在執(zhí)行11=======
poolSize:8
corePoolSize:5
maximumPoolSize:20
queue:5
completedTaskCount:0
=======正在執(zhí)行12=======
=======正在執(zhí)行10=======
largestPoolSize:8
keepAliveTime:2
poolSize:9
corePoolSize:5
=======正在執(zhí)行13=======
maximumPoolSize:20
queue:5
completedTaskCount:0
largestPoolSize:9
keepAliveTime:2
poolSize:10
corePoolSize:5
maximumPoolSize:20
=======正在執(zhí)行14=======
queue:5
completedTaskCount:0
largestPoolSize:10
keepAliveTime:2
poolSize:11
corePoolSize:5
maximumPoolSize:20
queue:5
=======正在執(zhí)行15=======
completedTaskCount:0
largestPoolSize:11
keepAliveTime:2
poolSize:12
corePoolSize:5
maximumPoolSize:20
queue:5
completedTaskCount:0
=======正在執(zhí)行16=======
largestPoolSize:12
keepAliveTime:2
poolSize:13
corePoolSize:5
maximumPoolSize:20
=======正在執(zhí)行17=======
queue:5
completedTaskCount:0
largestPoolSize:13
keepAliveTime:2
poolSize:14
corePoolSize:5
maximumPoolSize:20
queue:5
=======正在執(zhí)行18=======
completedTaskCount:0
largestPoolSize:14
keepAliveTime:2
poolSize:15
corePoolSize:5
maximumPoolSize:20
=======正在執(zhí)行19=======
queue:5
completedTaskCount:0
largestPoolSize:15
keepAliveTime:2
=======執(zhí)行完畢0========
=======正在執(zhí)行5========
=======執(zhí)行完畢1========
=======執(zhí)行完畢2========
=======正在執(zhí)行6========
=======正在執(zhí)行7========
=======執(zhí)行完畢4========
=======正在執(zhí)行8========
=======執(zhí)行完畢3========
=======正在執(zhí)行9========
=======執(zhí)行完畢13=======
=======執(zhí)行完畢12=======
=======執(zhí)行完畢10=======
=======執(zhí)行完畢11=======
=======執(zhí)行完畢15=======
=======執(zhí)行完畢16=======
=======執(zhí)行完畢14=======
=======執(zhí)行完畢19=======
=======執(zhí)行完畢18=======
=======執(zhí)行完畢17=======
=======執(zhí)行完畢5========
=======執(zhí)行完畢7========
=======執(zhí)行完畢6========
=======執(zhí)行完畢8========
=======執(zhí)行完畢9========
怎么理解無界隊(duì)列和有界隊(duì)列
有界隊(duì)列
1.初始的poolSize < corePoolSize,提交的runnable任務(wù),會直接做為new一個Thread的參數(shù),立馬執(zhí)行 。
2.當(dāng)提交的任務(wù)數(shù)超過了corePoolSize,會將當(dāng)前的runable提交到一個block queue中。
3.有界隊(duì)列滿了之后,如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進(jìn)行救急處理,立馬執(zhí)行對應(yīng)的runnable任務(wù)。
4.如果3中也無法處理了,就會走到第四步執(zhí)行reject操作。
無界隊(duì)列
與有界隊(duì)列相比,除非系統(tǒng)資源耗盡,否則無界的任務(wù)隊(duì)列不存在任務(wù)入隊(duì)失敗的情況。當(dāng)有新的任務(wù)到來,系統(tǒng)的線程數(shù)小于corePoolSize時,則新建線程執(zhí)行任務(wù)。當(dāng)達(dá)到corePoolSize后,就不會繼續(xù)增加,若后續(xù)仍有新的任務(wù)加入,而沒有空閑的線程資源,則任務(wù)直接進(jìn)入隊(duì)列等待。若任務(wù)創(chuàng)建和處理的速度差異很大,無界隊(duì)列會保持快速增長,直到耗盡系統(tǒng)內(nèi)存。
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略。