xxl-job-admin的源碼分析

一、設(shè)計(jì)思路

借用官網(wǎng)的話:
將調(diào)度行為抽象形成“調(diào)度中心”公共平臺(tái),而平臺(tái)自身并不承擔(dān)業(yè)務(wù)邏輯,“調(diào)度中心”負(fù)責(zé)發(fā)起調(diào)度請(qǐng)求。
將任務(wù)抽象成分散的JobHandler,交由“執(zhí)行器”統(tǒng)一管理,“執(zhí)行器”負(fù)責(zé)接收調(diào)度請(qǐng)求并執(zhí)行對(duì)應(yīng)的JobHandler中業(yè)務(wù)邏輯。因此,“調(diào)度”和“任務(wù)”兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴(kuò)展性;

借用官網(wǎng)的圖:

xxl_job架構(gòu)圖.png

簡(jiǎn)化的架構(gòu)圖:
簡(jiǎn)化架構(gòu)圖.png

二、啟動(dòng)原理

1.xxl-job-admin服務(wù)啟動(dòng)原理

啟動(dòng)XxlJobAdminApplication類,在spring容器實(shí)例化之前,會(huì)執(zhí)行實(shí)現(xiàn)了 InitializingBean 接口的 afterPropertiesSet() 的方法,這里是利用了springboot的拓展接口,來(lái)將xxl-job的相關(guān)bean給注冊(cè)到IOC容器當(dāng)中。然后執(zhí)行最關(guān)鍵的 xxlJobScheduler.init()

XxlJobAdminConfig.png

調(diào)用 XxlJobSchedulerinit() 方法

XxlJobScheduler.png

JobRegistryHelperJobFailMonitorHelper,JobCompleteHelper,JobLogReportHelper,JobScheduleHelper這五個(gè)類都是使用了餓漢式的單例模式(個(gè)人覺得還需要將構(gòu)造方法私有化),

image.png

a.調(diào)用 JobTriggerPoolHelper.toStart() 本質(zhì)就是調(diào)用JobTriggerPoolHelper的start()方法,構(gòu)造出兩個(gè)線程池,如下圖所示的,一個(gè)快觸發(fā)線程池,一個(gè)慢觸發(fā)線程池。

image.png

b.調(diào)用 JobRegistryHelper.getInstance().start(),方法內(nèi)部主要做了兩件事,一件事是初始化一個(gè)注冊(cè)或者移除的線程池 registryMonitorThread,然后創(chuàng)建一個(gè) registryMonitorThread 的守護(hù)線程。設(shè)置成守護(hù)線程。

JobRegistryHelper.start()方法.png

單獨(dú)把線程構(gòu)造拿出來(lái)分析。

// for monitor
        registryMonitorThread = new Thread(new Runnable() {
            @Override
            public void run() {
                // 死循環(huán)
                while (!toStop) {
                    try {
                        // 從 xxl_job_group 表中查詢出 自動(dòng)注冊(cè)的執(zhí)行器  (address_type:0 自動(dòng)注冊(cè),1:手動(dòng)注冊(cè))
                        List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                        if (groupList!=null && !groupList.isEmpty()) {
                            // 從 xxl_job_registry 表中找到更新時(shí)間 小于當(dāng)前時(shí)間+死亡間隔時(shí)間(就是找到注冊(cè)表中規(guī)定時(shí)間沒有更新的記錄)
                            List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                            if (ids!=null && ids.size()>0) {
                                // 如果找到在規(guī)定時(shí)間內(nèi)沒有更新的注冊(cè),就直接刪除這些注冊(cè)執(zhí)行器
                                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                            }

                            // fresh online address (admin/executor)
                            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                            // 從 xxl_job_registry 表中找到更新時(shí)間 小于當(dāng)前時(shí)間+死亡間隔時(shí)間(就是找到注冊(cè)表中規(guī)定時(shí)間沒有更新的記錄)
                            List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                            if (list != null) {
                                // 遍歷當(dāng)前過(guò)期的注冊(cè)器,將過(guò)期的注冊(cè)器的 register_value 注冊(cè)地址保存到臨時(shí)變量 appAddressMap 中
                                for (XxlJobRegistry item: list) {
                                    // 如果注冊(cè)器類型(registry_group) 是 EXECUTOR
                                    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                        String appname = item.getRegistryKey();
                                        List<String> registryList = appAddressMap.get(appname);
                                        if (registryList == null) {
                                            registryList = new ArrayList<String>();
                                        }

                                        if (!registryList.contains(item.getRegistryValue())) {
                                            registryList.add(item.getRegistryValue());
                                        }
                                        appAddressMap.put(appname, registryList);
                                    }
                                }
                            }

                            // 刷新對(duì)應(yīng)執(zhí)行器地址和最新修改時(shí)間
                            for (XxlJobGroup group: groupList) {
                                List<String> registryList = appAddressMap.get(group.getAppname());
                                String addressListStr = null;
                                if (registryList!=null && !registryList.isEmpty()) {
                                    Collections.sort(registryList);
                                    StringBuilder addressListSB = new StringBuilder();
                                    for (String item:registryList) {
                                        addressListSB.append(item).append(",");
                                    }
                                    addressListStr = addressListSB.toString();
                                    addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                                }
                                group.setAddressList(addressListStr);
                                group.setUpdateTime(new Date());

                                XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                    try {
                        // 休眠心跳的時(shí)間
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
            }
        });

C.調(diào)用 JobFailMonitorHelper.getInstance().start(),方法內(nèi)部創(chuàng)建一個(gè) monitorThread 的守護(hù)線程。設(shè)置成守護(hù)線程。

image.png

下面代碼詳細(xì)介紹 JobFailMonitorHelper.getInstance().start() 里面的 構(gòu)造的線程主要做的是什么事。

    public void start(){
        monitorThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // monitor 死循環(huán)監(jiān)控,每10秒鐘(每次執(zhí)行完休眠十秒)執(zhí)行一遍監(jiān)控的內(nèi)容
                while (!toStop) {
                    try {
                        // 從 xxl_job_log 查詢出失敗的日志記錄
                        List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                        if (failLogIds!=null && !failLogIds.isEmpty()) {
                            for (long failLogId: failLogIds) {

                                // 修改 xxl_job_log 將警報(bào)狀態(tài)改成 (-1鎖定狀態(tài)) 告警狀態(tài):0-默認(rèn)、1-無(wú)需告警、2-告警成功、3-告警失敗
                                int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                                if (lockRet < 1) {
                                    continue;
                                }
                                // 根據(jù)失敗的日志id,查詢出該條日志
                                XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                                // 根據(jù)這條日志記錄的 執(zhí)行器Id查詢對(duì)應(yīng)的執(zhí)行器
                                XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                                // 如果 日志的重試次數(shù)大于0,就直接觸發(fā)JobTriggerPoolHelper.trigger()方法,這個(gè)方法就是admin遠(yuǎn)程調(diào)用執(zhí)行器的方法。
                                if (log.getExecutorFailRetryCount() > 0) {
                                    JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                                    String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                                    log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                                    // 觸發(fā)完成,將失敗重試的次數(shù)減一,更新
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                                }

                                // 2、fail alarm monitor
                                int newAlarmStatus = 0;     // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無(wú)需告警、2-告警成功、3-告警失敗
                                // 如果存在失敗的日志,發(fā)送警報(bào)郵件
                                if (info != null) {
                                    boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                                    newAlarmStatus = alarmResult?2:3;
                                } else {
                                    newAlarmStatus = 1;
                                }
                                // 最后更新一下 xxl_job_log 表的 alarm_status 狀態(tài)字段
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                        }
                    }

                    try {
                          // 休眠 10秒
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

            }
        });
        monitorThread.setDaemon(true);
        monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
        monitorThread.start();
    }

調(diào)用 JobCompleteHelper.getInstance().start(),方法內(nèi)部創(chuàng)建一個(gè) monitorThread 的守護(hù)線程。設(shè)置成守護(hù)線程;以及一個(gè)callbackThreadPool線程池。

image.png

monitorThread 線程內(nèi)部的工作內(nèi)容

// for monitor
        monitorThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // wait for JobTriggerPoolHelper-init
                try {
                    // 上來(lái)休眠50毫秒,等待 JobTriggerPoolHelper 初始化完成
                    TimeUnit.MILLISECONDS.sleep(50);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                // monitor
                while (!toStop) {
                    try {
                        // 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過(guò)10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失?。?                        Date losedTime = DateUtil.addMinutes(new Date(), -10);
                        List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                        if (losedJobIds!=null && losedJobIds.size()>0) {
                            for (Long logId: losedJobIds) {

                                XxlJobLog jobLog = new XxlJobLog();
                                jobLog.setId(logId);

                                jobLog.setHandleTime(new Date());
                                jobLog.setHandleCode(ReturnT.FAIL_CODE);
                                jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
                                // 處理日志,并更新執(zhí)行器的完成結(jié)果
                                XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                        }
                    }

                    try {
                            // 休眠60秒,再執(zhí)行一次
                        TimeUnit.SECONDS.sleep(60);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

            }
        });

E.最后看一下 JobScheduleHelper.getInstance().start(); 方法。方法里面最主要的就是起兩個(gè)線程,分別將兩個(gè)線程設(shè)置成守護(hù)線程,ringThread是最干活的線程,scheduleThread是檢測(cè)并調(diào)度執(zhí)行器任務(wù)的線程。

image.png

看一下這ringThread和scheduleThread里面都是干啥的。這里先總結(jié)一下,scheduleThread線程主要就是將需要執(zhí)行的定時(shí)器任務(wù)分個(gè)類,并維護(hù)每個(gè)定時(shí)器里面的下次執(zhí)行時(shí)間,以及處理 調(diào)度過(guò)期的 執(zhí)行器,要么立刻執(zhí)行一次,要么直接忽略,等待下次執(zhí)行,最后就是將需要在5秒內(nèi)執(zhí)行的定時(shí)器放進(jìn)一個(gè)map里面,交給ringThread線程去執(zhí)行定時(shí)器。而ringThread線程就是直接從map中拿到需要執(zhí)行的執(zhí)行器去執(zhí)行,并且每輪執(zhí)行只處理兩個(gè)時(shí)間點(diǎn)(毫秒級(jí))的所有執(zhí)行器。具體的可以看代碼里面的講解。

public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                // 這里默認(rèn)么個(gè)定時(shí)任務(wù)線程執(zhí)行耗時(shí)50毫秒,每秒1000毫秒,可以執(zhí)行20個(gè)任務(wù),快線程池默認(rèn)是200個(gè)最大線程數(shù),慢線程默認(rèn)是100個(gè)最大線程數(shù),
                // 所以當(dāng)線程數(shù)拉滿的情況下,每秒鐘可以處理任務(wù)數(shù)是:(100+200)*20 = 6000 ;所以這里的  preReadCount 表示預(yù)讀數(shù)(默認(rèn)最大:6000)
                // 如果想要提高并發(fā)性,通過(guò)修改快慢線程池的最大線程數(shù)這個(gè)參數(shù)調(diào)節(jié)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
                logger.info("==========================preReadCount ={}",preReadCount);

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        // 利用數(shù)據(jù)庫(kù)的行鎖
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、預(yù)讀數(shù)據(jù)
                        long nowTime = System.currentTimeMillis();
                        // 由于分析了最多可以執(zhí)行6000個(gè)任務(wù),所以這里在去查任務(wù)表的時(shí)候,最多去查出來(lái)6000條滿足條件的
                        // 條件:下次執(zhí)行時(shí)間 小于等于 (當(dāng)前時(shí)間 + 5秒) 并且  執(zhí)行狀態(tài)是  正在運(yùn)行的狀態(tài)
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // time-ring jump
                                //如果服務(wù)宕機(jī)了,或者重啟等等,導(dǎo)致超過(guò)了調(diào)度周期(5秒的調(diào)度周期),也就是本來(lái)由時(shí)間上的上一次或上很多次調(diào)度觸發(fā)的數(shù)據(jù)被本次調(diào)度查到了,
                                // 這就可能代表著可能中間存在多次調(diào)度未觸發(fā),而按照周期性一次一次計(jì)算下次預(yù)期調(diào)度時(shí)間,那這次調(diào)度完了計(jì)算出來(lái)的下次調(diào)度還是在當(dāng)前時(shí)間以前,
                                // 例如調(diào)度周期1分鐘調(diào)度一次,宕機(jī)5分鐘了,現(xiàn)在查到的預(yù)期調(diào)度時(shí)間為5分鐘前,如果直接調(diào)度成功會(huì)重復(fù)調(diào)度5次當(dāng)前時(shí)間以前的任務(wù),這里直接pass并計(jì)算下一次調(diào)度時(shí)間,
                                // 但是計(jì)算下一次調(diào)度時(shí)間也是傳入當(dāng)前時(shí)間,直接修正預(yù)期下次調(diào)度時(shí)間為當(dāng)前時(shí)間之后,因?yàn)檎{(diào)度時(shí)間周期為 5秒,所以會(huì)+ PRE_READ_MS 判斷,如果是一次性的調(diào)度則會(huì)補(bǔ)償這次調(diào)度
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 拿到設(shè)置的 調(diào)度過(guò)期策略
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    // 如果是忽略就不執(zhí)行,如果是立刻執(zhí)行一次,就立馬調(diào)用執(zhí)行一次該執(zhí)行器
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        // FIRE_ONCE_NOW 》 trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
                                    refreshNextValidTime(jobInfo, new Date());

                                    // 當(dāng)前的時(shí)間大于執(zhí)行器下次的執(zhí)行的時(shí)間,說(shuō)明上次執(zhí)行器可能遺漏了這個(gè)執(zhí)行器
                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    // 如果下次發(fā)送時(shí)間在當(dāng)前時(shí)間之后5秒內(nèi),會(huì)進(jìn)行第二次觸發(fā),放到另一個(gè)線程中執(zhí)行觸發(fā)邏輯
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second
                                        // 這里算出的結(jié)果保持在 0 - 59之間
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、將當(dāng)前任務(wù)放進(jìn)一個(gè)全局變量map中,讓ringThread線程去執(zhí)行
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                    // 取5秒的直接觸發(fā) 但是區(qū)別于上面是第一次觸發(fā)
                                } else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、從當(dāng)前時(shí)間算起,算出這個(gè)定時(shí)器下次應(yīng)該執(zhí)行時(shí)間
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3、更新執(zhí)行器,主要是更新 上次觸發(fā)時(shí)間,下次觸發(fā)時(shí)間,和當(dāng)前執(zhí)行器的狀態(tài)
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    // 如果上述操作耗時(shí)大于一秒直接進(jìn)入下次循環(huán),如果小于一秒需要再判斷
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            // 首先 System.currentTimeMillis()%1000 這里的取余,最大值是999毫秒,可以理解成極限的一秒
                            // 如果讀到了數(shù)據(jù) 休眠 (1 - (System.currentTimeMillis()%1000))秒,這里最大休眠1秒,最小接近不休眠直接執(zhí)行
                            // 因?yàn)樽x到了數(shù)據(jù),不知道接下來(lái)還有沒有數(shù)據(jù),這里為了趕工確保滿足條件的定時(shí)任務(wù)能快速被執(zhí)行。
                            // 如果讀不到數(shù)據(jù),休眠 (5 - (System.currentTimeMillis()%1000))秒,這里最大休眠5秒,最小休眠4秒
                            // 因?yàn)橐呀?jīng)讀不到數(shù)據(jù)了,多休息一下,讓定時(shí)器等到需要被執(zhí)行的時(shí)間點(diǎn)
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        // 上來(lái)休眠最大1秒,如果scheduleThread有執(zhí)行任務(wù),保證會(huì)向 ringData(map) 里面寫
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        // 從 ringData(map)里面拿到,
                        List<Integer> ringItemData = new ArrayList<>();
                        // 上面介紹過(guò) 這個(gè)map的key是在0-59之間,直接取當(dāng)前的秒數(shù)(0-59)
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時(shí)太長(zhǎng),跨過(guò)刻度,向前校驗(yàn)一個(gè)刻度;
                        for (int i = 0; i < 2; i++) {
                            // 循環(huán)兩次從 ringData(map)中拿到兩個(gè)時(shí)間點(diǎn)的執(zhí)行器ID集合 list,然后賦值給 ringItemData
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        // 如果 ringItemData 集合不為空,說(shuō)明有需要執(zhí)行的執(zhí)行器Id,就遍歷執(zhí)行里面Id對(duì)應(yīng)的執(zhí)行器
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            // 遍歷完成后,將 ringItemData 置空,然后等待線程 下次執(zhí)行
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

三、總結(jié)

1.xxl-job-admin的啟動(dòng)是利用springboot的擴(kuò)展接口InitializingBean來(lái)實(shí)現(xiàn)的,銷毀是利用擴(kuò)展DisposableBean接口來(lái)實(shí)現(xiàn)的。
2.xxl-job-admin的核心運(yùn)行流程由JobRegistryHelper,JobFailMonitorHelper,JobCompleteHelperJobLogReportHelper,JobScheduleHelper 這幾個(gè)Helper完成。
3.單臺(tái)xxl-job最多一秒鐘可以完成6000個(gè)執(zhí)行器任務(wù)的執(zhí)行。
4.集群環(huán)境下,同一個(gè)執(zhí)行器不出現(xiàn)并發(fā)執(zhí)行問(wèn)題其實(shí)是依賴了數(shù)據(jù)庫(kù)的行鎖實(shí)現(xiàn)的。

綜上所述,將xxl-job-admin中的啟動(dòng),以及如何調(diào)度核心部分就已經(jīng)說(shuō)完了,其實(shí)在執(zhí)行執(zhí)行器任務(wù)的時(shí)候里面還涉及到xxl-job的集群分片處理任務(wù)的原理,以及集群路由的原理,還有內(nèi)置server的設(shè)計(jì),以及xxl-job-admin遠(yuǎn)程觸發(fā)任務(wù)使用的RPC調(diào)用原理細(xì)節(jié),后面有空再整理吧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容