elastic-job動(dòng)態(tài)添加定時(shí)任務(wù)

在elastic-job的使用過程中,我們會(huì)遇到動(dòng)態(tài)添加定時(shí)任務(wù)的時(shí)候,但是官網(wǎng)上面并沒有對(duì)這塊內(nèi)容進(jìn)行說明。按照我的理解以及官網(wǎng)上面elastic-job的框架圖,ej的定時(shí)任務(wù)其實(shí)是存儲(chǔ)在zookeeper的一個(gè)個(gè)節(jié)點(diǎn)上面,所以通過給zookeeper添加對(duì)應(yīng)的節(jié)點(diǎn)即可完成定時(shí)任務(wù)的添加動(dòng)作。

下面上代碼:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.exception.JobSystemException;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DynamicAddJob implements SimpleJob{
    private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";

    /***
     * @param date 時(shí)間
     * @return cron類型的日期
     */
    public static String getCron(final Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
        String formatTimeStr = "";
        if (date != null) {
            formatTimeStr = sdf.format(date);
        }
        return formatTimeStr;
    }


    public static void main(String[] args){
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
        ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);
        long now = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            String cron = getCron(new Date(now + (i + 1) * 50000));
            JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("dynamicDemoJob-" + i, cron, 2).build();
            SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, DynamicAddJob.class.getCanonicalName());
            JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
            try {
                jobScheduler.init();
            }catch (JobSystemException e){
                e.printStackTrace();
            }
        }
    }

    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()){
            case 0:
                System.out.println("doing sharding 0...job name is "+shardingContext.getJobName());
                // do something by sharding item 0
                break;
            case 1:
                System.out.println("doing sharding 1...job name is "+shardingContext.getJobName());
                // do something by sharding item 1
                break;
        }
    }
}

這里用到比較重要的一個(gè)類是JobScheduler,這是lite-core里面一個(gè)比較核心的類,這個(gè)類其實(shí)就是我們的job,他的構(gòu)造方法包含以下參數(shù):

  • CoordinatorRegistryCenter regCenter:注冊(cè)中心,這里是zookeeper
  • LiteJobConfiguration liteJobConfig:定時(shí)任務(wù)的配置信息

這里可以看一下LiteJobConfiguration這個(gè)類,采用了設(shè)計(jì)模式中的建造者模式進(jìn)行構(gòu)建。可能看著會(huì)比較摸不著頭腦,里面的Builder跟平時(shí)的不太一樣,這里我們需要知道的是ej的源碼采用了lombok這個(gè)代碼簡(jiǎn)化的工具,只需要通過注解的形式就能將我們平時(shí)所需要的get/set和構(gòu)造器的內(nèi)容在編譯時(shí)創(chuàng)建出來,不需要在代碼中體現(xiàn),能夠大大簡(jiǎn)化我們的代碼。

另外還遇到一個(gè)坑。這段代碼不能重復(fù)使用,第一次跑的時(shí)候沒問題,過段時(shí)間再次跑這個(gè)代碼時(shí),會(huì)在init()處報(bào)錯(cuò),原因是我們新建的job根本不能被fire,我跟了進(jìn)去。發(fā)現(xiàn),job的cron表達(dá)式表示的時(shí)間還是以前的時(shí)間,這就奇怪了,明明我這邊配置了一個(gè)新的時(shí)間。通過debug,進(jìn)入init方法中,發(fā)現(xiàn)他會(huì)更新job信息,而更新時(shí),會(huì)去zk上面load配置信息,而zk的znode節(jié)點(diǎn)是老的節(jié)點(diǎn),上面存儲(chǔ)的配置信息也是老的,所以這塊的cron表達(dá)式也是舊的時(shí)間,根本不會(huì)被執(zhí)行,下面貼出源碼,供大家參考。

init()源碼:

    /**
     * 初始化作業(yè).
     */
    public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

updateJobConfiguration()的源碼如下:

    /**
     * 更新作業(yè)配置.
     *
     * @param liteJobConfig 作業(yè)配置
     * @return 更新后的作業(yè)配置
     */
    public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
        configService.persist(liteJobConfig);
        return configService.load(false);
    }

load()源碼如下:

    /**
     * 讀取作業(yè)配置.
     * 
     * @param fromCache 是否從緩存中讀取
     * @return 作業(yè)配置
     */
    public LiteJobConfiguration load(final boolean fromCache) {
        String result;
        if (fromCache) {
            result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
            if (null == result) {
                result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
            }
        } else {
            result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
        }
        return LiteJobConfigurationGsonFactory.fromJson(result);
    }

可以發(fā)現(xiàn)這塊load有兩種,一種是從緩存(這里的緩存使用Map來實(shí)現(xiàn)的TreeCache)中獲取getJobNodeData,一種是從注冊(cè)中心也就是zookeeper中獲取getJobNodeDataDirectly。load的時(shí)候,根據(jù)的是zk的路徑,其實(shí)也就是任務(wù)的jobName,所以我們要盡量避免任務(wù)名稱的重復(fù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,678評(píng)論 19 139
  • scheduler定時(shí)調(diào)度系統(tǒng)是大多行業(yè)項(xiàng)目都需要的,傳統(tǒng)的spring-job模式,個(gè)人感覺已經(jīng)out了,因?yàn)榇?..
    安琪拉_4b7e閱讀 2,973評(píng)論 4 6
  • 《分布式任務(wù)調(diào)度平臺(tái)XXL-JOB》 一、簡(jiǎn)介 1.1 概述 XXL-JOB是一個(gè)輕量級(jí)分布式任務(wù)調(diào)度框架,其核心...
    許雪里閱讀 16,945評(píng)論 3 29
  • 你終于要回來了! 其實(shí)我知道你對(duì)于回福建的猶豫,畢竟不是自個(gè)熟悉的地方。 但是我希望能成為你歸來的其中一個(gè)哪怕很小...
    GandA閱讀 298評(píng)論 0 1
  • 每次看書的時(shí)候都特喜歡劇情大反轉(zhuǎn)的結(jié)局,特享受那種震撼感。 所有的伏筆在開篇時(shí)早已埋下,所有的結(jié)局在開始時(shí)早已...
    汐情閱讀 484評(píng)論 2 1

友情鏈接更多精彩內(nèi)容