SpringBoot線程池ThreadPoolTaskExecutor

SpringBoot線程池ThreadPoolTaskExecutor

SpringBoot框架@Async注解文章:SpringBoot異步調(diào)用@Async
SpringBoot線程池ThreadPoolExecutor文章:SpringBoot線程池ThreadPoolExecutor

ThreadPoolTaskExecutor是一個(gè)spring的線程池技術(shù),其實(shí),它的實(shí)現(xiàn)方式完全是使用ThreadPoolExecutor進(jìn)行實(shí)現(xiàn)。

SpringBoot線程池ThreadPoolTaskExecutor代碼實(shí)現(xiàn)

service層
  1. 創(chuàng)建一個(gè)service層的接口AsyncService,如下:
public interface AsyncService {
    /**
     * 執(zhí)行異步任務(wù)
     * */
    void executeAsync();
}
  1. 對(duì)應(yīng)的AsyncServiceImpl,實(shí)現(xiàn)如下:
import com.ceair.service.AsyncService;
import lombok.extern.java.Log;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 異步線程service
 * @author jeffrey_hjf
 */
@Service
@Log
public class AsyncServiceImpl implements AsyncService {

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        log.info("start executeAsync");
        try{
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        }
        log.info("end executeAsync");
    }
}
線程池配置

創(chuàng)建一個(gè)配置類ThreadPoolExecutorConfig,用來定義如何創(chuàng)建一個(gè)ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync這兩個(gè)注解,表示這是個(gè)配置類,并且是線程池的配置類,如下所示:

@Configuration
//@EnableAsync //在啟動(dòng)類里面加了@EnableAsync標(biāo)識(shí),這里可以不加
@Log
public class ThreadPoolExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize = 5;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize = 5;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity = 99999;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix = "async-service-";

    /**
     * 異步線程池
     */
    @Bean("asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數(shù)
        executor.setCorePoolSize(corePoolSize);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(maxPoolSize);
        //配置隊(duì)列大小
        executor.setQueueCapacity(queueCapacity);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執(zhí)行初始化
        executor.initialize();
        return executor;
    }
}
controller層

創(chuàng)建一個(gè)controller為Hello,里面定義一個(gè)http接口,做的事情是調(diào)用Service層的服務(wù),如下:

/**
 * @ClassName UserController
 * @Author jeffrey_hjf
 * @Description User
 **/

@RestController
@RequestMapping("/api")
@Log
public class UserController {

    @Autowired
    private AsyncService asyncService;

    /**
     * 線程池測(cè)試入口
     */
    @GetMapping("executeAsyncPool")
    public String executeAsyncPool() {
        log.info("start submit");

        //調(diào)用service層的任務(wù)
        asyncService.executeAsync();

        log.info("end submit");

        return "success";
    }
}
執(zhí)行效果

控制臺(tái)看見日志如下:

2019-08-26 17:28:12.552  INFO 9024 --- [nio-9090-exec-1] com.ceair.controller.UserController      : start submit
2019-08-26 17:28:12.558  INFO 9024 --- [nio-9090-exec-1] com.ceair.controller.UserController      : end submit
2019-08-26 17:28:12.559  INFO 9024 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl  : start executeAsync
2019-08-26 17:28:13.560  INFO 9024 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl  : end executeAsync

如上日志所示,我們可以看到controller的執(zhí)行線程是”nio-8080-exec-1”,這是tomcat的執(zhí)行線程,而service層的日志顯示線程名為“async-service-1”,顯然已經(jīng)在我們配置的線程池中執(zhí)行了,并且每次請(qǐng)求中,controller的起始和結(jié)束日志都是連續(xù)打印的,表明每次請(qǐng)求都快速響應(yīng)了,而耗時(shí)的操作都留給線程池中的線程去異步執(zhí)行;

SpringBoot線程池?cái)U(kuò)展ThreadPoolTaskExecutor代碼實(shí)現(xiàn)

雖然我們已經(jīng)用上了線程池,但是還不清楚線程池當(dāng)時(shí)的情況,有多少線程在執(zhí)行,多少在隊(duì)列中等待呢?這里我創(chuàng)建了一個(gè)ThreadPoolTaskExecutor的子類,在每次提交線程的時(shí)候都會(huì)將當(dāng)前線程池的運(yùn)行狀況打印出來。

擴(kuò)展ThreadPoolTaskExecutor類
import lombok.extern.java.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Log
public class VisiableThreadPoolTaskExecutorConfig extends ThreadPoolTaskExecutor {

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(null==threadPoolExecutor){
            return;
        }
        log.info(String.format("%s, %s,taskCount [%s], completedTaskCount [%s], activeCount [%s], queueSize [%s]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size()));
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

如上所示,showThreadPoolInfo方法中將任務(wù)總數(shù)、已完成數(shù)、活躍線程數(shù),隊(duì)列大小都打印出來了,然后Override了父類的execute、submit等方法,在里面調(diào)用showThreadPoolInfo方法,這樣每次有任務(wù)被提交到線程池的時(shí)候,都會(huì)將當(dāng)前線程池的基本情況打印到日志中;

修改ThreadPoolExecutorConfig配置類

修改ThreadPoolExecutorConfig.java的asyncServiceExecutor方法,將ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(),如下所示:

/**
 * 線程池配置類
 * @author jeffrey_hjf
 */
@Configuration
//@EnableAsync
@Log
public class ThreadPoolExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize = 5;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize = 5;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity = 99999;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix = "async-service-";

    /**
     * 異步線程池
     */
    @Bean("asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutorConfig();
        //配置核心線程數(shù)
        executor.setCorePoolSize(corePoolSize);
        //配置最大線程數(shù)
        executor.setMaxPoolSize(maxPoolSize);
        //配置隊(duì)列大小
        executor.setQueueCapacity(queueCapacity);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執(zhí)行初始化
        executor.initialize();
        return executor;
    }
}
執(zhí)行效果

日志如下:

2019-08-26 17:55:22.091  INFO 15252 --- [nio-9090-exec-8] com.ceair.controller.UserController      : start submit
2019-08-26 17:55:22.091  INFO 15252 --- [nio-9090-exec-8] c.c.VisiableThreadPoolTaskExecutorConfig : async-service-, 2. do submit,taskCount [5], completedTaskCount [2], activeCount [2], queueSize [1]
2019-08-26 17:55:22.092  INFO 15252 --- [nio-9090-exec-8] com.ceair.controller.UserController      : end submit
2019-08-26 17:55:22.092  INFO 15252 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl  : start executeAsync
2019-08-26 17:55:23.094  INFO 15252 --- [async-service-1] com.ceair.service.impl.AsyncServiceImpl  : end executeAsync

這說明提交任務(wù)到線程池的時(shí)候,調(diào)用的是submit(Callable task)這個(gè)方法,當(dāng)前已經(jīng)提交了5個(gè)任務(wù),完成了2個(gè),當(dāng)前有2個(gè)線程在處理任務(wù),還剩1個(gè)任務(wù)在隊(duì)列中等待,線程池的基本情況一路了然;

SpringBoot框架@Async注解文章:SpringBoot異步調(diào)用@Async
SpringBoot線程池ThreadPoolExecutor文章:SpringBoot線程池ThreadPoolExecutor

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容