方法鏈路
在閱讀整個代碼流程前,先羅列一下整個調(diào)度過程經(jīng)過的主要類和方法:
xxl-job-admin(調(diào)度器項目)->XxlJobScheduler.afterPropertiesSet()->JobScheduleHelper.getInstance().start()->JobTriggerPoolHelper.trigger()->JobTriggerPoolHelper.addTrigger()->XxlJobTrigger.trigger()->processTrigger()->runExecutor()。runExecutor方法中通過代理及netty請求,返回執(zhí)行結(jié)果。
具體源碼
直接從管理器Admin中的配置文類XxlJobScheduler.java開始,源碼如下:
@Override
public void afterPropertiesSet() throws Exception {
// init i18n
initI18n();
//執(zhí)行器地址信息維護,比如刪除失效的
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
//查詢執(zhí)行失敗,并且根據(jù)配置,進行告警
// admin monitor run
JobFailMonitorHelper.getInstance().start();
//初始化管理器RPC工廠,并且指定管理器中/api實現(xiàn)類,給executor調(diào)用做準備
// admin-server
initRpcProvider();
//啟動admin中的調(diào)度器
//start-schedule
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
JobScheduleHelper.getInstance().start() 就是管理器的調(diào)度入口,接著查看調(diào)度器中具體內(nèi)容,
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
//針對多個節(jié)點,防止調(diào)度器出現(xiàn)并發(fā)調(diào)度一個任務(wù),調(diào)度器執(zhí)行頻率控制,最長睡眠5秒,最短4秒多點,即(5000-999)毫秒
//如A節(jié)點的調(diào)度器剛啟動,并且獲取一個任務(wù),然后加鎖,如果同時B節(jié)點也啟動,也獲取到這個任務(wù),防止重復調(diào)用,隨機睡眠
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//鎖定資源
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
//獲取5秒內(nèi)狀態(tài)為正常運行的任務(wù)
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
//這里的觸發(fā)時間處理,用一個時間橫軸就比較好理解
//當前時間已經(jīng)超過觸發(fā)時間+5秒,不調(diào)度,直接計算下次調(diào)度時間,對應(yīng)時間段A
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
// fresh next
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date());
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
//當前時間已超過調(diào)度時間,但超過時間在5秒內(nèi),直接觸發(fā)調(diào)度,并且更新下次調(diào)度時間,即觸發(fā)時間在當前時間前5秒內(nèi)
//對應(yīng)時間段B
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();
// 1、trigger
//將任務(wù)放到觸發(fā)線程池中
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextTime);
//如果下次觸發(fā)時間在當前時間之后的5秒內(nèi),并且將這個時間段的任務(wù)單獨放在ringThread線程中處理,即觸發(fā)時間在當前時間的后5秒內(nèi)
//特別處理當前時間之后的5秒內(nèi),是因為本循環(huán)最長5秒循環(huán)一次,防止有漏掉的定時任務(wù),對應(yīng)時間段D
//注:scheduleThread和ringThread兩個線程的執(zhí)行評率不一樣
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {
// 1、make ring second
//此處計算出的ringSecond的值范圍是0-59
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
//將job放入ringThread線程
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
}
} else {//處理下次執(zhí)行時間在當前之后的時間,對應(yīng)時間段C
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
//該處ringSecond計算的值為0-59秒,
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
}
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException ignore) {
if (!scheduleThreadToStop) {
logger.error(ignore.getMessage(), ignore);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
//preReadSuc 有5秒內(nèi)正常運行的任務(wù),則睡眠一秒以內(nèi),沒有則睡眠5-(0至999)秒
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
時間軸如下:

接著看每秒掃描一次的調(diào)度線程
//此線程是處理當前時間以后在每秒時是否有定時任務(wù),有則直接啟動,
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
// 避免處理耗時太長,跨過刻度,向前校驗一個刻度;如果有的話,需要重新取回
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData!=null && ringItemData.size()>0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
//睡眠1秒以內(nèi)
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
上面兩個調(diào)度線程,觸發(fā)任務(wù)時,都是通過trigger方法觸發(fā)
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
接著看觸發(fā)器JobTriggerPoolHelper.java,觸發(fā)器中定義了一個快線程池和一個慢線程池,兩個線程池只是線程池大小和任務(wù)緩存隊列大小稍有不同,slowTriggerPool線程執(zhí)行那種一分鐘內(nèi),慢執(zhí)行超過10次(執(zhí)行時間超過500毫秒)的任務(wù),其他任務(wù)則使用fastTriggerPool 線程池執(zhí)行,具體線程池定義如下:
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor(
50,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor(
10,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
具體看線程池中執(zhí)行任務(wù)的策略
/**
* 線程池執(zhí)行任務(wù),及任務(wù)轉(zhuǎn)變?yōu)槁€程策略
* add trigger
*/
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
//一分鐘內(nèi),慢執(zhí)行超過10次,則使用slowTriggerPool執(zhí)行任務(wù)
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
//具體任務(wù)觸發(fā)
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
//每分鐘清理一次jobTimeoutCountMap,這個簡單實用達到一分鐘內(nèi)計數(shù)的目的
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
//一分鐘類執(zhí)行時間超過500毫秒,則將任務(wù)用慢的線程池執(zhí)行
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) {
//執(zhí)行時間超過500毫秒,則jobTimeoutCountMap中當前任務(wù)慢情況加1
// ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
接著看觸發(fā)器XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam)中的實現(xiàn)
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
// load data
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// sharding param
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
//如果是路由是分片 廣播,則將注冊地址中的都觸發(fā)一遍
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
接著processTrigger方法
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
//阻塞策略(eg:單機串行,丟棄后續(xù)調(diào)度,覆蓋之前調(diào)度)
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
//路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
//使用接口策略模式,獲取到執(zhí)行器地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
//執(zhí)行job
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
//搜集日志信息,保存結(jié)果
// 5、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
接著看runExecutor(triggerParam, address)方法
/**
* run executor
* @param triggerParam
* @param address
* @return
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
//通過反射中的getObject和netty 調(diào)用到執(zhí)行器的service中,
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
//通過netty直接調(diào)度到執(zhí)行器的executorBiz.run()方法,并返回結(jié)果
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
ExecutorBiz.class,
null,
3000,
address,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
在來看示列執(zhí)行器xxl-job-executor-sample-springboot項目中的啟動配置類中的一部分配置,XxlJobConfig->XxlJobSpringExecutor.start()->XxlJobExecutor->initRpcProvider().
XxlJobExecutor中的start()方法如下:
public void start() throws Exception {
//設(shè)置日志路徑
// init logpath
XxlJobFileAppender.initLogPath(logPath);
//設(shè)置admin地址及執(zhí)行器訪問口令
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
//設(shè)置日志清理線程參數(shù)
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
//任務(wù)執(zhí)行結(jié)果回調(diào)線程(包含回調(diào)失敗后重試機制)
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
//設(shè)置執(zhí)行器ip和port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
//注冊執(zhí)行器及初始化執(zhí)行器上面的netty服務(wù)器信息
initRpcProvider(ip, port, appName, accessToken);
}
進入initRpcProvider方法查看
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
//指定執(zhí)行器注冊類為ExecutorServiceRegistry
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
//注冊執(zhí)行器上面的service,用來執(zhí)行任務(wù)的入口
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
//啟動執(zhí)行器注冊工廠
// start
xxlRpcProviderFactory.start();
}
initRpcProvider方法就只做了兩件事情,第一是通過ExecutorServiceRegistry.java將執(zhí)行器注冊到調(diào)度器(這個屬于將執(zhí)行器注冊到調(diào)度器內(nèi)容,在另外一篇文章中有介紹(http://www.itdecent.cn/p/247c6cf53dca)),第二件事情是初始化一個netty客戶端,并且將ExecutorBiz的service實例注冊到netty服務(wù)器中,供調(diào)度器調(diào)度任務(wù)時使用。
總結(jié)
本框架有多容易上手和牛掰,就不在此夸贊了,整條業(yè)務(wù)邏輯拜讀完之后,收獲蠻多,簡單列舉一下本次主要的收獲;
1、對節(jié)點啟動時防止并發(fā)做的微調(diào)睡眠控制;
2、廢棄quartz框架,用6張表就實現(xiàn)定時器功能,
3、觸發(fā)器采用兩個線程,通過休眠頻率不同,實現(xiàn)秒級觸發(fā)
4、對慢任務(wù)的定義及轉(zhuǎn)移執(zhí)行
5、通過接口策略模式,獲取執(zhí)行器的路由地址
6、netty中自研RPC框架部分