Java 線程池的正確使用姿勢(shì)
線程池 ThreadPool
1. 線程池的定義:
(摘自職Q)在面向?qū)ο缶幊讨?,?chuàng)建和銷(xiāo)毀對(duì)象是很費(fèi)時(shí)間的,因?yàn)閯?chuàng)建一個(gè)對(duì)象要獲取內(nèi)存資源或者其它更多資源。在Java中更是如此,虛擬機(jī)將試圖跟蹤每一個(gè)對(duì)象,以便能夠在對(duì)象銷(xiāo)毀后進(jìn)行垃圾回收。所以提高服務(wù)程序效率的一個(gè)手段就是盡可能減少創(chuàng)建和銷(xiāo)毀對(duì)象的次數(shù),特別是一些很耗資源的對(duì)象創(chuàng)建和銷(xiāo)毀,這就是"池化資源"技術(shù)產(chǎn)生的原因。線程池顧名思義就是事先創(chuàng)建若干個(gè)可執(zhí)行的線程放入一個(gè)池(容器)中,需要的時(shí)候從池中獲取線程不用自行創(chuàng)建,使用完畢不需要銷(xiāo)毀線程而是放回池中,從而減少創(chuàng)建和銷(xiāo)毀線程對(duì)象的開(kāi)銷(xiāo)。
2. 如何創(chuàng)建線程池:
-
使用 ThreadPoolExecutor:
ThreadPoolExecutor是一個(gè)靈活的、穩(wěn)定的線程池,允許進(jìn)行定制。 -
使用 Executors:
Executors中的靜態(tài)工廠方法之一來(lái)創(chuàng)建線程池:
newSingleThreadExecutor: 是一個(gè)單線程的Executor,它創(chuàng)建單個(gè)工作者線程來(lái)執(zhí)行任務(wù),如果這個(gè)線程異常結(jié)束,會(huì)創(chuàng)建另一個(gè)線程來(lái)替代。newSingleThreadExecutor能確保依照任務(wù)在隊(duì)列中的順序來(lái)串行執(zhí)行(例如 FIFO、LIFO、優(yōu)先級(jí))。
newFixedThreadPool: 將創(chuàng)建一個(gè)固定長(zhǎng)度的線程池,每當(dāng)提交一個(gè)任務(wù)時(shí)就創(chuàng)建一個(gè)線程,直到達(dá)到線程池的最大數(shù)量,這時(shí)線程池的規(guī)模將不再發(fā)生變化(如果某個(gè)線程由于發(fā)生了未預(yù)期的Exception而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新的線程)。
newCachedThreadPool: 將創(chuàng)建一個(gè)可緩存的線程池,如果線程池的當(dāng)前規(guī)模超過(guò)了處理需求時(shí),那么將回收空閑的線程,而當(dāng)需求增加時(shí),則可以添加新的線程,線程池規(guī)模不存在任何限制。
newScheduledThreadExecutor: 創(chuàng)建了一個(gè)固定長(zhǎng)度的線程池,而且以延遲或定時(shí)的方式來(lái)執(zhí)行任務(wù),類(lèi)似于Timer。
配置 ThreadPoolExecutor
public class ThreadPoolExecutor {
// 線程池維護(hù)的最小線程數(shù)
private volatile int corePoolSize;
// 線程池可容納線程數(shù)的最大值
private volatile int maximumPoolSize;
// 線程池達(dá)到閾值后,新的線程需要等待的時(shí)間
private volatile long keepAliveTime;
// 以工廠模式創(chuàng)建新的線程
private volatile ThreadFactory threadFactory;
// 上下文
private final AccessControlContext acc;
// 阻塞隊(duì)列
private final BlockingQueue<Runnable> workQueue;
// 拒絕策略
private volatile RejectedExecutionHandler handler;
/**
* ThreadPoolExecutor的核心構(gòu)造器
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
管理任務(wù)隊(duì)列 BlockingQueue
ThreadPoolExecutor允許提供一個(gè)BlockingQueue來(lái)保存等待執(zhí)行的任務(wù)?;镜娜蝿?wù)排隊(duì)方法有3種:無(wú)界隊(duì)列、有界隊(duì)列和同步移交(Synchronous Handoff)。
-
無(wú)界隊(duì)列: 隊(duì)列大小無(wú)限制,常用的為無(wú)界的LinkedBlockingQueue,使用該隊(duì)列做為阻塞隊(duì)列時(shí)要尤其當(dāng)心,當(dāng)任務(wù)耗時(shí)較長(zhǎng)時(shí)可能會(huì)導(dǎo)致大量新任務(wù)在隊(duì)列中堆積最終導(dǎo)致OOM。
閱讀代碼發(fā)現(xiàn),Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而樓主踩到的就是這個(gè)坑,當(dāng)QPS很高,發(fā)送數(shù)據(jù)很大,大量的任務(wù)被添加到這個(gè)無(wú)界LinkedBlockingQueue 中,導(dǎo)致cpu和內(nèi)存飆升服務(wù)器掛掉。 -
有界隊(duì)列: 常用的有兩類(lèi),
一類(lèi)是遵循FIFO原則的隊(duì)列如ArrayBlockingQueue與有界的LinkedBlockingQueue,
另一類(lèi)是優(yōu)先級(jí)隊(duì)列如PriorityBlockingQueue。PriorityBlockingQueue中的優(yōu)先級(jí)由任務(wù)的Comparator決定。
使用有界隊(duì)列時(shí)隊(duì)列大小需和線程池大小互相配合,線程池較小有界隊(duì)列較大時(shí)可減少內(nèi)存消耗,降低cpu使用率和上下文切換,但是可能會(huì)限制系統(tǒng)吞吐量。在我們的修復(fù)方案中,選擇的就是這個(gè)類(lèi)型的隊(duì)列,雖然會(huì)有部分任務(wù)被丟失,但是我們線上是排序日志搜集任務(wù),所以對(duì)部分對(duì)丟失是可以容忍的。 - 同步移交隊(duì)列: 如果不希望任務(wù)在隊(duì)列中等待而是希望將任務(wù)直接移交給工作線程,可使用SynchronousQueue作為等待隊(duì)列。SynchronousQueue不是一個(gè)真正的隊(duì)列,而是一種線程之間移交的機(jī)制。要將一個(gè)元素放入SynchronousQueue中,必須有另一個(gè)線程正在等待接收這個(gè)元素。只有在使用無(wú)界線程池或者有飽和策略時(shí)才建議使用該隊(duì)列。
飽和策略 RejectedExecutionHandler
ThreadPoolExecutor提供如下4種飽和策略:
- CallerRunsPolicy 由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)
- AbortPolicy 丟棄任務(wù)并直接拋出RejectedExecutionException異常(默認(rèn)的線程池拒絕策略)
- DiscardPolicy 僅丟棄任務(wù)并不拋出異常
- DiscardOldestPolicy 丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
自定義飽和策略,只需實(shí)現(xiàn)RejectedExecutionHandler接口并重寫(xiě)void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法
public class ThreadPoolExecutor{
/**
* 默認(rèn)的線程池拒絕策略 AbortPolicy
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/* ThreadPoolExecutor提供如下4中拒絕策略: */
/**
* 由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {}
/**
* 丟棄任務(wù)并直接拋出RejectedExecutionException異常
*/
public static class AbortPolicy implements RejectedExecutionHandler {}
/**
* 僅丟棄任務(wù)并不拋出異常
*/
public static class DiscardPolicy implements RejectedExecutionHandler {}
/**
* 丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {}
}
Executors(不推薦)
在阿里巴巴Java開(kāi)發(fā)手冊(cè)中提到,使用Executors創(chuàng)建線程池可能會(huì)導(dǎo)致OOM(OutOfMemory ,內(nèi)存溢出)

ExecutorService
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// .......其他用于任務(wù)提交的方法
}
為了解決執(zhí)行服務(wù)的生命周期問(wèn)題,
ExecutorService拓展了Executor接口,添加了一些用于生命周期管理的方法。
ExecutorService的生命周期有3種狀態(tài):運(yùn)行、關(guān)閉和已終止。
ExecutorService在初始創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。
shutdown方法將執(zhí)行平緩的關(guān)閉過(guò)程:不再接受新的任務(wù),同時(shí)等待已經(jīng)提交的任務(wù)執(zhí)行完成——包括那些還未開(kāi)始執(zhí)行的任務(wù)。
shutdownNow方法將執(zhí)行粗暴的關(guān)閉過(guò)程:它將嘗試取消所有運(yùn)行中的任務(wù),并且不再啟動(dòng)隊(duì)列中尚未開(kāi)始執(zhí)行的任務(wù)。
ThreadFactory
DefaultThreadFactory
/** * The default thread factory */
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
PrivilegedThreadFactory
/**
* 權(quán)限訪問(wèn)與類(lèi)加載
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
使用guava的 ThreadFactoryBuilder
public class ThreadFactoryBuilder{
private static ThreadFactory doBuild(ThreadFactoryBuilder builder) {
final String nameFormat = builder.nameFormat;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
final ThreadFactory backingThreadFactory =
(builder.backingThreadFactory != null)
? builder.backingThreadFactory
: Executors.defaultThreadFactory();
final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = backingThreadFactory.newThread(runnable);
if (nameFormat != null) {
thread.setName(format(nameFormat, count.getAndIncrement()));
}
if (daemon != null) {// 守護(hù)線程
thread.setDaemon(daemon);
}
if (priority != null) {// 優(yōu)先級(jí)
thread.setPriority(priority);
}
if (uncaughtExceptionHandler != null) {
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
return thread;
}
};
}
}
創(chuàng)建線程池的正確姿勢(shì)
/**
* @Auther: Noseparte * @Date: 2019/11/27 10:35
* @Description:
*
* <p>定制協(xié)議網(wǎng)關(guān)線程池</p>
*/
public class ThreadPool {
protected final static Logger _LOG = LogManager.getLogger(ThreadPool.class);
private List<ExecutorService> workers = new ArrayList<>();
private int threadCount;
private ThreadFactory threadFactory;
public ThreadPool(int threadCount) {
this(threadCount, new UserThreadFactory("網(wǎng)關(guān)游戲邏輯協(xié)議線程池"));
}
public ThreadPool(int threadCount, ThreadFactory threadFactory) {
this.threadCount = threadCount;
this.threadFactory = threadFactory;
if (threadCount <= 0 || null == threadFactory)
throw new IllegalArgumentException();
for (int i = 0; i < threadCount; i++) {
workers.add(new ThreadPoolExecutor(threadCount, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
threadFactory,
new ThreadPoolExecutor.AbortPolicy()));
}
}
public Future execute(Runnable task, int mold) {
int index = Math.abs(mold) % threadCount;
ExecutorService executor = workers.get(index);
if (null == executor) {
_LOG.error("sid=" + mold + ", tid=" + index);
return null;
}
return executor.submit(task);
}
public void shutdown() {
int count = 0;
for (ExecutorService worker : workers) {
_LOG.error("close thread{}.", ++count);
worker.shutdown();
}
}
static class UserThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
UserThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
總結(jié)
創(chuàng)建線程池的注意事項(xiàng):
- 根據(jù)業(yè)務(wù)場(chǎng)景定制ThreadFactory、飽和策略、任務(wù)隊(duì)列、ThreadPoolExecutor
- 注意BlockingQueue中任務(wù)阻塞數(shù)量越來(lái)越多會(huì)導(dǎo)致內(nèi)存耗盡(OOM), 要設(shè)置隊(duì)列的上限值
相關(guān)博文:友情鏈接
一次Java線程池誤用引發(fā)的血案和總結(jié)
Java中線程池,你真的會(huì)用嗎?