我們常用ThreadPoolExecutor提供的線程池服務,springboot框架提供了@Async注解,幫助我們更方便的將業(yè)務邏輯提交到線程池中異步執(zhí)行,今天我們就來實戰(zhàn)體驗這個線程池服務;
本文地址:http://blog.csdn.net/boling_cavalry/article/details/79120268
實戰(zhàn)環(huán)境
- windowns10;
- jdk1.8;
- springboot 1.5.9.RELEASE;
- 開發(fā)工具:IntelliJ IDEA;
實戰(zhàn)源碼
本次實戰(zhàn)的源碼可以在我的GitHub下載,地址:git@github.com:zq2599/blog_demos.git,項目主頁:https://github.com/zq2599/blog_demos
這里面有多個工程,本次用到的工程為threadpooldemoserver,如下圖紅框所示:
實戰(zhàn)步驟梳理
本次實戰(zhàn)的步驟如下:
1. 創(chuàng)建springboot工程;
2. 創(chuàng)建Service層的接口和實現(xiàn);
3. 創(chuàng)建controller,開發(fā)一個http服務接口,里面會調用service層的服務;
4. 創(chuàng)建線程池的配置;
5. 將Service層的服務異步化,這樣每次調用都會都被提交到線程池異步執(zhí)行;
6. 擴展ThreadPoolTaskExecutor,在提交任務到線程池的時候可以觀察到當前線程池的情況;
創(chuàng)建springboot工程
用IntelliJ IDEA創(chuàng)建一個springboot的web工程threadpooldemoserver,pom.xml內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId>
<artifactId>threadpooldemoserver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>threadpooldemoserver</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
創(chuàng)建Service層的接口和實現(xiàn)
創(chuàng)建一個service層的接口AsyncService,如下:
public interface AsyncService {
/**
* 執(zhí)行異步任務
*/
void executeAsync();
}
對應的AsyncServiceImpl,實現(xiàn)如下:
@Service
public class AsyncServiceImpl implements AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override
public void executeAsync() {
logger.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
logger.info("end executeAsync");
}
}
這個方法做的事情很簡單:sleep了一秒鐘;
創(chuàng)建controller
創(chuàng)建一個controller為Hello,里面定義一個http接口,做的事情是調用Service層的服務,如下:
@RestController
public class Hello {
private static final Logger logger = LoggerFactory.getLogger(Hello.class);
@Autowired
private AsyncService asyncService;
@RequestMapping("/")
public String submit(){
logger.info("start submit");
//調用service層的任務
asyncService.executeAsync();
logger.info("end submit");
return "success";
}
}
至此,我們已經(jīng)做好了一個http請求的服務,里面做的事情其實是同步的,接下來我們就開始配置springboot的線程池服務,將service層做的事情都提交到線程池中去處理;
springboot的線程池配置
創(chuàng)建一個配置類ExecutorConfig,用來定義如何創(chuàng)建一個ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync這兩個注解,表示這是個配置類,并且是線程池的配置類,如下所示:
@Configuration
@EnableAsync
public class ExecutorConfig {
private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
@Bean
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(5);
//配置最大線程數(shù)
executor.setMaxPoolSize(5);
//配置隊列大小
executor.setQueueCapacity(99999);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("async-service-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
}
注意,上面的方法名稱為asyncServiceExecutor,稍后馬上用到;
將Service層的服務異步化
打開AsyncServiceImpl.java,在executeAsync方法上增加注解@Async(“asyncServiceExecutor”),asyncServiceExecutor是前面ExecutorConfig.java中的方法名,表明executeAsync方法進入的線程池是asyncServiceExecutor方法創(chuàng)建的,如下:
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
logger.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
logger.info("end executeAsync");
}
驗證效果
- 將這個springboot運行起來(pom.xml所在文件夾下執(zhí)行mvn spring-boot:run);
- 在瀏覽器輸入:http://localhost:8080;
- 在瀏覽器用F5按鈕快速多刷新幾次;
- 在springboot的控制臺看見日志如下:
2018-01-21 22:43:18.630 INFO 14824 --- [nio-8080-exec-8] c.b.t.controller.Hello : start submit
2018-01-21 22:43:18.630 INFO 14824 --- [nio-8080-exec-8] c.b.t.controller.Hello : end submit
2018-01-21 22:43:18.929 INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:18.930 INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : start executeAsync
2018-01-21 22:43:19.005 INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:19.006 INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : start executeAsync
2018-01-21 22:43:19.175 INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:19.175 INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : start executeAsync
2018-01-21 22:43:19.326 INFO 14824 --- [async-service-4] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:19.495 INFO 14824 --- [async-service-5] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:19.930 INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:20.006 INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 22:43:20.191 INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
如上日志所示,我們可以看到controller的執(zhí)行線程是”nio-8080-exec-8”,這是tomcat的執(zhí)行線程,而service層的日志顯示線程名為“async-service-1”,顯然已經(jīng)在我們配置的線程池中執(zhí)行了,并且每次請求中,controller的起始和結束日志都是連續(xù)打印的,表明每次請求都快速響應了,而耗時的操作都留給線程池中的線程去異步執(zhí)行;
擴展ThreadPoolTaskExecutor
雖然我們已經(jīng)用上了線程池,但是還不清楚線程池當時的情況,有多少線程在執(zhí)行,多少在隊列中等待呢?這里我創(chuàng)建了一個ThreadPoolTaskExecutor的子類,在每次提交線程的時候都會將當前線程池的運行狀況打印出來,代碼如下:
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null==threadPoolExecutor){
return;
}
logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1\. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2\. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1\. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2\. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1\. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2\. do submitListenable");
return super.submitListenable(task);
}
}
如上所示,showThreadPoolInfo方法中將任務總數(shù)、已完成數(shù)、活躍線程數(shù),隊列大小都打印出來了,然后Override了父類的execute、submit等方法,在里面調用showThreadPoolInfo方法,這樣每次有任務被提交到線程池的時候,都會將當前線程池的基本情況打印到日志中;
修改ExecutorConfig.java的asyncServiceExecutor方法,將ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改為ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(),如下所示:
@Bean
public Executor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
//使用VisiableThreadPoolTaskExecutor
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(5);
//配置最大線程數(shù)
executor.setMaxPoolSize(5);
//配置隊列大小
executor.setQueueCapacity(99999);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("async-service-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//執(zhí)行初始化
executor.initialize();
return executor;
}
- 1
再次啟動該工程,再瀏覽器反復刷新http://localhost:8080,看到的日志如下:
2018-01-21 23:04:56.113 INFO 15580 --- [nio-8080-exec-1] c.b.t.e.VisiableThreadPoolTaskExecutor : async-service-, 2\. do submit,taskCount [99], completedTaskCount [85], activeCount [5], queueSize [9]
2018-01-21 23:04:56.113 INFO 15580 --- [nio-8080-exec-1] c.b.t.controller.Hello : end submit
2018-01-21 23:04:56.225 INFO 15580 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 23:04:56.225 INFO 15580 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : start executeAsync
2018-01-21 23:04:56.240 INFO 15580 --- [nio-8080-exec-2] c.b.t.controller.Hello : start submit
2018-01-21 23:04:56.240 INFO 15580 --- [nio-8080-exec-2] c.b.t.e.VisiableThreadPoolTaskExecutor : async-service-, 2\. do submit,taskCount [100], completedTaskCount [86], activeCount [5], queueSize [9]
2018-01-21 23:04:56.240 INFO 15580 --- [nio-8080-exec-2] c.b.t.controller.Hello : end submit
2018-01-21 23:04:56.298 INFO 15580 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 23:04:56.298 INFO 15580 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : start executeAsync
2018-01-21 23:04:56.372 INFO 15580 --- [nio-8080-exec-3] c.b.t.controller.Hello : start submit
2018-01-21 23:04:56.373 INFO 15580 --- [nio-8080-exec-3] c.b.t.e.VisiableThreadPoolTaskExecutor : async-service-, 2\. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
2018-01-21 23:04:56.373 INFO 15580 --- [nio-8080-exec-3] c.b.t.controller.Hello : end submit
2018-01-21 23:04:56.444 INFO 15580 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : end executeAsync
2018-01-21 23:04:56.445 INFO 15580 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : start executeAsync
注意這一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
這說明提交任務到線程池的時候,調用的是submit(Callable task)這個方法,當前已經(jīng)提交了101個任務,完成了87個,當前有5個線程在處理任務,還剩9個任務在隊列中等待,線程池的基本情況一路了然;
至此,springboot線程池服務的實戰(zhàn)就完成了,希望能幫您在工程中快速實現(xiàn)異步服務;
spring-boot 方法異步調用,自定義線程池配置使用
1、在主類中添加@EnableAsync注解:
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class MySpringBootApplication {
private static Logger logger = LoggerFactory.getLogger(MySpringBootApplication.class);
public static void main(String[] args) {
SpringApplication.run(MySpringBootApplication.class, args);
logger.info("My Spring Boot Application Started");
}
2、創(chuàng)建一個AsyncTask類,在里面添加兩個用@Async注解的task:
@Component
public class AsyncTask {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Async
public Future<String> doTask1() throws InterruptedException{
logger.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();
logger.info("Task1 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task1 accomplished!");
}
@Async
public Future<String> doTask2() throws InterruptedException{
logger.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();
logger.info("Task2 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task2 accomplished!");
}
}
3、萬事俱備,開始測試:
public class TaskTests extends BasicUtClass{
@Autowired
private AsyncTask asyncTask;
@Test
public void AsyncTaskTest() throws InterruptedException, ExecutionException {
Future<String> task1 = asyncTask.doTask1();
Future<String> task2 = asyncTask.doTask2();
while(true) {
if(task1.isDone() && task2.isDone()) {
logger.info("Task1 result: {}", task1.get());
logger.info("Task2 result: {}", task2.get());
break;
}
Thread.sleep(1000);
}
logger.info("All tasks finished.");
}
}
測試結果:
2016-12-13 11:12:24,850:INFO main (AsyncExecutionAspectSupport.java:245) - No TaskExecutor bean found for async processing
2016-12-13 11:12:24,864:INFO SimpleAsyncTaskExecutor-1 (AsyncTask.java:22) - Task1 started.
2016-12-13 11:12:24,865:INFO SimpleAsyncTaskExecutor-2 (AsyncTask.java:34) - Task2 started.
2016-12-13 11:12:27,869:INFO SimpleAsyncTaskExecutor-2 (AsyncTask.java:39) - Task2 finished, time elapsed: 3001 ms.
2016-12-13 11:12:29,866:INFO SimpleAsyncTaskExecutor-1 (AsyncTask.java:27) - Task1 finished, time elapsed: 5001 ms.
2016-12-13 11:12:30,853:INFO main (TaskTests.java:23) - Task1 result: Task1 accomplished!
2016-12-13 11:12:30,853:INFO main (TaskTests.java:24) - Task2 result: Task2 accomplished!
2016-12-13 11:12:30,854:INFO main (TaskTests.java:30) - All tasks finished.
可以看到,沒有自定義的Executor,所以使用缺省的TaskExecutor 。
前面是最簡單的使用方法。如果想使用自定義的Executor,可以按照如下幾步來:
1、新建一個Executor配置類,順便把@EnableAsync注解搬到這里來:
@Configuration
@EnableAsync
public class ExecutorConfig {
/** Set the ThreadPoolExecutor's core pool size. */
private int corePoolSize = 10;
/** Set the ThreadPoolExecutor's maximum pool size. */
private int maxPoolSize = 200;
/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
private int queueCapacity = 10;
@Bean
public Executor mySimpleAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("MySimpleExecutor-");
executor.initialize();
return executor;
}
@Bean
public Executor myAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("MyExecutor-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
這里定義了兩個不同的Executor,第二個重新設置了pool已經(jīng)達到max size時候的處理方法;同時指定了線程名字的前綴。
2、自定義Executor的使用:
@Component
public class AsyncTask {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Async("mySimpleAsync")
public Future<String> doTask1() throws InterruptedException{
logger.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();
logger.info("Task1 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task1 accomplished!");
}
@Async("myAsync")
public Future<String> doTask2() throws InterruptedException{
logger.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();
logger.info("Task2 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task2 accomplished!");
}
}
就是把上面自定義Executor的類名,放進@Async注解中。
3、(測試用例不變)測試結果:
2016-12-13 10:57:11,998:INFO MySimpleExecutor-1 (AsyncTask.java:22) - Task1 started.
2016-12-13 10:57:12,001:INFO MyExecutor-1 (AsyncTask.java:34) - Task2 started.
2016-12-13 10:57:15,007:INFO MyExecutor-1 (AsyncTask.java:39) - Task2 finished, time elapsed: 3000 ms.
2016-12-13 10:57:16,999:INFO MySimpleExecutor-1 (AsyncTask.java:27) - Task1 finished, time elapsed: 5001 ms.
2016-12-13 10:57:17,994:INFO main (TaskTests.java:23) - Task1 result: Task1 accomplished!
2016-12-13 10:57:17,994:INFO main (TaskTests.java:24) - Task2 result: Task2 accomplished!
2016-12-13 10:57:17,994:INFO main (TaskTests.java:30) - All tasks finished.
2016-12-13 10:57:18,064 Thread-3 WARN Unable to register Log4j shutdown hook because JVM is shutting down. Using SimpleLogger
可見,線程名字的前綴變了,兩個task使用了不同的線程池了。
參考博客:http://blog.csdn.net/clementad/article/details/53607311