任務(wù)調(diào)度框架之Quartz

任務(wù)調(diào)度簡介

1、什么時候需要任務(wù)調(diào)度

業(yè)務(wù)場景:
1)賬單日或者還款日上午 9 點,給每個信用卡客戶發(fā)送賬單通知,還款通知,如何判斷客戶的賬單日、還款日,完成通知的發(fā)送?
2)銀行業(yè)務(wù)系統(tǒng),夜間要完成跑批的一系列流程,清理數(shù)據(jù),下載文件,解析文件,對賬清算、切換結(jié)算日期等等,如何觸發(fā)一系列流程的執(zhí)行?
3)金融機構(gòu)跟人民銀行二代支付系統(tǒng)對接,人民銀行要求低于 5W 的金額(小額支付)半個小時打一次包發(fā)送,以緩解并發(fā)壓力。所以,銀行的跨行轉(zhuǎn)賬分成了多個流程:
錄入、復(fù)核、發(fā)送。如何把半個小時以內(nèi)的所有數(shù)據(jù)一次性發(fā)送?

類似于這種 1、基于準(zhǔn)確的時刻或者固定的時間間隔觸發(fā)的任務(wù),或者 2、有批量數(shù)據(jù)需要處理,或者 3、要實現(xiàn)兩個動作解耦的場景,我們都可以用任務(wù)調(diào)度來實現(xiàn)

2、任務(wù)調(diào)度需求分析
基本需求:
1)可以定義觸發(fā)的規(guī)則,比如基于時刻、時間間隔、表達(dá)式

2)可以定義需要執(zhí)行的任務(wù)。比如執(zhí)行一個腳本或者一段代碼。任務(wù)和規(guī)則是分開的

3)集中管理配置,持久配置。不用把規(guī)則寫在代碼里面,可以看到所有的任務(wù)配置,方便維護,重啟之后任務(wù)可以再次調(diào)度——配置文件或者配置中心

4)支持任務(wù)的串行執(zhí)行,例如執(zhí)行 A 任務(wù)后再執(zhí)行 B 任務(wù)再執(zhí)行 C 任務(wù)

5)支持多個任務(wù)并發(fā)執(zhí)行,互不干擾(例如ScheduledThreadPoolExecutor 線程池)

6)有自己的調(diào)度器,可以啟動、中斷、停止任務(wù)

7)容易集成到 Spring

3、任務(wù)調(diào)度工具對比

層次 舉例 特點
操作系統(tǒng) Linux Window 計劃任務(wù) 只能執(zhí)行簡單腳本或者命令
數(shù)據(jù)庫 MySQL Oracle 可以操作數(shù)據(jù),不能執(zhí)行java代碼
工具 kettle 可以操作數(shù)據(jù)庫,執(zhí)行腳本,沒有集中配置
開發(fā)語言 JDK Timer、ScheduledThreadPool Timer:單線程 JDK1.5 之后:ScheduledThreadPool(Cache、Fiexed、Single):沒有集中配置,日常管理不夠靈活
容器 Spring Task、@Scheduled 不支持集群
分布式框架 XXL-JOB、Elastic-Job 支持集群,集中配置,容易管理

@Scheduled 也是用 JUC 的 ScheduledExecutorService 實現(xiàn)的
Scheduled(cron = “0/5 * * * * ?”)

1) ScheduledAnnotationBeanPostProcessorpostProcessAfterInitialization 方法將@Scheduled 的方法包裝為指定的 task添加到 ScheduledTaskRegistrar

2) ScheduledAnnotationBeanPostProcessor 會監(jiān)聽 Spring 的容器初始化事件,在 Spring 容器初始化完成后進行TaskScheduler 實現(xiàn)類實例的查找,若發(fā)現(xiàn)有 SchedulingConfigurer 的實現(xiàn)類實例,則跳過 3

3) 查找 TaskScheduler 的實現(xiàn)類實例默認(rèn)是通過類型查找,若有多個實現(xiàn)則會查找名字為"taskScheduler"的實現(xiàn) Bean,若沒有找到則在 ScheduledTaskRegistrar 調(diào)度任務(wù)的時候會創(chuàng)建一個newSingleThreadScheduledExecutor,將TaskScheduler 的實現(xiàn)類實例設(shè)置到 ScheduledTaskRegistrar 屬性中

4)ScheduledTaskRegistrarscheduleTasks 方法觸發(fā)任務(wù)調(diào)度

5)真正調(diào)度任務(wù)的類是 TaskScheduler 實現(xiàn)類中的 ScheduledExecutorService,由 J.U.C 提供

Quartz基本介紹

Quatz 是一個特性豐富的,開源的任務(wù)調(diào)度庫,它幾乎可以嵌入所有的 Java 程序,從很小的獨立應(yīng)用程序到大型商業(yè)系統(tǒng)。Quartz 可以用來創(chuàng)建成百上千的簡單的或者復(fù)雜的任務(wù),這些任務(wù)可以用來執(zhí)行任何程序可以做的事情。Quartz 擁有很多企業(yè)級的特性,包括支持 JTA 事務(wù)和集群

Quartz 是一個老牌的任務(wù)調(diào)度系統(tǒng),98 年構(gòu)思,01 年發(fā)布到 sourceforge。現(xiàn)在更新比較慢,因為已經(jīng)非常成熟了。

Quartz 的目的就是讓任務(wù)調(diào)度更加簡單,開發(fā)人員只需要關(guān)注業(yè)務(wù)即可。他是用 Java 語言編寫的(也有.NET 的版本)。Java 代碼能做的任何事情,Quartz 都可以調(diào)度。

特點:
a)精確到毫秒級別的調(diào)度
b)可以獨立運行,也可以集成到容器中
c)支持事務(wù)(JobStoreCMT )
d)支持集群
e)支持持久化

Quartz Java編程

1、引入依賴

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.0</version>
</dependency>

2、默認(rèn)配置文件
org.quartz.core 包下,有一個默認(rèn)的配置文件,quartz.properties,當(dāng)我們沒有定義一個同名的配置文件的時候,就會使用默認(rèn)配置文件里面的配置

org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

3、創(chuàng)建Job
實現(xiàn)唯一的方法 execute(),方法中的代碼就是任務(wù)執(zhí)行的內(nèi)容

public class MyJob implements Job {
    public void execute(JobExecutionContext context) throws     JobExecutionException {
        System.out.println("定時任務(wù)測試");
    }
}

在測試類方法中,把 Job 進一步包裝成 JobDetail,必須要指定 JobNamegroupName,兩個合起來是唯一標(biāo)識符,可以攜帶 KV 的數(shù)據(jù)(JobDataMap),用于擴展屬性,在運行的時候可以從 context獲取到

JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
    .withIdentity("job1", "group1")
    .usingJobData("vincent","666")
    .usingJobData("liao",1314)
    .build();

4、創(chuàng)建Trigger
在測試類 main()方法中,基于 SimpleTrigger 定義了一個每 2 秒鐘運行一次、不斷重復(fù)的 Trigger:

Trigger trigger = TriggerBuilder.newTrigger()
    .withIdentity("trigger1", "group1")
    .startNow()
    .withSchedule(SimpleScheduleBuilder.simpleSchedule()
        .withIntervalInSeconds(2)
        .repeatForever())
.build();

5、創(chuàng)建Scheduler
在測試類 main()方法中,通過 Factory 獲取調(diào)度器的實例,把 JobDetailTrigger綁定,注冊到容器中

Scheduler 啟動順序無所謂,只要有 Trigger 到達(dá)觸發(fā)條件,就會執(zhí)行任務(wù)

SchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

調(diào)度器一定是單例的

6、體系結(jié)構(gòu)總結(jié)


總體架構(gòu).png

1)JobDetail
創(chuàng)建一個實現(xiàn) Job 接口的類,使用 JobBuilder 包裝成 JobDetail,它可以攜帶KV 的數(shù)據(jù)

2)Trigger
定義任務(wù)的觸發(fā)規(guī)律:Trigger,使用 TriggerBuilder 來構(gòu)建
JobDetailTrigger1:N 的關(guān)系

為什么要解耦?
Trigger 接口在 Quartz 有 4 個繼承的子接口:

子接口 描述 特點
SimpleTrigger 簡單觸發(fā)器 固定時刻或時間間隔,單位是毫秒
CalendarIntervalTrigger 基于日歷的觸發(fā)器 比簡單觸發(fā)器更多時間單位,支持非固定時間的觸發(fā),例如一年可能 365/366,一個月可能 28/29/30/31
DailyTimeIntervalTrigger 基于日期的觸發(fā)器 每天的某個時間段
CronTrigger 基于Cron表達(dá)式的觸發(fā)器

SimpleTrigger
SimpleTrigger 可以定義固定時刻或者固定時間間隔的調(diào)度規(guī)則(精確到毫秒)
例如:每天 9 點鐘運行;每隔 30 分鐘運行一次

CalendarIntervalTrigger
可以定義更多時間單位的調(diào)度需求,精確到秒
好處是:不需要去計算時間間隔,比如 1 個小時等于多少毫秒

例如每年、每個月、每周、每天、每小時、每分鐘、每秒、每年的月數(shù)和每個月的天數(shù)不是固定的,這種情況也適用

DailyTimeIntervalTrigger
每天的某個時間段內(nèi),以一定的時間間隔執(zhí)行任務(wù)
例如:每天早上 10 點到晚上 7 點,每隔半個小時執(zhí)行一次,并且只在周一到周六執(zhí)行

CronTrigger
可以定義基于 Cron 表達(dá)式的調(diào)度規(guī)則,是最常用的觸發(fā)器類型
Cron表達(dá)式

位置 時間域 特殊值
1 0-59 , - * /
2 分鐘 0-59 , - * /
3 小時 0-23 , - * /
4 日期 1-31 , - * ? / L W C
5 月份 1-12 , - * /
6 星期 1-7 , - * ? / L W C
7 年份 1-31 , - * /

解析:

星號(*):可用在所有字段中,表示對應(yīng)時間域的每一個時刻,例如,在分鐘字段時,表示“每分鐘”;

問號(?):該字符只在日期和星期字段中使用,它通常指定為“無意義的值”,相當(dāng)于點位符;

減號(-):表達(dá)一個范圍,如在小時字段中使用“10-12”,則表示從 10 到 12 點,即 10,11,12;

逗號(,):表達(dá)一個列表值,如在星期字段中使用“MON,WED,FRI”,則表示星期一,星期三和星期五;

斜杠(/):x/y 表達(dá)一個等步長序列,x 為起始值,y 為增量步長值。如在分鐘字段中使用 0/15,則表示為 0,15,30 和45 秒,而 5/15 在分鐘字段中表示 5,20,35,50,你也可以使用*/y,它等同于 0/y;

L:該字符只在日期和星期字段中使用,代表“Last”的意思,但它在兩個字段中意思不同。L 在日期字段中,表示這個月份的最后一天,如一月的 31 號,非閏年二月的 28 號;如果 L 用在星期中,則表示星期六,等同于 7。但是,如果 L 出現(xiàn)在星期字段里,而且在前面有一個數(shù)值 X,則表示“這個月的最后 X 天”,例如,6L 表示該月的最后星期五;

W:該字符只能出現(xiàn)在日期字段里,是對前導(dǎo)日期的修飾,表示離該日期最近的工作日。例如 15W 表示離該月 15號最近的工作日,如果該月 15 號是星期六,則匹配 14 號星期五;如果 15 日是星期日,則匹配 16 號星期一;如果 15號是星期二,那結(jié)果就是 15 號星期二。但必須注意關(guān)聯(lián)的匹配日期不能夠跨月,如你指定 1W,如果 1 號是星期六,結(jié)果匹配的是 3 號星期一,而非上個月最后的那天。W 字符串只能指定單一日期,而不能指定日期范圍;

LW 組合:在日期字段可以組合使用 LW,它的意思是當(dāng)月的最后一個工作日;`井號(#)`:該字符只能在星期字段中使用,表示當(dāng)月某個工作日。如 `6#3` 表示當(dāng)月的第三個星期五(6 表示星期五,`#3`表示當(dāng)前的第三個),而 `4#5` 表示當(dāng)月的第五個星期三,假設(shè)當(dāng)月沒有第五個星期三,忽略不觸發(fā);

C:該字符只在日期和星期字段中使用,代表“Calendar”的意思。它的意思是計劃所關(guān)聯(lián)的日期,如果日期沒有被關(guān)聯(lián),則相當(dāng)于日歷中所有日期。例如 5C 在日期字段中就相當(dāng)于日歷 5 日以后的第一天。1C 在星期字段中相當(dāng)于星期日后的第一天。Cron 表達(dá)式對特殊字符的大小寫不敏感,對代表星期的縮寫英文大小寫也不敏感

3)Scheduler
調(diào)度器,是 Quartz 的指揮官,由 StdSchedulerFactory 產(chǎn)生。它是單例的

Quartz 中最重要的 API,默認(rèn)是實現(xiàn)類是 StdScheduler,里面包含了一個QuartzScheduler,QuartzScheduler 里面又包含了一個 QuartzSchedulerThread

Scheduler 中的方法主要分為三大類:
1)操作調(diào)度器本身,例如調(diào)度器的啟動 start()、調(diào)度器的關(guān)閉 shutdown()
2)操作 Trigger,例如 pauseTriggers()、resumeTrigger()
3)操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()

4)Listener
需求:在每個任務(wù)運行結(jié)束之后發(fā)送通知給運維管理員,那是不是
要在每個任務(wù)的最后添加一行代碼呢?這種方式對原來的代碼造成了入侵,不利于維護,如果代碼不是寫在任務(wù)代碼的最后一行,怎么知道任務(wù)執(zhí)行完了呢?或者說,怎么監(jiān)測到任務(wù)的生命周期呢?

觀察者模式:定義對象間一種一對多的依賴關(guān)系,使得每當(dāng)一個對象改變狀態(tài),則所有依賴它的對象都會得到通知并自動更新

Quartz 中提供了三種 Listener:
監(jiān)聽 Scheduler 的,監(jiān)聽 Trigger 的,監(jiān)聽 Job 的

只需要創(chuàng)建類實現(xiàn)相應(yīng)的接口,并在 Scheduler 上注冊 Listener,便可實現(xiàn)對核心對象的監(jiān)聽

JobListener
public class MyJobListener implements JobListener {
    //返回JobListener名稱
    public String getName() {
        String name = getClass().getSimpleName();
        System.out.println( "Method 111111 :"+ "獲取到監(jiān)聽器名稱:"+name);
        return name;
    }
    //Scheduler在JobDetail將要被執(zhí)行時調(diào)用這個方法
    public void jobToBeExecuted(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        System.out.println("Method 222222 :"+ jobName + " ——任務(wù)即將執(zhí)行 ");
    }
    //Scheduler在JobDetail即將被執(zhí)行,但又被TriggerListener否決了時調(diào)用這個方法
    public void jobExecutionVetoed(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        System.out.println("Method 333333 :"+ jobName + " ——任務(wù)被否決 ");
    }
    //Scheduler在JobDetail被執(zhí)行之后調(diào)用這個方法
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        String jobName = context.getJobDetail().getKey().getName();
        System.out.println("Method 444444 :"+ jobName + " ——執(zhí)行完畢 ");
        System.out.println("------------------");
    }
}

public class MyJobListenerTest {
    public static void main(String[] args) throws SchedulerException {

        // JobDetail
        JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();

        // Trigger
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();

        // SchedulerFactory
        SchedulerFactory  factory = new StdSchedulerFactory();

        // Scheduler
        Scheduler scheduler = factory.getScheduler();

        scheduler.scheduleJob(jobDetail, trigger);

        // 創(chuàng)建并注冊一個全局的Job Listener
        scheduler.getListenerManager().addJobListener(new MyJobListener(), EverythingMatcher.allJobs());

        scheduler.start();
    }
}

工具類:ListenerManager,用于添加、獲取、移除監(jiān)聽器
工具類:Matcher,主要是基于 groupName 和 keyName 進行匹配。

TriggerListener
public class MyTriggerListener implements TriggerListener {
    private String name;

    public MyTriggerListener(String name) {
        this.name = name;
    }
    //返回監(jiān)聽器的名稱
    public String getName() {
        return name;
    }

    // Trigger 被觸發(fā),Job 上的 execute() 方法將要被執(zhí)行時,Scheduler就調(diào)用這個方法
    public void triggerFired(Trigger trigger, JobExecutionContext context) {
        String triggerName = trigger.getKey().getName();
        System.out.println("Method 11111 " + triggerName + " was fired");
    }

    // 在 Trigger 觸發(fā)后,Job 將要被執(zhí)行時由 Scheduler 調(diào)用這個方法
    // TriggerListener 給了一個選擇去否決 Job 的執(zhí)行,如果返回true時,這個任務(wù)不會被觸發(fā)
    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
        String triggerName = trigger.getKey().getName();
        System.out.println("Method 222222 " + triggerName + " was not vetoed");
        return false;
    }
    //Trigger 錯過觸發(fā)時調(diào)用
    public void triggerMisfired(Trigger trigger) {
        String triggerName = trigger.getKey().getName();
        System.out.println("Method 333333 " + triggerName + " misfired");
    }
    //Trigger 被觸發(fā)并且完成了 Job 的執(zhí)行時,Scheduler 調(diào)用這個方法
    public void triggerComplete(Trigger trigger, JobExecutionContext context,
                                Trigger.CompletedExecutionInstruction triggerInstructionCode) {
        String triggerName = trigger.getKey().getName();
        System.out.println("Method 444444 " + triggerName + " is complete");
        System.out.println("------------");
    }
}

public class MyTriggerListenerTest {
    public static void main(String[] args) throws SchedulerException {

        // JobDetail
        JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();

        // Trigger
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()).build();

        // SchedulerFactory
        SchedulerFactory  factory = new StdSchedulerFactory();

        // Scheduler
        Scheduler scheduler = factory.getScheduler();

        scheduler.scheduleJob(jobDetail, trigger);

        // 創(chuàng)建并注冊一個全局的Trigger Listener
        scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener1"), EverythingMatcher.allTriggers());

        // 創(chuàng)建并注冊一個局部的Trigger Listener
        scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener2"), KeyMatcher.keyEquals(TriggerKey.triggerKey("trigger1", "gourp1")));

        // 創(chuàng)建并注冊一個特定組的Trigger Listener
        GroupMatcher<TriggerKey> matcher = GroupMatcher.triggerGroupEquals("gourp1");
        scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener3"), matcher);

        scheduler.start();
    }
}
SchedulerListener
public class MySchedulerListener implements SchedulerListener {
    
    public void jobScheduled(Trigger trigger) {
        String jobName = trigger.getJobKey().getName();
        System.out.println( jobName + " has been scheduled");
    }

    public void jobUnscheduled(TriggerKey triggerKey) {
        System.out.println(triggerKey + " is being unscheduled");
    }

    public void triggerFinalized(Trigger trigger) {
        System.out.println("Trigger is finished for " + trigger.getJobKey().getName());
    }

    public void triggerPaused(TriggerKey triggerKey) {
        System.out.println(triggerKey + " is being paused");
    }
    
    public void triggersPaused(String triggerGroup) {
        System.out.println("trigger group "+triggerGroup + " is being paused");
    }
    
    public void triggerResumed(TriggerKey triggerKey) {
        System.out.println(triggerKey + " is being resumed");
    }

    public void triggersResumed(String triggerGroup) {
        System.out.println("trigger group "+triggerGroup + " is being resumed");
    }

    
    public void jobAdded(JobDetail jobDetail) {
        System.out.println(jobDetail.getKey()+" is added");
    }
    
    public void jobDeleted(JobKey jobKey) {
        System.out.println(jobKey+" is deleted");
    }
    
    public void jobPaused(JobKey jobKey) {
        System.out.println(jobKey+" is paused");
    }

    public void jobsPaused(String jobGroup) {
        System.out.println("job group "+jobGroup+" is paused");
    }

    public void jobResumed(JobKey jobKey) {
        System.out.println(jobKey+" is resumed");
    }

    public void jobsResumed(String jobGroup) {
        System.out.println("job group "+jobGroup+" is resumed");
    }

    public void schedulerError(String msg, SchedulerException cause) {
        System.out.println(msg + cause.getUnderlyingException().getStackTrace());
    }
    
    public void schedulerInStandbyMode() {
        System.out.println("scheduler is in standby mode");
    }

    public void schedulerStarted() {
        System.out.println("scheduler has been started");
    }

    
    public void schedulerStarting() {
        System.out.println("scheduler is being started");
    }

    public void schedulerShutdown() {
        System.out.println("scheduler has been shutdown");
    }

    public void schedulerShuttingdown() {
        System.out.println("scheduler is being shutdown");
    }

    public void schedulingDataCleared() {
        System.out.println("scheduler has cleared all data");
    }
}

public class MySchedulerListenerTest {
    public static void main(String[] args) throws SchedulerException {

        // JobDetail
        JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();

        // Trigger
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();

        // SchedulerFactory
        SchedulerFactory  factory = new StdSchedulerFactory();

        // Scheduler
        Scheduler scheduler = factory.getScheduler();

        scheduler.scheduleJob(jobDetail, trigger);

        // 創(chuàng)建Scheduler Listener
        scheduler.getListenerManager().addSchedulerListener(new MySchedulerListener());

        scheduler.start();
    }
}

5)JobStore
最多可以運行多少個任務(wù)(磁盤、內(nèi)存、線程數(shù))?
Jobstore 用來存儲任務(wù)和觸發(fā)器相關(guān)的信息,例如所有任務(wù)的名稱、數(shù)量、狀態(tài)等等

Quartz 中有兩種存儲任務(wù)的方式:一種存在內(nèi)存,一種是存在數(shù)據(jù)庫

RAMJobStore

Quartz 默認(rèn)的 JobStore 是 RAMJobstore,也就是把任務(wù)和觸發(fā)器信息運行的信息存儲在內(nèi)存中,用到了 HashMap、TreeSet、HashSet 等等數(shù)據(jù)結(jié)構(gòu)

如果程序崩潰或重啟,所有存儲在內(nèi)存中的數(shù)據(jù)都會丟失,所以我們需要把這些數(shù)據(jù)持久化到磁盤

JDBCJobStore

JDBCJobStore 可以通過 JDBC 接口,將任務(wù)運行數(shù)據(jù)保存在數(shù)據(jù)庫中

JDBC 的實現(xiàn)方式有兩種,JobStoreSupport 類的兩個子類:
JobStoreTX:在獨立的程序中使用,自己管理事務(wù),不參與外部事務(wù)
JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事務(wù)時,可以使用他

使用JDBCJobStore時,需要配置數(shù)據(jù)的信息:

org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties,不使用默認(rèn)配置
org.quartz.jobStore.useProperties:true
#數(shù)據(jù)庫中 quartz 表的表名前綴
org.quartz.jobStore.tablePrefix:QRTZ_
org.quartz.jobStore.dataSource:myDS
?
#配置數(shù)據(jù)源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/vincent?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual

問題來了?需要建什么表?表里面有什么字段?字段類型和長度是什么?
在官網(wǎng)的 Downloads 鏈接中,提供了 11 張表的建表語句
2.3 的版本在這個路徑下:src\org\quartz\impl\jdbcjobstore

表名和作用:

QRTZ_BLOB_TRIGGERS:Trigger 作為 Blob 類型存儲

QRTZ_CALENDARS:存儲 Quartz 的 Calendar 信息

QRTZ_CRON_TRIGGERS:存儲 CronTrigger,包括 Cron 表達(dá)式和時區(qū)信息

QRTZ_FIRED_TRIGGERS:存儲與已觸發(fā)的 Trigger 相關(guān)的狀態(tài)信息,以及相關(guān) Job 的執(zhí)行信息

QRTZ_JOB_DETAILS:存儲每一個已配置的 Job 的詳細(xì)信息

QRTZ_LOCKS:存儲程序的悲觀鎖的信息

QRTZ_PAUSED_TRIGGER_GRPS:存儲已暫停的 Trigger 組的信息

QRTZ_SCHEDULER_STATE:存儲少量的有關(guān) Scheduler 的狀態(tài)信息,和別的 Scheduler 實例

QRTZ_SIMPLE_TRIGGERS:存儲 SimpleTrigger 的信息,包括重復(fù)次數(shù)、間隔、以及已觸的次數(shù)

QRTZ_SIMPROP_TRIGGERS:存儲 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 兩種類型的觸發(fā)器

QRTZ_TRIGGERS:存儲已配置的 Trigger 的信息

Quartz集成到Spring

Spring 在 spring-context-support.jar 中直接提供了對Quartz 的支持

image.png

可以在配置文件中把 JobDetail、Trigger、Scheduler 定義成 Bean,交給Spring去管理

1)定義Job

<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
    <property name="name" value="my_job_1"/>
    <property name="vincent" value="my_group"/>
    <property name="jobClass" value="com.vincent.quartz.MyJob1"/>
    <property name="durability" value="true"/>
</bean>

2)定義Trigger

<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
    <property name="name" value="my_trigger_1"/>
    <property name="group" value="my_group"/>
    <property name="jobDetail" ref="myJob1"/>
    <property name="startDelay" value="1000"/>
    <property name="repeatInterval" value="5000"/>
    <property name="repeatCount" value="2"/>
</bean>

3)定義Scheduler

<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="triggers">
        <list>
            <ref bean="simpleTrigger"/>
            <ref bean="cronTrigger"/>
        </list>
    </property>
</bean>

既然可以在配置文件配置,當(dāng)然也可以用@Bean 注解配置。在配置類上加上@Configuration 讓 Spring 讀取到

public class QuartzConfig {
    @Bean
    public JobDetail printTimeJobDetail(){
        return JobBuilder.newJob(MyJob1.class)
            .withIdentity("vincentJob")
            .usingJobData("vincent", "只為更好的你")
            .storeDurably()
            .build();
    }
    @Bean
    public Trigger printTimeJobTrigger() {
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
        return TriggerBuilder.newTrigger()
            .forJob(printTimeJobDetail())
            .withIdentity("quartzTaskService")
            .withSchedule(cronScheduleBuilder)
            .build();
    }
}


public class QuartzTest {
    private static Scheduler scheduler;
    public static void main(String[] args) throws SchedulerException {
        // 獲取容器
        ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");

        // 從容器中獲取調(diào)度器
        scheduler = (StdScheduler) ac.getBean("scheduler");

        // 啟動調(diào)度器
        scheduler.start();
    }
}

動態(tài)調(diào)度的實現(xiàn)

1)配置管理
用最簡單的數(shù)據(jù)庫的實現(xiàn)
問題 1:建一張什么樣的表?參考 JobDetail 的屬性

CREATE TABLE `sys_job` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `job_name` varchar(512) NOT NULL COMMENT '任務(wù)名稱',
    `job_group` varchar(512) NOT NULL COMMENT '任務(wù)組名',
    `job_cron` varchar(512) NOT NULL COMMENT '時間表達(dá)式',
    `job_class_path` varchar(1024) NOT NULL COMMENT '類路徑,全類型',
    `job_data_map` varchar(1024) DEFAULT NULL COMMENT '傳遞 map 參數(shù)',
    `job_status` int(2) NOT NULL COMMENT '狀態(tài):1 啟用 0 停用',
    `job_describe` varchar(1024) DEFAULT NULL COMMENT '任務(wù)功能描述',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;

2)數(shù)據(jù)操作與任務(wù)調(diào)度
操作數(shù)據(jù)表非常簡單,SSM 增刪改查

但是在修改了表的數(shù)據(jù)之后,怎么讓調(diào)度器知道呢?
調(diào)度器的接口:Scheduler

在我們的需求中,我們需要做的事情:
1、 新增一個任務(wù)
2、 刪除一個任務(wù)
3、 啟動、停止一個任務(wù)
4、 修改任務(wù)的信息(包括調(diào)度規(guī)律)
因 此 可 以 把 相 關(guān) 的 操 作 封 裝 到 一 個 工 具 類 中:

public class SchedulerUtil {
    private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);

    /**
     * 新增定時任務(wù)
     * @param jobClassName 類路徑
     * @param jobName 任務(wù)名稱
     * @param jobGroupName 組別
     * @param cronExpression Cron表達(dá)式
     * @param jobDataMap 需要傳遞的參數(shù)
     * @throws Exception
     */
    public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {
        // 通過SchedulerFactory獲取一個調(diào)度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        // 啟動調(diào)度器
        scheduler.start();
        // 構(gòu)建job信息
        JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
                .withIdentity(jobName, jobGroupName).build();
        // JobDataMap用于傳遞任務(wù)運行時的參數(shù),比如定時發(fā)送郵件,可以用json形式存儲收件人等等信息
        if (StringUtils.isNotEmpty(jobDataMap)) {
            JSONObject jb = JSONObject.parseObject(jobDataMap);
            Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");
            for (Map.Entry<String, Object> m:dataMap.entrySet()) {
                jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
            }
        }
        // 表達(dá)式調(diào)度構(gòu)建器(即任務(wù)執(zhí)行的時間)
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
        // 按新的cronExpression表達(dá)式構(gòu)建一個新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                .withSchedule(scheduleBuilder).startNow().build();
        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            logger.info("創(chuàng)建定時任務(wù)失敗" + e);
            throw new Exception("創(chuàng)建定時任務(wù)失敗");
        }
    }
    
    /**
     * 停用一個定時任務(wù)
     * @param jobName 任務(wù)名稱
     * @param jobGroupName 組別
     * @throws Exception
     */
    public static void jobPause(String jobName, String jobGroupName) throws Exception {
        // 通過SchedulerFactory獲取一個調(diào)度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
    }
    
    /**
     * 啟用一個定時任務(wù)
     * @param jobName 任務(wù)名稱
     * @param jobGroupName 組別
     * @throws Exception
     */
    public static void jobresume(String jobName, String jobGroupName) throws Exception {
        // 通過SchedulerFactory獲取一個調(diào)度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
    }
    
    /**
     * 刪除一個定時任務(wù)
     * @param jobName 任務(wù)名稱
     * @param jobGroupName 組別
     * @throws Exception
     */
    public static void jobdelete(String jobName, String jobGroupName) throws Exception {
        // 通過SchedulerFactory獲取一個調(diào)度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));
        scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));
        scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
    }
    
    /**
     * 更新定時任務(wù)表達(dá)式
     * @param jobName 任務(wù)名稱
     * @param jobGroupName 組別
     * @param cronExpression Cron表達(dá)式
     * @throws Exception
     */
    public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {
        try {
            SchedulerFactory schedulerFactory = new StdSchedulerFactory();
            Scheduler scheduler = schedulerFactory.getScheduler();
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
            // 表達(dá)式調(diào)度構(gòu)建器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            // 按新的cronExpression表達(dá)式重新構(gòu)建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();
            // 按新的trigger重新設(shè)置job執(zhí)行
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (SchedulerException e) {
            System.out.println("更新定時任務(wù)失敗" + e);
            throw new Exception("更新定時任務(wù)失敗");
        }
    }
    
    /**
     * 檢查Job是否存在
     * @throws Exception
     */
    public static Boolean isResume(String jobName, String jobGroupName) throws Exception {
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
        Boolean state = scheduler.checkExists(triggerKey);
         
        return state;
    }

    /**
     * 暫停所有任務(wù)
     * @throws Exception
     */
    public static void pauseAlljob() throws Exception {
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        scheduler.pauseAll();
    }

    /**
     * 喚醒所有任務(wù)
     * @throws Exception
     */
    public static void resumeAlljob() throws Exception {
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler sched = sf.getScheduler();
        sched.resumeAll();
    }

    /**
     * 獲取Job實例
     * @param classname
     * @return
     * @throws Exception
     */
    public static BaseJob getClass(String classname) throws Exception {
        try {
            Class<?> c = Class.forName(classname);
            return (BaseJob) c.newInstance();
        } catch (Exception e) {
            throw new Exception("類["+classname+"]不存在!");
        }
    }
}

3)容器啟動與Service注入
a)容器啟動
任務(wù)沒有定義在 ApplicationContext.xml 中,而是放到了數(shù)據(jù)庫中,SpringBoot 啟動時,怎么讀取任務(wù)信息?怎么在 Spring 啟動完成的時候做一些事情?

創(chuàng)建一個類,實現(xiàn) CommandLineRunner 接口,實現(xiàn) run方法
從表中查出狀態(tài)是 1 的任務(wù),然后構(gòu)建

@Component
public class InitStartSchedule implements CommandLineRunner {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Autowired
    private ISysJobService sysJobService;
    @Autowired
    private MyJobFactory myJobFactory;
    
    @Override
    public void run(String... args) throws Exception {
        /**
         * 用于程序啟動時加載定時任務(wù),并執(zhí)行已啟動的定時任務(wù)(只會執(zhí)行一次,在程序啟動完執(zhí)行)
         */
        
        //查詢job狀態(tài)為啟用的
        HashMap<String,String> map = new HashMap<String,String>();
        map.put("jobStatus", "1");
        List<SysJob> jobList= sysJobService.querySysJobList(map);
        if( null == jobList || jobList.size() ==0){
            logger.info("系統(tǒng)啟動,沒有需要執(zhí)行的任務(wù)... ...");
        }
        // 通過SchedulerFactory獲取一個調(diào)度器實例
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler scheduler = sf.getScheduler();
        // 如果不設(shè)置JobFactory,Service注入到Job會報空指針
        scheduler.setJobFactory(myJobFactory);
        // 啟動調(diào)度器
        scheduler.start();

        for (SysJob sysJob:jobList) {
            String jobClassName=sysJob.getJobName();
            String jobGroupName=sysJob.getJobGroup();
            //構(gòu)建job信息
            JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();
            if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {
                JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());
                Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");
                for (Map.Entry<String, Object> m:dataMap.entrySet()) {
                    jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
                }
            }
            //表達(dá)式調(diào)度構(gòu)建器(即任務(wù)執(zhí)行的時間)
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());
            //按新的cronExpression表達(dá)式構(gòu)建一個新的trigger
            CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
                .withSchedule(scheduleBuilder).startNow().build();
            // 任務(wù)不存在的時候才添加
            if( !scheduler.checkExists(jobDetail.getKey()) ){
                try {
                    scheduler.scheduleJob(jobDetail, trigger);
                } catch (SchedulerException e) {
                    logger.info("\n創(chuàng)建定時任務(wù)失敗"+e);
                    throw new Exception("創(chuàng)建定時任務(wù)失敗");
                }
            }
        }
    }
    
    public static BaseJob getClass(String classname) throws Exception
    {
        Class<?>  c= Class.forName(classname);
        return (BaseJob)c.newInstance();
    }
}

b)Service類注入到Job中
Spring Bean 如何注入到實現(xiàn)了 Job 接口的類中?

如果沒有任何配置,注入會報空指針異常

因為定時任務(wù)Job 對象的實例化過程是在 Quartz 中進行的,而 Service Bean 是由Spring 容器管理的,Quartz 察覺不到 Service Bean 的存在,所以無法將 Service Bean裝配到 Job 對象中

分析:
Quartz 集成到 Spring 中,用到 SchedulerFactoryBean,其實現(xiàn)了 InitializingBean方法,在唯一的方法 afterPropertiesSet()Bean 的屬性初始化后調(diào)用

調(diào)度器用 AdaptableJobFactoryJob 對象進行實例化,如果我們可以把這個 JobFactory 指定為我們自定義的工廠的話,就可以在 Job 實例化完成之后,把 Job納入到 Spring 容器中管理

解決:
1)定義一個 AdaptableJobFactory,實現(xiàn) JobFactory 接口,實現(xiàn)接口定義的newJob 方法,在這里面返回 Job 實例

public class AdaptableJobFactory implements JobFactory {
    @Override
    public Job newJob(TriggerFiredBundle bundle, Scheduler arg1) throws SchedulerException {
         return newJob(bundle);
    }

     public Job newJob(TriggerFiredBundle bundle) throws SchedulerException {
            try {
                // 返回Job實例
                Object jobObject = createJobInstance(bundle);
                return adaptJob(jobObject);
            }
            catch (Exception ex) {
                throw new SchedulerException("Job instantiation failed", ex);
            }
        }

        // 通過反射的方式創(chuàng)建實例
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Method getJobDetail = bundle.getClass().getMethod("getJobDetail");
            Object jobDetail = ReflectionUtils.invokeMethod(getJobDetail, bundle);
            Method getJobClass = jobDetail.getClass().getMethod("getJobClass");
            Class jobClass = (Class) ReflectionUtils.invokeMethod(getJobClass, jobDetail);
            return jobClass.newInstance();
        }

        protected Job adaptJob(Object jobObject) throws Exception {
            if (jobObject instanceof Job) {
                return (Job) jobObject;
            }
            else if (jobObject instanceof Runnable) {
                return new DelegatingJob((Runnable) jobObject);
            }
            else {
                throw new IllegalArgumentException("Unable to execute job class [" + jobObject.getClass().getName() +
                        "]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
            }
        }
}

2)定義一個MyJobFactory,繼承AdaptableJobFactory

@Component
public class MyJobFactory extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //調(diào)用父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        capableBeanFactory.autowireBean(jobInstance);

        return jobInstance;
    }
}

3)指定Scheduler的JobFactory為自定義的JobFactory
scheduler.setJobFactory(myJobFactory);

Quartz集群部署

1)為什么要集群
a)防止單點故障,減少對業(yè)務(wù)的影響
b)減少節(jié)點的壓力,例如在 11 點要觸發(fā) 5000 個任務(wù),如果有 10 個節(jié)點,則每個節(jié)點之需要執(zhí)行 500 個任務(wù)

2)集群需要解決的問題
a)任務(wù)重跑,因為節(jié)點部署的內(nèi)容是一樣的,到 10 點的時候,每個節(jié)點都會執(zhí)行相同的操作,引起數(shù)據(jù)混亂,比如跑批,絕對不能執(zhí)行多次
b)任務(wù)漏跑,假如任務(wù)是平均分配的,本來應(yīng)該在某個節(jié)點上執(zhí)行的任務(wù),因為節(jié)點故障,一直沒有得到執(zhí)行
c)水平集群需要注意時間同步問題
d)Quartz 使用的是隨機的負(fù)載均衡算法,不能指定節(jié)點執(zhí)行

所以必須要有一種共享數(shù)據(jù)或者通信的機制,在分布式系統(tǒng)的不同節(jié)點中,我們可以采用什么樣的方式,實現(xiàn)數(shù)據(jù)共享?

兩兩通信,或者基于分布式的服務(wù),實現(xiàn)數(shù)據(jù)共享

例如:ZK、Redis、DB
在 Quartz 中,提供了一種簡單的方式,基于數(shù)據(jù)庫共享任務(wù)執(zhí)行信息。也就是說,一個節(jié)點執(zhí)行任務(wù)的時候,會操作數(shù)據(jù)庫,其他的節(jié)點查詢數(shù)據(jù)庫,便可以感知到了

3)集群配置與驗證
quartz.properties 配置。
四個配置:集群實例 ID、集群開關(guān)、數(shù)據(jù)庫持久化、數(shù)據(jù)源信息
注意先清空 quartz 所有表、改端口、兩個任務(wù)頻率改成一樣
驗證 1:先后啟動 2 個節(jié)點,任務(wù)是否重跑
驗證 2:停掉一個節(jié)點,任務(wù)是否漏跑

Quartz調(diào)度原理

帶著問題看源碼:
Job 沒有繼承 Thread 和實現(xiàn) Runnable,是怎么被調(diào)用的?通過反射還是什么?
任務(wù)是什么時候被調(diào)度的?是誰在監(jiān)視任務(wù)還是監(jiān)視 Trigger?
任務(wù)是怎么被調(diào)用的?誰執(zhí)行了任務(wù)?
任務(wù)本身有狀態(tài)嗎?還是觸發(fā)器有狀態(tài)?

源碼入口:
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

1)獲取調(diào)度器實例

a、讀取配置文件

public Scheduler getScheduler() throws SchedulerException {
    if (cfg == null) {
        // 讀取 quartz.properties 配置文件
        initialize();
    }
    // 這個類是一個 HashMap,用來基于調(diào)度器的名稱保證調(diào)度器的唯一性
    SchedulerRepository schedRep = SchedulerRepository.getInstance();
?
    Scheduler sched = schedRep.lookup(getSchedulerName());
    // 如果調(diào)度器已經(jīng)存在了
    if (sched != null) {
        // 調(diào)度器關(guān)閉了,移除
        if (sched.isShutdown()) {
            schedRep.remove(getSchedulerName());
        } else {
            // 返回調(diào)度器
            return sched;
        }
    }
    // 調(diào)度器不存在,初始化
    sched = instantiate();
    return sched;
}

instantiate()方法中做了初始化的所有工作:

// 存儲任務(wù)信息的 JobStore
JobStore js = null;
// 創(chuàng)建線程池,默認(rèn)是 SimpleThreadPool
ThreadPool tp = null;
// 創(chuàng)建調(diào)度器
QuartzScheduler qs = null;
// 連接數(shù)據(jù)庫的連接管理器
DBConnectionManager dbMgr = null;
// 自動生成 ID
// 創(chuàng)建線程執(zhí)行器,默認(rèn)為 DefaultThreadExecutor
ThreadExecutor threadExecutor;

b、創(chuàng)建線程池(包工頭)
創(chuàng)建了一個線程池,默認(rèn)是配置文件中指定的SimpleThreadPool

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();

SimpleThreadPool 里面維護了三個 list,分別存放所有的工作線程、空閑的工作線程和忙碌的工作線程,我們可以把 SimpleThreadPool 理解為包工頭

private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

tp 的 runInThread()方法是線程池運行線程的接口方法。參數(shù) Runnable 是執(zhí)行的任務(wù)內(nèi)容,取出 WorkerThread 去執(zhí)行參數(shù)里面的 runnable(JobRunShell)

WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);

c、WorkerThread(工人)
WorkerThreadSimpleThreadPool的內(nèi)部類 , 用來執(zhí)行任務(wù) ,我 們 把WorkerThread理解為工人。在WorkerThread的run方法中,執(zhí)行傳入的參數(shù)runnable任務(wù):runnable.run();

d、創(chuàng)建調(diào)度線程(項目經(jīng)理)
創(chuàng)建了調(diào)度器 QuartzScheduler:
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

QuartzScheduler 的構(gòu)造函數(shù)中,創(chuàng)建了QuartzSchedulerThread,我們把它理解為項目經(jīng)理,它會調(diào)用包工頭的工人資源,給他們安排任務(wù)

并且創(chuàng)建了線程執(zhí)行器 schedThreadExecutor , 執(zhí) 行 了 這 個QuartzSchedulerThread,也就是調(diào)用了它的 run 方法

// 創(chuàng)建一個線程,resouces 里面有線程名稱
this.schedThread = new QuartzSchedulerThread(this, resources);
// 線程執(zhí)行器
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//執(zhí)行這個線程,也就是調(diào)用了線程的 run 方法
schedThreadExecutor.execute(this.schedThread);

在QuartzSchedulerThread 類,找到 run 方法,這個是 Quartz 任務(wù)調(diào)度的核心方法:

public void run() {
        boolean lastAcquireFailed = false;
        // 檢查 scheuler 是否為停止?fàn)顟B(tài)
        while (!halted.get()) {    
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    // 檢查是否為暫停狀態(tài)
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            // 暫停的話會嘗試去獲得信號鎖,并 wait 一會
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }
                //從線程池獲取可用的線程
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers = null;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        // 獲取需要下次執(zhí)行的 triggers
                        // idleWaitTime: 默認(rèn) 30s
                        // availThreadCount:獲取可用(空閑)的工作線程數(shù)量,總會大于 1,因為該方法會一直阻塞,直到有工作線程空閑下來。
                        //maxBatchSize:一次拉取 trigger 的最大數(shù)量,默認(rèn)是 1
                        //batchTimeWindow:時間窗口調(diào)節(jié)參數(shù),默認(rèn)是 0
                        //misfireThreshold: 超過這個時間還未觸發(fā)的 trigger,被認(rèn)為發(fā)生了 misfire,默認(rèn) 60s
                        //調(diào)度線程一次會拉取 NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個 triggers,默認(rèn)情況下,會拉取未來 30s、過去 60s 之間還未 fire 的 1 個 trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        lastAcquireFailed = false;
                        if (log.isDebugEnabled()) 
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if(!lastAcquireFailed) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        lastAcquireFailed = true;
                        continue;
                    } catch (RuntimeException e) {
                        if(!lastAcquireFailed) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        lastAcquireFailed = true;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                //觸發(fā) Trigger,把 ACQUIRED 狀態(tài)改成 EXECUTING
                                //如果這個 trigger 的 NEXTFIRETIME 為空,也就是未來不再觸發(fā),就將其狀態(tài)改為COMPLETE
                                //如果trigger不允許并發(fā)執(zhí)行(即Job的實現(xiàn)類標(biāo)注了@DisallowConcurrentExecution),則將狀態(tài)變?yōu)?BLOCKED,否則就將狀態(tài)改為 WAITING
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                //循環(huán)處理 Trigger
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                //根據(jù) trigger 信息實例化JobRunShell(implements Runnable),同時依據(jù)JOB_CLASS_NAME 實例化 Job,隨后我們將 JobRunShell 實例丟入工作線。
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            // 執(zhí)行 JobRunShell 的 run 方法
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

JobRunShell 的作用:
JobRunShell 用來為 Job 提供安全的運行環(huán)境的,執(zhí)行 Job 中所有的作業(yè),捕獲運行中的異常,在任務(wù)執(zhí)行完畢的時候更新 Trigger 狀態(tài)等等

JobRunShell 實例是用 JobRunShellFactoryQuartzSchedulerThread 創(chuàng)建的,在調(diào)度器決定一個 Job 被觸發(fā)的時候,它從線程池中取出一個線程來執(zhí)行任務(wù)

e、線程模型總結(jié)
SimpleThreadPool:包工頭,管理所有 WorkerThread
WorkerThread:工人,把 Job 包裝成 JobRunShell,執(zhí)行
QuartSchedulerThread:項目經(jīng)理,獲取即將觸發(fā)的 Trigger,從包工頭拿出拿到worker,執(zhí)行 Trigger 綁定的任務(wù)

2)綁定JobDetail和Trigger

// 存儲 JobDetail 和 Trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知相關(guān)的 Listener
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);

3)啟動調(diào)度器

// 通知監(jiān)聽器
notifySchedulerListenersStarting();
if (initialStart == null) {
    initialStart = new Date();
    this.resources.getJobStore().schedulerStarted();
    startPlugins();
} else {
    resources.getJobStore().schedulerResumed();
}
// 通知 QuartzSchedulerThread 不再等待,開始干活
schedThread.togglePause(false);
// 通知監(jiān)聽器
notifySchedulerListenersStarted();

4)源碼總結(jié)
getScheduler 方法創(chuàng)建線程池 ThreadPool,創(chuàng)建調(diào)度器 QuartzScheduler,創(chuàng)建調(diào)度線程 QuartzSchedulerThread,調(diào)度線程初始處于暫停狀態(tài)

scheduleJob 將任務(wù)添加到 JobStore

scheduler.start()方法激活調(diào)度器,QuartzSchedulerThread 從 timeTrriger 取出待觸發(fā)的任務(wù),并包裝成 TriggerFiredBundle,然后由 JobRunShellFactory 創(chuàng)建TriggerFiredBundle 的 執(zhí) 行 線 程 JobRunShell , 調(diào) 度 執(zhí) 行 通 過 線 程 池SimpleThreadPool去執(zhí)行JobRunShell,而JobRunShell執(zhí)行的就是任務(wù)類的execute方法:job.execute(JobExecutionContext context)

5)集群原理
基于數(shù)據(jù)庫,如何實現(xiàn)任務(wù)的不重跑不漏跑?
問題 1:如果任務(wù)執(zhí)行中的資源是“下一個即將觸發(fā)的任務(wù)”,怎么基于數(shù)據(jù)庫實現(xiàn)這個資源的競爭?
問題 2:怎么對數(shù)據(jù)的行加鎖?


image.png

QuartzSchedulerThread 獲取下一個即將觸發(fā)的 Trigger:
triggers = qsRsrcs.getJobStore().acquireNextTriggers()

調(diào)用 JobStoreSupport 的 acquireNextTriggers()方法

調(diào)用 JobStoreSupport.executeInNonManagedTXLock()方法
return executeInNonManagedTXLock(lockName,

嘗試獲得鎖:
transOwner = getLockHandler().obtainLock(conn, lockName);

調(diào)用 DBSemaphore 的 obtainLock()方法:

public boolean obtainLock(Connection conn, String lockName)
throws LockException {
    if (!isLockOwner(lockName)) {
        executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);

調(diào)用 StdRowLockSemaphore 的 executeSQL()方法

最終用 JDBC 執(zhí)行 SQL,語句內(nèi)容是 expandedSQL 和 expandedInsertSQL
ps = conn.prepareStatement(expandedSQL);

問題:expandedSQL 和 expandedInsertSQL 是一條什么 SQL 語句?似乎我們沒有賦值?

在 StdRowLockSemaphore 的構(gòu)造函數(shù)中,把定義的兩條 SQL 傳進去:

public StdRowLockSemaphore() {
    super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK);
}

public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
?
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";

它調(diào)用了父類 DBSemaphore 的構(gòu)造函數(shù):

public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
    this.tablePrefix = tablePrefix;
    this.schedName = schedName;
    setSQL(defaultSQL);
    setInsertSQL(defaultInsertSQL);
}

在 setSQL()和 setInsertSQL()中為 expandedSQL 和expandedInsertSQL 賦值

執(zhí)行的 SQL 語句:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update

在執(zhí)行官方的建表腳本的時候,QRTZ_LOCKS 表,它會為每個調(diào)度器創(chuàng)建兩行數(shù)據(jù),獲取 Trigger 和觸發(fā) Trigger 是兩把鎖:


image.png
image.png

6)任務(wù)為什么重復(fù)執(zhí)行
有多個調(diào)度器,任務(wù)沒有重復(fù)執(zhí)行,也就是默認(rèn)會加鎖,什么情況下不會上鎖呢?
JobStoreSupportexecuteInNonManagedTXLock()方法,如果 lockName 為空,則不上鎖

 if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection 
                // until after acquiring the lock since it isn't needed.
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            
            if (conn == null) {
                conn = getNonManagedTXConnection();
            }

而 上 一 步 JobStoreSupportacquireNextTriggers() 方 法 , 如 果isAcquireTriggersWithinLock()值是 false 并且 maxCount>1 的話,lockName 賦值為null,否則賦值為 LOCK_TRIGGER_ACCESS,這種情況獲取 Trigger 下默認(rèn)不加鎖

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
}

acquireTriggersWithinLock 默認(rèn)是空的:
private boolean acquireTriggersWithinLock = false;

maxCount 來自 QuartzSchedulerThread:

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
    now + idleWaitTime, Math.min(availThreadCount,     qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());

getMaxBatchSize()來自 QuartzSchedulerResources,代表 Scheduler 一次拉取trigger 的最大數(shù)量,默認(rèn)是 1
private int maxBatchSize = 1;

這個值可以通過參數(shù)修改:
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=50

理論上把 batchTriggerAcquisitionMaxCount 的值改掉以后,在獲取 Trigger 的時候就不會再上鎖了,但是實際上為什么沒有出現(xiàn)頻繁的重復(fù)執(zhí)行問題?
因為每個調(diào)度器的線程持有鎖的時間太短了

QuartzSchedulerThread 的 triggersFired()方法:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

調(diào)用了 JobStoreSupport 的 triggersFired()方法,接著又調(diào)用了一個 triggerFired,triggerFired(Connection conn, OperableTrigger trigger)方法:
如果 Trigger 的狀態(tài)不是 ACQUIRED,也就是說被其他的線程 fire 了,返回空。但是這種樂觀鎖的檢查在高并發(fā)下難免會出現(xiàn) ABA 的問題,比如線程 A 拿到的時候還是ACQUIRED 狀態(tài),但是剛準(zhǔn)備執(zhí)行的時候已經(jīng)變成了 EXECUTING 狀態(tài),這個時候就會出現(xiàn)重復(fù)執(zhí)行的問題

if (!state.equals(STATE_ACQUIRED)) {
    return null;
}

總結(jié):
如果設(shè)置的數(shù)量>1,并且使用 JDBC JobStoreRAMJobStore 不支持分布式,只有 一 個 調(diào) 度 器 實 例 , 所 以 不 加 鎖 ) , 則 屬 性org.quartz.jobStore.acquireTriggersWithinLock 應(yīng)設(shè)置為 true,否則不加鎖會導(dǎo)致任務(wù)重復(fù)執(zhí)行

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
org.quartz.jobStore.acquireTriggersWithinLock=true

Quartz-Misfire

什么情況下錯過觸發(fā)?

示例:線程池只有 5 個線程,當(dāng)有 5 個任務(wù)都在執(zhí)行的時候,第六個任務(wù)即將觸發(fā),這個時候任務(wù)就不能得到執(zhí)行,在 quartz.properties 有一個屬性 misfireThreshold,用來定義觸發(fā)器超時的"臨界值",也就是超過了這個時間,就算錯過觸發(fā)了

例如,如果 misfireThreshold 是 60000(60 秒),9 點整應(yīng)該執(zhí)行的任務(wù),9 點零1 分還沒有可用線程執(zhí)行它,就會超時(misfires)

可能造成 misfired job的原因:
1、 沒有可用線程
2、 Trigger 被暫停
3、 系統(tǒng)重啟
4、 禁止并發(fā)執(zhí)行的任務(wù)在到達(dá)觸發(fā)時間時,上次執(zhí)行還沒有結(jié)束

錯過觸發(fā)怎么辦?

Misfire 策略設(shè)置,每一種 Trigger 都定義了自己的 Misfire 策略,不同的策略通過不同的方法來設(shè)置

Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
    .withSchedule(SimpleScheduleBuilder.simpleSchedule()
    .withMisfireHandlingInstructionNowWithExistingCount()
    .withIntervalInSeconds(1)
    .repeatForever()).build();

一般來說有 3 種:
1、 忽略
2、 立即跑一次
3、 下次跑

文章參考:
Quartz Scheduler misfireThreshold屬性的意義與觸發(fā)器超時后的處理策略

quartz-misfire 錯失、補償執(zhí)行

怎么避免任務(wù)錯過觸發(fā)?

合理地設(shè)置線程池數(shù)量,以及任務(wù)觸發(fā)間隔

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容