?開始分析elasticJob的啟動過程,首先看官方example的啟動實(shí)例:
//javaMain.java
// CHECKSTYLE:OFF
public static void main(final String[] args) throws IOException {
// CHECKSTYLE:ON
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(null);
setUpSimpleJob(regCenter, jobEventConfig);
setUpDataflowJob(regCenter, jobEventConfig);
setUpScriptJob(regCenter, jobEventConfig);
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("javaSimpleJob", "0/5 * * * * ?", 3).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
}
private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0/5 * * * * ?", 3).build();
ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
}
?從官方的Example代碼中看,三種不同類型的作業(yè)完成基本的參數(shù)組裝之后,都是交給JobScheduler這個(gè)對象的init()方法去統(tǒng)一初始化,那么,這個(gè)類到底在elastic-job系統(tǒng)中,承擔(dān)著什么樣的角色,一起來看一下JobScheduler對象到底做了些什么事情:
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
?看JobScheduler的構(gòu)造方法,首先作業(yè)注冊之前,會將job任務(wù)統(tǒng)一交由注冊器JobRegistry統(tǒng)一管理,JobRegistry對象保存有job注冊的所有信息,且從名字可以看出,該對象是單例;

?看JobRegistry的API,能看到每一個(gè)job都有一個(gè)對應(yīng)的job實(shí)例,和一個(gè)jobSchedulerController(作業(yè)控制器),這就和example實(shí)例中的javaMain類中的代碼相互論證,setUpDataflowJob,setUpDataflowJob,setUpScriptJob中都new 了一個(gè)新的JobScheduler,而jobSchduler中new了一個(gè)新的SchedulerFacade(原來以為會和quartz的集群方案一樣,共用一個(gè)Scheduler,看來是自己想錯(cuò)了,elasticJob中每個(gè)作業(yè)都有一個(gè)Scheduler)。 ElasticJobListener是job監(jiān)控的listener,后面再單獨(dú)講,這里帶過,然后組裝數(shù)據(jù)。
?而在官方的example中,得知,job的初始化是在JobScheduler.init()方法中,構(gòu)造方法只是組裝jobScheduler使用到的一些服務(wù),和參數(shù),那么init方法到底做了哪些事情,去啟動job服務(wù)的。看代碼:
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
?首先,啟動的時(shí)候,會將job的配置信息通過scheduler傳遞給configService服務(wù)通過zk保存或更新最新的配置信息,然后將job的分片參數(shù),jobSchdulerController注冊到JobRegistry單例對象里,上面已經(jīng)提到,所有的job相關(guān)的信息都是通過單例JobRegistry對象去統(tǒng)一管理的,registerStartUpInfo方法中通過不同的服務(wù),做了不同服務(wù)的幾件事情,啟動所有的作業(yè)監(jiān)聽服務(wù),選舉主節(jié)點(diǎn),持久化服務(wù)器上線服務(wù),重新分片等,看代碼:
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
?然后看到JobSchdulerController.scheduleJob,只看scheduleJob方法名,答案已經(jīng)很明顯了,在這個(gè)方法中會去啟動quartz的scheduler。從init代碼中初始化JobScheduleController對象得知,每初始化一個(gè)JobScheduleController,就會創(chuàng)建一個(gè)新的quartz的Scheduler,也就是說一個(gè)job對應(yīng)一個(gè)quartz的scheduler,一個(gè)quartz的scheduler對應(yīng)一個(gè)job,一一對應(yīng)。
public void scheduleJob(final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
//quartz的scheduler
scheduler.scheduleJob(jobDetail, createTrigger(cron));
}
scheduler.start(); //啟動quartz的scheduler
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
?看代碼已經(jīng)很清楚了。
總結(jié)一下啟動過程中的幾個(gè)對象:
JobRegistry: 保存所有job的所有相關(guān)信息。
JobScheduleController:封裝quartz的接口信息,相當(dāng)于ElasticJob調(diào)用quartz的API。
JobScheduler: 作業(yè)入口,所有job都要通過jobScheduler進(jìn)行初始化。
最后,看一下類圖

?就這樣,elasticJob和quartz結(jié)合在一起,從此不分離,最終執(zhí)行作業(yè)LiteJob。