任務(wù)調(diào)度簡介
1、什么時候需要任務(wù)調(diào)度
業(yè)務(wù)場景:
1)賬單日或者還款日上午 9 點,給每個信用卡客戶發(fā)送賬單通知,還款通知,如何判斷客戶的賬單日、還款日,完成通知的發(fā)送?
2)銀行業(yè)務(wù)系統(tǒng),夜間要完成跑批的一系列流程,清理數(shù)據(jù),下載文件,解析文件,對賬清算、切換結(jié)算日期等等,如何觸發(fā)一系列流程的執(zhí)行?
3)金融機構(gòu)跟人民銀行二代支付系統(tǒng)對接,人民銀行要求低于 5W 的金額(小額支付)半個小時打一次包發(fā)送,以緩解并發(fā)壓力。所以,銀行的跨行轉(zhuǎn)賬分成了多個流程:
錄入、復(fù)核、發(fā)送。如何把半個小時以內(nèi)的所有數(shù)據(jù)一次性發(fā)送?
類似于這種 1、基于準(zhǔn)確的時刻或者固定的時間間隔觸發(fā)的任務(wù),或者 2、有批量數(shù)據(jù)需要處理,或者 3、要實現(xiàn)兩個動作解耦的場景,我們都可以用任務(wù)調(diào)度來實現(xiàn)
2、任務(wù)調(diào)度需求分析
基本需求:
1)可以定義觸發(fā)的規(guī)則,比如基于時刻、時間間隔、表達(dá)式
2)可以定義需要執(zhí)行的任務(wù)。比如執(zhí)行一個腳本或者一段代碼。任務(wù)和規(guī)則是分開的
3)集中管理配置,持久配置。不用把規(guī)則寫在代碼里面,可以看到所有的任務(wù)配置,方便維護,重啟之后任務(wù)可以再次調(diào)度——配置文件或者配置中心
4)支持任務(wù)的串行執(zhí)行,例如執(zhí)行 A 任務(wù)后再執(zhí)行 B 任務(wù)再執(zhí)行 C 任務(wù)
5)支持多個任務(wù)并發(fā)執(zhí)行,互不干擾(例如ScheduledThreadPoolExecutor 線程池)
6)有自己的調(diào)度器,可以啟動、中斷、停止任務(wù)
7)容易集成到 Spring
3、任務(wù)調(diào)度工具對比
| 層次 | 舉例 | 特點 |
|---|---|---|
| 操作系統(tǒng) | Linux Window 計劃任務(wù) | 只能執(zhí)行簡單腳本或者命令 |
| 數(shù)據(jù)庫 | MySQL Oracle | 可以操作數(shù)據(jù),不能執(zhí)行java代碼 |
| 工具 | kettle | 可以操作數(shù)據(jù)庫,執(zhí)行腳本,沒有集中配置 |
| 開發(fā)語言 | JDK Timer、ScheduledThreadPool | Timer:單線程 JDK1.5 之后:ScheduledThreadPool(Cache、Fiexed、Single):沒有集中配置,日常管理不夠靈活 |
| 容器 | Spring Task、@Scheduled | 不支持集群 |
| 分布式框架 | XXL-JOB、Elastic-Job | 支持集群,集中配置,容易管理 |
@Scheduled 也是用 JUC 的 ScheduledExecutorService 實現(xiàn)的
Scheduled(cron = “0/5 * * * * ?”)
1) ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法將@Scheduled 的方法包裝為指定的 task添加到 ScheduledTaskRegistrar 中
2) ScheduledAnnotationBeanPostProcessor 會監(jiān)聽 Spring 的容器初始化事件,在 Spring 容器初始化完成后進行TaskScheduler 實現(xiàn)類實例的查找,若發(fā)現(xiàn)有 SchedulingConfigurer 的實現(xiàn)類實例,則跳過 3
3) 查找 TaskScheduler 的實現(xiàn)類實例默認(rèn)是通過類型查找,若有多個實現(xiàn)則會查找名字為"taskScheduler"的實現(xiàn) Bean,若沒有找到則在 ScheduledTaskRegistrar 調(diào)度任務(wù)的時候會創(chuàng)建一個newSingleThreadScheduledExecutor,將TaskScheduler 的實現(xiàn)類實例設(shè)置到 ScheduledTaskRegistrar 屬性中
4)ScheduledTaskRegistrar 的 scheduleTasks 方法觸發(fā)任務(wù)調(diào)度
5)真正調(diào)度任務(wù)的類是 TaskScheduler 實現(xiàn)類中的 ScheduledExecutorService,由 J.U.C 提供
Quartz基本介紹
Quatz 是一個特性豐富的,開源的任務(wù)調(diào)度庫,它幾乎可以嵌入所有的 Java 程序,從很小的獨立應(yīng)用程序到大型商業(yè)系統(tǒng)。Quartz 可以用來創(chuàng)建成百上千的簡單的或者復(fù)雜的任務(wù),這些任務(wù)可以用來執(zhí)行任何程序可以做的事情。Quartz 擁有很多企業(yè)級的特性,包括支持 JTA 事務(wù)和集群
Quartz 是一個老牌的任務(wù)調(diào)度系統(tǒng),98 年構(gòu)思,01 年發(fā)布到 sourceforge。現(xiàn)在更新比較慢,因為已經(jīng)非常成熟了。
Quartz 的目的就是讓任務(wù)調(diào)度更加簡單,開發(fā)人員只需要關(guān)注業(yè)務(wù)即可。他是用 Java 語言編寫的(也有.NET 的版本)。Java 代碼能做的任何事情,Quartz 都可以調(diào)度。
特點:
a)精確到毫秒級別的調(diào)度
b)可以獨立運行,也可以集成到容器中
c)支持事務(wù)(JobStoreCMT )
d)支持集群
e)支持持久化
Quartz Java編程
1、引入依賴
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
2、默認(rèn)配置文件
org.quartz.core 包下,有一個默認(rèn)的配置文件,quartz.properties,當(dāng)我們沒有定義一個同名的配置文件的時候,就會使用默認(rèn)配置文件里面的配置
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
3、創(chuàng)建Job
實現(xiàn)唯一的方法 execute(),方法中的代碼就是任務(wù)執(zhí)行的內(nèi)容
public class MyJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("定時任務(wù)測試");
}
}
在測試類方法中,把 Job 進一步包裝成 JobDetail,必須要指定 JobName 和 groupName,兩個合起來是唯一標(biāo)識符,可以攜帶 KV 的數(shù)據(jù)(JobDataMap),用于擴展屬性,在運行的時候可以從 context獲取到
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
.withIdentity("job1", "group1")
.usingJobData("vincent","666")
.usingJobData("liao",1314)
.build();
4、創(chuàng)建Trigger
在測試類 main()方法中,基于 SimpleTrigger 定義了一個每 2 秒鐘運行一次、不斷重復(fù)的 Trigger:
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();
5、創(chuàng)建Scheduler
在測試類 main()方法中,通過 Factory 獲取調(diào)度器的實例,把 JobDetail 和 Trigger綁定,注冊到容器中
Scheduler 啟動順序無所謂,只要有 Trigger 到達(dá)觸發(fā)條件,就會執(zhí)行任務(wù)
SchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
調(diào)度器一定是單例的
6、體系結(jié)構(gòu)總結(jié)

1)JobDetail
創(chuàng)建一個實現(xiàn) Job 接口的類,使用 JobBuilder 包裝成 JobDetail,它可以攜帶KV 的數(shù)據(jù)
2)Trigger
定義任務(wù)的觸發(fā)規(guī)律:Trigger,使用 TriggerBuilder 來構(gòu)建
JobDetail 跟 Trigger 是 1:N 的關(guān)系
為什么要解耦?
Trigger 接口在 Quartz 有 4 個繼承的子接口:
| 子接口 | 描述 | 特點 |
|---|---|---|
| SimpleTrigger | 簡單觸發(fā)器 | 固定時刻或時間間隔,單位是毫秒 |
| CalendarIntervalTrigger | 基于日歷的觸發(fā)器 | 比簡單觸發(fā)器更多時間單位,支持非固定時間的觸發(fā),例如一年可能 365/366,一個月可能 28/29/30/31 |
| DailyTimeIntervalTrigger | 基于日期的觸發(fā)器 | 每天的某個時間段 |
| CronTrigger | 基于Cron表達(dá)式的觸發(fā)器 |
SimpleTrigger
SimpleTrigger 可以定義固定時刻或者固定時間間隔的調(diào)度規(guī)則(精確到毫秒)
例如:每天 9 點鐘運行;每隔 30 分鐘運行一次
CalendarIntervalTrigger
可以定義更多時間單位的調(diào)度需求,精確到秒
好處是:不需要去計算時間間隔,比如 1 個小時等于多少毫秒
例如每年、每個月、每周、每天、每小時、每分鐘、每秒、每年的月數(shù)和每個月的天數(shù)不是固定的,這種情況也適用
DailyTimeIntervalTrigger
每天的某個時間段內(nèi),以一定的時間間隔執(zhí)行任務(wù)
例如:每天早上 10 點到晚上 7 點,每隔半個小時執(zhí)行一次,并且只在周一到周六執(zhí)行
CronTrigger
可以定義基于 Cron 表達(dá)式的調(diào)度規(guī)則,是最常用的觸發(fā)器類型
Cron表達(dá)式
| 位置 | 時間域 | 特殊值 | |
|---|---|---|---|
| 1 | 秒 | 0-59 | , - * / |
| 2 | 分鐘 | 0-59 | , - * / |
| 3 | 小時 | 0-23 | , - * / |
| 4 | 日期 | 1-31 | , - * ? / L W C |
| 5 | 月份 | 1-12 | , - * / |
| 6 | 星期 | 1-7 | , - * ? / L W C |
| 7 | 年份 | 1-31 | , - * / |
解析:
星號(*):可用在所有字段中,表示對應(yīng)時間域的每一個時刻,例如,在分鐘字段時,表示“每分鐘”;
問號(?):該字符只在日期和星期字段中使用,它通常指定為“無意義的值”,相當(dāng)于點位符;
減號(-):表達(dá)一個范圍,如在小時字段中使用“10-12”,則表示從 10 到 12 點,即 10,11,12;
逗號(,):表達(dá)一個列表值,如在星期字段中使用“MON,WED,FRI”,則表示星期一,星期三和星期五;
斜杠(/):x/y 表達(dá)一個等步長序列,x 為起始值,y 為增量步長值。如在分鐘字段中使用 0/15,則表示為 0,15,30 和45 秒,而 5/15 在分鐘字段中表示 5,20,35,50,你也可以使用*/y,它等同于 0/y;
L:該字符只在日期和星期字段中使用,代表“Last”的意思,但它在兩個字段中意思不同。L 在日期字段中,表示這個月份的最后一天,如一月的 31 號,非閏年二月的 28 號;如果 L 用在星期中,則表示星期六,等同于 7。但是,如果 L 出現(xiàn)在星期字段里,而且在前面有一個數(shù)值 X,則表示“這個月的最后 X 天”,例如,6L 表示該月的最后星期五;
W:該字符只能出現(xiàn)在日期字段里,是對前導(dǎo)日期的修飾,表示離該日期最近的工作日。例如 15W 表示離該月 15號最近的工作日,如果該月 15 號是星期六,則匹配 14 號星期五;如果 15 日是星期日,則匹配 16 號星期一;如果 15號是星期二,那結(jié)果就是 15 號星期二。但必須注意關(guān)聯(lián)的匹配日期不能夠跨月,如你指定 1W,如果 1 號是星期六,結(jié)果匹配的是 3 號星期一,而非上個月最后的那天。W 字符串只能指定單一日期,而不能指定日期范圍;
LW 組合:在日期字段可以組合使用 LW,它的意思是當(dāng)月的最后一個工作日;`井號(#)`:該字符只能在星期字段中使用,表示當(dāng)月某個工作日。如 `6#3` 表示當(dāng)月的第三個星期五(6 表示星期五,`#3`表示當(dāng)前的第三個),而 `4#5` 表示當(dāng)月的第五個星期三,假設(shè)當(dāng)月沒有第五個星期三,忽略不觸發(fā);
C:該字符只在日期和星期字段中使用,代表“Calendar”的意思。它的意思是計劃所關(guān)聯(lián)的日期,如果日期沒有被關(guān)聯(lián),則相當(dāng)于日歷中所有日期。例如 5C 在日期字段中就相當(dāng)于日歷 5 日以后的第一天。1C 在星期字段中相當(dāng)于星期日后的第一天。Cron 表達(dá)式對特殊字符的大小寫不敏感,對代表星期的縮寫英文大小寫也不敏感
3)Scheduler
調(diào)度器,是 Quartz 的指揮官,由 StdSchedulerFactory 產(chǎn)生。它是單例的
Quartz 中最重要的 API,默認(rèn)是實現(xiàn)類是 StdScheduler,里面包含了一個QuartzScheduler,QuartzScheduler 里面又包含了一個 QuartzSchedulerThread
Scheduler 中的方法主要分為三大類:
1)操作調(diào)度器本身,例如調(diào)度器的啟動 start()、調(diào)度器的關(guān)閉 shutdown()
2)操作 Trigger,例如 pauseTriggers()、resumeTrigger()
3)操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()
4)Listener
需求:在每個任務(wù)運行結(jié)束之后發(fā)送通知給運維管理員,那是不是
要在每個任務(wù)的最后添加一行代碼呢?這種方式對原來的代碼造成了入侵,不利于維護,如果代碼不是寫在任務(wù)代碼的最后一行,怎么知道任務(wù)執(zhí)行完了呢?或者說,怎么監(jiān)測到任務(wù)的生命周期呢?
觀察者模式:定義對象間一種一對多的依賴關(guān)系,使得每當(dāng)一個對象改變狀態(tài),則所有依賴它的對象都會得到通知并自動更新
Quartz 中提供了三種 Listener:
監(jiān)聽 Scheduler 的,監(jiān)聽 Trigger 的,監(jiān)聽 Job 的
只需要創(chuàng)建類實現(xiàn)相應(yīng)的接口,并在 Scheduler 上注冊 Listener,便可實現(xiàn)對核心對象的監(jiān)聽
JobListener
public class MyJobListener implements JobListener {
//返回JobListener名稱
public String getName() {
String name = getClass().getSimpleName();
System.out.println( "Method 111111 :"+ "獲取到監(jiān)聽器名稱:"+name);
return name;
}
//Scheduler在JobDetail將要被執(zhí)行時調(diào)用這個方法
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 222222 :"+ jobName + " ——任務(wù)即將執(zhí)行 ");
}
//Scheduler在JobDetail即將被執(zhí)行,但又被TriggerListener否決了時調(diào)用這個方法
public void jobExecutionVetoed(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 333333 :"+ jobName + " ——任務(wù)被否決 ");
}
//Scheduler在JobDetail被執(zhí)行之后調(diào)用這個方法
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 444444 :"+ jobName + " ——執(zhí)行完畢 ");
System.out.println("------------------");
}
}
public class MyJobListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 創(chuàng)建并注冊一個全局的Job Listener
scheduler.getListenerManager().addJobListener(new MyJobListener(), EverythingMatcher.allJobs());
scheduler.start();
}
}
工具類:ListenerManager,用于添加、獲取、移除監(jiān)聽器
工具類:Matcher,主要是基于 groupName 和 keyName 進行匹配。
TriggerListener
public class MyTriggerListener implements TriggerListener {
private String name;
public MyTriggerListener(String name) {
this.name = name;
}
//返回監(jiān)聽器的名稱
public String getName() {
return name;
}
// Trigger 被觸發(fā),Job 上的 execute() 方法將要被執(zhí)行時,Scheduler就調(diào)用這個方法
public void triggerFired(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 11111 " + triggerName + " was fired");
}
// 在 Trigger 觸發(fā)后,Job 將要被執(zhí)行時由 Scheduler 調(diào)用這個方法
// TriggerListener 給了一個選擇去否決 Job 的執(zhí)行,如果返回true時,這個任務(wù)不會被觸發(fā)
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 222222 " + triggerName + " was not vetoed");
return false;
}
//Trigger 錯過觸發(fā)時調(diào)用
public void triggerMisfired(Trigger trigger) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 333333 " + triggerName + " misfired");
}
//Trigger 被觸發(fā)并且完成了 Job 的執(zhí)行時,Scheduler 調(diào)用這個方法
public void triggerComplete(Trigger trigger, JobExecutionContext context,
Trigger.CompletedExecutionInstruction triggerInstructionCode) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 444444 " + triggerName + " is complete");
System.out.println("------------");
}
}
public class MyTriggerListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 創(chuàng)建并注冊一個全局的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener1"), EverythingMatcher.allTriggers());
// 創(chuàng)建并注冊一個局部的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener2"), KeyMatcher.keyEquals(TriggerKey.triggerKey("trigger1", "gourp1")));
// 創(chuàng)建并注冊一個特定組的Trigger Listener
GroupMatcher<TriggerKey> matcher = GroupMatcher.triggerGroupEquals("gourp1");
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener3"), matcher);
scheduler.start();
}
}
SchedulerListener
public class MySchedulerListener implements SchedulerListener {
public void jobScheduled(Trigger trigger) {
String jobName = trigger.getJobKey().getName();
System.out.println( jobName + " has been scheduled");
}
public void jobUnscheduled(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being unscheduled");
}
public void triggerFinalized(Trigger trigger) {
System.out.println("Trigger is finished for " + trigger.getJobKey().getName());
}
public void triggerPaused(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being paused");
}
public void triggersPaused(String triggerGroup) {
System.out.println("trigger group "+triggerGroup + " is being paused");
}
public void triggerResumed(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being resumed");
}
public void triggersResumed(String triggerGroup) {
System.out.println("trigger group "+triggerGroup + " is being resumed");
}
public void jobAdded(JobDetail jobDetail) {
System.out.println(jobDetail.getKey()+" is added");
}
public void jobDeleted(JobKey jobKey) {
System.out.println(jobKey+" is deleted");
}
public void jobPaused(JobKey jobKey) {
System.out.println(jobKey+" is paused");
}
public void jobsPaused(String jobGroup) {
System.out.println("job group "+jobGroup+" is paused");
}
public void jobResumed(JobKey jobKey) {
System.out.println(jobKey+" is resumed");
}
public void jobsResumed(String jobGroup) {
System.out.println("job group "+jobGroup+" is resumed");
}
public void schedulerError(String msg, SchedulerException cause) {
System.out.println(msg + cause.getUnderlyingException().getStackTrace());
}
public void schedulerInStandbyMode() {
System.out.println("scheduler is in standby mode");
}
public void schedulerStarted() {
System.out.println("scheduler has been started");
}
public void schedulerStarting() {
System.out.println("scheduler is being started");
}
public void schedulerShutdown() {
System.out.println("scheduler has been shutdown");
}
public void schedulerShuttingdown() {
System.out.println("scheduler is being shutdown");
}
public void schedulingDataCleared() {
System.out.println("scheduler has cleared all data");
}
}
public class MySchedulerListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 創(chuàng)建Scheduler Listener
scheduler.getListenerManager().addSchedulerListener(new MySchedulerListener());
scheduler.start();
}
}
5)JobStore
最多可以運行多少個任務(wù)(磁盤、內(nèi)存、線程數(shù))?
Jobstore 用來存儲任務(wù)和觸發(fā)器相關(guān)的信息,例如所有任務(wù)的名稱、數(shù)量、狀態(tài)等等
Quartz 中有兩種存儲任務(wù)的方式:一種存在內(nèi)存,一種是存在數(shù)據(jù)庫
RAMJobStore
Quartz 默認(rèn)的 JobStore 是 RAMJobstore,也就是把任務(wù)和觸發(fā)器信息運行的信息存儲在內(nèi)存中,用到了 HashMap、TreeSet、HashSet 等等數(shù)據(jù)結(jié)構(gòu)
如果程序崩潰或重啟,所有存儲在內(nèi)存中的數(shù)據(jù)都會丟失,所以我們需要把這些數(shù)據(jù)持久化到磁盤
JDBCJobStore
JDBCJobStore 可以通過 JDBC 接口,將任務(wù)運行數(shù)據(jù)保存在數(shù)據(jù)庫中
JDBC 的實現(xiàn)方式有兩種,JobStoreSupport 類的兩個子類:
JobStoreTX:在獨立的程序中使用,自己管理事務(wù),不參與外部事務(wù)
JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事務(wù)時,可以使用他
使用JDBCJobStore時,需要配置數(shù)據(jù)的信息:
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties,不使用默認(rèn)配置
org.quartz.jobStore.useProperties:true
#數(shù)據(jù)庫中 quartz 表的表名前綴
org.quartz.jobStore.tablePrefix:QRTZ_
org.quartz.jobStore.dataSource:myDS
?
#配置數(shù)據(jù)源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/vincent?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual
問題來了?需要建什么表?表里面有什么字段?字段類型和長度是什么?
在官網(wǎng)的 Downloads 鏈接中,提供了 11 張表的建表語句
2.3 的版本在這個路徑下:src\org\quartz\impl\jdbcjobstore
表名和作用:
QRTZ_BLOB_TRIGGERS:Trigger 作為 Blob 類型存儲
QRTZ_CALENDARS:存儲 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS:存儲 CronTrigger,包括 Cron 表達(dá)式和時區(qū)信息
QRTZ_FIRED_TRIGGERS:存儲與已觸發(fā)的 Trigger 相關(guān)的狀態(tài)信息,以及相關(guān) Job 的執(zhí)行信息
QRTZ_JOB_DETAILS:存儲每一個已配置的 Job 的詳細(xì)信息
QRTZ_LOCKS:存儲程序的悲觀鎖的信息
QRTZ_PAUSED_TRIGGER_GRPS:存儲已暫停的 Trigger 組的信息
QRTZ_SCHEDULER_STATE:存儲少量的有關(guān) Scheduler 的狀態(tài)信息,和別的 Scheduler 實例
QRTZ_SIMPLE_TRIGGERS:存儲 SimpleTrigger 的信息,包括重復(fù)次數(shù)、間隔、以及已觸的次數(shù)
QRTZ_SIMPROP_TRIGGERS:存儲 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 兩種類型的觸發(fā)器
QRTZ_TRIGGERS:存儲已配置的 Trigger 的信息
Quartz集成到Spring
Spring 在 spring-context-support.jar 中直接提供了對Quartz 的支持

可以在配置文件中把 JobDetail、Trigger、Scheduler 定義成 Bean,交給Spring去管理
1)定義Job
<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="name" value="my_job_1"/>
<property name="vincent" value="my_group"/>
<property name="jobClass" value="com.vincent.quartz.MyJob1"/>
<property name="durability" value="true"/>
</bean>
2)定義Trigger
<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
<property name="name" value="my_trigger_1"/>
<property name="group" value="my_group"/>
<property name="jobDetail" ref="myJob1"/>
<property name="startDelay" value="1000"/>
<property name="repeatInterval" value="5000"/>
<property name="repeatCount" value="2"/>
</bean>
3)定義Scheduler
<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="simpleTrigger"/>
<ref bean="cronTrigger"/>
</list>
</property>
</bean>
既然可以在配置文件配置,當(dāng)然也可以用@Bean 注解配置。在配置類上加上@Configuration 讓 Spring 讀取到
public class QuartzConfig {
@Bean
public JobDetail printTimeJobDetail(){
return JobBuilder.newJob(MyJob1.class)
.withIdentity("vincentJob")
.usingJobData("vincent", "只為更好的你")
.storeDurably()
.build();
}
@Bean
public Trigger printTimeJobTrigger() {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
return TriggerBuilder.newTrigger()
.forJob(printTimeJobDetail())
.withIdentity("quartzTaskService")
.withSchedule(cronScheduleBuilder)
.build();
}
}
public class QuartzTest {
private static Scheduler scheduler;
public static void main(String[] args) throws SchedulerException {
// 獲取容器
ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");
// 從容器中獲取調(diào)度器
scheduler = (StdScheduler) ac.getBean("scheduler");
// 啟動調(diào)度器
scheduler.start();
}
}
動態(tài)調(diào)度的實現(xiàn)
1)配置管理
用最簡單的數(shù)據(jù)庫的實現(xiàn)
問題 1:建一張什么樣的表?參考 JobDetail 的屬性
CREATE TABLE `sys_job` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`job_name` varchar(512) NOT NULL COMMENT '任務(wù)名稱',
`job_group` varchar(512) NOT NULL COMMENT '任務(wù)組名',
`job_cron` varchar(512) NOT NULL COMMENT '時間表達(dá)式',
`job_class_path` varchar(1024) NOT NULL COMMENT '類路徑,全類型',
`job_data_map` varchar(1024) DEFAULT NULL COMMENT '傳遞 map 參數(shù)',
`job_status` int(2) NOT NULL COMMENT '狀態(tài):1 啟用 0 停用',
`job_describe` varchar(1024) DEFAULT NULL COMMENT '任務(wù)功能描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;
2)數(shù)據(jù)操作與任務(wù)調(diào)度
操作數(shù)據(jù)表非常簡單,SSM 增刪改查
但是在修改了表的數(shù)據(jù)之后,怎么讓調(diào)度器知道呢?
調(diào)度器的接口:Scheduler
在我們的需求中,我們需要做的事情:
1、 新增一個任務(wù)
2、 刪除一個任務(wù)
3、 啟動、停止一個任務(wù)
4、 修改任務(wù)的信息(包括調(diào)度規(guī)律)
因 此 可 以 把 相 關(guān) 的 操 作 封 裝 到 一 個 工 具 類 中:
public class SchedulerUtil {
private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);
/**
* 新增定時任務(wù)
* @param jobClassName 類路徑
* @param jobName 任務(wù)名稱
* @param jobGroupName 組別
* @param cronExpression Cron表達(dá)式
* @param jobDataMap 需要傳遞的參數(shù)
* @throws Exception
*/
public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {
// 通過SchedulerFactory獲取一個調(diào)度器實例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
// 啟動調(diào)度器
scheduler.start();
// 構(gòu)建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
.withIdentity(jobName, jobGroupName).build();
// JobDataMap用于傳遞任務(wù)運行時的參數(shù),比如定時發(fā)送郵件,可以用json形式存儲收件人等等信息
if (StringUtils.isNotEmpty(jobDataMap)) {
JSONObject jb = JSONObject.parseObject(jobDataMap);
Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
// 表達(dá)式調(diào)度構(gòu)建器(即任務(wù)執(zhí)行的時間)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表達(dá)式構(gòu)建一個新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
logger.info("創(chuàng)建定時任務(wù)失敗" + e);
throw new Exception("創(chuàng)建定時任務(wù)失敗");
}
}
/**
* 停用一個定時任務(wù)
* @param jobName 任務(wù)名稱
* @param jobGroupName 組別
* @throws Exception
*/
public static void jobPause(String jobName, String jobGroupName) throws Exception {
// 通過SchedulerFactory獲取一個調(diào)度器實例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 啟用一個定時任務(wù)
* @param jobName 任務(wù)名稱
* @param jobGroupName 組別
* @throws Exception
*/
public static void jobresume(String jobName, String jobGroupName) throws Exception {
// 通過SchedulerFactory獲取一個調(diào)度器實例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 刪除一個定時任務(wù)
* @param jobName 任務(wù)名稱
* @param jobGroupName 組別
* @throws Exception
*/
public static void jobdelete(String jobName, String jobGroupName) throws Exception {
// 通過SchedulerFactory獲取一個調(diào)度器實例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 更新定時任務(wù)表達(dá)式
* @param jobName 任務(wù)名稱
* @param jobGroupName 組別
* @param cronExpression Cron表達(dá)式
* @throws Exception
*/
public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {
try {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
// 表達(dá)式調(diào)度構(gòu)建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表達(dá)式重新構(gòu)建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();
// 按新的trigger重新設(shè)置job執(zhí)行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
System.out.println("更新定時任務(wù)失敗" + e);
throw new Exception("更新定時任務(wù)失敗");
}
}
/**
* 檢查Job是否存在
* @throws Exception
*/
public static Boolean isResume(String jobName, String jobGroupName) throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
Boolean state = scheduler.checkExists(triggerKey);
return state;
}
/**
* 暫停所有任務(wù)
* @throws Exception
*/
public static void pauseAlljob() throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseAll();
}
/**
* 喚醒所有任務(wù)
* @throws Exception
*/
public static void resumeAlljob() throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
sched.resumeAll();
}
/**
* 獲取Job實例
* @param classname
* @return
* @throws Exception
*/
public static BaseJob getClass(String classname) throws Exception {
try {
Class<?> c = Class.forName(classname);
return (BaseJob) c.newInstance();
} catch (Exception e) {
throw new Exception("類["+classname+"]不存在!");
}
}
}
3)容器啟動與Service注入
a)容器啟動
任務(wù)沒有定義在 ApplicationContext.xml 中,而是放到了數(shù)據(jù)庫中,SpringBoot 啟動時,怎么讀取任務(wù)信息?怎么在 Spring 啟動完成的時候做一些事情?
創(chuàng)建一個類,實現(xiàn) CommandLineRunner 接口,實現(xiàn) run方法
從表中查出狀態(tài)是 1 的任務(wù),然后構(gòu)建
@Component
public class InitStartSchedule implements CommandLineRunner {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ISysJobService sysJobService;
@Autowired
private MyJobFactory myJobFactory;
@Override
public void run(String... args) throws Exception {
/**
* 用于程序啟動時加載定時任務(wù),并執(zhí)行已啟動的定時任務(wù)(只會執(zhí)行一次,在程序啟動完執(zhí)行)
*/
//查詢job狀態(tài)為啟用的
HashMap<String,String> map = new HashMap<String,String>();
map.put("jobStatus", "1");
List<SysJob> jobList= sysJobService.querySysJobList(map);
if( null == jobList || jobList.size() ==0){
logger.info("系統(tǒng)啟動,沒有需要執(zhí)行的任務(wù)... ...");
}
// 通過SchedulerFactory獲取一個調(diào)度器實例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
// 如果不設(shè)置JobFactory,Service注入到Job會報空指針
scheduler.setJobFactory(myJobFactory);
// 啟動調(diào)度器
scheduler.start();
for (SysJob sysJob:jobList) {
String jobClassName=sysJob.getJobName();
String jobGroupName=sysJob.getJobGroup();
//構(gòu)建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();
if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {
JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());
Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
//表達(dá)式調(diào)度構(gòu)建器(即任務(wù)執(zhí)行的時間)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());
//按新的cronExpression表達(dá)式構(gòu)建一個新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
// 任務(wù)不存在的時候才添加
if( !scheduler.checkExists(jobDetail.getKey()) ){
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
logger.info("\n創(chuàng)建定時任務(wù)失敗"+e);
throw new Exception("創(chuàng)建定時任務(wù)失敗");
}
}
}
}
public static BaseJob getClass(String classname) throws Exception
{
Class<?> c= Class.forName(classname);
return (BaseJob)c.newInstance();
}
}
b)Service類注入到Job中
Spring Bean 如何注入到實現(xiàn)了 Job 接口的類中?
如果沒有任何配置,注入會報空指針異常
因為定時任務(wù)Job 對象的實例化過程是在 Quartz 中進行的,而 Service Bean 是由Spring 容器管理的,Quartz 察覺不到 Service Bean 的存在,所以無法將 Service Bean裝配到 Job 對象中
分析:
Quartz 集成到 Spring 中,用到 SchedulerFactoryBean,其實現(xiàn)了 InitializingBean方法,在唯一的方法 afterPropertiesSet()在 Bean 的屬性初始化后調(diào)用
調(diào)度器用 AdaptableJobFactory 對 Job 對象進行實例化,如果我們可以把這個 JobFactory 指定為我們自定義的工廠的話,就可以在 Job 實例化完成之后,把 Job納入到 Spring 容器中管理
解決:
1)定義一個 AdaptableJobFactory,實現(xiàn) JobFactory 接口,實現(xiàn)接口定義的newJob 方法,在這里面返回 Job 實例
public class AdaptableJobFactory implements JobFactory {
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler arg1) throws SchedulerException {
return newJob(bundle);
}
public Job newJob(TriggerFiredBundle bundle) throws SchedulerException {
try {
// 返回Job實例
Object jobObject = createJobInstance(bundle);
return adaptJob(jobObject);
}
catch (Exception ex) {
throw new SchedulerException("Job instantiation failed", ex);
}
}
// 通過反射的方式創(chuàng)建實例
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Method getJobDetail = bundle.getClass().getMethod("getJobDetail");
Object jobDetail = ReflectionUtils.invokeMethod(getJobDetail, bundle);
Method getJobClass = jobDetail.getClass().getMethod("getJobClass");
Class jobClass = (Class) ReflectionUtils.invokeMethod(getJobClass, jobDetail);
return jobClass.newInstance();
}
protected Job adaptJob(Object jobObject) throws Exception {
if (jobObject instanceof Job) {
return (Job) jobObject;
}
else if (jobObject instanceof Runnable) {
return new DelegatingJob((Runnable) jobObject);
}
else {
throw new IllegalArgumentException("Unable to execute job class [" + jobObject.getClass().getName() +
"]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
}
}
}
2)定義一個MyJobFactory,繼承AdaptableJobFactory
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//調(diào)用父類的方法
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
3)指定Scheduler的JobFactory為自定義的JobFactory
scheduler.setJobFactory(myJobFactory);
Quartz集群部署
1)為什么要集群
a)防止單點故障,減少對業(yè)務(wù)的影響
b)減少節(jié)點的壓力,例如在 11 點要觸發(fā) 5000 個任務(wù),如果有 10 個節(jié)點,則每個節(jié)點之需要執(zhí)行 500 個任務(wù)
2)集群需要解決的問題
a)任務(wù)重跑,因為節(jié)點部署的內(nèi)容是一樣的,到 10 點的時候,每個節(jié)點都會執(zhí)行相同的操作,引起數(shù)據(jù)混亂,比如跑批,絕對不能執(zhí)行多次
b)任務(wù)漏跑,假如任務(wù)是平均分配的,本來應(yīng)該在某個節(jié)點上執(zhí)行的任務(wù),因為節(jié)點故障,一直沒有得到執(zhí)行
c)水平集群需要注意時間同步問題
d)Quartz 使用的是隨機的負(fù)載均衡算法,不能指定節(jié)點執(zhí)行
所以必須要有一種共享數(shù)據(jù)或者通信的機制,在分布式系統(tǒng)的不同節(jié)點中,我們可以采用什么樣的方式,實現(xiàn)數(shù)據(jù)共享?
兩兩通信,或者基于分布式的服務(wù),實現(xiàn)數(shù)據(jù)共享
例如:ZK、Redis、DB
在 Quartz 中,提供了一種簡單的方式,基于數(shù)據(jù)庫共享任務(wù)執(zhí)行信息。也就是說,一個節(jié)點執(zhí)行任務(wù)的時候,會操作數(shù)據(jù)庫,其他的節(jié)點查詢數(shù)據(jù)庫,便可以感知到了
3)集群配置與驗證
quartz.properties 配置。
四個配置:集群實例 ID、集群開關(guān)、數(shù)據(jù)庫持久化、數(shù)據(jù)源信息
注意先清空 quartz 所有表、改端口、兩個任務(wù)頻率改成一樣
驗證 1:先后啟動 2 個節(jié)點,任務(wù)是否重跑
驗證 2:停掉一個節(jié)點,任務(wù)是否漏跑
Quartz調(diào)度原理
帶著問題看源碼:
Job 沒有繼承 Thread 和實現(xiàn) Runnable,是怎么被調(diào)用的?通過反射還是什么?
任務(wù)是什么時候被調(diào)度的?是誰在監(jiān)視任務(wù)還是監(jiān)視 Trigger?
任務(wù)是怎么被調(diào)用的?誰執(zhí)行了任務(wù)?
任務(wù)本身有狀態(tài)嗎?還是觸發(fā)器有狀態(tài)?
源碼入口:
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
1)獲取調(diào)度器實例
a、讀取配置文件
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
// 讀取 quartz.properties 配置文件
initialize();
}
// 這個類是一個 HashMap,用來基于調(diào)度器的名稱保證調(diào)度器的唯一性
SchedulerRepository schedRep = SchedulerRepository.getInstance();
?
Scheduler sched = schedRep.lookup(getSchedulerName());
// 如果調(diào)度器已經(jīng)存在了
if (sched != null) {
// 調(diào)度器關(guān)閉了,移除
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
// 返回調(diào)度器
return sched;
}
}
// 調(diào)度器不存在,初始化
sched = instantiate();
return sched;
}
instantiate()方法中做了初始化的所有工作:
// 存儲任務(wù)信息的 JobStore
JobStore js = null;
// 創(chuàng)建線程池,默認(rèn)是 SimpleThreadPool
ThreadPool tp = null;
// 創(chuàng)建調(diào)度器
QuartzScheduler qs = null;
// 連接數(shù)據(jù)庫的連接管理器
DBConnectionManager dbMgr = null;
// 自動生成 ID
// 創(chuàng)建線程執(zhí)行器,默認(rèn)為 DefaultThreadExecutor
ThreadExecutor threadExecutor;
b、創(chuàng)建線程池(包工頭)
創(chuàng)建了一個線程池,默認(rèn)是配置文件中指定的SimpleThreadPool
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
SimpleThreadPool 里面維護了三個 list,分別存放所有的工作線程、空閑的工作線程和忙碌的工作線程,我們可以把 SimpleThreadPool 理解為包工頭
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
tp 的 runInThread()方法是線程池運行線程的接口方法。參數(shù) Runnable 是執(zhí)行的任務(wù)內(nèi)容,取出 WorkerThread 去執(zhí)行參數(shù)里面的 runnable(JobRunShell)
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
c、WorkerThread(工人)
WorkerThread 是 SimpleThreadPool的內(nèi)部類 , 用來執(zhí)行任務(wù) ,我 們 把WorkerThread理解為工人。在WorkerThread的run方法中,執(zhí)行傳入的參數(shù)runnable任務(wù):runnable.run();
d、創(chuàng)建調(diào)度線程(項目經(jīng)理)
創(chuàng)建了調(diào)度器 QuartzScheduler:
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
在 QuartzScheduler 的構(gòu)造函數(shù)中,創(chuàng)建了QuartzSchedulerThread,我們把它理解為項目經(jīng)理,它會調(diào)用包工頭的工人資源,給他們安排任務(wù)
并且創(chuàng)建了線程執(zhí)行器 schedThreadExecutor , 執(zhí) 行 了 這 個QuartzSchedulerThread,也就是調(diào)用了它的 run 方法
// 創(chuàng)建一個線程,resouces 里面有線程名稱
this.schedThread = new QuartzSchedulerThread(this, resources);
// 線程執(zhí)行器
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//執(zhí)行這個線程,也就是調(diào)用了線程的 run 方法
schedThreadExecutor.execute(this.schedThread);
在QuartzSchedulerThread 類,找到 run 方法,這個是 Quartz 任務(wù)調(diào)度的核心方法:
public void run() {
boolean lastAcquireFailed = false;
// 檢查 scheuler 是否為停止?fàn)顟B(tài)
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
// 檢查是否為暫停狀態(tài)
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
// 暫停的話會嘗試去獲得信號鎖,并 wait 一會
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
//從線程池獲取可用的線程
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 獲取需要下次執(zhí)行的 triggers
// idleWaitTime: 默認(rèn) 30s
// availThreadCount:獲取可用(空閑)的工作線程數(shù)量,總會大于 1,因為該方法會一直阻塞,直到有工作線程空閑下來。
//maxBatchSize:一次拉取 trigger 的最大數(shù)量,默認(rèn)是 1
//batchTimeWindow:時間窗口調(diào)節(jié)參數(shù),默認(rèn)是 0
//misfireThreshold: 超過這個時間還未觸發(fā)的 trigger,被認(rèn)為發(fā)生了 misfire,默認(rèn) 60s
//調(diào)度線程一次會拉取 NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個 triggers,默認(rèn)情況下,會拉取未來 30s、過去 60s 之間還未 fire 的 1 個 trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
//觸發(fā) Trigger,把 ACQUIRED 狀態(tài)改成 EXECUTING
//如果這個 trigger 的 NEXTFIRETIME 為空,也就是未來不再觸發(fā),就將其狀態(tài)改為COMPLETE
//如果trigger不允許并發(fā)執(zhí)行(即Job的實現(xiàn)類標(biāo)注了@DisallowConcurrentExecution),則將狀態(tài)變?yōu)?BLOCKED,否則就將狀態(tài)改為 WAITING
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
//循環(huán)處理 Trigger
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
//根據(jù) trigger 信息實例化JobRunShell(implements Runnable),同時依據(jù)JOB_CLASS_NAME 實例化 Job,隨后我們將 JobRunShell 實例丟入工作線。
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 執(zhí)行 JobRunShell 的 run 方法
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
JobRunShell 的作用:
JobRunShell 用來為 Job 提供安全的運行環(huán)境的,執(zhí)行 Job 中所有的作業(yè),捕獲運行中的異常,在任務(wù)執(zhí)行完畢的時候更新 Trigger 狀態(tài)等等
JobRunShell 實例是用 JobRunShellFactory 為 QuartzSchedulerThread 創(chuàng)建的,在調(diào)度器決定一個 Job 被觸發(fā)的時候,它從線程池中取出一個線程來執(zhí)行任務(wù)
e、線程模型總結(jié)
SimpleThreadPool:包工頭,管理所有 WorkerThread
WorkerThread:工人,把 Job 包裝成 JobRunShell,執(zhí)行
QuartSchedulerThread:項目經(jīng)理,獲取即將觸發(fā)的 Trigger,從包工頭拿出拿到worker,執(zhí)行 Trigger 綁定的任務(wù)
2)綁定JobDetail和Trigger
// 存儲 JobDetail 和 Trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知相關(guān)的 Listener
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
3)啟動調(diào)度器
// 通知監(jiān)聽器
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
// 通知 QuartzSchedulerThread 不再等待,開始干活
schedThread.togglePause(false);
// 通知監(jiān)聽器
notifySchedulerListenersStarted();
4)源碼總結(jié)
getScheduler 方法創(chuàng)建線程池 ThreadPool,創(chuàng)建調(diào)度器 QuartzScheduler,創(chuàng)建調(diào)度線程 QuartzSchedulerThread,調(diào)度線程初始處于暫停狀態(tài)
scheduleJob 將任務(wù)添加到 JobStore 中
scheduler.start()方法激活調(diào)度器,QuartzSchedulerThread 從 timeTrriger 取出待觸發(fā)的任務(wù),并包裝成 TriggerFiredBundle,然后由 JobRunShellFactory 創(chuàng)建TriggerFiredBundle 的 執(zhí) 行 線 程 JobRunShell , 調(diào) 度 執(zhí) 行 通 過 線 程 池SimpleThreadPool去執(zhí)行JobRunShell,而JobRunShell執(zhí)行的就是任務(wù)類的execute方法:job.execute(JobExecutionContext context)
5)集群原理
基于數(shù)據(jù)庫,如何實現(xiàn)任務(wù)的不重跑不漏跑?
問題 1:如果任務(wù)執(zhí)行中的資源是“下一個即將觸發(fā)的任務(wù)”,怎么基于數(shù)據(jù)庫實現(xiàn)這個資源的競爭?
問題 2:怎么對數(shù)據(jù)的行加鎖?

QuartzSchedulerThread 獲取下一個即將觸發(fā)的 Trigger:
triggers = qsRsrcs.getJobStore().acquireNextTriggers()
調(diào)用 JobStoreSupport 的 acquireNextTriggers()方法
調(diào)用 JobStoreSupport.executeInNonManagedTXLock()方法
return executeInNonManagedTXLock(lockName,
嘗試獲得鎖:
transOwner = getLockHandler().obtainLock(conn, lockName);
調(diào)用 DBSemaphore 的 obtainLock()方法:
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
if (!isLockOwner(lockName)) {
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
調(diào)用 StdRowLockSemaphore 的 executeSQL()方法
最終用 JDBC 執(zhí)行 SQL,語句內(nèi)容是 expandedSQL 和 expandedInsertSQL
ps = conn.prepareStatement(expandedSQL);
問題:expandedSQL 和 expandedInsertSQL 是一條什么 SQL 語句?似乎我們沒有賦值?
在 StdRowLockSemaphore 的構(gòu)造函數(shù)中,把定義的兩條 SQL 傳進去:
public StdRowLockSemaphore() {
super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK);
}
public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
?
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
它調(diào)用了父類 DBSemaphore 的構(gòu)造函數(shù):
public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
this.tablePrefix = tablePrefix;
this.schedName = schedName;
setSQL(defaultSQL);
setInsertSQL(defaultInsertSQL);
}
在 setSQL()和 setInsertSQL()中為 expandedSQL 和expandedInsertSQL 賦值
執(zhí)行的 SQL 語句:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
在執(zhí)行官方的建表腳本的時候,QRTZ_LOCKS 表,它會為每個調(diào)度器創(chuàng)建兩行數(shù)據(jù),獲取 Trigger 和觸發(fā) Trigger 是兩把鎖:


6)任務(wù)為什么重復(fù)執(zhí)行
有多個調(diào)度器,任務(wù)沒有重復(fù)執(zhí)行,也就是默認(rèn)會加鎖,什么情況下不會上鎖呢?
JobStoreSupport 的 executeInNonManagedTXLock()方法,如果 lockName 為空,則不上鎖
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
而 上 一 步 JobStoreSupport 的 acquireNextTriggers() 方 法 , 如 果isAcquireTriggersWithinLock()值是 false 并且 maxCount>1 的話,lockName 賦值為null,否則賦值為 LOCK_TRIGGER_ACCESS,這種情況獲取 Trigger 下默認(rèn)不加鎖
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
}
acquireTriggersWithinLock 默認(rèn)是空的:
private boolean acquireTriggersWithinLock = false;
maxCount 來自 QuartzSchedulerThread:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
getMaxBatchSize()來自 QuartzSchedulerResources,代表 Scheduler 一次拉取trigger 的最大數(shù)量,默認(rèn)是 1:
private int maxBatchSize = 1;
這個值可以通過參數(shù)修改:
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=50
理論上把 batchTriggerAcquisitionMaxCount 的值改掉以后,在獲取 Trigger 的時候就不會再上鎖了,但是實際上為什么沒有出現(xiàn)頻繁的重復(fù)執(zhí)行問題?
因為每個調(diào)度器的線程持有鎖的時間太短了
QuartzSchedulerThread 的 triggersFired()方法:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
調(diào)用了 JobStoreSupport 的 triggersFired()方法,接著又調(diào)用了一個 triggerFired,triggerFired(Connection conn, OperableTrigger trigger)方法:
如果 Trigger 的狀態(tài)不是 ACQUIRED,也就是說被其他的線程 fire 了,返回空。但是這種樂觀鎖的檢查在高并發(fā)下難免會出現(xiàn) ABA 的問題,比如線程 A 拿到的時候還是ACQUIRED 狀態(tài),但是剛準(zhǔn)備執(zhí)行的時候已經(jīng)變成了 EXECUTING 狀態(tài),這個時候就會出現(xiàn)重復(fù)執(zhí)行的問題
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
總結(jié):
如果設(shè)置的數(shù)量>1,并且使用 JDBC JobStore(RAMJobStore 不支持分布式,只有 一 個 調(diào) 度 器 實 例 , 所 以 不 加 鎖 ) , 則 屬 性org.quartz.jobStore.acquireTriggersWithinLock 應(yīng)設(shè)置為 true,否則不加鎖會導(dǎo)致任務(wù)重復(fù)執(zhí)行
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
org.quartz.jobStore.acquireTriggersWithinLock=true
Quartz-Misfire
什么情況下錯過觸發(fā)?
示例:線程池只有 5 個線程,當(dāng)有 5 個任務(wù)都在執(zhí)行的時候,第六個任務(wù)即將觸發(fā),這個時候任務(wù)就不能得到執(zhí)行,在 quartz.properties 有一個屬性 misfireThreshold,用來定義觸發(fā)器超時的"臨界值",也就是超過了這個時間,就算錯過觸發(fā)了
例如,如果 misfireThreshold 是 60000(60 秒),9 點整應(yīng)該執(zhí)行的任務(wù),9 點零1 分還沒有可用線程執(zhí)行它,就會超時(misfires)
可能造成 misfired job的原因:
1、 沒有可用線程
2、 Trigger 被暫停
3、 系統(tǒng)重啟
4、 禁止并發(fā)執(zhí)行的任務(wù)在到達(dá)觸發(fā)時間時,上次執(zhí)行還沒有結(jié)束
錯過觸發(fā)怎么辦?
Misfire 策略設(shè)置,每一種 Trigger 都定義了自己的 Misfire 策略,不同的策略通過不同的方法來設(shè)置
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionNowWithExistingCount()
.withIntervalInSeconds(1)
.repeatForever()).build();
一般來說有 3 種:
1、 忽略
2、 立即跑一次
3、 下次跑
文章參考:
Quartz Scheduler misfireThreshold屬性的意義與觸發(fā)器超時后的處理策略
怎么避免任務(wù)錯過觸發(fā)?
合理地設(shè)置線程池數(shù)量,以及任務(wù)觸發(fā)間隔