關(guān)注公眾號(hào) MageByte,設(shè)置星標(biāo)獲取最新干貨。公眾號(hào)后臺(tái)回復(fù) “加群” 進(jìn)入技術(shù)交流群獲更多技術(shù)成長。
——本文由 MageByte 團(tuán)隊(duì)的「青葉」編寫
==異步調(diào)用對(duì)應(yīng)的是同步調(diào)用,同步調(diào)用可以理解為按照定義的順序依次執(zhí)行,有序性;異步調(diào)用在執(zhí)行的時(shí)候不需要等待上一個(gè)指令調(diào)用結(jié)束就可以繼續(xù)執(zhí)行。==
我們將在創(chuàng)建一個(gè) Spring Boot 工程來說明。具體工程可以參考github代碼 https://github.com/UniqueDong/springboot-study async模塊
pom 依賴如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-logging</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
啟動(dòng)類如下:
@SpringBootApplication
public class AsyncApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncApplication.class, args);
}
}
定義線程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 異步線程池
*/
@Configuration
@EnableAsync
public class AsyncExecutorConfig {
/**
* Set the ThreadPoolExecutor's core pool size.
*/
private int corePoolSize = 8;
/**
* Set the ThreadPoolExecutor's maximum pool size.
*/
private int maxPoolSize = 16;
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
*/
private int queueCapacity = 200;
private String threadNamePrefix = "AsyncExecutor-";
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix(threadNamePrefix);
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
代碼中我們通過 ThreadPoolTaskExecutor 創(chuàng)建了一個(gè)線程池。參數(shù)含義如下所示:
- corePoolSize:線程池創(chuàng)建的核心線程數(shù)
- maxPoolSize:線程池最大線程池?cái)?shù)量,當(dāng)任務(wù)數(shù)超過corePoolSize以及緩沖隊(duì)列也滿了以后才會(huì)申請(qǐng)的線程數(shù)量。
- setKeepAliveSeconds: 允許線程空閑時(shí)間60秒,當(dāng)maxPoolSize的線程在空閑時(shí)間到達(dá)的時(shí)候銷毀。
- ThreadNamePrefix:線程的前綴任務(wù)名字。
- RejectedExecutionHandler:當(dāng)線程池沒有處理能力的時(shí)候,該策略會(huì)直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)
使用實(shí)戰(zhàn)
@Slf4j
@Service
public class OrderService {
public static Random random = new Random();
@Autowired
private AsyncTask asyncTask;
public void doShop() {
try {
createOrder();
// 調(diào)用有結(jié)果返回的異步任務(wù)
Future<String> pay = asyncTask.pay();
if (pay.isDone()) {
try {
String result = pay.get();
log.info("異步任務(wù)返回結(jié)果{}", result);
} catch (ExecutionException e) {
e.printStackTrace();
}
asyncTask.vip();
asyncTask.sendSms();
}
otherJob();
} catch (InterruptedException e) {
log.error("異常", e);
}
}
public void createOrder() {
log.info("開始做任務(wù)1:下單成功");
}
/**
* 錯(cuò)誤使用,不會(huì)異步執(zhí)行:調(diào)用方與被調(diào)方不能在同一個(gè)類。主要是使用了動(dòng)態(tài)代理,同一個(gè)類的時(shí)候直接調(diào)用,不是通過生成的動(dòng)態(tài)代理類調(diào)用
*/
@Async("taskExecutor")
public void otherJob() {
log.info("開始做任務(wù)4:物流");
long start = System.currentTimeMillis();
try {
Thread.sleep(random.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
log.info("完成任務(wù)4,耗時(shí):" + (end - start) + "毫秒");
}
}
異步任務(wù)服務(wù)類
mport lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import java.util.Random;
import java.util.concurrent.Future;
@Component
@Slf4j
public class AsyncTask {
public static Random random = new Random();
@Async("taskExecutor")
public void sendSms() throws InterruptedException {
log.info("開始做任務(wù)2:發(fā)送短信");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任務(wù)1,耗時(shí):" + (end - start) + "毫秒");
}
// 返回結(jié)果的異步調(diào)用
@Async("taskExecutor")
public Future<String> pay() throws InterruptedException {
log.info("開始做異步返回結(jié)果任務(wù)2:支付");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任務(wù)2,耗時(shí):" + (end - start) + "毫秒");
return new AsyncResult<>("會(huì)員服務(wù)完成");
}
/**
* 會(huì)員積分任務(wù)
* @throws InterruptedException
*/
@Async("taskExecutor")
public void vip() throws InterruptedException {
log.info("開始做任務(wù)5:會(huì)員");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("開始做異步返回結(jié)果任務(wù)5,耗時(shí):" + (end - start) + "毫秒");
}
}
單元測(cè)試
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AsyncApplication.class)
public class AsyncApplicationTests {
@Autowired
private OrderService orderService;
@Test
public void testAsync() {
orderService.doShop();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
結(jié)果展示
2019-05-16 20:25:06.577 [INFO ] [main] - zero.springboot.study.async.service.OrderService-52 開始做任務(wù)1:下單成功
2019-05-16 20:25:06.586 [INFO ] [main] - zero.springboot.study.async.service.OrderService-60 開始做任務(wù)4:物流
2019-05-16 20:25:06.599 [INFO ] [AsyncExecutor-1] - zero.springboot.study.async.service.AsyncTask-38 開始做異步返回結(jié)果任務(wù)2:支付
2019-05-16 20:25:13.382 [INFO ] [AsyncExecutor-1] - zero.springboot.study.async.service.AsyncTask-42 完成任務(wù)2,耗時(shí):6783毫秒
2019-05-16 20:25:14.771 [INFO ] [main] - zero.springboot.study.async.service.OrderService-68 完成任務(wù)4,耗時(shí):8184毫秒
可以看到有的線程的名字就是我們線程池定義的前綴,說明使用了線程池異步執(zhí)行。其中我們示范了一個(gè)錯(cuò)誤的使用案例 otherJob(),并沒有異步執(zhí)行。
原因:
spring 在掃描bean的時(shí)候會(huì)掃描方法上是否包含@Async注解,如果包含,spring會(huì)為這個(gè)bean動(dòng)態(tài)地生成一個(gè)子類(即代理類,proxy),代理類是繼承原來那個(gè)bean的。此時(shí),當(dāng)這個(gè)有注解的方法被調(diào)用的時(shí)候,實(shí)際上是由代理類來調(diào)用的,代理類在調(diào)用時(shí)增加異步作用。然而,如果這個(gè)有注解的方法是被同一個(gè)類中的其他方法調(diào)用的,那么該方法的調(diào)用并沒有通過代理類,而是直接通過原來的那個(gè) bean 也就是 this. method,所以就沒有增加異步作用,我們看到的現(xiàn)象就是@Async注解無效。
歡迎加群與我們分享,我們第一時(shí)間反饋。關(guān)注公眾號(hào),后臺(tái)回復(fù) 加群即可獲取個(gè)人微信。