quartz答疑

這篇筆記記錄我在閱讀quartz源碼的時(shí)候是如何分析的,如何去查找問題的.

1. 任務(wù)的狀態(tài)

可以參考https://segmentfault.com/a/1190000015492260 寫的很好,分析很詳細(xì),這里我盜張圖,quartz狀態(tài)轉(zhuǎn)化如下:

image

2. 如何查詢?nèi)蝿?wù)

上節(jié)寫過,生產(chǎn)者線程會(huì)查所有要執(zhí)行的觸發(fā)器,在QuartzSchedulerThread的run方法中

 triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
  • idleWaitTime:默認(rèn)30s,可通過配置屬性org.quartz.scheduler.idleWaitTime設(shè)置。
  • availThreadCount`:獲取可用(空閑)的工作線程數(shù)量,總會(huì)大于1,因?yàn)樵摲椒〞?huì)一直阻塞,直到有工作線程空閑下來。
  • maxBatchSize:一次拉取trigger的最大數(shù)量,默認(rèn)是1,可通過org.quartz.scheduler.batchTriggerAcquisitionMaxCount改寫
  • batchTimeWindow:時(shí)間窗口調(diào)節(jié)參數(shù),默認(rèn)是0,可通過org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改寫
  • misfireThreshold: 超過這個(gè)時(shí)間還未觸發(fā)的trigger,被認(rèn)為發(fā)生了misfire,默認(rèn)60s,可通過org.quartz.jobStore.misfireThreshold設(shè)置。

我們使用的是數(shù)據(jù)庫存儲(chǔ),所以acquireNextTriggers調(diào)用的是org.quartz.impl.jdbcjobstore.JobStoreSupport#acquireNextTriggers。

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        //1. 判斷鎖,獲取鎖
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                      //這個(gè)方法是獲取觸發(fā)器的
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        try {
                            List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                            Set<String> fireInstanceIds = new HashSet<String>();
                            for (FiredTriggerRecord ft : acquired) {
                                fireInstanceIds.add(ft.getFireInstanceId());
                            }
                            for (OperableTrigger tr : result) {
                                if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (SQLException e) {
                            throw new JobPersistenceException("error validating trigger acquisition", e);
                        }
                    }
                });
    }

acquireNextTriggers方法先獲取鎖,然后回調(diào)調(diào)用acquireNextTrigger獲取觸發(fā)器,查詢觸發(fā)器的sql。

SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND 
NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

由sql可以得知,quartz可以查詢過去60s將來30s的觸發(fā)器。查詢出來后會(huì)把觸發(fā)器保存到QRTZ_FIRED_TRIGGERS表中,作用在第三節(jié)會(huì)講。

3. 如何保證任務(wù)不丟失

任務(wù)在什么情況下會(huì)丟失。沒有多余的消費(fèi)者線程可以消費(fèi),服務(wù)器重啟導(dǎo)致任務(wù)丟失。針對(duì)這兩種情況,quartz是如何做的

3.1 線程堵塞導(dǎo)致任務(wù)丟失

如何模擬線程堵塞,這里我把消費(fèi)者線程池大小設(shè)置為1

spring:
  quartz:
    job-store-type: jdbc
    overwriteExistingJobs: true
    properties:
      org.quartz.threadPool.threadCount: 1

任務(wù)每5秒執(zhí)行一次,但是執(zhí)行的時(shí)候睡眠10s中,這樣就會(huì)導(dǎo)致任務(wù)丟失。

public class TestTaskJob1 extends QuartzJobBean {
    private static final Logger logger = LoggerFactory.getLogger(TestTaskJob1.class);

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        String localDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        logger.info("TestTaskJob1-->" + localDateTime);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在第二節(jié)中,在查詢觸發(fā)器的sql中有一句(MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)這樣的條件,MISFIRE_INSTR的值如果是-1,則會(huì)一直查詢出來,那么這個(gè)值代表什么意思呢?可以查看http://www.itdecent.cn/p/634d2a6fae7b, 通過設(shè)置misfire的值,可以設(shè)置保證任務(wù)不丟失。不過執(zhí)行時(shí)間有延遲而已。

3.2 服務(wù)器重啟導(dǎo)致

misfire可以保證阻塞狀態(tài)下,任務(wù)不丟失,但是如果正在執(zhí)行過程中,服務(wù)器掛了,如何保證不丟失?
在第二節(jié)中講到,查詢觸發(fā)器的時(shí)候,會(huì)把查詢的觸發(fā)器插入到QRTZ_FIRED_TRIGGERS表中,當(dāng)服務(wù)器重啟的時(shí)候,會(huì)去讀取這張表,將任務(wù)恢復(fù)執(zhí)行。

4. 如何保證分布式一致性

為了保證任務(wù)的可靠用,我們基本都會(huì)部署多臺(tái)服務(wù)器,但是部署多臺(tái)服務(wù)器就會(huì)出現(xiàn)任務(wù)在多臺(tái)服務(wù)器中被執(zhí)行,這種情況該如何處理。

在第二節(jié)中獲取觸發(fā)器的時(shí)候,獲取觸發(fā)器是通過executeInNonManagedTXLock方法回調(diào)的,看下executeInNonManagedTXLock的實(shí)現(xiàn)邏輯.

protected <T> T executeInNonManagedTXLock(
            String lockName, 
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
           
           if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection 
                // until after acquiring the lock since it isn't needed.
                //1. 獲取數(shù)據(jù)庫連接
                    if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                //2.獲取鎖
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            
            if (conn == null) {
                conn = getNonManagedTXConnection();
            }
            //3. 回調(diào)查詢結(jié)果
            final T result = txCallback.execute(conn);
            try {
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                    @Override
                    public Boolean execute(Connection conn) throws JobPersistenceException {
                        return txValidator.validate(conn, result);
                    }
                })) {
                    throw e;
                }
            }

            Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
            if(sigTime != null && sigTime >= 0) {
                signalSchedulingChangeImmediately(sigTime);
            }
            
            return result;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (RuntimeException e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Unexpected runtime exception: "
                    + e.getMessage(), e);
        } finally {
            try {
                releaseLock(lockName, transOwner);
            } finally {
                cleanupConnection(conn);
            }
        }
    }

主要分析getLockHandler().obtainLock(conn, lockName);這段邏輯,getLockHandler是獲取鎖處理對(duì)象,因?yàn)槭褂玫氖菙?shù)據(jù)庫模式,所以是DBSemaphore#obtainLock。

public boolean obtainLock(Connection conn, String lockName)
        throws LockException {

        if(log.isDebugEnabled()) {
            log.debug(
                "Lock '" + lockName + "' is desired by: "
                        + Thread.currentThread().getName());
        }
            //1. 判斷是不是自己獲取了鎖,鎖可重入
        if (!isLockOwner(lockName)) {

          //2. 執(zhí)行sql獲取鎖
            executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
            
            if(log.isDebugEnabled()) {
                log.debug(
                    "Lock '" + lockName + "' given to: "
                            + Thread.currentThread().getName());
            }
           //3. 如果取到鎖,把鎖放到threadLocal中
            getThreadLocks().add(lockName);
            //getThreadLocksObtainer().put(lockName, new
            // Exception("Obtainer..."));
        } else if(log.isDebugEnabled()) {
            log.debug(
                "Lock '" + lockName + "' Is already owned by: "
                        + Thread.currentThread().getName());
        }

        return true;
    }

獲取鎖的邏輯是:

  1. 判斷鎖是不是已經(jīng)被自己獲取了,判斷的邏輯是threadLocal中是不是有值,這個(gè)目的是鎖可以重入。

  2. 執(zhí)行sql獲取鎖,

    SELECT * FROM QRTZ_LOCKS WHERE SCHED_NAME = 'quartzScheduler' AND LOCK_NAME = ? FOR UPDATE
    

    根據(jù)鎖的名字,使用for update獲取行鎖。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容