xxl-job源碼2-調(diào)度端

一 服務(wù)端主體流程

服務(wù)端流程
  • 任務(wù)觸發(fā)時(shí),需要進(jìn)行執(zhí)行器路由處理,并組裝任務(wù)相關(guān)配置信息,如阻塞策略,分片參數(shù),超時(shí)時(shí)間等。

二 表

2.1 XxlJobRegistry

  • 執(zhí)行器注冊信息
    類型,應(yīng)用,執(zhí)行器地址,心跳時(shí)間
  • 任務(wù)信息
public class XxlJobInfo {
    
    private int id;             // 主鍵ID     (JobKey.name)
    
    private int jobGroup;       // 執(zhí)行器主鍵ID  (JobKey.group)
    private String jobCron;     // 任務(wù)執(zhí)行CRON表達(dá)式 【base on quartz】
    private String jobDesc;
    
    private Date addTime;
    private Date updateTime;
    
    private String author;      // 負(fù)責(zé)人
    private String alarmEmail;  // 報(bào)警郵件

    private String executorRouteStrategy;   // 執(zhí)行器路由策略
    private String executorHandler;         // 執(zhí)行器,任務(wù)Handler名稱
    private String executorParam;           // 執(zhí)行器,任務(wù)參數(shù)
    private String executorBlockStrategy;   // 阻塞處理策略
    private int executorTimeout;            // 任務(wù)執(zhí)行超時(shí)時(shí)間,單位秒
    private int executorFailRetryCount;     // 失敗重試次數(shù)
    
    private String glueType;        // GLUE類型   #com.xxl.job.core.glue.GlueTypeEnum
    private String glueSource;      // GLUE源代碼
    private String glueRemark;      // GLUE備注
    private Date glueUpdatetime;    // GLUE更新時(shí)間

    private String childJobId;      // 子任務(wù)ID,多個(gè)逗號分隔
    
    // copy from quartz
    private String jobStatus;       // 任務(wù)狀態(tài) 【base on quartz】
}
  • 任務(wù)執(zhí)行記錄
public class XxlJobLog {
    
    private int id;
    
    // job info
    private int jobGroup;//執(zhí)行器主鍵id
    private int jobId;

    // execute info
    private String executorAddress;//執(zhí)行器地址
    private String executorHandler;//執(zhí)行器任務(wù)執(zhí)行函數(shù)
    private String executorParam;//參數(shù)
    private String executorShardingParam;//分片參數(shù)
    private int executorFailRetryCount;//失敗重試次數(shù)
    
    // trigger info
    private Date triggerTime;//觸發(fā)時(shí)間
    private int triggerCode;//觸發(fā)結(jié)果
    private String triggerMsg;
    
    // handle info
    private Date handleTime;//處理完成時(shí)間
    private int handleCode;//處理結(jié)果
    private String handleMsg;
}
  • 應(yīng)用執(zhí)行器信息
public class XxlJobGroup {

    private int id;
    private String appName;
    private String title;
    private int order;
    private int addressType;    // 執(zhí)行器地址類型:0=自動(dòng)注冊、1=手動(dòng)錄入
    private String addressList;    // 執(zhí)行器地址列表,多地址逗號分隔(手動(dòng)錄入)
}

三 任務(wù)觸發(fā)

  • quartz調(diào)度觸發(fā)執(zhí)行RemoteHttpBean.executeInternal
protected void executeInternal(JobExecutionContext context)
        throws JobExecutionException {

    // load jobId
    JobKey jobKey = context.getTrigger().getJobKey();
    Integer jobId = Integer.valueOf(jobKey.getName());

    // trigger
    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
}
  • JobTriggerPoolHelper使用線程池,每個(gè)任務(wù)觸發(fā)一個(gè)線程執(zhí)行
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
}

public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
    triggerPool.execute(new Runnable() {
        @Override
        public void run() {
            XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
        }
    });
}
  • 初始化任務(wù)調(diào)度的信息
TriggerParam triggerParam = new TriggerParam();
//任務(wù)id
triggerParam.setJobId(jobInfo.getId());
//任務(wù)處理函數(shù),參數(shù)
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
//阻塞策略
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
//任務(wù)執(zhí)行超時(shí)時(shí)間配置
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
//任務(wù)觸發(fā)記錄id
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
//任務(wù)執(zhí)行函數(shù)源碼信息
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
//分片信息
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
  • 簡單任務(wù)按照執(zhí)行器路由策略選擇執(zhí)行器
    executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

  • xxlrpc發(fā)送任務(wù)觸發(fā)消息

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}
  • 通知開始監(jiān)控任務(wù)觸發(fā)記錄
    JobFailMonitorHelper.monitor(jobLog.getId());

四 定時(shí)任務(wù)

  • JobRegistryMonitorHelper執(zhí)行器心跳掃描,定時(shí)掃描執(zhí)行器心跳時(shí)間,刪除過期的執(zhí)行器
  • JobFailMonitorHelper任務(wù)狀態(tài)監(jiān)控告警及失敗重試
    blockqueu存儲所有本調(diào)度器待監(jiān)控的任務(wù),定時(shí)進(jìn)行檢查任務(wù)。
    按照告警策略,進(jìn)行失敗重試或者發(fā)送告警。
    任務(wù)執(zhí)行中,則繼續(xù)監(jiān)控
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容