quartz源碼閱讀

前面的話

這里只對(duì)quartz的源碼做一個(gè)整體的梳理,關(guān)于quartz的整體結(jié)構(gòu),百度Google之,一堆一堆的。

具體閱讀

quartz中主要圍繞3個(gè)東東搞各種邏輯。分別是調(diào)度器(Scheduler),觸發(fā)器(trigger)和任務(wù)(job)。調(diào)度器去獲取觸發(fā)器,觸發(fā)器指定任務(wù)的調(diào)度時(shí)間,調(diào)度策略,調(diào)度狀態(tài),優(yōu)先級(jí),開始時(shí)間,結(jié)束時(shí)間等信息。任務(wù)就是具體的業(yè)務(wù)邏輯實(shí)現(xiàn)。

一個(gè)栗子進(jìn)入代碼

        SchedulerFactory factory = new StdSchedulerFactory("test_quartz.properties");
        Scheduler scheduler = factory.getScheduler();
        scheduler.start();
        Trigger t = newTrigger().withIdentity("t1","g1").startAt(new Date(1466746025000l)).withSchedule(simpleSchedule().withMisfireHandlingInstructionNextWithRemainingCount().withRepeatCount(0)).build();
        JobDetail job = newJob(TestJob.class).withIdentity("myJob1", "g1").build();
        scheduler.scheduleJob(job,t);
    }
public class TestJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            Thread.sleep(20000);
            System.out.println(context.getTrigger().getKey()+"執(zhí)行成功?。?!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面兩段代碼就是一個(gè)簡(jiǎn)單的任務(wù)的寫法。
主要過(guò)程如下:
1、首先通過(guò)調(diào)度器工廠獲取一個(gè)調(diào)度器。啟動(dòng)調(diào)度器。
2、定義觸發(fā)器。
3、定義任務(wù)。
4、通過(guò)調(diào)度器將觸發(fā)器和任務(wù)關(guān)聯(lián)起來(lái)。
首先來(lái)看下調(diào)度器的初始化。
調(diào)度器工廠初始化主要是讀取配置信息。通過(guò)getScheduler方法才是真正的初始化scheduler,里邊主要是通過(guò)配置信息組裝scheduler。這里不是重點(diǎn),一筆帶過(guò)。【注:在調(diào)度器組裝的時(shí)候,順便啟動(dòng)了任務(wù)的執(zhí)行線程
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);

,只是在線程啟動(dòng)后一直等待,知道調(diào)度器調(diào)用start方法

while (paused && !halted.get()) {
                      try {
                          // wait until togglePause(false) is called...
                          sigLock.wait(1000L);
                      } catch (InterruptedException ignore) {
                      }

下面試調(diào)度器的啟動(dòng)。

調(diào)度器啟動(dòng)過(guò)程

quartz支持集群模式下的任務(wù)調(diào)度。任務(wù)持久化采用DB的方式。
這里主要涉及集群模式下的任務(wù)執(zhí)行過(guò)程。
啟動(dòng)過(guò)程代碼如下:

 public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }
        notifySchedulerListenersStarting();
        if (initialStart == null) {
            initialStart = new Date();
    //調(diào)度器第一次啟動(dòng)
            this.resources.getJobStore().schedulerStarted();            
            startPlugins();
        } else {
          
            resources.getJobStore().schedulerResumed();
        }
    //將執(zhí)行線程喚醒。用于獲取觸發(fā)器,觸發(fā)任務(wù)
        schedThread.togglePause(false);
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");        
        notifySchedulerListenersStarted();
    }
public void schedulerStarted() throws SchedulerException {

//如果是集群,啟動(dòng)集群的管理線程,定時(shí)檢查集群的健康性。
        if (isClustered()) {
            clusterManagementThread = new ClusterManager();
            if(initializersLoader != null)
                clusterManagementThread.setContextClassLoader(initializersLoader);
            clusterManagementThread.initialize();
        } else {
            try {
  //非集群,則恢復(fù)調(diào)度器宕機(jī)前的任務(wù)
                recoverJobs();
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }
//起線程,用于檢查是否有任務(wù)錯(cuò)過(guò)執(zhí)行時(shí)間,若有,則根據(jù)不同的策略修改不同的nextfiretime值,以便于工作線程去選擇trigger。
        misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        misfireHandler.initialize();
        schedulerRunning = true;
        
        getLog().debug("JobStore background threads started (as scheduler was started).");
    }

下面深入對(duì)三大線程做講解。QuartzSchedulerThread,ClusterManager和MisfireHandler。

QuartzSchedulerThread

這個(gè)線程是quartz的主要線程,負(fù)責(zé)調(diào)度的??聪麓a:
run方法很長(zhǎng),這里選取主要的代碼。

  public void run() {
        int acquiresFailed = 0;
        while (!halted.get()) {
            try {
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                           //scheduler調(diào)用start方法前,在此處循環(huán),直到start方法里調(diào)用了togglePause方法。
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
。。。
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
                    try {
                      //獲取當(dāng)前可以被觸發(fā)的trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                     。。。

                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
                        if(goAhead) {
                            try {
                              //觸發(fā)trigger,主要是修改qrtz_trigger表中trigger的狀態(tài)從acquired狀態(tài)變成WATTING或者complete狀態(tài)。并計(jì)算下一次執(zhí)行時(shí)間,等待下一次被選中。同時(shí)修改QRTZ_FIRED_TRIGGERS中trigger狀態(tài)為executing狀態(tài)。
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
。。。
                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }
                            //真正的到了調(diào)用job的execute的地方了,該方法執(zhí)行完成之后,本次調(diào)度就真正完成了。
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
 //后面就是該線程等待一段時(shí)間,用于其他節(jié)點(diǎn)來(lái)調(diào)度任務(wù)。
    }

總結(jié)起來(lái),上面代碼的邏輯如下:
1、啟動(dòng)時(shí),該線程一直都在等待,知道有調(diào)用scheudler的start方法,開始喚醒該線程。
2、查看當(dāng)前任務(wù)的處理線程池里空閑線程的個(gè)數(shù),然后去qrtz_triggers表中獲取可以處理的trigger,并將trigger的狀態(tài)改為acquired,同時(shí)插入表qrtz_fired_triggers,此時(shí)qrtz_fired_triggers表中trigger的狀態(tài)也為acquired。
3、獲取到待執(zhí)行的trigger,由于取的是時(shí)間窗里的trigger,所以,從待執(zhí)行的trigger列表中取第一個(gè)trigger(trigger列表是按照next_fire_time升序排列),與當(dāng)前時(shí)間比較,如果大于2s,則等待。
4、等到第一個(gè)trigger的任務(wù)到了,則去qrtz_triggers表中再次確認(rèn)獲取到的trigger的狀態(tài)是否為aquired,若是,則修改qrtz_fired_triggers狀態(tài)為executing。同時(shí),qrtz_triggers中的狀態(tài)在本次調(diào)度時(shí)已經(jīng)走到盡頭,可以等待下一次的調(diào)度了。即,計(jì)算下一次的調(diào)度時(shí)間,將并將任務(wù)狀態(tài)改為watting狀態(tài)。若計(jì)算得到的下一次調(diào)度時(shí)間為null,則表明該任務(wù)已經(jīng)執(zhí)行完成。將任務(wù)改為complete狀態(tài)。返回待本次調(diào)度的trigger。
5、循環(huán)trigger,獲取任務(wù)執(zhí)行線程,執(zhí)行任務(wù)的execute方法。
6、改調(diào)度線程wait一段時(shí)間,等待下一次獲取trigger,調(diào)度。
接下來(lái)看下真正調(diào)度的線程JobRunShell,同樣,很長(zhǎng)的run方法,這里只摘取部分代碼:

 public void run() {
        qs.addInternalSchedulerListener(this);
        try {
            OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
            JobDetail jobDetail = jec.getJobDetail();
            do {
                JobExecutionException jobExEx = null;
                Job job = jec.getJobInstance();
......
                long startTime = System.currentTimeMillis();
                long endTime = startTime;
                try {
                    job.execute(jec);
                    endTime = System.currentTimeMillis();
                } catch (JobExecutionException jee) {
                    endTime = System.currentTimeMillis();
                    jobExEx = jee;
                    getLog().info("Job " + jobDetail.getKey() +
                            " threw a JobExecutionException: ", jobExEx);
                } catch (Throwable e) {
                    endTime = System.currentTimeMillis();
                    getLog().error("Job " + jobDetail.getKey() +
                            " threw an unhandled Exception: ", e);
                    SchedulerException se = new SchedulerException(
                            "Job threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError("Job ("
                            + jec.getJobDetail().getKey()
                            + " threw an exception.", se);
                    jobExEx = new JobExecutionException(se, false);
                }

                jec.setJobRunTime(endTime - startTime);
......
                CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
                try {
                    instCode = trigger.executionComplete(jec, jobExEx);
                } catch (Exception e) {
                    // If this happens, there's a bug in the trigger...
                    SchedulerException se = new SchedulerException(
                            "Trigger threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError(
                            "Please report this error to the Quartz developers.",
                            se);
                }
                if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
                    jec.incrementRefireCount();
                    try {
                        complete(false);
                    } catch (SchedulerException se) {
                        qs.notifySchedulerListenersError("Error executing Job ("
                                + jec.getJobDetail().getKey()
                                + ": couldn't finalize execution.", se);
                    }
                    continue;
                }

                try {
                    complete(true);
                } catch (SchedulerException se) {
                    qs.notifySchedulerListenersError("Error executing Job ("
                            + jec.getJobDetail().getKey()
                            + ": couldn't finalize execution.", se);
                    continue;
                }

                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                break;
            } while (true);

        } finally {
            qs.removeInternalSchedulerListener(this);
        }
    }

上述代碼的邏輯很簡(jiǎn)單,就是獲取job并執(zhí)行job的execute方法。執(zhí)行完成之后,通過(guò)不同的返回碼,進(jìn)行不同的數(shù)據(jù)庫(kù)操作。 qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);這句話就是通過(guò)不同的返回值做不同的數(shù)據(jù)庫(kù)操作。主要是修改qrtz_triggers里的trigger狀態(tài)及某些場(chǎng)景下刪除trigger。然后是刪除qrtz_fired_triggers里的當(dāng)前trigger。
到此,正常的任務(wù)調(diào)度完成了。當(dāng)然其中很多步驟里都調(diào)用了SchedulerListener,TriggerListener中的一些方法,這些是quartz開放出來(lái)的定制接口,方便每步操作時(shí),我們對(duì)任務(wù)的監(jiān)控。

狀態(tài)轉(zhuǎn)換

接下來(lái)看misfired的進(jìn)程MisfireHandler。

Misfirehandler是一個(gè)內(nèi)部類。run接口 代碼如下:

  public void run() {
            
            while (!shutdown) {

                long sTime = System.currentTimeMillis();

//獲取misfired的job
                RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();

// 如果任務(wù)處理線程在等待下一次的掃描滿足的trigger,則喚醒線程,來(lái)處理misfired的任務(wù)
                if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
                    signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
                }

                if (!shutdown) {
                    long timeToSleep = 50l;  // At least a short pause to help balance threads
                    if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
                        timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
                        if (timeToSleep <= 0) {
                            timeToSleep = 50l;
                        }

                        if(numFails > 0) {
                            timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
                        }
                    }
                    
                    try {
                        Thread.sleep(timeToSleep);
                    } catch (Exception ignore) {
                    }
                }//while !shutdown
            }
        }

主要邏輯在manager方法里。

 private RecoverMisfiredJobsResult manage() {
            try {
                getLog().debug("MisfireHandler: scanning for misfires...");

                RecoverMisfiredJobsResult res = doRecoverMisfires();
                numFails = 0;
                return res;
            } catch (Exception e) {
                if(numFails % 4 == 0) {
                    getLog().error(
                        "MisfireHandler: Error handling misfires: "
                                + e.getMessage(), e);
                }
                numFails++;
            }
            return RecoverMisfiredJobsResult.NO_OP;
        }

主要的方法是doRecoverMisfires()。

protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = getNonManagedTXConnection();
        try {
            RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
            
            // Before we make the potentially expensive call to acquire the 
            // trigger lock, peek ahead to see if it is likely we would find
            // misfired triggers requiring recovery.
            int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
                getDelegate().countMisfiredTriggersInState(
                    conn, STATE_WAITING, getMisfireTime()) : 
                Integer.MAX_VALUE;
            
            if (misfireCount == 0) {
                getLog().debug(
                    "Found 0 triggers that missed their scheduled fire-time.");
            } else {
                transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
                //修改misfired的next fired time ,等待任務(wù)選取線程去調(diào)度
                result = recoverMisfiredJobs(conn, false);
            }
            
            commitConnection(conn);
            return result;

int misfireCount = (getDoubleCheckLockMisfireHandler()) ? getDelegate().countMisfiredTriggersInState( conn,STATE_WAITING,getMisfireTime()) :
獲取misfired的trigger,執(zhí)行的查詢?yōu)?code>select count(TRIGGER_NAME) from QRTZ_TRIGGERS where SCHED_NAME=xxx and not (MISFIRE_INSTR = -1 ) and NEXT_FIRE_TIME < 當(dāng)前時(shí)間 and TRIGGER_STATE='STATE_WAITING'即選取當(dāng)前調(diào)度器的misfired的策略不為-1的,且下一次執(zhí)行時(shí)間小于當(dāng)前時(shí)間的且狀態(tài)為waiting的trigger。
result = recoverMisfiredJobs(conn, false);真正獲取misfired的job的時(shí)候了。

protected RecoverMisfiredJobsResult recoverMisfiredJobs(
        Connection conn, boolean recovering)
        throws JobPersistenceException, SQLException {

        // If recovering, we want to handle all of the misfired
        // triggers right away.
        int maxMisfiresToHandleAtATime = 
            (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
        
        List<TriggerKey> misfiredTriggers = new LinkedList<TriggerKey>();
        long earliestNewTime = Long.MAX_VALUE;
        // We must still look for the MISFIRED state in case triggers were left 
        // in this state when upgrading to this version that does not support it. 
        boolean hasMoreMisfiredTriggers =
            getDelegate().hasMisfiredTriggersInState(
                conn, STATE_WAITING, getMisfireTime(), 
                maxMisfiresToHandleAtATime, misfiredTriggers);

        if (hasMoreMisfiredTriggers) {
            getLog().info(
                "Handling the first " + misfiredTriggers.size() +
                " triggers that missed their scheduled fire-time.  " +
                "More misfired triggers remain to be processed.");
        } else if (misfiredTriggers.size() > 0) { 
            getLog().info(
                "Handling " + misfiredTriggers.size() + 
                " trigger(s) that missed their scheduled fire-time.");
        } else {
            getLog().debug(
                "Found 0 triggers that missed their scheduled fire-time.");
            return RecoverMisfiredJobsResult.NO_OP; 
        }

        for (TriggerKey triggerKey: misfiredTriggers) {
            
            OperableTrigger trig = 
                retrieveTrigger(conn, triggerKey);

            if (trig == null) {
                continue;
            }

            doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

            if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
                earliestNewTime = trig.getNextFireTime().getTime();
        }

        return new RecoverMisfiredJobsResult(
                hasMoreMisfiredTriggers, misfiredTriggers.size(), earliestNewTime);
    }

每次獲取misfired的trigger有一定的數(shù)量,默認(rèn)20個(gè),超過(guò)20個(gè),則會(huì)在下一次去獲取。
處理misfired的triggerdoUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);

 private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException {
        Calendar cal = null;
        if (trig.getCalendarName() != null) {
            cal = retrieveCalendar(conn, trig.getCalendarName());
        }

//觸發(fā)triggerlistener的misfired方法。
        schedSignaler.notifyTriggerListenersMisfired(trig);

//根據(jù)不同的misfired的策略計(jì)算next_fired_time?!颈容^的繞,下次詳細(xì)介紹】
        trig.updateAfterMisfire(cal);

        if (trig.getNextFireTime() == null) {
//如果下一次執(zhí)行的時(shí)間為空,則認(rèn)為執(zhí)行的任務(wù)已經(jīng)完成了。直接修改狀態(tài)的state_complete
            storeTrigger(conn, trig,
                null, true, STATE_COMPLETE, forceState, recovering);
            schedSignaler.notifySchedulerListenersFinalized(trig);
        } else {
//修改任務(wù)的下一次執(zhí)行時(shí)間和任務(wù)狀態(tài),等待調(diào)度線程去調(diào)度
            storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                    forceState, recovering);
        }
    }

執(zhí)行完上面的代碼,提交了數(shù)據(jù)庫(kù)事物后,任務(wù)就可以被正常調(diào)度了。到這里,misfired的任務(wù)大體的也完成。
然后就是回到manager方法了。喚醒調(diào)度線程,至此,misfired的本次掃描全部完成,接下來(lái)的事情就交給QuartzSchedulerThread來(lái)處理了。
寫了好幾天,終于寫完了兩個(gè)線程的處理過(guò)程。接下來(lái)的一篇主要介紹任務(wù)節(jié)點(diǎn)在down機(jī)的時(shí)候的處理及不同情況下trigger的next_fire_time的計(jì)算。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概述 了解Quartz體系結(jié)構(gòu) Quartz對(duì)任務(wù)調(diào)度的領(lǐng)域問(wèn)題進(jìn)行了高度的抽象,提出了調(diào)度器、任務(wù)和觸發(fā)器這3個(gè)...
    張晨輝Allen閱讀 2,313評(píng)論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評(píng)論 19 139
  • ** 版本:2.2.1 ** Hello world: 調(diào)度器: 任務(wù)詳情:任務(wù)體實(shí)現(xiàn)Job接口 觸發(fā)器: 執(zhí)行調(diào)...
    Coselding閱讀 10,384評(píng)論 12 38
  • 老李去扶貧,這是組織決定的。 清早來(lái)到車庫(kù),他犯難了,開哪輛車呢?老李有四輛車,第一輛普桑,他三十...
    日大俠閱讀 274評(píng)論 0 0
  • 有一個(gè)高中同學(xué),年薪如何如何,房子車子如何如何,這么多年偶爾給我電話,每次嘮叨的事情都一樣,不同的人生階段不同的收...
    by_10閱讀 414評(píng)論 1 1

友情鏈接更多精彩內(nèi)容