重寫并優(yōu)化 ThreadPoolExecutor 線程池

import java.util.concurrent.*;

import java.util.concurrent.atomic.AtomicInteger;

/**

* Created by yehan .

*? 代碼和思路主要來自于:

*

* tomcat :

*? ? ? ? org.apache.catalina.core.StandardThreadExecutor

*

* java.util.concurrent

* threadPoolExecutor execute執(zhí)行策略:? ? ? ? 優(yōu)先offer到queue,queue滿后再擴充線程到maxThread,如果已經(jīng)到了maxThread就reject

*? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 比較適合于CPU密集型應(yīng)用(比如runnable內(nèi)部執(zhí)行的操作都在JVM內(nèi)部,memory copy, or compute等等)

*

* StandardThreadExecutor execute執(zhí)行策略: 優(yōu)先擴充線程到maxThread,再offer到queue,如果滿了就reject

*? ? ? ? ? ? ? ? ? ? ? ? ? 比較適合于業(yè)務(wù)處理需要遠程資源的場景

*

* @author yehan

*

*/

public class StandardThreadExecutorextends ThreadPoolExecutor {

public static final int DEFAULT_MIN_THREADS =20;

? ? public static final int DEFAULT_MAX_THREADS =200;

? ? /**1 minutes*/

? ? public static final int DEFAULT_MAX_IDLE_TIME =60 *1000;

? ? /**正在處理的任務(wù)數(shù)*/

? ? protected AtomicIntegersubmittedTasksCount;

? ? /**最大允許同時處理的任務(wù)數(shù)*/

? ? private int maxSubmittedTaskCount;

? ? public StandardThreadExecutor() {

this(DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS);

? ? }

public StandardThreadExecutor(int coreThread, int maxThreads) {

this(coreThread, maxThreads, maxThreads);

? ? }

public StandardThreadExecutor(int coreThread, int maxThreads, long keepAliveTime, TimeUnit unit) {

this(coreThread, maxThreads, keepAliveTime, unit, maxThreads);

? ? }

public StandardThreadExecutor(int coreThreads, int maxThreads, int queueCapacity) {

this(coreThreads, maxThreads, queueCapacity, Executors.defaultThreadFactory());

? ? }

public StandardThreadExecutor(int coreThreads, int maxThreads, int queueCapacity, ThreadFactory threadFactory) {

this(coreThreads, maxThreads, DEFAULT_MAX_IDLE_TIME, TimeUnit.MILLISECONDS, queueCapacity, threadFactory);

? ? }

public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit, int queueCapacity) {

this(coreThreads, maxThreads, keepAliveTime, unit, queueCapacity, Executors.defaultThreadFactory());

? ? }

public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int queueCapacity, ThreadFactory threadFactory) {

this(coreThreads, maxThreads, keepAliveTime, unit, queueCapacity, threadFactory, new AbortPolicy());

? ? }

public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory, handler);

? ? ? ? ((ExecutorQueue) getQueue()).setStandardThreadExecutor(this);

? ? ? ? submittedTasksCount =new AtomicInteger(0);

? ? ? ? // 最大并發(fā)任務(wù)限制: 隊列buffer數(shù) + 最大線程數(shù)

? ? ? ? maxSubmittedTaskCount = queueCapacity + maxThreads;

? ? }

@Override

? ? public void execute(Runnable command) {

int count =submittedTasksCount.incrementAndGet();

? ? ? ? // 超過最大的并發(fā)任務(wù)限制,進行 reject

// 依賴的LinkedTransferQueue沒有長度限制,因此這里進行控制

? ? ? ? if (count >maxSubmittedTaskCount) {

submittedTasksCount.decrementAndGet();

? ? ? ? ? ? getRejectedExecutionHandler().rejectedExecution(command, this);

? ? ? ? }

try {

super.execute(command);

? ? ? ? }catch (RejectedExecutionException rx) {

// there could have been contention around the queue

? ? ? ? ? ? if (!((ExecutorQueue) getQueue()).force(command)) {

submittedTasksCount.decrementAndGet();

? ? ? ? ? ? ? ? getRejectedExecutionHandler().rejectedExecution(command, this);

? ? ? ? ? ? }

}

}

public int getSubmittedTasksCount() {

return this.submittedTasksCount.get();

? ? }

public int getMaxSubmittedTaskCount() {

return maxSubmittedTaskCount;

? ? }

@Override

? ? protected void afterExecute(Runnable r, Throwable t) {

submittedTasksCount.decrementAndGet();

? ? }

}

/**

* LinkedTransferQueue 能保證更高性能,相比與LinkedBlockingQueue有明顯提升

*

*


*? ? ? ? 1) 不過LinkedTransferQueue的缺點是沒有隊列長度控制,需要在外層協(xié)助控制

*

*

* @author yehan

*

*/

class ExecutorQueueextends LinkedTransferQueue {

private static final long serialVersionUID = -265236426751004839L;

? ? StandardThreadExecutorthreadPoolExecutor;

? ? public ExecutorQueue() {

super();

? ? }

public void setStandardThreadExecutor(StandardThreadExecutor threadPoolExecutor) {

this.threadPoolExecutor = threadPoolExecutor;

? ? }

/** 注:代碼來源于 tomcat*/

? ? public boolean force(Runnable o) {

if (threadPoolExecutor.isShutdown()) {

throw new RejectedExecutionException("Executor not running, can't force a command into the queue");

? ? ? ? }

// forces the item onto the queue, to be used if the task is rejected

? ? ? ? return super.offer(o);

? ? }

/**

*注:tomcat的代碼進行一些小變更

*

*/

? ? @Override

? ? public boolean offer(Runnable o) {

int poolSize =threadPoolExecutor.getPoolSize();

? ? ? ? // we are maxed out on threads, simply queue the object

? ? ? ? if (poolSize ==threadPoolExecutor.getMaximumPoolSize()) {

return super.offer(o);

? ? ? ? }

// we have idle threads, just add it to the queue

// note that we don't use getActiveCount(), see BZ 49730

? ? ? ? if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) {

return super.offer(o);

? ? ? ? }

// if we have less threads than maximum force creation of a new

// thread

? ? ? ? if (poolSize

return false;

? ? ? ? }

// if we reached here, we need to add it to the queue

? ? ? ? return super.offer(o);

? ? }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容