這篇筆記記錄我在閱讀quartz源碼的時(shí)候是如何分析的,如何去查找問題的.
1. 任務(wù)的狀態(tài)
可以參考https://segmentfault.com/a/1190000015492260 寫的很好,分析很詳細(xì),這里我盜張圖,quartz狀態(tài)轉(zhuǎn)化如下:

2. 如何查詢?nèi)蝿?wù)
上節(jié)寫過,生產(chǎn)者線程會(huì)查所有要執(zhí)行的觸發(fā)器,在QuartzSchedulerThread的run方法中
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
- idleWaitTime:默認(rèn)30s,可通過配置屬性
org.quartz.scheduler.idleWaitTime設(shè)置。 - availThreadCount`:獲取可用(空閑)的工作線程數(shù)量,總會(huì)大于1,因?yàn)樵摲椒〞?huì)一直阻塞,直到有工作線程空閑下來。
-
maxBatchSize:一次拉取trigger的最大數(shù)量,默認(rèn)是1,可通過org.quartz.scheduler.batchTriggerAcquisitionMaxCount改寫 -
batchTimeWindow:時(shí)間窗口調(diào)節(jié)參數(shù),默認(rèn)是0,可通過org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改寫 -
misfireThreshold: 超過這個(gè)時(shí)間還未觸發(fā)的trigger,被認(rèn)為發(fā)生了misfire,默認(rèn)60s,可通過org.quartz.jobStore.misfireThreshold設(shè)置。
我們使用的是數(shù)據(jù)庫存儲(chǔ),所以acquireNextTriggers調(diào)用的是org.quartz.impl.jdbcjobstore.JobStoreSupport#acquireNextTriggers。
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
//1. 判斷鎖,獲取鎖
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
//這個(gè)方法是獲取觸發(fā)器的
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
acquireNextTriggers方法先獲取鎖,然后回調(diào)調(diào)用acquireNextTrigger獲取觸發(fā)器,查詢觸發(fā)器的sql。
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND
NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
由sql可以得知,quartz可以查詢過去60s將來30s的觸發(fā)器。查詢出來后會(huì)把觸發(fā)器保存到QRTZ_FIRED_TRIGGERS表中,作用在第三節(jié)會(huì)講。
3. 如何保證任務(wù)不丟失
任務(wù)在什么情況下會(huì)丟失。沒有多余的消費(fèi)者線程可以消費(fèi),服務(wù)器重啟導(dǎo)致任務(wù)丟失。針對(duì)這兩種情況,quartz是如何做的
3.1 線程堵塞導(dǎo)致任務(wù)丟失
如何模擬線程堵塞,這里我把消費(fèi)者線程池大小設(shè)置為1
spring:
quartz:
job-store-type: jdbc
overwriteExistingJobs: true
properties:
org.quartz.threadPool.threadCount: 1
任務(wù)每5秒執(zhí)行一次,但是執(zhí)行的時(shí)候睡眠10s中,這樣就會(huì)導(dǎo)致任務(wù)丟失。
public class TestTaskJob1 extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(TestTaskJob1.class);
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
String localDateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
logger.info("TestTaskJob1-->" + localDateTime);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在第二節(jié)中,在查詢觸發(fā)器的sql中有一句(MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)這樣的條件,MISFIRE_INSTR的值如果是-1,則會(huì)一直查詢出來,那么這個(gè)值代表什么意思呢?可以查看http://www.itdecent.cn/p/634d2a6fae7b, 通過設(shè)置misfire的值,可以設(shè)置保證任務(wù)不丟失。不過執(zhí)行時(shí)間有延遲而已。
3.2 服務(wù)器重啟導(dǎo)致
misfire可以保證阻塞狀態(tài)下,任務(wù)不丟失,但是如果正在執(zhí)行過程中,服務(wù)器掛了,如何保證不丟失?
在第二節(jié)中講到,查詢觸發(fā)器的時(shí)候,會(huì)把查詢的觸發(fā)器插入到QRTZ_FIRED_TRIGGERS表中,當(dāng)服務(wù)器重啟的時(shí)候,會(huì)去讀取這張表,將任務(wù)恢復(fù)執(zhí)行。
4. 如何保證分布式一致性
為了保證任務(wù)的可靠用,我們基本都會(huì)部署多臺(tái)服務(wù)器,但是部署多臺(tái)服務(wù)器就會(huì)出現(xiàn)任務(wù)在多臺(tái)服務(wù)器中被執(zhí)行,這種情況該如何處理。
在第二節(jié)中獲取觸發(fā)器的時(shí)候,獲取觸發(fā)器是通過executeInNonManagedTXLock方法回調(diào)的,看下executeInNonManagedTXLock的實(shí)現(xiàn)邏輯.
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
//1. 獲取數(shù)據(jù)庫連接
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
//2.獲取鎖
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
//3. 回調(diào)查詢結(jié)果
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
@Override
public Boolean execute(Connection conn) throws JobPersistenceException {
return txValidator.validate(conn, result);
}
})) {
throw e;
}
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
if(sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} catch (RuntimeException e) {
rollbackConnection(conn);
throw new JobPersistenceException("Unexpected runtime exception: "
+ e.getMessage(), e);
} finally {
try {
releaseLock(lockName, transOwner);
} finally {
cleanupConnection(conn);
}
}
}
主要分析getLockHandler().obtainLock(conn, lockName);這段邏輯,getLockHandler是獲取鎖處理對(duì)象,因?yàn)槭褂玫氖菙?shù)據(jù)庫模式,所以是DBSemaphore#obtainLock。
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' is desired by: "
+ Thread.currentThread().getName());
}
//1. 判斷是不是自己獲取了鎖,鎖可重入
if (!isLockOwner(lockName)) {
//2. 執(zhí)行sql獲取鎖
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' given to: "
+ Thread.currentThread().getName());
}
//3. 如果取到鎖,把鎖放到threadLocal中
getThreadLocks().add(lockName);
//getThreadLocksObtainer().put(lockName, new
// Exception("Obtainer..."));
} else if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' Is already owned by: "
+ Thread.currentThread().getName());
}
return true;
}
獲取鎖的邏輯是:
判斷鎖是不是已經(jīng)被自己獲取了,判斷的邏輯是threadLocal中是不是有值,這個(gè)目的是鎖可以重入。
-
執(zhí)行sql獲取鎖,
SELECT * FROM QRTZ_LOCKS WHERE SCHED_NAME = 'quartzScheduler' AND LOCK_NAME = ? FOR UPDATE根據(jù)鎖的名字,使用for update獲取行鎖。