前面的話
這里只對(duì)quartz的源碼做一個(gè)整體的梳理,關(guān)于quartz的整體結(jié)構(gòu),百度Google之,一堆一堆的。
具體閱讀
quartz中主要圍繞3個(gè)東東搞各種邏輯。分別是調(diào)度器(Scheduler),觸發(fā)器(trigger)和任務(wù)(job)。調(diào)度器去獲取觸發(fā)器,觸發(fā)器指定任務(wù)的調(diào)度時(shí)間,調(diào)度策略,調(diào)度狀態(tài),優(yōu)先級(jí),開始時(shí)間,結(jié)束時(shí)間等信息。任務(wù)就是具體的業(yè)務(wù)邏輯實(shí)現(xiàn)。
一個(gè)栗子進(jìn)入代碼
SchedulerFactory factory = new StdSchedulerFactory("test_quartz.properties");
Scheduler scheduler = factory.getScheduler();
scheduler.start();
Trigger t = newTrigger().withIdentity("t1","g1").startAt(new Date(1466746025000l)).withSchedule(simpleSchedule().withMisfireHandlingInstructionNextWithRemainingCount().withRepeatCount(0)).build();
JobDetail job = newJob(TestJob.class).withIdentity("myJob1", "g1").build();
scheduler.scheduleJob(job,t);
}
public class TestJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
Thread.sleep(20000);
System.out.println(context.getTrigger().getKey()+"執(zhí)行成功?。?!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上面兩段代碼就是一個(gè)簡(jiǎn)單的任務(wù)的寫法。
主要過(guò)程如下:
1、首先通過(guò)調(diào)度器工廠獲取一個(gè)調(diào)度器。啟動(dòng)調(diào)度器。
2、定義觸發(fā)器。
3、定義任務(wù)。
4、通過(guò)調(diào)度器將觸發(fā)器和任務(wù)關(guān)聯(lián)起來(lái)。
首先來(lái)看下調(diào)度器的初始化。
調(diào)度器工廠初始化主要是讀取配置信息。通過(guò)getScheduler方法才是真正的初始化scheduler,里邊主要是通過(guò)配置信息組裝scheduler。這里不是重點(diǎn),一筆帶過(guò)。【注:在調(diào)度器組裝的時(shí)候,順便啟動(dòng)了任務(wù)的執(zhí)行線程
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
throws SchedulerException {
this.resources = resources;
if (resources.getJobStore() instanceof JobListener) {
addInternalJobListener((JobListener)resources.getJobStore());
}
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
,只是在線程啟動(dòng)后一直等待,知道調(diào)度器調(diào)用start方法
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
】
下面試調(diào)度器的啟動(dòng)。
調(diào)度器啟動(dòng)過(guò)程
quartz支持集群模式下的任務(wù)調(diào)度。任務(wù)持久化采用DB的方式。
這里主要涉及集群模式下的任務(wù)執(zhí)行過(guò)程。
啟動(dòng)過(guò)程代碼如下:
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
//調(diào)度器第一次啟動(dòng)
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
//將執(zhí)行線程喚醒。用于獲取觸發(fā)器,觸發(fā)任務(wù)
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
notifySchedulerListenersStarted();
}
public void schedulerStarted() throws SchedulerException {
//如果是集群,啟動(dòng)集群的管理線程,定時(shí)檢查集群的健康性。
if (isClustered()) {
clusterManagementThread = new ClusterManager();
if(initializersLoader != null)
clusterManagementThread.setContextClassLoader(initializersLoader);
clusterManagementThread.initialize();
} else {
try {
//非集群,則恢復(fù)調(diào)度器宕機(jī)前的任務(wù)
recoverJobs();
} catch (SchedulerException se) {
throw new SchedulerConfigException(
"Failure occured during job recovery.", se);
}
}
//起線程,用于檢查是否有任務(wù)錯(cuò)過(guò)執(zhí)行時(shí)間,若有,則根據(jù)不同的策略修改不同的nextfiretime值,以便于工作線程去選擇trigger。
misfireHandler = new MisfireHandler();
if(initializersLoader != null)
misfireHandler.setContextClassLoader(initializersLoader);
misfireHandler.initialize();
schedulerRunning = true;
getLog().debug("JobStore background threads started (as scheduler was started).");
}
下面深入對(duì)三大線程做講解。QuartzSchedulerThread,ClusterManager和MisfireHandler。
QuartzSchedulerThread
這個(gè)線程是quartz的主要線程,負(fù)責(zé)調(diào)度的??聪麓a:
run方法很長(zhǎng),這里選取主要的代碼。
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
//scheduler調(diào)用start方法前,在此處循環(huán),直到start方法里調(diào)用了togglePause方法。
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
。。。
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
try {
//獲取當(dāng)前可以被觸發(fā)的trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
。。。
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
if(goAhead) {
try {
//觸發(fā)trigger,主要是修改qrtz_trigger表中trigger的狀態(tài)從acquired狀態(tài)變成WATTING或者complete狀態(tài)。并計(jì)算下一次執(zhí)行時(shí)間,等待下一次被選中。同時(shí)修改QRTZ_FIRED_TRIGGERS中trigger狀態(tài)為executing狀態(tài)。
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
。。。
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//真正的到了調(diào)用job的execute的地方了,該方法執(zhí)行完成之后,本次調(diào)度就真正完成了。
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
//后面就是該線程等待一段時(shí)間,用于其他節(jié)點(diǎn)來(lái)調(diào)度任務(wù)。
}
總結(jié)起來(lái),上面代碼的邏輯如下:
1、啟動(dòng)時(shí),該線程一直都在等待,知道有調(diào)用scheudler的start方法,開始喚醒該線程。
2、查看當(dāng)前任務(wù)的處理線程池里空閑線程的個(gè)數(shù),然后去qrtz_triggers表中獲取可以處理的trigger,并將trigger的狀態(tài)改為acquired,同時(shí)插入表qrtz_fired_triggers,此時(shí)qrtz_fired_triggers表中trigger的狀態(tài)也為acquired。
3、獲取到待執(zhí)行的trigger,由于取的是時(shí)間窗里的trigger,所以,從待執(zhí)行的trigger列表中取第一個(gè)trigger(trigger列表是按照next_fire_time升序排列),與當(dāng)前時(shí)間比較,如果大于2s,則等待。
4、等到第一個(gè)trigger的任務(wù)到了,則去qrtz_triggers表中再次確認(rèn)獲取到的trigger的狀態(tài)是否為aquired,若是,則修改qrtz_fired_triggers狀態(tài)為executing。同時(shí),qrtz_triggers中的狀態(tài)在本次調(diào)度時(shí)已經(jīng)走到盡頭,可以等待下一次的調(diào)度了。即,計(jì)算下一次的調(diào)度時(shí)間,將并將任務(wù)狀態(tài)改為watting狀態(tài)。若計(jì)算得到的下一次調(diào)度時(shí)間為null,則表明該任務(wù)已經(jīng)執(zhí)行完成。將任務(wù)改為complete狀態(tài)。返回待本次調(diào)度的trigger。
5、循環(huán)trigger,獲取任務(wù)執(zhí)行線程,執(zhí)行任務(wù)的execute方法。
6、改調(diào)度線程wait一段時(shí)間,等待下一次獲取trigger,調(diào)度。
接下來(lái)看下真正調(diào)度的線程JobRunShell,同樣,很長(zhǎng)的run方法,這里只摘取部分代碼:
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
......
long startTime = System.currentTimeMillis();
long endTime = startTime;
try {
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
......
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
continue;
}
try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
continue;
}
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
上述代碼的邏輯很簡(jiǎn)單,就是獲取job并執(zhí)行job的execute方法。執(zhí)行完成之后,通過(guò)不同的返回碼,進(jìn)行不同的數(shù)據(jù)庫(kù)操作。 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);這句話就是通過(guò)不同的返回值做不同的數(shù)據(jù)庫(kù)操作。主要是修改qrtz_triggers里的trigger狀態(tài)及某些場(chǎng)景下刪除trigger。然后是刪除qrtz_fired_triggers里的當(dāng)前trigger。
到此,正常的任務(wù)調(diào)度完成了。當(dāng)然其中很多步驟里都調(diào)用了SchedulerListener,TriggerListener中的一些方法,這些是quartz開放出來(lái)的定制接口,方便每步操作時(shí),我們對(duì)任務(wù)的監(jiān)控。

接下來(lái)看misfired的進(jìn)程MisfireHandler。
Misfirehandler是一個(gè)內(nèi)部類。run接口 代碼如下:
public void run() {
while (!shutdown) {
long sTime = System.currentTimeMillis();
//獲取misfired的job
RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
// 如果任務(wù)處理線程在等待下一次的掃描滿足的trigger,則喚醒線程,來(lái)處理misfired的任務(wù)
if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
}
if (!shutdown) {
long timeToSleep = 50l; // At least a short pause to help balance threads
if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
if (timeToSleep <= 0) {
timeToSleep = 50l;
}
if(numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}//while !shutdown
}
}
主要邏輯在manager方法里。
private RecoverMisfiredJobsResult manage() {
try {
getLog().debug("MisfireHandler: scanning for misfires...");
RecoverMisfiredJobsResult res = doRecoverMisfires();
numFails = 0;
return res;
} catch (Exception e) {
if(numFails % 4 == 0) {
getLog().error(
"MisfireHandler: Error handling misfires: "
+ e.getMessage(), e);
}
numFails++;
}
return RecoverMisfiredJobsResult.NO_OP;
}
主要的方法是doRecoverMisfires()。
protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
boolean transOwner = false;
Connection conn = getNonManagedTXConnection();
try {
RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
// Before we make the potentially expensive call to acquire the
// trigger lock, peek ahead to see if it is likely we would find
// misfired triggers requiring recovery.
int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
getDelegate().countMisfiredTriggersInState(
conn, STATE_WAITING, getMisfireTime()) :
Integer.MAX_VALUE;
if (misfireCount == 0) {
getLog().debug(
"Found 0 triggers that missed their scheduled fire-time.");
} else {
transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//修改misfired的next fired time ,等待任務(wù)選取線程去調(diào)度
result = recoverMisfiredJobs(conn, false);
}
commitConnection(conn);
return result;
int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate().countMisfiredTriggersInState( conn,STATE_WAITING,getMisfireTime()) :
獲取misfired的trigger,執(zhí)行的查詢?yōu)?code>select count(TRIGGER_NAME) from QRTZ_TRIGGERS where SCHED_NAME=xxx and not (MISFIRE_INSTR = -1 ) and NEXT_FIRE_TIME < 當(dāng)前時(shí)間 and TRIGGER_STATE='STATE_WAITING'即選取當(dāng)前調(diào)度器的misfired的策略不為-1的,且下一次執(zhí)行時(shí)間小于當(dāng)前時(shí)間的且狀態(tài)為waiting的trigger。
result = recoverMisfiredJobs(conn, false);真正獲取misfired的job的時(shí)候了。
protected RecoverMisfiredJobsResult recoverMisfiredJobs(
Connection conn, boolean recovering)
throws JobPersistenceException, SQLException {
// If recovering, we want to handle all of the misfired
// triggers right away.
int maxMisfiresToHandleAtATime =
(recovering) ? -1 : getMaxMisfiresToHandleAtATime();
List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
long earliestNewTime = Long.MAX_VALUE;
// We must still look for the MISFIRED state in case triggers were left
// in this state when upgrading to this version that does not support it.
boolean hasMoreMisfiredTriggers =
getDelegate().hasMisfiredTriggersInState(
conn, STATE_WAITING, getMisfireTime(),
maxMisfiresToHandleAtATime, misfiredTriggers);
if (hasMoreMisfiredTriggers) {
getLog().info(
"Handling the first " + misfiredTriggers.size() +
" triggers that missed their scheduled fire-time. " +
"More misfired triggers remain to be processed.");
} else if (misfiredTriggers.size() > 0) {
getLog().info(
"Handling " + misfiredTriggers.size() +
" trigger(s) that missed their scheduled fire-time.");
} else {
getLog().debug(
"Found 0 triggers that missed their scheduled fire-time.");
return RecoverMisfiredJobsResult.NO_OP;
}
for (TriggerKey triggerKey: misfiredTriggers) {
OperableTrigger trig =
retrieveTrigger(conn, triggerKey);
if (trig == null) {
continue;
}
doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);
if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
earliestNewTime = trig.getNextFireTime().getTime();
}
return new RecoverMisfiredJobsResult(
hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
}
每次獲取misfired的trigger有一定的數(shù)量,默認(rèn)20個(gè),超過(guò)20個(gè),則會(huì)在下一次去獲取。
處理misfired的triggerdoUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);
private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
Calendar cal = null;
if (trig.getCalendarName() != null) {
cal = retrieveCalendar(conn, trig.getCalendarName());
}
//觸發(fā)triggerlistener的misfired方法。
schedSignaler.notifyTriggerListenersMisfired(trig);
//根據(jù)不同的misfired的策略計(jì)算next_fired_time?!颈容^的繞,下次詳細(xì)介紹】
trig.updateAfterMisfire(cal);
if (trig.getNextFireTime() == null) {
//如果下一次執(zhí)行的時(shí)間為空,則認(rèn)為執(zhí)行的任務(wù)已經(jīng)完成了。直接修改狀態(tài)的state_complete
storeTrigger(conn, trig,
null, true, STATE_COMPLETE, forceState, recovering);
schedSignaler.notifySchedulerListenersFinalized(trig);
} else {
//修改任務(wù)的下一次執(zhí)行時(shí)間和任務(wù)狀態(tài),等待調(diào)度線程去調(diào)度
storeTrigger(conn, trig, null, true, newStateIfNotComplete,
forceState, recovering);
}
}
執(zhí)行完上面的代碼,提交了數(shù)據(jù)庫(kù)事物后,任務(wù)就可以被正常調(diào)度了。到這里,misfired的任務(wù)大體的也完成。
然后就是回到manager方法了。喚醒調(diào)度線程,至此,misfired的本次掃描全部完成,接下來(lái)的事情就交給QuartzSchedulerThread來(lái)處理了。
寫了好幾天,終于寫完了兩個(gè)線程的處理過(guò)程。接下來(lái)的一篇主要介紹任務(wù)節(jié)點(diǎn)在down機(jī)的時(shí)候的處理及不同情況下trigger的next_fire_time的計(jì)算。