概述

在SpringBoot的日常開發(fā)中,一般都是同步調(diào)用的。但經(jīng)常有特殊業(yè)務(wù)需要做異步來處理,例如:注冊(cè)新用戶,送100個(gè)積分,或下單成功,發(fā)送push消息等等。
就拿注冊(cè)新用戶為什么要異步處理?
- 第一個(gè)原因:容錯(cuò)性、健壯性,如果送積分出現(xiàn)異常,不能因?yàn)樗头e分而導(dǎo)致用戶注冊(cè)失?。?br> 因?yàn)橛脩糇?cè)是主要功能,送積分是次要功能,即使送積分異常也要提示用戶注冊(cè)成功,然后后面在針對(duì)積分異常做補(bǔ)償處理。
- 第二個(gè)原因:提升性能,例如注冊(cè)用戶花了20毫秒,送積分花費(fèi)50毫秒,如果用同步的話,總耗時(shí)70毫秒,用異步的話,無需等待積分,故耗時(shí)20毫秒。
因此,異步能解決2個(gè)問題,性能和容錯(cuò)性。
SpringBoot異步調(diào)用
在SpringBoot中使用異步調(diào)用是很簡單的,只需要使用@Async注解即可實(shí)現(xiàn)方法的異步調(diào)用。
1、@Async異步調(diào)用例子
步驟1:開啟異步任務(wù)
采用@EnableAsync來開啟異步任務(wù)支持,另外需要加入@Configuration來把當(dāng)前類加入springIOC容器中。
@Configuration
@EnableAsync
public class SyncConfiguration {
}
步驟2:在方法上標(biāo)記異步調(diào)用
增加一個(gè)service類,用來做積分處理。
@Async添加在方法上,代表該方法為異步處理。
public class ScoreService {
private static final Logger logger = LoggerFactory.getLogger(ScoreService.class);
@Async
public void addScore(){
//TODO 模擬睡5秒,用于贈(zèng)送積分處理
try {
Thread.sleep(1000*5);
logger.info("--------------處理積分--------------------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、為什么要給@Async自定義線程池?
@Async注解,在默認(rèn)情況下用的是SimpleAsyncTaskExecutor線程池,該線程池不是真正意義上的線程池,因?yàn)榫€程不重用,每次調(diào)用都會(huì)新建一條線程。
可以通過控制臺(tái)日志輸出查看,每次打印的線程名都是[task-1]、[task-2]、[task-3]、[task-4].....遞增的。
@Async注解異步框架提供多種線程
SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會(huì)創(chuàng)建一個(gè)新的線程。
SyncTaskExecutor:這個(gè)類沒有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地方。
ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類。
ThreadPoolTaskScheduler:可以使用cron表達(dá)式。
ThreadPoolTaskExecutor :最常使用,推薦。 其實(shí)質(zhì)是對(duì)java.util.concurrent.ThreadPoolExecutor的包裝。
3、為@Async實(shí)現(xiàn)一個(gè)自定義線程池
步驟1:配置線程池
@Configuration
@EnableAsync
public class SyncConfiguration {
@Bean(name = "scorePoolTaskExecutor")
public ThreadPoolTaskExecutor getScorePoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心線程數(shù)
taskExecutor.setCorePoolSize(10);
//線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊(duì)列滿了之后才會(huì)申請(qǐng)超過核心線程數(shù)的線程
taskExecutor.setMaxPoolSize(100);
//緩存隊(duì)列
taskExecutor.setQueueCapacity(50);
//許的空閑時(shí)間,當(dāng)超過了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會(huì)被銷毀
taskExecutor.setKeepAliveSeconds(200);
//異步方法內(nèi)部線程名稱
taskExecutor.setThreadNamePrefix("score-");
/**
* 當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會(huì)采取任務(wù)拒絕策略
* 通常有以下四種策略:
* ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
* ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
* ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
* ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動(dòng)重復(fù)調(diào)用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
步驟2: 為@Async指定線程池名字
@Async("scorePoolTaskExecutor")
public void addScore2(){
//TODO 模擬睡5秒,用于贈(zèng)送積分處理
try {
Thread.sleep(1000*5);
log.info("--------------處理積分--------------------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}