spring任務(wù)執(zhí)行器與任務(wù)調(diào)度器(TaskExecutor And TaskScheduler)

對于多線程及周期性調(diào)度相關(guān)的操作,spring框架提供了TaskExecutor和TaskScheduler接口為異步執(zhí)行和任務(wù)調(diào)度。并提供了相關(guān)實(shí)現(xiàn)類給開發(fā)者使用。(只記錄采用注解的使用形式,對于XML的使用形式不做筆記。)
Spring官方對TaskExecutor的相關(guān)解釋:

Spring的TaskExecutor接口與java.util.concurrent.Executor接口相同。該接口具有單個(gè)方法(execute(Runnable task)),該方法根據(jù)線程池的語義和配置接受要執(zhí)行的任務(wù)。

二話(寫)不說,自說(寫)自話(字)直接拐上了

TaskExecutor接口相關(guān)實(shí)現(xiàn)類

實(shí)現(xiàn)類名 對應(yīng)解釋(直接甩翻譯了)
SyncTaskExecutor 該實(shí)現(xiàn)類不會(huì)執(zhí)行異步調(diào)用。 相反,每次調(diào)用都在調(diào)用的線程中進(jìn)行(翻譯過來也即同步任務(wù)執(zhí)行器)。 它主要用于不需要多線程的情況,例如在簡單的測試用例中。
SimpleAsyncTaskExecutor 此實(shí)現(xiàn)不會(huì)重用任何線程。 相反,它為每次調(diào)用啟動(dòng)一個(gè)新線程。 但是,它確實(shí)支持并發(fā)限制,該限制會(huì)阻止超出限制的任何調(diào)用,直到釋放插槽為止。(說簡單了,就是要使用了直接創(chuàng)建一個(gè)線程)
ConcurrentTaskExecutor 此實(shí)現(xiàn)是java.util.concurrent.Executor實(shí)例的適配器。很少需要直接使用ConcurrentTaskExecutor(官網(wǎng)自己都覺得很少使用,不過相對于ThreadPoolTaskExecutor,官網(wǎng)推薦如果ThreadPoolTaskExecutor不夠靈活,無法滿足需求,則可以使用ConcurrentTaskExecutor)。
ThreadPoolTaskExecutor 殺手锏級的任務(wù)調(diào)度器(最常用),可以說已經(jīng)足夠滿足我們的需求了(除非,非常非常特例才使用ConcurrentTaskExecutor)。官網(wǎng)翻譯重要片段:公開了bean屬性,用于配置java.util.concurrent.ThreadPoolExecutor并將其包裝在TaskExecutor中
WorkManagerTaskExecutor 此實(shí)現(xiàn)使用CommonJ WorkManager作為其后備服務(wù)提供程序,并且是在Spring應(yīng)用程序上下文中在WebLogic或WebSphere上設(shè)置基于CommonJ的線程池集成的中心便利類。
DefaultManagedTaskExecutor 此實(shí)現(xiàn)在JSR-236兼容的運(yùn)行時(shí)環(huán)境(例如Java EE 7+應(yīng)用程序服務(wù)器)中使用JNDI獲取的ManagedExecutorService,為此目的替換CommonJ WorkManager。(說明了就是依賴環(huán)境)

其中可能今后工作中會(huì)用到的(包括測試):SyncTaskExecutor、SimpleAsyncTaskExecutor、ConcurrentTaskExecutor、ThreadPoolTaskExecutor此四個(gè)實(shí)現(xiàn)類。重點(diǎn)關(guān)注ThreadPoolTaskExecutor此類。
:以上均實(shí)現(xiàn)了spring提供的TaskExecutor接口。

Spring官方對TaskSheduler接口的相關(guān)解釋:

用于在將來的某個(gè)時(shí)間點(diǎn)調(diào)度任務(wù)。

TaskScheduler接口相關(guān)實(shí)現(xiàn)類

實(shí)現(xiàn)類名 對應(yīng)解釋
ConcurrentTaskScheduler 該類實(shí)際繼承了ConcurrentTaskExecutor對象。只是實(shí)現(xiàn)了TaskScheduler接口,增加了相關(guān)定時(shí)調(diào)度任務(wù)的方法。
ThreadPoolTaskScheduler spring對該類設(shè)計(jì)原則同ThreadPoolTaskExecutor類。是為了定時(shí)調(diào)度任務(wù)不依賴相關(guān)的運(yùn)行容器(例如weblogic、WebSphere等)。其底層委托給ScheduledExecutorService,向外暴露相關(guān)的常見bean配置屬性。

接下來對以上相應(yīng)的實(shí)現(xiàn)類通過注解的形式進(jìn)行相應(yīng)的測試
首先進(jìn)行解釋要用到的幾個(gè)注解,對這幾個(gè)注解總結(jié)如下表

注解名 解釋
@EnableAsync 開啟異步執(zhí)行。官方文檔中解釋:該注解添加到@Configuration標(biāo)注的類上以開始異步執(zhí)行。開啟后@Async標(biāo)注的方法或類即可異步執(zhí)行。
@EnableScheduling 開啟定時(shí)調(diào)度。官方文檔解釋也是配合@Configuration一起使用。開啟后@Scheduled注解標(biāo)注的方法即可自動(dòng)定時(shí)(或延遲)執(zhí)行。
@Async 異步執(zhí)行注解??蓸?biāo)注類和方法。標(biāo)注類時(shí),則該類下所有方法均可使用異步執(zhí)行。標(biāo)注方法時(shí),則該方法可使用異步執(zhí)行。當(dāng)標(biāo)注有@Configuration注解的配置類上標(biāo)注了@EnableAsync注解后即可生效。
@Scheduled 標(biāo)注相關(guān)方法后,如果配置類標(biāo)注了@EnableScheduling后即可開啟定時(shí)調(diào)度任務(wù)。
  • 接下來先測試異步執(zhí)行器(TaskExecutor)

使用方式1(直接使用spring容器中的ThreadPoolTaskExecutor):

先通過@Bean注解將ThreadPoolTaskExecutor放入Spring容器中:

@Configuration
public class ExecutorBean {
    /**
     * 執(zhí)行器ThreadPoolTaskExecutor
     */
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 設(shè)置核心線程池大小(這里設(shè)置為初始化10個(gè))
        executor.setMaxPoolSize(30); // 設(shè)置最大線程池大小(當(dāng)核心線程池不夠用時(shí)候,會(huì)自動(dòng)在原基礎(chǔ)上增加。最大為30個(gè))
        executor.setQueueCapacity(2000); // 設(shè)置隊(duì)列容量為2000個(gè)。
        return executor;
    }
}

然后在標(biāo)注有@Configuration注解的配置類或SpringBoot啟動(dòng)類上添加@EntityAsync注解。這里在標(biāo)有@SpringBootApplication注解的SpringBoot啟動(dòng)類的入口方法上添加@EntityAsync注解。

@SpringBootApplication
@EnableAsync
public class ScheduleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class, args);
    }
}

然后在需要采取異步執(zhí)行的方法上(或類上,此測試使用方法)標(biāo)注@Async注解:

@Async(value="threadPoolTaskExecutor")
public void testThreadPoolTaskExecutor() {
    System.err.println("---  testThreadPoolTaskExecutor  --");
}

測試用例:

@Test
public void testThreadPoolTaskExecutor() {
    System.err.println("--- 開始 ---");
    taskExecutorService.testThreadPoolTaskExecutor();
    System.err.println("--- 結(jié)束 ---");
}

執(zhí)行結(jié)果:

--- 開始 ---
--- 結(jié)束 ---
---  testThreadPoolTaskExecutor  --

使用方式2(通過修改默認(rèn)的線程池配置,即實(shí)現(xiàn)AsyncConfigurer接口,并重寫其中的getAsyncExecutor方法[因?yàn)槭荍DK8提供的default方法,所以才稱為重寫]):
首先,先實(shí)現(xiàn)AsyncConfigurer接口,重寫getAsyncExecutor方法并將此實(shí)現(xiàn)類作為配置類裝載進(jìn)spring容器中(記:對于void返回類型,異常未被捕獲且無法傳輸,所以getAsyncUncaughtExceptionHandler方法用于處理異步調(diào)用后出現(xiàn)異常的情況。這里僅僅記錄未出現(xiàn)異常的測試),同時(shí)添加@EnableAsync開啟可異步調(diào)用(也可以在springBoot啟動(dòng)類中的入口方法上添加)。

@Configuration
@EnableAsync
public class CustomAsyncConfigurer implements AsyncConfigurer {
    /**
     * 設(shè)置線程池相關(guān)的配置
     * @return ThreadPoolTaskExecutor
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 設(shè)置核心線程池大小(這里設(shè)置為初始化10個(gè))
        executor.setMaxPoolSize(30); // 設(shè)置最大線程池大小(當(dāng)核心線程池不夠用時(shí)候,會(huì)自動(dòng)在原基礎(chǔ)上增加。最大為30個(gè))
        executor.setQueueCapacity(2000); // 設(shè)置隊(duì)列容量為2000個(gè)。
        executor.initialize();
        return executor;
    }
}

然后可以直接使用@Async注解標(biāo)注需要異步執(zhí)行的方法(或類上,此測試使用方法)

@Async
public void testThreadPoolTaskExecutor(Long id) {
    System.err.println("---  testThreadPoolTaskExecutor  --" + id);
}

測試用例:

@Test
public void testThreadPoolTaskExecutor() {
    System.err.println("--- 開始前 ---");
    System.err.println("--- 開始 ---");
    for(int i =0;i<20;i++) {
        taskExecutorService.testThreadPoolTaskExecutor(new Long(i));
    }
    System.err.println("--- 結(jié)束 ---");
    System.err.println("--- 結(jié)束后 ---");
}

測試結(jié)果:

--- 開始前 ---
--- 開始 ---
---  testThreadPoolTaskExecutor  --0
---  testThreadPoolTaskExecutor  --1
---  testThreadPoolTaskExecutor  --2
---  testThreadPoolTaskExecutor  --3
---  testThreadPoolTaskExecutor  --4
---  testThreadPoolTaskExecutor  --5
---  testThreadPoolTaskExecutor  --6
---  testThreadPoolTaskExecutor  --7
---  testThreadPoolTaskExecutor  --8
---  testThreadPoolTaskExecutor  --9
---  testThreadPoolTaskExecutor  --10
---  testThreadPoolTaskExecutor  --11
---  testThreadPoolTaskExecutor  --12
---  testThreadPoolTaskExecutor  --13
---  testThreadPoolTaskExecutor  --14
---  testThreadPoolTaskExecutor  --17
---  testThreadPoolTaskExecutor  --16
---  testThreadPoolTaskExecutor  --15
--- 結(jié)束 ---
--- 結(jié)束后 ---
---  testThreadPoolTaskExecutor  --19
---  testThreadPoolTaskExecutor  --18

**使用方式3:因?yàn)門hreadPoolTaskScheduler實(shí)現(xiàn)了TaskExecutor相關(guān)的接口。所以同樣可以用ThreadPoolTaskScheduler替換ThreadPoolTaskExecutor來調(diào)用異步執(zhí)行器。
同樣實(shí)現(xiàn)AsyncConfigurer接口,重寫getAsyncExecutor方法并將此實(shí)現(xiàn)類作為配置類裝載進(jìn)spring容器中。只是此時(shí)用ThreadPoolTaskScheduler替換ThreadPoolTaskExecutor類作為任務(wù)執(zhí)行器

@Configuration
@EnableAsync
public class CustomAsyncConfigurer implements AsyncConfigurer {
    /**
     * 設(shè)置線程池相關(guān)的配置
     * @return ThreadPoolTaskExecutor
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
        executor.setPoolSize(30);
        executor.initialize();
        return executor;
    }
}

然后用@Asnyc標(biāo)注方法

@Async
public void testThreadPoolTaskScheduler(Long id) {
    System.err.println("---  testThreadPoolTaskExecutor  --" + id);
}

測試

@Test
public void testThreadPoolTaskScheduler() {
    System.err.println("--- 開始前 ---");
    System.err.println("--- 開始 ---");
    for(int i =0;i<20;i++) {
        taskExecutorService.testThreadPoolTaskScheduler(new Long(i));
    }
    System.err.println("--- 結(jié)束 ---");
    System.err.println("--- 結(jié)束后 ---");
}

經(jīng)過測試,用例3沒啥效果...測試方式可能有誤。同樣的異步調(diào)用ThreadPoolTaskScheduler類可能不能當(dāng)做ThreadPoolTaskExecutor 類使用。雖然同樣實(shí)現(xiàn)了TaskExecutor接口(看來得看底層源碼了,現(xiàn)在僅僅記錄下來先)

  • 現(xiàn)在來測試定時(shí)調(diào)度器(TaskScheduler)

使用方式1(直接使用@EnableScheduling開啟定時(shí)調(diào)度任務(wù),然后對需要定時(shí)調(diào)度的方法用@Sheduled注解標(biāo)注):
直接在SpringBoot啟動(dòng)器類中的入口方法上標(biāo)注或標(biāo)注了@Configuration注解的配置類上使用@EnableScheduling注解

//在啟動(dòng)類上添加
@SpringBootApplication
@EnableScheduling
public class ScheduleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class, args);
    }
}

//或在在配置類上標(biāo)注
@Configuration
@EnableScheduling
public class CustomScheduleConfigurer implements AsyncConfigurer {
//...
}

需要在定時(shí)任務(wù)的方法上添加@Scheduled

@Scheduled(fixedRate = 300)  //表示每300毫秒調(diào)用一次該方法
@Async
@Override
public void threadPoolTaskScheduler() {
    System.err.println("---  threadPoolTaskScheduler  --");
}

測試用例

@Test
public void schedulers() throws Exception {
    Thread.sleep(50000000);   //讓線程睡上一段時(shí)間,以便查看效果
}

因?yàn)閼?yīng)用一啟動(dòng)后定時(shí)調(diào)度器便會(huì)開始執(zhí)行。如果測試用例不使用線程睡眠的話程序會(huì)一瞬間執(zhí)行結(jié)束,有可能看不到效果。
:這里添加了@async注解,表示此方法定時(shí)調(diào)用時(shí)使用異步方式執(zhí)行。
:@Scheduled注解支持Quartz的CRON表達(dá)式來規(guī)劃定時(shí)任務(wù):SPRING官網(wǎng)示例)。
spring框架支持Quartz來使用定時(shí)調(diào)度任務(wù)。
這里簡單記錄其各個(gè)表達(dá)式含義:

@Scheduled注解的各個(gè)參數(shù)與其含義
參數(shù)名 類型 含義
cron String 使用表達(dá)式的方式定義任務(wù)執(zhí)行時(shí)間
zone String 可以通過它設(shè)定區(qū)域時(shí)間(時(shí)區(qū))
fixedDelay long 表示從上一個(gè)任務(wù)完成后到下一個(gè)任務(wù)開始時(shí)的時(shí)間間隔,單位毫秒
fixedDelayString String 同fixedDelay,只是為字符串值,可使用SPEL表達(dá)式來引入配置文件配置
inittalDelay long 初始化延時(shí)時(shí)間。spring容器初始化后首次任務(wù)延時(shí)多久開始執(zhí)行,單位毫秒
inittalDelayString String 同 inittalDelay,值為字符串值??墒褂肧PEL表達(dá)式來引入配置文件配置
fixedRate long 每次執(zhí)行任務(wù)的間隔時(shí)間,單位毫秒
fixedRateString String 每次執(zhí)行任務(wù)的間隔時(shí)間,單位為字符串值??墒褂肧PEL表達(dá)式來引入配置文件配置

CRON表達(dá)式有6~7個(gè)空格分隔得時(shí)間元素,按順序依次為:
秒 分 時(shí) 天 月 星期 年 其中年可以是不用配置的元素。
例:
0 0 0 ? * WED
表示每個(gè)星期三0點(diǎn)整執(zhí)行任務(wù)。其中因?yàn)樘旌托瞧跁?huì)產(chǎn)生定義上的沖突。所以采用了通配符。以下為通配符含義

通配符符號 含義
* 表示任意
? 表示不指定,用于處理天和星期的沖突
- 指定時(shí)間區(qū)間
/ 指定時(shí)間間隔
L 指最后
# 第幾
, 列舉多個(gè)項(xiàng)

以下為幾個(gè)難點(diǎn)的例子:

示例 描述
0 0/6 19,22 * * ? 每天19點(diǎn)至19點(diǎn)59和22點(diǎn)至22點(diǎn)59分倆時(shí)間段內(nèi)每6分鐘執(zhí)行一次
0 10-16 22 * * ? 每天的22點(diǎn)至22點(diǎn)16分每分鐘執(zhí)行一次
0 0 19 ? * MON-FRI 周一到周五的每天19點(diǎn)執(zhí)行一次
0 23 23 ? * 5L 2019-2022 2019年至2022年的每月最后一個(gè)周四的23點(diǎn)23分執(zhí)行
0 16 22 ? * 6#2 每月第二周周五的22點(diǎn)16分執(zhí)行一次

使用方式2(實(shí)現(xiàn)SchedulingConfigurer配置類,規(guī)定定時(shí)任務(wù))
首先實(shí)現(xiàn)SchedulingConfigurer配置類,并啟用定時(shí)調(diào)度任務(wù)

@Configuration
@EnableScheduling
public class CustomSchedulerConfigurer implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        IntervalTask task = new IntervalTask(new Runnable() {
            @Override
            public void run() {
                System.err.println("----間隔執(zhí)行----"); 
            }
        }, 2000);
        taskRegistrar.addFixedDelayTask(task);    //taskRegistrar有更多相關(guān)方法來執(zhí)行定時(shí)調(diào)度任務(wù)。測試用例先做簡單記錄
    }
}

然后進(jìn)行測試

@Test
public void schedulers() throws Exception {
    Thread.sleep(50000000);    //同樣讓線程睡上一段時(shí)間,以便查看效果
}

測試中并未處理異步執(zhí)行后如果出現(xiàn)異常的情況。異常情況發(fā)生后如何處理之后再做記錄。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容