一、 前言
最近對(duì)重構(gòu)Dubbo服務(wù)線程池調(diào)優(yōu),工作線程使用 CachedThreadPool 線程策略,可是上線之后,出現(xiàn)線程池一路上升,差點(diǎn)導(dǎo)致線上事故。

所以本篇文章對(duì)線程池揭開(kāi)謎底。
二、Dubbo線程池介紹
Dubbo中 CachedThreadPool源代碼
package org.apache.dubbo.common.threadpool.support.cached;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.ALIVE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ALIVE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CORE_THREADS;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_QUEUES;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREAD_NAME;
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
/**
* This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for
* the upcoming request.
*
* @see java.util.concurrent.Executors#newCachedThreadPool()
*/
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
//1 獲取線程名稱前綴 如果沒(méi)有 默認(rèn)是Dubbo
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//2\. 獲取線程池核心線程數(shù)大小
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
//3\. 獲取線程池最大線程數(shù)大小,默認(rèn)整型最大值
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
//4\. 獲取線程池隊(duì)列大小
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
//5\. 獲取線程池多長(zhǎng)時(shí)間被回收 單位毫秒
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
//6\. 使用JUC包里的ThreadPoolExecutor創(chuàng)建線程池
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
可以看出,Dubbo本質(zhì)上是使用JUC包里的ThreadPoolExecutor創(chuàng)建線程池,源碼如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
大致流程圖如下:

1、 當(dāng)線程池小于corePoolSize時(shí),新任務(wù)將創(chuàng)建一個(gè)新的線程,即使此時(shí)線程池中存在空閑線程。
2、 當(dāng)線程池達(dá)到corePoolSize時(shí),新提交的任務(wù)將被放入workQueue中,等待線程池任務(wù)調(diào)度執(zhí)行。
3、 當(dāng)workQueue已滿,且maximumPoolSize>corePoolSize時(shí),新任務(wù)會(huì)創(chuàng)建新線程執(zhí)行任務(wù)。
4、 當(dāng)提交任務(wù)數(shù)超過(guò)maximumPoolSize時(shí),新提交任務(wù)由RejectedExecutionHandler處理。
5、 當(dāng)線程池中超過(guò)corePoolSize時(shí),空閑時(shí)間達(dá)到keepAliveTime時(shí),關(guān)閉空閑線程。
另外,當(dāng)設(shè)置了allowCoreThreadTimeOut(true)時(shí),線程池中corePoolSize線程空閑時(shí)間達(dá)到keepAliveTime也將關(guān)閉。
RejectedExecutionHandler 默認(rèn)提供了四種拒絕策略
1、AbortPolicy策略:該策略會(huì)直接拋出異常,阻止系統(tǒng)正常工作;
2、CallerRunsPolicy策略:如果線程池的線程數(shù)量達(dá)到上限,該策略會(huì)把任務(wù)隊(duì)列中的任務(wù)放在調(diào)用者線程當(dāng)中運(yùn)行;
3、DiscardOledestPolicy策略:該策略會(huì)丟棄任務(wù)隊(duì)列中最老的一個(gè)任務(wù),也就是當(dāng)前任務(wù)隊(duì)列中最先被添加進(jìn)去的,馬上要被執(zhí)行的那個(gè)任務(wù),并嘗試再次提交。
4、DiscardPolicy策略:該策略會(huì)默默丟棄無(wú)法處理的任務(wù),不予任何處理。當(dāng)然使用此策略,業(yè)務(wù)場(chǎng)景中需允許任務(wù)的丟失;
值得注意的是,Dubbo中拒絕策略 AbortPolicyWithReport 實(shí)際上是繼承了 ThreadPoolExecutor.AbortPolicy 策略,主要是多打印了一些關(guān)鍵信息和堆棧信息。
三、 關(guān)于線程池配置
線程池配置非常重要,但是往往很容易忽視,配置不合理或者線程池復(fù)用次數(shù)少,依然會(huì)頻繁的創(chuàng)建和銷戶。
- 如何合理計(jì)算核心線程數(shù)?
我們可以通過(guò)接口平均響應(yīng)時(shí)間和服務(wù)需要支撐的QPS計(jì)算 例如: 我們接口平均RT 0.005s,那么,一個(gè)工作線程可以處理任務(wù)數(shù)200 如果單機(jī)需要支撐QPS 3W,那么可以計(jì)算出 需要核心線程數(shù) 150
即公式: QPS ? (1 ? 平均RT) = QPS * RT
- 容易忽視的 @Async 注解
Spring中使用 @Async 注解 默認(rèn)線程池是 SimpleAsyncTaskExecutor,默認(rèn)情況下如果沒(méi)有配置等于沒(méi)有使用線程池,因?yàn)樗看味紩?huì)重新創(chuàng)建一個(gè)新的線程,不會(huì)復(fù)用。
所以切記,如果使用@Async 一定要配置.
@EnableAsync
@Configuration
@Slf4j
public class ThreadPoolConfig {
private static final int corePoolSize = 100; // 核心線程數(shù)(默認(rèn)線程數(shù))
private static final int maxPoolSize = 400; // 最大線程數(shù)
private static final int keepAliveTime = 60; // 允許線程空閑時(shí)間(單位:默認(rèn)為秒)
private static final int queueCapacity = 0; // 緩沖隊(duì)列數(shù)
private static final String threadNamePrefix = "Async-Service-"; // 線程池名前綴
@Bean("taskExecutor")
public ThreadPoolTaskExecutor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 初始化
executor.initialize();
return executor;
}
}
四、 線程池飆升如何引起的?
Dubbo服務(wù)端工作線程我們配置如下:
corethreads: 150
threads: 800
threadpool: cached
queues: 10
看上去是不是挺合理的,設(shè)置很小隊(duì)列數(shù),是為了防止抖動(dòng)引起短暫線程池不足情況。從上面看,貌似也沒(méi)什么問(wèn)題,從白天業(yè)務(wù)量來(lái)說(shuō)核心線程數(shù)是完全夠用的(RT<5ms, QPS<1w)??墒巧暇€之后,線程池一路飆升,最大達(dá)到閾值最大值800, 報(bào)警信息如下:
org.apache.dubbo.remoting.RemotingException("Server side(IP,20880) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-IP:20880, Pool Size: 800 (active: 4, core: 300, max: 800, largest: 800), Task: 4101304 (completed: 4101301), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://IP:20880!"
從上可以看出,到達(dá)最大線程數(shù)時(shí),active線程數(shù)是很少的,這完全不符合預(yù)期。
五、場(chǎng)景模擬
由源碼
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues))
可知:
當(dāng)隊(duì)列元素為0時(shí),阻塞隊(duì)列使用的是SynchronousQueue;當(dāng)隊(duì)列元素小于0時(shí),使用的是無(wú)界阻塞隊(duì)列LinkedBlockingQueue;當(dāng)隊(duì)列元素大于0時(shí),使用的是有界的隊(duì)列LinkedBlockingQueue。
核心線程數(shù)和最大線程數(shù)肯定不會(huì)有問(wèn)題,所以我猜想是否隊(duì)列數(shù)設(shè)置是否有問(wèn)題。
為了復(fù)現(xiàn),我寫(xiě)了個(gè)簡(jiǎn)單的代碼模擬
package com.bytearch.fast.cloud;
import java.util.concurrent.*;
public class TestThreadPool {
public final static int queueSize = 10;
public static void main(String[] args) {
ExecutorService executorService = getThreadPool(queueSize);
for (int i = 0; i < 100000; i++) {
int finalI = i;
try {
executorService.execute(new Runnable() {
@Override
public void run() {
doSomething(finalI);
}
});
} catch (Exception e) {
System.out.println("emsg:" + e.getMessage());
}
if (i % 20 == 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("all done!");
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static ExecutorService getThreadPool(int queues) {
int cores = 150;
int threads = 800;
int alive = 60 * 1000;
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)));
}
public static void doSomething(final int i) {
try {
Thread.sleep(5);
System.out.println("thread:" + Thread.currentThread().getName() + ", active:" + Thread.activeCount() + ", do:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
模擬結(jié)果:
queueSize值現(xiàn)象0沒(méi)有出現(xiàn)異常10出現(xiàn)拒絕異常100沒(méi)有出現(xiàn)異常
異常如下:
emsg:Task com.bytearch.fast.cloud.TestThreadPool$1@733aa9d8 rejected from java.util.concurrent.ThreadPoolExecutor@6615435c[Running, pool size = 800, active threads = 32, queued tasks = 9, completed tasks = 89755]
all done!
很顯然,當(dāng)并發(fā)較高時(shí),使用LinkedBlockingQueue有界隊(duì)列, 隊(duì)列數(shù)設(shè)置相對(duì)較小時(shí),線程池會(huì)出現(xiàn)問(wèn)題。
將queues配置改為0后上線,恢復(fù)正常。
至于更深層原因,感興趣同學(xué)可以深度分析下,也可以公眾號(hào)后臺(tái)與我交流。
六、總結(jié)
這次分享了線程池 ThreadPoolExecutor 基本原理、線程池配置計(jì)算方式,和容易忽視的使用注解@Async 配置問(wèn)題。
另外介紹了下我們使用線程池遇到的詭異問(wèn)題,一個(gè)參數(shù)問(wèn)題,可能導(dǎo)致不可預(yù)期的后果。