5 xxl-job原理-- 執(zhí)行器注冊問題

xxl-job: v2.0.2 原理 目錄學(xué)習(xí)

xxl-job-admin 調(diào)度中心, 每隔30s, 會檢測 注冊表, 刪除90s之前的注冊信息

1. XxlJobScheduler 調(diào)度中心的代碼

// init i18n
// admin registry monitor run
//  admin  registry 開始工作,將注冊表信息注入到執(zhí)行器表中, 每隔30s檢測,刪除90s之前的數(shù)據(jù)
//  1.  auto registry group   
//  2. remove dead address (admin/executor)
//  3. fresh online address (admin/executor)
//  4. fresh group address
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
// 1、fail retry monitor
// 2、fail alarm monitor
JobFailMonitorHelper.getInstance().start();
 // init   admin-server  , 做一些準(zhǔn)備工作
 initRpcProvider();
// start-schedule      
//           schedule thread 
//           ringThread
 JobScheduleHelper.getInstance().start();
// 1、預(yù)讀10s內(nèi)調(diào)度任務(wù)
// 2、推送時間輪
     // 時間輪刻度計算
    過期超10s:本地忽略,當(dāng)前時間開始計算下次觸發(fā)時間
    過期10s內(nèi):立即觸發(fā)一次,當(dāng)前時間開始計算下次觸發(fā)時間
   未過期:正常觸發(fā),遞增計算下次觸發(fā)時間
    push async ring
3、更新trigger信息



2.觸發(fā)任務(wù)

JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);

3.修正執(zhí)行器注冊 ,dashboard能夠看到其注冊信息

JobRegistryMonitorHelper

public void start() {
        registryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        // auto registry group
                        List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
//                      if (groupList!=null && !groupList.isEmpty()) {

                        // remove dead address (admin/executor)
                        XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(RegistryConfig.DEAD_TIMEOUT);

                        // fresh online address (admin/executor)
                        Map<String, List<String>> appAddressMap = new ConcurrentHashMap<String, List<String>>();
                        appAddressMap = XxlJobScheduler.jobGroupCache;
                        List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
                        if (list != null) {
                            for (XxlJobRegistry item : list) {
                                if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                    String appName = item.getRegistryKey();
                                    List<String> registryList = appAddressMap.get(appName);
                                    if (registryList == null) {
                                        registryList = new ArrayList<String>();
                                    }

                                    if (!registryList.contains(item.getRegistryValue())) {
                                        registryList.add(item.getRegistryValue());
                                    }
                                    appAddressMap.put(appName, registryList);
                                }
                            }
                        }

                        // fresh group address
                        for (Map.Entry<String, List<String>> entry : appAddressMap.entrySet()) {
                            for (XxlJobGroup group : groupList) {
                                if (entry.getKey().equals(group.getAppName())) {
                                    List<String> registryList = appAddressMap.get(group.getAppName());
                                    String addressListStr = null;
                                    if (registryList != null && !registryList.isEmpty()) {
                                        Collections.sort(registryList);
                                        addressListStr = Joiner.on(",").join(registryList);
                                    }
                                    group.setAddressList(addressListStr);
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                                } else {
                                    if ("xxl-job-executor-sample".equals(group.getAppName())) {
                                        continue;
                                    }
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().remove(group.getId());
                                }
                            }
                            String appName = entry.getKey();
                            String addressListStr = Joiner.on(",").join(entry.getValue());
                            XxlJobGroup group = new XxlJobGroup();
                            group.setAppName(appName);
                            group.setAddressType(0);
                            group.setAddressList(addressListStr);
                            group.setTitle("自動注入的執(zhí)行器");
                            int order = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findMaxOrder();
                            group.setOrder(order + 1);
                            List<XxlJobGroup> oldjobGroup = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAppName(group);
                            if (oldjobGroup == null || oldjobGroup.isEmpty()) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().save(group);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
            }
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
        registryThread.start();
    }

總結(jié)

  1. 開啟國際化i18n
  2. 開啟admin registry 注冊器
  3. 開啟jobFail 失敗執(zhí)行器
  4. 開啟admin server, 用于構(gòu)建xxlRpcProviderFactory和servletServerHandler
  5. 開啟定時任務(wù)觸發(fā)器, job scheudle

PS: 若你覺得可以、還行、過得去、甚至不太差的話,可以“關(guān)注”一下,就此謝過!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容