近期花了一些時(shí)間翻了xxl-job的源碼,稍作分析,希望能從如此成熟的框架中洞悉一些分布式任務(wù)調(diào)度的本質(zhì)。
本文的行文包括如下幾點(diǎn):
- 梳理出整個(gè)核心設(shè)計(jì)圖,從骨架上俯瞰xxl-job的架構(gòu)設(shè)計(jì)
- 梳理服務(wù)端核心設(shè)計(jì),建立服務(wù)端設(shè)計(jì)的核心思路
- 梳理客戶端核心設(shè)計(jì),建立客戶端設(shè)計(jì)的核心思路
核心設(shè)計(jì)圖

服務(wù)端設(shè)計(jì)
從XxlJobScheduler中可以窺視服務(wù)端設(shè)計(jì)的最核心的內(nèi)容,包括如下:
- 觸發(fā)器線程池(任務(wù)調(diào)度執(zhí)行)
- 服務(wù)注冊(cè)與注銷(xiāo)
- 任務(wù)執(zhí)行異常補(bǔ)償機(jī)制
- 客戶端異常下線處理、回調(diào)處理
- 任務(wù)執(zhí)行統(tǒng)計(jì)報(bào)表
- 任務(wù)執(zhí)行定時(shí)任務(wù)(生成任務(wù)下一次執(zhí)行時(shí)間)
觸發(fā)器
設(shè)計(jì)上分為兩個(gè)線程池,fastTriggerPool與slowTriggerPool,二者唯一的不同就是blockqueue隊(duì)列的大小不一樣,fastTriggerPool是1000,slowTriggerPool是2000
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// ....
}
注釋中提示,當(dāng)一分鐘內(nèi)任務(wù)執(zhí)行發(fā)生timeout的次數(shù)超過(guò)10次,任務(wù)將會(huì)被投入慢觸發(fā)器線程池,但是此處提及的timeout可不是指http執(zhí)行的超時(shí)時(shí)間,指的是任務(wù)執(zhí)行的時(shí)間超過(guò)500ms。
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
每次任務(wù)執(zhí)行完成,將會(huì)判斷當(dāng)前時(shí)間是否等于上一次執(zhí)行的最小時(shí)間(通過(guò)當(dāng)前時(shí)間戳除以60000來(lái)粗糙的表示1分鐘的間距)
當(dāng)任務(wù)執(zhí)行時(shí)間超過(guò)500ms,則記錄超時(shí)次數(shù)。(500ms還不是可以配置的參數(shù),不得不說(shuō)有點(diǎn)粗糙……)
服務(wù)注冊(cè)與注銷(xiāo)
registryOrRemoveThreadPool線程池是用來(lái)處理接收來(lái)自客戶端的注冊(cè)與注銷(xiāo)的處理任務(wù),比較特殊的是線程池的拒絕策略是r.run(),意味著將會(huì)由客戶端請(qǐng)求的線程池來(lái)完成執(zhí)行。(xxl-job相當(dāng)多的線程池都是使用該策略)
registryMonitorThread是定時(shí)任務(wù),執(zhí)行時(shí)間間隔為30s,主要工作如下:
- 檢查是否有自動(dòng)注冊(cè)的調(diào)度組(執(zhí)行器),如果有,則如下處理
- 查詢(xún)超過(guò)90s沒(méi)有上報(bào)注冊(cè)的客戶端并刪除
- 查詢(xún)正常上報(bào)的客戶端(注冊(cè)時(shí)間在90s內(nèi)),并將注冊(cè)的IP地址更新到調(diào)度組
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
//....
}
// fresh group address
// ....
}
任務(wù)執(zhí)行異常補(bǔ)償機(jī)制
monitorThread是定時(shí)任務(wù),執(zhí)行時(shí)間間隔為10s,主要工作如下:
- 查詢(xún)出1000內(nèi)執(zhí)行失敗的任務(wù)(告警狀態(tài)為默認(rèn)),循環(huán)處理這些失敗的任務(wù)
- 將任務(wù)的默認(rèn)狀態(tài)更新為鎖定狀態(tài)(更新失敗則跳過(guò))
- 判斷該任務(wù)是否配置了重試次數(shù),如果配置的話,則進(jìn)行重試
- 判斷該任務(wù)是否配置了報(bào)警郵件,如果配置的話,則進(jìn)行告警
- 更新任務(wù)狀態(tài)為1,2,3中一個(gè)
告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無(wú)需告警、2-告警成功、3-告警失敗
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無(wú)需告警、2-告警成功、3-告警失敗
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
客戶端異常下線處理、回調(diào)處理
callbackThreadPool線程池用來(lái)處理接收來(lái)自客戶端的回調(diào)處理。拒絕策略也是r.run()。
monitorThread,是定時(shí)任務(wù),時(shí)間間隔為60s,主要工作如下:
- 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過(guò)10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失敗
// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過(guò)10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失??;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
任務(wù)執(zhí)行統(tǒng)計(jì)報(bào)表
統(tǒng)計(jì)報(bào)表的核心代碼如下:
Date todayTo = itemDay.getTime();
// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// do refresh
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
任務(wù)執(zhí)行定時(shí)任務(wù)(生成任務(wù)下一次執(zhí)行時(shí)間)
scheduleThread是個(gè)定時(shí)任務(wù),時(shí)間間隔是動(dòng)態(tài)計(jì)算的,這是服務(wù)端最核心的部分功能。
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); // 秒數(shù)為0, 5, 10.....
} catch (InterruptedException e) {
//....
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 默認(rèn)為(200 + 100) * 20 = 60000
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
// 獲取數(shù)據(jù)庫(kù)資源
boolean preReadSuc = true;
try {
// ...
// 爭(zhēng)取到鎖權(quán)限(爭(zhēng)取不到的節(jié)點(diǎn),將會(huì)在這里阻塞,比如當(dāng)部署多個(gè)節(jié)點(diǎn)時(shí))
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
// 查詢(xún)下一次執(zhí)行時(shí)間小于當(dāng)前時(shí)間的5s后,并查默認(rèn)6000條
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// 當(dāng)前時(shí)間 > 下一次觸發(fā)時(shí)間 + 5s
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
// 過(guò)期調(diào)度策略
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next
// 刷新下次執(zhí)行的時(shí)間
// 如果計(jì)算不到下次執(zhí)行時(shí)間,將會(huì)停止任務(wù)
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 當(dāng)前時(shí)間稍大于下一次執(zhí)行時(shí)間(5s內(nèi))
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// 如果下次觸發(fā)的時(shí)間間隔在5s內(nèi),則寫(xiě)入ring線程,由時(shí)間輪線程處理
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 計(jì)算下一次觸發(fā)事件將會(huì)掉落在時(shí)間輪的哪一格上
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);//
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 下次觸發(fā)的時(shí)間在當(dāng)前時(shí)間之后,計(jì)算本次觸發(fā)的時(shí)間將會(huì)掉落在時(shí)間輪的哪一個(gè)格上
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false; // 沒(méi)有可執(zhí)行任務(wù)
}
// tx stop
} catch (Exception e) {
// ....
} finally {
// 關(guān)閉資源
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 5s內(nèi)沒(méi)有可執(zhí)行的任務(wù),則沉睡到下一個(gè)零整5s,比如0,5,10....
// 如果有則沉睡到下一個(gè)零整1s, 1,2,3,4....
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
ring thread是一個(gè)時(shí)間輪設(shè)計(jì),最高為60格,內(nèi)部是一個(gè)map,key為秒數(shù),value為待執(zhí)行任務(wù)ID列表。
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); // 對(duì)齊到下一秒
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免處理耗時(shí)太長(zhǎng),跨過(guò)刻度,向前校驗(yàn)一個(gè)刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); // 將當(dāng)前刻度+上一個(gè)刻度。距離當(dāng)前是4s, i=0則為4, i1則為3
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
小結(jié):
由此服務(wù)端的核心設(shè)計(jì)基本就算拆解完了,最核心的部分是任務(wù)執(zhí)行定時(shí)任務(wù),內(nèi)部實(shí)現(xiàn)了秒級(jí)的時(shí)間輪算法(有點(diǎn)粗糙....)??偟募軜?gòu)設(shè)計(jì)并不算簡(jiǎn)單,由于沒(méi)有外部框架依賴(lài),需要自己實(shí)現(xiàn)服務(wù)注冊(cè)/服務(wù)注銷(xiāo)/服務(wù)掉線檢查等等的功能,所以總體上又可以將架構(gòu)分為:服務(wù)治理與服務(wù)調(diào)度兩個(gè)大塊去看。服務(wù)治理主要作用于維系服務(wù)端與客戶端兩個(gè)角色的連接狀態(tài)。服務(wù)調(diào)度則是xxl-job的核心功能。
客戶端設(shè)計(jì)
客戶端的肯定不如服務(wù)端復(fù)雜,基本上就幾個(gè)點(diǎn):
- 掃描
Spring容器中,加了XXL注解的方法,生成MethodJobHandler加入到注冊(cè)表中 - 初始話客戶端日志相關(guān)內(nèi)容
- 初始話客戶端服務(wù)器(用戶接收來(lái)自服務(wù)端調(diào)用)
- 服務(wù)注冊(cè)與注銷(xiāo)
- 處理執(zhí)行成功的回調(diào)任務(wù)
其中重點(diǎn)關(guān)注的點(diǎn)在于客戶端服務(wù)器,以及服務(wù)注冊(cè)與注銷(xiāo)
客戶端服務(wù)器
EmbedServer是一個(gè)基于Netty構(gòu)建的Http服務(wù)器,默認(rèn)端口號(hào)為9000
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));// http 處理器
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address); // 服務(wù)注冊(cè)
// wait util stop
future.channel().closeFuture().sync();
EmbedHttpServerHandler處理來(lái)自服務(wù)端的Http請(qǐng)求,主要包括心跳,空閑線程檢查,執(zhí)行,停止任務(wù)執(zhí)行,獲取日志.
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid 僅支持post
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
// 檢驗(yàn)token
if (accessToken!=null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);// 檢查線程是否在運(yùn)行,隊(duì)列中是否有數(shù)據(jù)
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {// 停止客戶端線程
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
// ....
}
}
重點(diǎn)關(guān)注run事件,它主要是找到相應(yīng)的IHandler,構(gòu)建JobThread,推到線程隊(duì)列中等線程處理。
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
//...
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// ....
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) { // 根據(jù)配置的阻塞策略來(lái)執(zhí)行
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // 丟棄后續(xù)調(diào)度
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆蓋之前調(diào)度
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else { // 單機(jī)串行
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); // 生成該handler的執(zhí)行線程并注冊(cè)到注冊(cè)表中,啟動(dòng)該線程
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); // 放入阻塞隊(duì)列,線程會(huì)對(duì)阻塞隊(duì)列的數(shù)據(jù)進(jìn)行處理
return pushResult;
}
job thread的處理邏輯,具體如下:
// init
try {
handler.init(); // ihandler有init方法支持
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
idleTimes++; // 空閑次數(shù)
TriggerParam triggerParam = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); // 阻塞等待3s
if (triggerParam!=null) {
running = true;
idleTimes = 0; // 空閑次數(shù)清零
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
// 上下文環(huán)境
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
// 設(shè)置了執(zhí)行超時(shí)時(shí)間,則采用異步超時(shí)等待執(zhí)行的方式
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute(); // 執(zhí)行
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);// 超時(shí)時(shí)間的單位為秒
} catch (TimeoutException e) {
// ...
// handle result
XxlJobHelper.handleTimeout("job execute timeout "); // 處理超時(shí)
} finally {
futureThread.interrupt();
}
} else {
// just execute
handler.execute(); // 直接執(zhí)行
}
// ....
} else {
if (idleTimes > 30) { // 當(dāng)空閑次數(shù)超過(guò)30次
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost 等待執(zhí)行隊(duì)列中沒(méi)有數(shù)據(jù)
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); // 移除該線程
}
}
}
} catch (Throwable e) {
// 處理異常
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm 寫(xiě)入回調(diào)隊(duì)列,等待回調(diào)線程上報(bào)處理結(jié)果
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed 線程已暫停,可以kill,由服務(wù)端來(lái)kill
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_COCE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
// 線程停止后,隊(duì)列中有數(shù)據(jù),寫(xiě)入回調(diào)隊(duì)列,等待kill
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_COCE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}
}
// destroy
try {
handler.destroy(); // 線程停止時(shí)銷(xiāo)毀回調(diào),ihandler有destroy支持
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
小結(jié):至此,客戶端的執(zhí)行邏輯基本分析完成了。其中設(shè)計(jì)點(diǎn)主要為:每一個(gè)@XXL的方法都將生成一個(gè)對(duì)應(yīng)的線程來(lái)處理服務(wù)端的調(diào)度。并且當(dāng)線程的空閑次數(shù)超過(guò)30次,每次3s,總共為90s,沒(méi)有任務(wù)處理,將會(huì)關(guān)閉線程,下次調(diào)度時(shí)如果沒(méi)有線程再重新生成,即同一個(gè)前后調(diào)度間隔超過(guò)90s的任務(wù)那不是要來(lái)回構(gòu)建/刪除線程嗎?
服務(wù)注冊(cè)與注銷(xiāo)
EmbedServer構(gòu)建Http服務(wù)器時(shí)會(huì)啟動(dòng)服務(wù)注冊(cè)線程registryThread,registryThread每30s上報(bào)一次心跳。如果registryThread的toStop被更新為true,則進(jìn)入服務(wù)注銷(xiāo)流程。
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam); // 注冊(cè)(上報(bào)心跳)
// ...
} catch (Exception e) {}
}
} catch (Exception e) {}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); // 30s
}
} catch (InterruptedException e) {}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); // 注銷(xiāo)
//....
} catch (Exception e) {}
}
} catch (Exception e) {}
}
小結(jié):客戶端的設(shè)計(jì)言簡(jiǎn)意賅,配置多個(gè)admin adress就向多個(gè)服務(wù)端注冊(cè),這種方式也進(jìn)一步說(shuō)明服務(wù)端之間并未做狀態(tài)同步(如果是同一個(gè)數(shù)據(jù)庫(kù)即可以相互感知)??偠灾?,客戶端的設(shè)計(jì)主要是比較耗費(fèi)線程資源,有些線程可能會(huì)出現(xiàn)不斷構(gòu)建、刪除的情況。