elastic-job 源碼解讀之job的啟動過程

?開始分析elasticJob的啟動過程,首先看官方example的啟動實(shí)例:

//javaMain.java
// CHECKSTYLE:OFF
   public static void main(final String[] args) throws IOException {
       // CHECKSTYLE:ON
       CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
       JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(null);
       setUpSimpleJob(regCenter, jobEventConfig);
       setUpDataflowJob(regCenter, jobEventConfig);
       setUpScriptJob(regCenter, jobEventConfig);
   }

  private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
       JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
       SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
       new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
   }

  private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
       JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
       SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
       new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
   }

   private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
       JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0/5 * * * * ?", 3).build();
       ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
       new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
   }

?從官方的Example代碼中看,三種不同類型的作業(yè)完成基本的參數(shù)組裝之后,都是交給JobScheduler這個(gè)對象的init()方法去統(tǒng)一初始化,那么,這個(gè)類到底在elastic-job系統(tǒng)中,承擔(dān)著什么樣的角色,一起來看一下JobScheduler對象到底做了些什么事情:

private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        this.liteJobConfig = liteJobConfig;
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }

?看JobScheduler的構(gòu)造方法,首先作業(yè)注冊之前,會將job任務(wù)統(tǒng)一交由注冊器JobRegistry統(tǒng)一管理,JobRegistry對象保存有job注冊的所有信息,且從名字可以看出,該對象是單例;


JobRegistry.png

?看JobRegistry的API,能看到每一個(gè)job都有一個(gè)對應(yīng)的job實(shí)例,和一個(gè)jobSchedulerController(作業(yè)控制器),這就和example實(shí)例中的javaMain類中的代碼相互論證,setUpDataflowJob,setUpDataflowJob,setUpScriptJob中都new 了一個(gè)新的JobScheduler,而jobSchduler中new了一個(gè)新的SchedulerFacade(原來以為會和quartz的集群方案一樣,共用一個(gè)Scheduler,看來是自己想錯(cuò)了,elasticJob中每個(gè)作業(yè)都有一個(gè)Scheduler)。 ElasticJobListener是job監(jiān)控的listener,后面再單獨(dú)講,這里帶過,然后組裝數(shù)據(jù)。
?而在官方的example中,得知,job的初始化是在JobScheduler.init()方法中,構(gòu)造方法只是組裝jobScheduler使用到的一些服務(wù),和參數(shù),那么init方法到底做了哪些事情,去啟動job服務(wù)的。看代碼:

   public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

?首先,啟動的時(shí)候,會將job的配置信息通過scheduler傳遞給configService服務(wù)通過zk保存或更新最新的配置信息,然后將job的分片參數(shù),jobSchdulerController注冊到JobRegistry單例對象里,上面已經(jīng)提到,所有的job相關(guān)的信息都是通過單例JobRegistry對象去統(tǒng)一管理的,registerStartUpInfo方法中通過不同的服務(wù),做了不同服務(wù)的幾件事情,啟動所有的作業(yè)監(jiān)聽服務(wù),選舉主節(jié)點(diǎn),持久化服務(wù)器上線服務(wù),重新分片等,看代碼:

  public void registerStartUpInfo(final boolean enabled) {
        listenerManager.startAllListeners();
        leaderService.electLeader();
        serverService.persistOnline(enabled);
        instanceService.persistOnline();
        shardingService.setReshardingFlag();
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

?然后看到JobSchdulerController.scheduleJob,只看scheduleJob方法名,答案已經(jīng)很明顯了,在這個(gè)方法中會去啟動quartz的scheduler。從init代碼中初始化JobScheduleController對象得知,每初始化一個(gè)JobScheduleController,就會創(chuàng)建一個(gè)新的quartz的Scheduler,也就是說一個(gè)job對應(yīng)一個(gè)quartz的scheduler,一個(gè)quartz的scheduler對應(yīng)一個(gè)job,一一對應(yīng)。

   public void scheduleJob(final String cron) {
        try {
            if (!scheduler.checkExists(jobDetail.getKey())) {
                //quartz的scheduler
                scheduler.scheduleJob(jobDetail, createTrigger(cron));
            }
            scheduler.start(); //啟動quartz的scheduler
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }

?看代碼已經(jīng)很清楚了。
總結(jié)一下啟動過程中的幾個(gè)對象:
JobRegistry: 保存所有job的所有相關(guān)信息。
JobScheduleController:封裝quartz的接口信息,相當(dāng)于ElasticJob調(diào)用quartz的API。
JobScheduler: 作業(yè)入口,所有job都要通過jobScheduler進(jìn)行初始化。
最后,看一下類圖

scheduler.png

?就這樣,elasticJob和quartz結(jié)合在一起,從此不分離,最終執(zhí)行作業(yè)LiteJob。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容