作業(yè)需要執(zhí)行,必然需要一個調度器,去統(tǒng)籌作業(yè)執(zhí)行的邏輯,這也是ElasticJob的核心內容;ElasticJob依賴注冊中心實現(xiàn)分片,所以調度器主要需要做的的事就是在任務啟動的時候,將任務的信息寫到注冊中心,其次就是啟動任務,具體的執(zhí)行邏輯需要慢慢分析;
1,作業(yè)調度器-JobScheduler
首先來看看JobScheduler的構造器;
1.1,構造器
private JobScheduler(final CoordinatorRegistryCenter regCenter,
final LiteJobConfiguration liteJobConfig,
final JobEventBus jobEventBus,
final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
- 首先使用作業(yè)注冊表JobRegistry注冊當前的作業(yè)信息,標注唯一作業(yè)的方式是,使用ip地址+虛擬機進程+作業(yè)名稱,來唯一標識某臺機器上運行的某個項目的某個作業(yè)實例;
- 其實是為監(jiān)聽器設置值,這個以后再講;
- 最后就是兩個門面類的初始化賦值;
1.2,啟動調度器-init
啟動調度器使用的是init方法,這個方法可以理解為啟動一個定時任務,只不過這個定時任務功能比較強大,不僅僅是簡單的定時執(zhí)行;
首先來看init方法:
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
- 第一行首先更新該作業(yè)的配置信息到注冊中心,更新的條件是注冊中心中,名為jobName+"config"的節(jié)點不存在,或者LiteJobConfiguration的overWrite字段設置為true時,會強制更新;
- 第二行代碼是將本次作業(yè)的分片總數(shù)注冊到注冊表JobRegistry中;
- 然后構建一個作業(yè)調度器的控制器JobScheduleController,這個控制器,可以實現(xiàn)調度,暫停,恢復,關閉等操作,使用的是quartz的Scheduler來實現(xiàn)的,所以構造是需要一個Scheduler對象,同時還需要一個JobDetail對象,這兩個對象后面講;
- 控制器構造完成后,將控制器和注冊中心注冊到作業(yè)注冊表(JobRegistry)中,也就是說這個注冊表中包含了所有作業(yè)的所有信息,當后期需要時,可以從注冊表中獲??;
- 再然后使用門面裝飾類schedulerFacade,注冊初始化信息到注冊中心,這里使用了裝飾器模式,將大量的邏輯分在在里面,需要另開一篇講解,總之他的任務就是講作業(yè)的各項信息寫到注冊中心;
- 最后,使用作業(yè)調度器的控制器,啟動調度操作,也就是啟動定時任務,使用quartz來實現(xiàn)的;
再來看看作業(yè)調度器的控制器需要的JobDetail是如何構造的:
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
- 構造方式很簡單,使用一個builder對象,然后將elasticJob對象和jobFacade對象放置進去;
- 但是這里面還有一點很有意思的東西,newJob的對象LiteJob,他是quartz的job接口的實現(xiàn)類,他內部擁有兩個屬性elasticJob和jobFacade,而后面的代碼向jobDataModel中(ELASTIC_JOB_DATA_MAP_KEY->elasticJob;JOB_FACADE_DATA_MAP_KEY->jobFacade)放置的也是這兩個屬性,quartz框架會將jobDataModel中的鍵值對賦值給newJob對象中對應的屬性;
- 還需要注意的是,當jobClass為ScriptJob時,elasticJob是沒有放置到jobDataModel中的,但是沒關系,在LiteJob中調用的JobExecutorFactory在調用時,如果elasticJob為null,那么就默認執(zhí)行ScriptJobExecutor;
再來看看Scheduler對象是如何構建:
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
- 這里注冊了一個JobTriggerListener,用來設置任務被錯過執(zhí)行的標記;
- JobShutdownHookPlugin是用來在Scheduler關閉的時候做掃尾工作的;
總結:
- 調度器的啟動邏輯較為復雜,大量邏輯包含在內部尚未解析,但是他主要做的事:
- 向注冊中心寫入各種節(jié)點信息;
- 向作業(yè)注冊表(JobRegistry)中,寫入作業(yè)的各種信息;
- 構建并啟動quartz的調度器,也就是啟動定時任務,執(zhí)行本次作業(yè);
2,作業(yè)的具體執(zhí)行
從上面代碼也可以看出,每次定時任務出發(fā)的時候,quratz會調用,實現(xiàn)了Job接口的LiteJob類的execute方法,那么我們就從這兒開始看起;
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
- 上面也分析到了,這個類的構造器只有空構造函數(shù),有quartz框架new出來,并且會設置elasticJob和jobFacade的值,值的來源是,構建JobDetail對象時,設置到jobDataMap中的同名屬性;
- 然后使用JobExecutorFactory根據elasticJob的類型的不同挑選對應的作業(yè)執(zhí)行器,代碼如下;
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
if (null == elasticJob) {
return new ScriptJobExecutor(jobFacade);
}
if (elasticJob instanceof SimpleJob) {
return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
}
if (elasticJob instanceof DataflowJob) {
return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
- 在構建JobDetail時說過,如果作業(yè)類型為ScriptJob,那么elasticJob是沒有值的,在此處,如果沒有值,就返回ScriptJobExecutor,與之前相對應;
- 其他的,根據作業(yè)類型,獲取相應的執(zhí)行器,如果沒有,直接報錯;
2.1,簡單作業(yè)執(zhí)行器-SimpleJobExecutor
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
@Override
protected void process(final ShardingContext shardingContext) {
simpleJob.execute(shardingContext);
}
}
這個類沒什么講的,只是向父類傳遞了一個參數(shù),然后重寫了一個方法,但是這個方法調用的是我們實現(xiàn)SimpleJob時復寫的方法,也就是我們自己寫的業(yè)務邏輯在此處執(zhí)行;主要還是看看他的父類中干了什么,先來看看父類的構造器;
2.1.1,執(zhí)行器的構造器
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
jobRootConfig = jobFacade.loadJobRootConfiguration(true);
jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
- 根據子類傳過來的JobFacade對象,獲取JobRootConfig,JobName;
- 以及執(zhí)行作業(yè)的線程池ExecutorService,和作業(yè)的異常處理器jobExceptionHandler,這兩個不同類型的字段是通過同一方法(getHandler)獲取的,值得借鑒;
private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
try {
Class<?> handlerClass = Class.forName(handlerClassName);
if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
return handlerClass.newInstance();
}
return getDefaultHandler(jobPropertiesEnum, handlerClassName);
} catch (final ReflectiveOperationException ex) {
return getDefaultHandler(jobPropertiesEnum, handlerClassName);
}
}
實現(xiàn)邏輯是這樣的:
- 方法參數(shù)給定一個 JobProperties.JobPropertiesEnum 類型的參數(shù)作為默認值;
- 如果能從核心配置類(JobCoreConfiguration)中獲取,定義作業(yè)配置的值,那么就使用配置的值,如果沒有,或者配置的不是方法參數(shù)中期望的類型,那么就使用參數(shù)中的默認值;
- 獲取默認值的方法如下;
private Object getDefaultHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum, final String handlerClassName) {
log.warn("Cannot instantiation class '{}', use default '{}' class.", handlerClassName, jobPropertiesEnum.getKey());
try {
return Class.forName(jobPropertiesEnum.getDefaultValue()).newInstance();
} catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new JobSystemException(e);
}
}
2.1.2,執(zhí)行器的execute方法
該方法邏輯比較長,Elasticjob也分成了幾段來實現(xiàn),我們就一段一段的看;
- 第一段
public final void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
try {
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
jobFacade.failoverIfNecessary();
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
- 首先檢查本機與注冊中心的時間誤差秒數(shù)是否在允許范圍;
- 如果maxTimeDiffSeconds配置為-1,表示不檢查;
- 需要注意一點,即使誤差過大,拋出異常,如果使用的是默認的異常處理器,那么也只是會打印error日志,而并不會阻礙程序;
- 然后獲取分片上下文;
- 這里的獲取到的分片邏輯有點復雜,在2.2中在來具體解釋一下,因為涉及到了失效轉移的問題;
- 如果沒有失效轉移,那么獲取的分片就是當前實例在執(zhí)行分片時獲取的分片,去除標記了disable的分片;
- 然后判斷,如果當前分片項仍在運行,是否需要設置任務被錯過執(zhí)行的標記;
- 如果需要設置,那么當前任務將被跳過,并設置任務被錯過執(zhí)行的標記;
- 如果不需要設置,那么接著執(zhí)行后面的邏輯;
- 需要注意的是分片項是否正在運行的判斷邏輯;
- 首先根據LiteJobConfiguration中的monitorExecution字段判斷是否監(jiān)控執(zhí)行,默認為true,如果為false,根本就不會設置 錯過執(zhí)行,即使上一次的定時任務還在執(zhí)行,這一次的定時任務也將啟動執(zhí)行;
- 其次,本項目實例拿到的分片中,有任意一個分片還在運行中,那么所持有的所有分片,都將錯過本次定時任務的執(zhí)行;
- 然后執(zhí)行監(jiān)聽器(ElasticJobListener)的beforeJobExecuted方法;
- 然后執(zhí)行第二段邏輯,下面再講;
- 然后判斷作業(yè)是否需要執(zhí)行錯過的任務,如果需要,那么還是執(zhí)行第二段邏輯;
- 被錯過執(zhí)行有兩種情況:
- 開啟了monitorExecution,發(fā)現(xiàn)上一次的任務還在執(zhí)行中,那么本實例拿到的所有分片的這一次的定時任務都將被錯過;
- 如果定時任務時間間距過小,如10秒,而任務執(zhí)行了12秒,那么quartz會觸發(fā)監(jiān)聽器JobTriggerListener,監(jiān)聽器會設置本作業(yè)實例的所有分片為錯過執(zhí)行;
- 然后判斷如果需要失效轉移, 則執(zhí)行作業(yè)失效轉移;
- 最后執(zhí)行監(jiān)聽器的afterJobExecuted方法;
- 第二段
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);
} finally {
// TODO 考慮增加作業(yè)失敗的狀態(tài),并且考慮如何處理作業(yè)失敗的整體回路
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
- 該段代碼如果monitorExecution屬性沒有開啟,將不會有什么意義;
- 首先需要在注冊中心中增加running節(jié)點,表示任務正在運行;
- 當處理完后,在finally中再將running移除;
- 第三段
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
- 這里對ShardingContexts中是否只包含了一個分片做了區(qū)分處理
- 如果只包含一個分片,比較好處理,直接進入第三段的處理即可;
- 如果包含多個分片,那么因為第二段代碼是夾在第一段代碼的監(jiān)聽器的兩個方法中間執(zhí)行的,必須要處理完所有的分片任務,才能執(zhí)行監(jiān)聽器的afterJobExecuted方法;
- 這里使用了CountDownLatch對象,該對象在初始化的時候需要設置一個數(shù)量值,每調用一次countDown方法,則數(shù)量會減少1,如果值不為0,那么線程會一直阻塞在await方法處,此處是為了實現(xiàn)線程等待而使用的,等待所有分片任務執(zhí)行完,再接著向下執(zhí)行;
- 第四段
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
protected abstract void process(ShardingContext shardingContext);
- 這一段沒什么邏輯,除了調用真實的業(yè)務邏輯外,就是發(fā)布一些事件;
- 真正執(zhí)行的業(yè)務代碼,由子類提供;
2.2,失效轉移
首先來解釋一下失效轉移的含義:
- 就是在某個定時任務,從這一次開始運行,到下一次開始運行之前,這之間的時間段內,某一個或多個作業(yè)實例出現(xiàn)了問題(或者作業(yè)實例的部分分片出了問題),那么這些出問題的作業(yè)實例,這一次的定時任務就相當于沒有執(zhí)行,那么失效轉移功能就會將這些沒有執(zhí)行的分片,轉移到其他正常機器,馬上觸發(fā)執(zhí)行;
- 當下一次任務的時間點到來時,開始重新分片,然后繼續(xù)正常執(zhí)行;
再來看看代碼層面的實現(xiàn):
- 首先是JobCrashedJobListener監(jiān)聽到了需要失效轉移的分片項;
- 然后逐一設置失效的分片項標記,具體位置在注冊中心的:{jobName}/leader/failover/items/{item};
- 然后開始逐一執(zhí)行作業(yè)失效轉移,先執(zhí)行主節(jié)點選舉爭搶當前分片,然后執(zhí)行回調;
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
- 回調使用的是FailoverLeaderExecutionCallback:
- 首先向{jobName}/sharding/{item}/failover中填充臨時數(shù)據;
- 然后移除先前的{jobName}/leader/failover/items/{item}中的數(shù)據;
- 最后構建一個jobScheduleController對象,調用triggerJob方法,馬上進行一次任務啟動操作;
再來看看AbstractElasticJobExecutor中獲取分片的邏輯:
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
shardingService.shardingIfNecessary();
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
- 如果開啟了失效轉移,那么就去獲取{jobName}/sharding下所有item中的failover節(jié)點,并篩選出當前作業(yè)實例的failover,將這些failover對應的item構造成ShardingContexts直接返回;這里的執(zhí)行邏輯,對應的是上面jobScheduleController對象調用triggerJob方法;
- 失效轉移執(zhí)行完成后,下一次再來獲取分片的時候,會執(zhí)行shardingIfNecessary方法,重新進行分片;