擴(kuò)展ThreadPoolExecutor的一種辦法

概述


在JAVA的世界里,如果想并行的執(zhí)行一些任務(wù),可以使用ThreadPoolExecutor。
大部分情況下直接使用ThreadPoolExecutor就可以滿足要求了,但是在某些場(chǎng)景下,比如瞬時(shí)大流量的,為了提高響應(yīng)和吞吐量,最好還是擴(kuò)展一下ThreadPoolExecutor。

全宇宙的JAVA IT人士應(yīng)該都知道ThreadPoolExecutor的執(zhí)行流程:

  • core線程還能應(yīng)付的,則不斷的<font color="red">創(chuàng)建</font>新的線程;
  • core線程無(wú)法應(yīng)付,則將任務(wù)扔到隊(duì)列里面;
  • 隊(duì)列滿了(<font color="red">意味著插入任務(wù)失敗</font>),則開始創(chuàng)建MAX線程,線程數(shù)達(dá)到MAX后,隊(duì)列還一直是滿的,則拋出RejectedExecutionException.

這個(gè)執(zhí)行流程有個(gè)小問(wèn)題,就是當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,會(huì)<font color="red">立刻</font>將任務(wù)添加到隊(duì)列中,如果隊(duì)列非常長(zhǎng),而任務(wù)又非常多,那么將會(huì)有頻繁的<font color="red">任務(wù)入隊(duì)列</font>和<font color="red">任務(wù)出隊(duì)列</font>的操作。

根據(jù)實(shí)際的壓測(cè)發(fā)現(xiàn),這種操作也是有一定消耗的。其實(shí)JAVA提供的SynchronousQueue隊(duì)列是一個(gè)零長(zhǎng)度的隊(duì)列,任務(wù)都是直接由生產(chǎn)者遞交給消費(fèi)者,中間沒(méi)有入隊(duì)列的過(guò)程,可見JAVA API的設(shè)計(jì)者也是有考慮過(guò)入隊(duì)列這種操作的開銷。

另外,任務(wù)一多,立刻扔到隊(duì)列里,而MAX線程又不干活,如果隊(duì)列里面太多任務(wù)了,只有可憐的core線程在忙,也是會(huì)影響性能的。

當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,能不能延后<font color="red">入隊(duì)列</font>這個(gè)操作呢? 讓MAX線程盡快啟動(dòng)起來(lái),幫忙處理任務(wù)。

也即是說(shuō),當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,如果當(dāng)前線程池中的線程數(shù)量還小于MAX線程數(shù)的時(shí)候,繼續(xù)創(chuàng)建新的線程處理任務(wù),一直到線程數(shù)量到達(dá)MAX后,才將任務(wù)插入到隊(duì)列里

我們通過(guò)覆蓋隊(duì)列的offer方法來(lái)實(shí)現(xiàn)這個(gè)目標(biāo)。

  @Override
public  boolean offer(Runnable o) {
    int currentPoolThreadSize = executor.getPoolSize();
    //如果線程池里的線程數(shù)量已經(jīng)到達(dá)最大,將任務(wù)添加到隊(duì)列中
    if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
        return super.offer(o);
    }
    //說(shuō)明有空閑的線程,這個(gè)時(shí)候無(wú)需創(chuàng)建core線程之外的線程,而是把任務(wù)直接丟到隊(duì)列里即可
    if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(o);
    }

    //如果線程池里的線程數(shù)量還沒(méi)有到達(dá)最大,直接創(chuàng)建線程,而不是把任務(wù)丟到隊(duì)列里面
    if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
        return false;
    }

    return super.offer(o);
}

注意其中的

if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
        return super.offer(o);
}

是表示core線程仍然能處理的來(lái),同時(shí)又有空閑線程的情況,將任務(wù)插入到隊(duì)列中。 如何判斷線程池中有空閑線程呢? 可以使用一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn),每當(dāng)execute方法被執(zhí)行的時(shí)候,計(jì)算器加1,當(dāng)afterExecute被執(zhí)行后,計(jì)數(shù)器減1.

    @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        //代碼未完整,待補(bǔ)充。。。。。
    }
 @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

這樣,當(dāng)

executor.getSubmittedTaskCount() < currentPoolThreadSize

的時(shí)候,說(shuō)明有空閑線程。


完整代碼


EnhancedThreadPoolExecutor類


package executer;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

    /**
     * 計(jì)數(shù)器,用于表示已經(jīng)提交到隊(duì)列里面的task的數(shù)量,這里task特指還未完成的task。
     * 當(dāng)task執(zhí)行完后,submittedTaskCount會(huì)減1的。
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
        workQueue.setExecutor(this);
    }

    /**
     * 覆蓋父類的afterExecute方法,當(dāng)task執(zhí)行完成后,將計(jì)數(shù)器減1
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }


    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }


    /**
     * 覆蓋父類的execute方法,在任務(wù)開始執(zhí)行之前,計(jì)數(shù)器加1。
     */
    @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            //當(dāng)發(fā)生RejectedExecutionException,嘗試再次將task丟到隊(duì)列里面,如果還是發(fā)生RejectedExecutionException,則直接拋出異常。
            BlockingQueue<Runnable> taskQueue = super.getQueue();
            if (taskQueue instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)taskQueue;
                if (!queue.forceTaskIntoQueue(command)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("隊(duì)列已滿");
                }
            } else {
                submittedTaskCount.decrementAndGet();
                throw rx;
            }
        }
    }
}

TaskQueue

package executer;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    private EnhancedThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EnhancedThreadPoolExecutor exec) {
        executor = exec;
    }

    public boolean forceTaskIntoQueue(Runnable o) {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor已經(jīng)關(guān)閉了,不能將task添加到隊(duì)列里面");
        }
        return super.offer(o);
    }

    @Override
    public  boolean offer(Runnable o) {
        int currentPoolThreadSize = executor.getPoolSize();
        //如果線程池里的線程數(shù)量已經(jīng)到達(dá)最大,將任務(wù)添加到隊(duì)列中
        if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
            return super.offer(o);
        }
        //說(shuō)明有空閑的線程,這個(gè)時(shí)候無(wú)需創(chuàng)建core線程之外的線程,而是把任務(wù)直接丟到隊(duì)列里即可
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(o);
        }

        //如果線程池里的線程數(shù)量還沒(méi)有到達(dá)最大,直接創(chuàng)建線程,而不是把任務(wù)丟到隊(duì)列里面
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        return super.offer(o);
    }
}

TestExecuter

package executer;

import java.util.concurrent.TimeUnit;

public class TestExecuter {
    private static final int CORE_SIZE = 5;

    private static final int MAX_SIZE = 10;

    private static final long KEEP_ALIVE_TIME = 30;

    private static final int QUEUE_SIZE = 5;

    static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));

    public static void main(String[] args){
        for (int i = 0; i < 15; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

            System.out.println("線程池中現(xiàn)在的線程數(shù)目是:"+executor.getPoolSize()+",  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:"+ executor.getQueue().size());
        }
    }
}

先運(yùn)行一下代碼,看看是否如何預(yù)期。直接執(zhí)行TestExecuter類中的main方法,運(yùn)行結(jié)果如下:

線程池中現(xiàn)在的線程數(shù)目是:1,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:2,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:3,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:4,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:6,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:7,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:8,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:9,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:1
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:2
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:3
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:4
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5

可以看到當(dāng)線程數(shù)增加到core數(shù)量的時(shí)候,隊(duì)列中是沒(méi)有任務(wù)的。一直到線程數(shù)量增加到MAX數(shù)量,也即是10的時(shí)候,隊(duì)列中才開始有任務(wù)。符合我們的預(yù)期。

如果我們注釋掉TaskQueue類中的offer方法,也即是不覆蓋隊(duì)列的offer方法,那么運(yùn)行結(jié)果如下:

線程池中現(xiàn)在的線程數(shù)目是:1,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:2,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:3,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:4,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:1
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:2
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:3
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:4
線程池中現(xiàn)在的線程數(shù)目是:5,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:6,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:7,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:8,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:9,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:10,  隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5

可以看到當(dāng)線程數(shù)增加到core數(shù)量的時(shí)候,隊(duì)列中已經(jīng)有任務(wù)了。


進(jìn)一步思考


在使用ThreadPoolExecutor的時(shí)候,如果發(fā)生了RejectedExecutionException,該如何處理?本文中的代碼是采用了重新將任務(wù)嘗試插入到隊(duì)列中,如果還是失敗則直接將reject異常拋出去。

 @Override
    public void execute(Runnable command) {
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            //當(dāng)發(fā)生RejectedExecutionException,嘗試再次將task丟到隊(duì)列里面,如果還是發(fā)生RejectedExecutionException,則直接拋出異常。
            BlockingQueue<Runnable> taskQueue = super.getQueue();
            if (taskQueue instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)taskQueue;
                if (!queue.forceTaskIntoQueue(command)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("隊(duì)列已滿");
                }
            } else {
                submittedTaskCount.decrementAndGet();
                throw rx;
            }
        }
    }

TaskQueue類提供了forceTaskIntoQueue方法,將任務(wù)插入到隊(duì)列中。

還有另一種解決方案,就是使用另外一個(gè)線程池來(lái)執(zhí)行任務(wù),當(dāng)?shù)谝粋€(gè)線程池拋出Reject異常時(shí),catch住它,并使用第二個(gè)線程池處理任務(wù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容