Spring Boot使用@Async實(shí)現(xiàn)異步調(diào)用

關(guān)注公眾號(hào) MageByte,設(shè)置星標(biāo)獲取最新干貨。公眾號(hào)后臺(tái)回復(fù) “加群” 進(jìn)入技術(shù)交流群獲更多技術(shù)成長。
——本文由 MageByte 團(tuán)隊(duì)的「青葉」編寫

==異步調(diào)用對(duì)應(yīng)的是同步調(diào)用,同步調(diào)用可以理解為按照定義的順序依次執(zhí)行,有序性;異步調(diào)用在執(zhí)行的時(shí)候不需要等待上一個(gè)指令調(diào)用結(jié)束就可以繼續(xù)執(zhí)行。==

我們將在創(chuàng)建一個(gè) Spring Boot 工程來說明。具體工程可以參考github代碼 https://github.com/UniqueDong/springboot-study async模塊

pom 依賴如下:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-boot-starter-logging</artifactId>
                    <groupId>org.springframework.boot</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- logback -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

啟動(dòng)類如下:

@SpringBootApplication
public class AsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }

}

定義線程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 異步線程池
 */
@Configuration
@EnableAsync
public class AsyncExecutorConfig {

    /**
     * Set the ThreadPoolExecutor's core pool size.
     */
    private int corePoolSize = 8;
    /**
     * Set the ThreadPoolExecutor's maximum pool size.
     */
    private int maxPoolSize = 16;
    /**
     * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
     */
    private int queueCapacity = 200;

    private String threadNamePrefix = "AsyncExecutor-";

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix(threadNamePrefix);

        // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)  
        // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行  
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

}  

代碼中我們通過 ThreadPoolTaskExecutor 創(chuàng)建了一個(gè)線程池。參數(shù)含義如下所示:

  • corePoolSize:線程池創(chuàng)建的核心線程數(shù)
  • maxPoolSize:線程池最大線程池?cái)?shù)量,當(dāng)任務(wù)數(shù)超過corePoolSize以及緩沖隊(duì)列也滿了以后才會(huì)申請(qǐng)的線程數(shù)量。
  • setKeepAliveSeconds: 允許線程空閑時(shí)間60秒,當(dāng)maxPoolSize的線程在空閑時(shí)間到達(dá)的時(shí)候銷毀。
  • ThreadNamePrefix:線程的前綴任務(wù)名字。
  • RejectedExecutionHandler:當(dāng)線程池沒有處理能力的時(shí)候,該策略會(huì)直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)

使用實(shí)戰(zhàn)

@Slf4j
@Service
public class OrderService {
    public static Random random = new Random();


    @Autowired
    private AsyncTask asyncTask;

    public void doShop() {
        try {
            createOrder();
            // 調(diào)用有結(jié)果返回的異步任務(wù)
            Future<String> pay = asyncTask.pay();
            if (pay.isDone()) {
                try {
                    String result = pay.get();
                    log.info("異步任務(wù)返回結(jié)果{}", result);
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                asyncTask.vip();
                asyncTask.sendSms();
            }
            otherJob();
        } catch (InterruptedException e) {
            log.error("異常", e);
        }
    }

    public void createOrder() {
        log.info("開始做任務(wù)1:下單成功");
    }

    /**
     * 錯(cuò)誤使用,不會(huì)異步執(zhí)行:調(diào)用方與被調(diào)方不能在同一個(gè)類。主要是使用了動(dòng)態(tài)代理,同一個(gè)類的時(shí)候直接調(diào)用,不是通過生成的動(dòng)態(tài)代理類調(diào)用
     */
    @Async("taskExecutor")
    public void otherJob() {
        log.info("開始做任務(wù)4:物流");
        long start = System.currentTimeMillis();
        try {
            Thread.sleep(random.nextInt(10000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)4,耗時(shí):" + (end - start) + "毫秒");
    }

}

異步任務(wù)服務(wù)類

mport lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.Random;
import java.util.concurrent.Future;

@Component
@Slf4j
public class AsyncTask {
    public static Random random = new Random();


    @Async("taskExecutor")
    public void sendSms() throws InterruptedException {
        log.info("開始做任務(wù)2:發(fā)送短信");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)1,耗時(shí):" + (end - start) + "毫秒");
    }
 
// 返回結(jié)果的異步調(diào)用
    @Async("taskExecutor")
    public Future<String> pay() throws InterruptedException {
        log.info("開始做異步返回結(jié)果任務(wù)2:支付");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)2,耗時(shí):" + (end - start) + "毫秒");
        return new AsyncResult<>("會(huì)員服務(wù)完成");
    }

    /**
     * 會(huì)員積分任務(wù)
     * @throws InterruptedException
     */
    @Async("taskExecutor")
    public void vip() throws InterruptedException {
        log.info("開始做任務(wù)5:會(huì)員");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("開始做異步返回結(jié)果任務(wù)5,耗時(shí):" + (end - start) + "毫秒");
    }
}

單元測(cè)試

@RunWith(SpringRunner.class)
@SpringBootTest(classes = AsyncApplication.class)
public class AsyncApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    public void testAsync() {
        orderService.doShop();
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

結(jié)果展示

2019-05-16 20:25:06.577 [INFO ] [main] - zero.springboot.study.async.service.OrderService-52 開始做任務(wù)1:下單成功 
2019-05-16 20:25:06.586 [INFO ] [main] - zero.springboot.study.async.service.OrderService-60 開始做任務(wù)4:物流 
2019-05-16 20:25:06.599 [INFO ] [AsyncExecutor-1] - zero.springboot.study.async.service.AsyncTask-38 開始做異步返回結(jié)果任務(wù)2:支付 
2019-05-16 20:25:13.382 [INFO ] [AsyncExecutor-1] - zero.springboot.study.async.service.AsyncTask-42 完成任務(wù)2,耗時(shí):6783毫秒 
2019-05-16 20:25:14.771 [INFO ] [main] - zero.springboot.study.async.service.OrderService-68 完成任務(wù)4,耗時(shí):8184毫秒 

可以看到有的線程的名字就是我們線程池定義的前綴,說明使用了線程池異步執(zhí)行。其中我們示范了一個(gè)錯(cuò)誤的使用案例 otherJob(),并沒有異步執(zhí)行。

原因:

spring 在掃描bean的時(shí)候會(huì)掃描方法上是否包含@Async注解,如果包含,spring會(huì)為這個(gè)bean動(dòng)態(tài)地生成一個(gè)子類(即代理類,proxy),代理類是繼承原來那個(gè)bean的。此時(shí),當(dāng)這個(gè)有注解的方法被調(diào)用的時(shí)候,實(shí)際上是由代理類來調(diào)用的,代理類在調(diào)用時(shí)增加異步作用。然而,如果這個(gè)有注解的方法是被同一個(gè)類中的其他方法調(diào)用的,那么該方法的調(diào)用并沒有通過代理類,而是直接通過原來的那個(gè) bean 也就是 this. method,所以就沒有增加異步作用,我們看到的現(xiàn)象就是@Async注解無效。

歡迎加群與我們分享,我們第一時(shí)間反饋。關(guān)注公眾號(hào),后臺(tái)回復(fù) 加群即可獲取個(gè)人微信。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容