xxl-job v2.2.1 調(diào)度中心源碼解讀

調(diào)度中心

  1. 掃描加載配置 ,XxlJobAdminConfig

  2. XxlJobAdminConfig加載完成后,初始化XxlJobScheduler
    // ---------------------- XxlJobScheduler ----------------------

      private XxlJobScheduler xxlJobScheduler;
    
      @Override
      public void afterPropertiesSet() throws Exception {
        adminConfig = this;
    
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
      }
    
      @Override
      public void destroy() throws Exception {
        xxlJobScheduler.destroy();
      }
    
    
    // ---------------------- XxlJobScheduler ----------------------
    
  3. XxlJobScheduler初始化daemon線程

    public void init() throws Exception {
    // init i18n 初始化國際化
    initI18n();

         // admin registry monitor run 
         JobRegistryMonitorHelper.getInstance().start();
    
         // admin fail-monitor run
         JobFailMonitorHelper.getInstance().start();
    
         // admin lose-monitor run
         JobLosedMonitorHelper.getInstance().start();
    
         // admin trigger pool start
         JobTriggerPoolHelper.toStart();
    
         // admin log report start
         JobLogReportHelper.getInstance().start();
    
         // start-schedule
         JobScheduleHelper.getInstance().start();
    
         logger.info(">>>>>>>>> init xxl-job admin success.");
     }
    

JobRegistryMonitorHelper.getInstance().start();

初始化執(zhí)行器注冊,異步執(zhí)行,registryThread線程,每隔30s執(zhí)行一次

// step 1. 查詢自動注冊的執(zhí)行器,對應(yīng)mysql表xxl_job_group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);

// step 2. 清除90s未發(fā)送心跳節(jié)點(admin/executor),對應(yīng)mysql表xxl_job_registry
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
  XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}

// step 3. 查詢存活節(jié)點,同步xxl_job_registry表未注冊到xxl_job_group.address_list的數(shù)據(jù)
// 3.1 數(shù)據(jù)封裝
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
  for (XxlJobRegistry item: list) {
    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
      String appname = item.getRegistryKey();
      List<String> registryList = appAddressMap.get(appname);
      if (registryList == null) {
        registryList = new ArrayList<String>();
      }

      if (!registryList.contains(item.getRegistryValue())) {
        registryList.add(item.getRegistryValue());
      }
      appAddressMap.put(appname, registryList);
    }
  }
}
// 3.2 fresh group address
for (XxlJobGroup group: groupList) {
  List<String> registryList = appAddressMap.get(group.getAppname());
  String addressListStr = null;
  if (registryList!=null && !registryList.isEmpty()) {
    Collections.sort(registryList);
    addressListStr = "";
    for (String item:registryList) {
      addressListStr += item + ",";
    }
    addressListStr = addressListStr.substring(0, addressListStr.length()-1);
  }
  group.setAddressList(addressListStr);
  XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}


JobFailMonitorHelper.getInstance().start(); 

監(jiān)控報警,異步執(zhí)行,monitorThread線程,每隔10s執(zhí)行一次

// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
    continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

// 1、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
  JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
  String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
  log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
  XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}

// 2、fail alarm monitor
int newAlarmStatus = 0;     // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無需告警、2-告警成功、3-告警失敗
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
  boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
  newAlarmStatus = alarmResult?2:3;
} else {
    newAlarmStatus = 1;
}

// 更新狀態(tài) 
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);

JobLosedMonitorHelper.getInstance().start();

超時job置為失敗,異步執(zhí)行,monitorThread線程,每隔60s執(zhí)行一次

// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運行中" 狀態(tài)超過10min,且對應(yīng)執(zhí)行器心跳注冊失敗不在線,則將本地調(diào)度主動標(biāo)記失?。?Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

if (losedJobIds!=null && losedJobIds.size()>0) {
  for (Long logId: losedJobIds) {

    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setId(logId);

    jobLog.setHandleTime(new Date());
    jobLog.setHandleCode(ReturnT.FAIL_CODE);
    jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
  }

}

JobTriggerPoolHelper.toStart();

JobTriggerPoolHelper

啟動快、慢線程池

// 正常線程池
fastTriggerPool = new ThreadPoolExecutor(
  10,
  XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
  60L,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue<Runnable>(1000),
  new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
    }
  });

// 慢速線程池
slowTriggerPool = new ThreadPoolExecutor(
  10,
  XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
  60L,
  TimeUnit.SECONDS,
  new LinkedBlockingQueue<Runnable>(2000),
  new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
      return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
    }
  });

JobLogReportHelper.getInstance().start();

執(zhí)行記錄報表,異步執(zhí)行,logrThread

// 1、log-report refresh: refresh log report in 3 days
try {

  for (int i = 0; i < 3; i++) {

    // today
    Calendar itemDay = Calendar.getInstance();
    itemDay.add(Calendar.DAY_OF_MONTH, -i);
    itemDay.set(Calendar.HOUR_OF_DAY, 0);
    itemDay.set(Calendar.MINUTE, 0);
    itemDay.set(Calendar.SECOND, 0);
    itemDay.set(Calendar.MILLISECOND, 0);

    Date todayFrom = itemDay.getTime();

    itemDay.set(Calendar.HOUR_OF_DAY, 23);
    itemDay.set(Calendar.MINUTE, 59);
    itemDay.set(Calendar.SECOND, 59);
    itemDay.set(Calendar.MILLISECOND, 999);

    Date todayTo = itemDay.getTime();

    // refresh log-report every minute
    XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
    xxlJobLogReport.setTriggerDay(todayFrom);
    xxlJobLogReport.setRunningCount(0);
    xxlJobLogReport.setSucCount(0);
    xxlJobLogReport.setFailCount(0);

    Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
    if (triggerCountMap!=null && triggerCountMap.size()>0) {
      int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
      int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
      int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
      int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

      xxlJobLogReport.setRunningCount(triggerDayCountRunning);
      xxlJobLogReport.setSucCount(triggerDayCountSuc);
      xxlJobLogReport.setFailCount(triggerDayCountFail);
    }

    // do refresh
    int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
    if (ret < 1) {
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
    }
  }

} catch (Exception e) {
  if (!toStop) {
    logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
  }
}

// 2、log-clean: switch open & once each day
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
    && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

  // expire-time
  Calendar expiredDay = Calendar.getInstance();
  expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
  expiredDay.set(Calendar.HOUR_OF_DAY, 0);
  expiredDay.set(Calendar.MINUTE, 0);
  expiredDay.set(Calendar.SECOND, 0);
  expiredDay.set(Calendar.MILLISECOND, 0);
  Date clearBeforeTime = expiredDay.getTime();

  // clean expired log
  List<Long> logIds = null;
  do {
    logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
    if (logIds!=null && logIds.size()>0) {
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
    }
  } while (logIds!=null && logIds.size()>0);

  // update clean time
  lastCleanLogTime = System.currentTimeMillis();
}

try {
  TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
  if (!toStop) {
    logger.error(e.getMessage(), e);
  }
}

JobScheduleHelper.getInstance().start();

job執(zhí)行

// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 計算預(yù)讀數(shù)量
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

// 加鎖
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

 // tx start
// 1、pre read 查詢運行中,到達(dá)執(zhí)行時間job limit preReadCount,mysql表xxl_job_info
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
  // 2、push time-ring
  for (XxlJobInfo jobInfo: scheduleList) {

    // time-ring jump
    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
      // 2.1、trigger-expire > 5s:pass && make next-trigger-time
      // 下次執(zhí)行時間超過當(dāng)前5s不執(zhí)行,更新時間,等下一次執(zhí)行
      logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

      // fresh next
      refreshNextValidTime(jobInfo, new Date());

    } else if (nowTime > jobInfo.getTriggerNextTime()) {
      // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
            // 下次執(zhí)行在5s內(nèi),執(zhí)行
      // 1、trigger,加入線程池執(zhí)行,快慢線程池選擇策略
      JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
      logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

      // 2、fresh next 更新狀態(tài)和下次執(zhí)行時間
      refreshNextValidTime(jobInfo, new Date());

      // next-trigger-time in 5s, pre-read again
      if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

        // 1、make ring second
        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

        // 2、push time ring
        pushTimeRing(ringSecond, jobInfo.getId());

        // 3、fresh next
        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

      }

    } else {// 執(zhí)行時間 < 當(dāng)前時間
      // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

      // 1、make ring second
      int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

      // 2、push time ring
      pushTimeRing(ringSecond, jobInfo.getId());

      // 3、fresh next
      refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

    }

  }

  // 3、update trigger info
  for (XxlJobInfo jobInfo: scheduleList) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
  }

} else {
  preReadSuc = false;
}

// tx stop
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容