一、設(shè)計(jì)思路
借用官網(wǎng)的話:
將調(diào)度行為抽象形成“調(diào)度中心”公共平臺(tái),而平臺(tái)自身并不承擔(dān)業(yè)務(wù)邏輯,“調(diào)度中心”負(fù)責(zé)發(fā)起調(diào)度請(qǐng)求。
將任務(wù)抽象成分散的JobHandler,交由“執(zhí)行器”統(tǒng)一管理,“執(zhí)行器”負(fù)責(zé)接收調(diào)度請(qǐng)求并執(zhí)行對(duì)應(yīng)的JobHandler中業(yè)務(wù)邏輯。因此,“調(diào)度”和“任務(wù)”兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴(kuò)展性;
借用官網(wǎng)的圖:

簡(jiǎn)化的架構(gòu)圖:

二、啟動(dòng)原理
1.xxl-job-admin服務(wù)啟動(dòng)原理
啟動(dòng)XxlJobAdminApplication類,在spring容器實(shí)例化之前,會(huì)執(zhí)行實(shí)現(xiàn)了 InitializingBean 接口的 afterPropertiesSet() 的方法,這里是利用了springboot的拓展接口,來(lái)將xxl-job的相關(guān)bean給注冊(cè)到IOC容器當(dāng)中。然后執(zhí)行最關(guān)鍵的 xxlJobScheduler.init()

調(diào)用 XxlJobScheduler 的 init() 方法

JobRegistryHelper,JobFailMonitorHelper,JobCompleteHelper,JobLogReportHelper,JobScheduleHelper這五個(gè)類都是使用了餓漢式的單例模式(個(gè)人覺得還需要將構(gòu)造方法私有化),

a.調(diào)用 JobTriggerPoolHelper.toStart() 本質(zhì)就是調(diào)用JobTriggerPoolHelper的start()方法,構(gòu)造出兩個(gè)線程池,如下圖所示的,一個(gè)快觸發(fā)線程池,一個(gè)慢觸發(fā)線程池。

b.調(diào)用 JobRegistryHelper.getInstance().start(),方法內(nèi)部主要做了兩件事,一件事是初始化一個(gè)注冊(cè)或者移除的線程池 registryMonitorThread,然后創(chuàng)建一個(gè) registryMonitorThread 的守護(hù)線程。設(shè)置成守護(hù)線程。

單獨(dú)把線程構(gòu)造拿出來(lái)分析。
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
// 死循環(huán)
while (!toStop) {
try {
// 從 xxl_job_group 表中查詢出 自動(dòng)注冊(cè)的執(zhí)行器 (address_type:0 自動(dòng)注冊(cè),1:手動(dòng)注冊(cè))
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// 從 xxl_job_registry 表中找到更新時(shí)間 小于當(dāng)前時(shí)間+死亡間隔時(shí)間(就是找到注冊(cè)表中規(guī)定時(shí)間沒有更新的記錄)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
// 如果找到在規(guī)定時(shí)間內(nèi)沒有更新的注冊(cè),就直接刪除這些注冊(cè)執(zhí)行器
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 從 xxl_job_registry 表中找到更新時(shí)間 小于當(dāng)前時(shí)間+死亡間隔時(shí)間(就是找到注冊(cè)表中規(guī)定時(shí)間沒有更新的記錄)
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
// 遍歷當(dāng)前過(guò)期的注冊(cè)器,將過(guò)期的注冊(cè)器的 register_value 注冊(cè)地址保存到臨時(shí)變量 appAddressMap 中
for (XxlJobRegistry item: list) {
// 如果注冊(cè)器類型(registry_group) 是 EXECUTOR
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 刷新對(duì)應(yīng)執(zhí)行器地址和最新修改時(shí)間
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
// 休眠心跳的時(shí)間
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
C.調(diào)用 JobFailMonitorHelper.getInstance().start(),方法內(nèi)部創(chuàng)建一個(gè) monitorThread 的守護(hù)線程。設(shè)置成守護(hù)線程。

下面代碼詳細(xì)介紹 JobFailMonitorHelper.getInstance().start() 里面的 構(gòu)造的線程主要做的是什么事。
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor 死循環(huán)監(jiān)控,每10秒鐘(每次執(zhí)行完休眠十秒)執(zhí)行一遍監(jiān)控的內(nèi)容
while (!toStop) {
try {
// 從 xxl_job_log 查詢出失敗的日志記錄
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// 修改 xxl_job_log 將警報(bào)狀態(tài)改成 (-1鎖定狀態(tài)) 告警狀態(tài):0-默認(rèn)、1-無(wú)需告警、2-告警成功、3-告警失敗
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
// 根據(jù)失敗的日志id,查詢出該條日志
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
// 根據(jù)這條日志記錄的 執(zhí)行器Id查詢對(duì)應(yīng)的執(zhí)行器
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 如果 日志的重試次數(shù)大于0,就直接觸發(fā)JobTriggerPoolHelper.trigger()方法,這個(gè)方法就是admin遠(yuǎn)程調(diào)用執(zhí)行器的方法。
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
// 觸發(fā)完成,將失敗重試的次數(shù)減一,更新
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無(wú)需告警、2-告警成功、3-告警失敗
// 如果存在失敗的日志,發(fā)送警報(bào)郵件
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
// 最后更新一下 xxl_job_log 表的 alarm_status 狀態(tài)字段
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
// 休眠 10秒
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
調(diào)用 JobCompleteHelper.getInstance().start(),方法內(nèi)部創(chuàng)建一個(gè) monitorThread 的守護(hù)線程。設(shè)置成守護(hù)線程;以及一個(gè)callbackThreadPool線程池。

monitorThread 線程內(nèi)部的工作內(nèi)容
// for monitor
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// wait for JobTriggerPoolHelper-init
try {
// 上來(lái)休眠50毫秒,等待 JobTriggerPoolHelper 初始化完成
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
// monitor
while (!toStop) {
try {
// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過(guò)10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失?。? Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
// 處理日志,并更新執(zhí)行器的完成結(jié)果
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
// 休眠60秒,再執(zhí)行一次
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
E.最后看一下 JobScheduleHelper.getInstance().start(); 方法。方法里面最主要的就是起兩個(gè)線程,分別將兩個(gè)線程設(shè)置成守護(hù)線程,ringThread是最干活的線程,scheduleThread是檢測(cè)并調(diào)度執(zhí)行器任務(wù)的線程。

看一下這ringThread和scheduleThread里面都是干啥的。這里先總結(jié)一下,scheduleThread線程主要就是將需要執(zhí)行的定時(shí)器任務(wù)分個(gè)類,并維護(hù)每個(gè)定時(shí)器里面的下次執(zhí)行時(shí)間,以及處理 調(diào)度過(guò)期的 執(zhí)行器,要么立刻執(zhí)行一次,要么直接忽略,等待下次執(zhí)行,最后就是將需要在5秒內(nèi)執(zhí)行的定時(shí)器放進(jìn)一個(gè)map里面,交給ringThread線程去執(zhí)行定時(shí)器。而ringThread線程就是直接從map中拿到需要執(zhí)行的執(zhí)行器去執(zhí)行,并且每輪執(zhí)行只處理兩個(gè)時(shí)間點(diǎn)(毫秒級(jí))的所有執(zhí)行器。具體的可以看代碼里面的講解。
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 這里默認(rèn)么個(gè)定時(shí)任務(wù)線程執(zhí)行耗時(shí)50毫秒,每秒1000毫秒,可以執(zhí)行20個(gè)任務(wù),快線程池默認(rèn)是200個(gè)最大線程數(shù),慢線程默認(rèn)是100個(gè)最大線程數(shù),
// 所以當(dāng)線程數(shù)拉滿的情況下,每秒鐘可以處理任務(wù)數(shù)是:(100+200)*20 = 6000 ;所以這里的 preReadCount 表示預(yù)讀數(shù)(默認(rèn)最大:6000)
// 如果想要提高并發(fā)性,通過(guò)修改快慢線程池的最大線程數(shù)這個(gè)參數(shù)調(diào)節(jié)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
logger.info("==========================preReadCount ={}",preReadCount);
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// 利用數(shù)據(jù)庫(kù)的行鎖
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、預(yù)讀數(shù)據(jù)
long nowTime = System.currentTimeMillis();
// 由于分析了最多可以執(zhí)行6000個(gè)任務(wù),所以這里在去查任務(wù)表的時(shí)候,最多去查出來(lái)6000條滿足條件的
// 條件:下次執(zhí)行時(shí)間 小于等于 (當(dāng)前時(shí)間 + 5秒) 并且 執(zhí)行狀態(tài)是 正在運(yùn)行的狀態(tài)
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
//如果服務(wù)宕機(jī)了,或者重啟等等,導(dǎo)致超過(guò)了調(diào)度周期(5秒的調(diào)度周期),也就是本來(lái)由時(shí)間上的上一次或上很多次調(diào)度觸發(fā)的數(shù)據(jù)被本次調(diào)度查到了,
// 這就可能代表著可能中間存在多次調(diào)度未觸發(fā),而按照周期性一次一次計(jì)算下次預(yù)期調(diào)度時(shí)間,那這次調(diào)度完了計(jì)算出來(lái)的下次調(diào)度還是在當(dāng)前時(shí)間以前,
// 例如調(diào)度周期1分鐘調(diào)度一次,宕機(jī)5分鐘了,現(xiàn)在查到的預(yù)期調(diào)度時(shí)間為5分鐘前,如果直接調(diào)度成功會(huì)重復(fù)調(diào)度5次當(dāng)前時(shí)間以前的任務(wù),這里直接pass并計(jì)算下一次調(diào)度時(shí)間,
// 但是計(jì)算下一次調(diào)度時(shí)間也是傳入當(dāng)前時(shí)間,直接修正預(yù)期下次調(diào)度時(shí)間為當(dāng)前時(shí)間之后,因?yàn)檎{(diào)度時(shí)間周期為 5秒,所以會(huì)+ PRE_READ_MS 判斷,如果是一次性的調(diào)度則會(huì)補(bǔ)償這次調(diào)度
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 拿到設(shè)置的 調(diào)度過(guò)期策略
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
// 如果是忽略就不執(zhí)行,如果是立刻執(zhí)行一次,就立馬調(diào)用執(zhí)行一次該執(zhí)行器
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
refreshNextValidTime(jobInfo, new Date());
// 當(dāng)前的時(shí)間大于執(zhí)行器下次的執(zhí)行的時(shí)間,說(shuō)明上次執(zhí)行器可能遺漏了這個(gè)執(zhí)行器
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
// 如果下次發(fā)送時(shí)間在當(dāng)前時(shí)間之后5秒內(nèi),會(huì)進(jìn)行第二次觸發(fā),放到另一個(gè)線程中執(zhí)行觸發(fā)邏輯
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
// 這里算出的結(jié)果保持在 0 - 59之間
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、將當(dāng)前任務(wù)放進(jìn)一個(gè)全局變量map中,讓ringThread線程去執(zhí)行
pushTimeRing(ringSecond, jobInfo.getId());
// 3、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
// 取5秒的直接觸發(fā) 但是區(qū)別于上面是第一次觸發(fā)
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、更新執(zhí)行器,主要是更新 上次觸發(fā)時(shí)間,下次觸發(fā)時(shí)間,和當(dāng)前執(zhí)行器的狀態(tài)
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
// 如果上述操作耗時(shí)大于一秒直接進(jìn)入下次循環(huán),如果小于一秒需要再判斷
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 首先 System.currentTimeMillis()%1000 這里的取余,最大值是999毫秒,可以理解成極限的一秒
// 如果讀到了數(shù)據(jù) 休眠 (1 - (System.currentTimeMillis()%1000))秒,這里最大休眠1秒,最小接近不休眠直接執(zhí)行
// 因?yàn)樽x到了數(shù)據(jù),不知道接下來(lái)還有沒有數(shù)據(jù),這里為了趕工確保滿足條件的定時(shí)任務(wù)能快速被執(zhí)行。
// 如果讀不到數(shù)據(jù),休眠 (5 - (System.currentTimeMillis()%1000))秒,這里最大休眠5秒,最小休眠4秒
// 因?yàn)橐呀?jīng)讀不到數(shù)據(jù)了,多休息一下,讓定時(shí)器等到需要被執(zhí)行的時(shí)間點(diǎn)
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
// 上來(lái)休眠最大1秒,如果scheduleThread有執(zhí)行任務(wù),保證會(huì)向 ringData(map) 里面寫
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
// 從 ringData(map)里面拿到,
List<Integer> ringItemData = new ArrayList<>();
// 上面介紹過(guò) 這個(gè)map的key是在0-59之間,直接取當(dāng)前的秒數(shù)(0-59)
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免處理耗時(shí)太長(zhǎng),跨過(guò)刻度,向前校驗(yàn)一個(gè)刻度;
for (int i = 0; i < 2; i++) {
// 循環(huán)兩次從 ringData(map)中拿到兩個(gè)時(shí)間點(diǎn)的執(zhí)行器ID集合 list,然后賦值給 ringItemData
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
// 如果 ringItemData 集合不為空,說(shuō)明有需要執(zhí)行的執(zhí)行器Id,就遍歷執(zhí)行里面Id對(duì)應(yīng)的執(zhí)行器
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
// 遍歷完成后,將 ringItemData 置空,然后等待線程 下次執(zhí)行
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
三、總結(jié)
1.xxl-job-admin的啟動(dòng)是利用springboot的擴(kuò)展接口InitializingBean來(lái)實(shí)現(xiàn)的,銷毀是利用擴(kuò)展DisposableBean接口來(lái)實(shí)現(xiàn)的。
2.xxl-job-admin的核心運(yùn)行流程由JobRegistryHelper,JobFailMonitorHelper,JobCompleteHelper,JobLogReportHelper,JobScheduleHelper 這幾個(gè)Helper完成。
3.單臺(tái)xxl-job最多一秒鐘可以完成6000個(gè)執(zhí)行器任務(wù)的執(zhí)行。
4.集群環(huán)境下,同一個(gè)執(zhí)行器不出現(xiàn)并發(fā)執(zhí)行問(wèn)題其實(shí)是依賴了數(shù)據(jù)庫(kù)的行鎖實(shí)現(xiàn)的。
綜上所述,將xxl-job-admin中的啟動(dòng),以及如何調(diào)度核心部分就已經(jīng)說(shuō)完了,其實(shí)在執(zhí)行執(zhí)行器任務(wù)的時(shí)候里面還涉及到xxl-job的集群分片處理任務(wù)的原理,以及集群路由的原理,還有內(nèi)置server的設(shè)計(jì),以及xxl-job-admin遠(yuǎn)程觸發(fā)任務(wù)使用的RPC調(diào)用原理細(xì)節(jié),后面有空再整理吧。