分布式定時(shí)任務(wù)調(diào)度框架實(shí)踐

本文首發(fā)于 vivo互聯(lián)網(wǎng)技術(shù) 微信公眾號
https://mp.weixin.qq.com/s/l4vuYpNRjKxQRkRTDhyg2Q
作者:陳王榮

分布式任務(wù)調(diào)度框架幾乎是每個(gè)大型應(yīng)用必備的工具,本文介紹了任務(wù)調(diào)度框架使用的需求背景和痛點(diǎn),對業(yè)界普遍使用的開源分布式任務(wù)調(diào)度框架的使用進(jìn)行了探究實(shí)踐,并分析了這幾種框架的優(yōu)劣勢和對自身業(yè)務(wù)的思考。

一、業(yè)務(wù)背景

1.1 為什么需要使用定時(shí)任務(wù)調(diào)度

(1)時(shí)間驅(qū)動(dòng)處理場景:整點(diǎn)發(fā)送優(yōu)惠券,每天更新收益,每天刷新標(biāo)簽數(shù)據(jù)和人群數(shù)據(jù)。
(2) 批量處理數(shù)據(jù): 按月批量統(tǒng)計(jì)報(bào)表數(shù)據(jù),批量更新短信狀態(tài),實(shí)時(shí)性要求不高。
(3) 異步執(zhí)行解耦:活動(dòng)狀態(tài)刷新,異步執(zhí)行離線查詢,與內(nèi)部邏輯解耦。

1.2 使用需求和痛點(diǎn)

(1)任務(wù)執(zhí)行監(jiān)控告警能力。
(2)任務(wù)可靈活動(dòng)態(tài)配置,無需重啟。
(3)業(yè)務(wù)透明,低耦合,配置精簡,開發(fā)方便。
(4)易測試。
(5)高可用,無單點(diǎn)故障。
(6)任務(wù)不可重復(fù)執(zhí)行,防止邏輯異常。
(7)大任務(wù)的分發(fā)并行處理能力。

二、開源框架實(shí)踐與探索

2.1 Java 原生 Timer 和ScheduledExecutorService

2.1.1 Timer使用

timer.png

Timer缺陷:
(1)Timer底層是使用單線程來處理多個(gè)Timer任務(wù),這意味著所有任務(wù)實(shí)際上都是串行執(zhí)行,前一個(gè)任務(wù)的延遲會(huì)影響到之后的任務(wù)的執(zhí)行。
(2)由于單線程的緣故,一旦某個(gè)定時(shí)任務(wù)在運(yùn)行時(shí),產(chǎn)生未處理的異常,那么不僅當(dāng)前這個(gè)線程會(huì)停止,所有的定時(shí)任務(wù)都會(huì)停止。
(3)Timer任務(wù)執(zhí)行是依賴于系統(tǒng)絕對時(shí)間,系統(tǒng)時(shí)間變化會(huì)導(dǎo)致執(zhí)行計(jì)劃的變更。

由于上述缺陷,盡量不要使用Timer, idea中也會(huì)明確提示,使用ScheduledThreadPoolExecutor替代Timer 。

2.1.2 ScheduledExecutorService使用

ScheduledExecutorService對于Timer的缺陷進(jìn)行了修補(bǔ),首先ScheduledExecutorService內(nèi)部實(shí)現(xiàn)是ScheduledThreadPool線程池,可以支持多個(gè)任務(wù)并發(fā)執(zhí)行。
對于某一個(gè)線程執(zhí)行的任務(wù)出現(xiàn)異常,也會(huì)處理,不會(huì)影響其他線程任務(wù)的執(zhí)行,另外ScheduledExecutorService是基于時(shí)間間隔的延遲,執(zhí)行不會(huì)由于系統(tǒng)時(shí)間的改變發(fā)生變化。
當(dāng)然,ScheduledExecutorService也有自己的局限性:只能根據(jù)任務(wù)的延遲來進(jìn)行調(diào)度,無法滿足基于絕對時(shí)間和日歷調(diào)度的需求。

2.2 Spring Task

2.2.1 Spring Task 使用

spring task 是spring自主開發(fā)的輕量級定時(shí)任務(wù)框架,不需要依賴其他額外的包,配置較為簡單。
此處使用注解配置


SpringTask.png

2.2.2 Spring Task缺陷

Spring Task 本身不支持持久化,也沒有推出官方的分布式集群模式,只能靠開發(fā)者在業(yè)務(wù)應(yīng)用中自己手動(dòng)擴(kuò)展實(shí)現(xiàn),無法滿足可視化,易配置的需求。

2.3 永遠(yuǎn)經(jīng)典的 Quartz

2.3.1 基本介紹

Quartz框架是Java領(lǐng)域最著名的開源任務(wù)調(diào)度工具,也是目前事實(shí)上的定時(shí)任務(wù)標(biāo)準(zhǔn),幾乎全部的開源定時(shí)任務(wù)框架都是基于Quartz核心調(diào)度構(gòu)建而成。

2.3.2 原理解析

核心組件和架構(gòu)


quartz.png

關(guān)鍵概念
(1)Scheduler</strong>:任務(wù)調(diào)度器,是執(zhí)行任務(wù)調(diào)度的控制器。本質(zhì)上是一個(gè)計(jì)劃調(diào)度容器,注冊了全部Trigger和對應(yīng)的JobDetail, 使用線程池作為任務(wù)運(yùn)行的基礎(chǔ)組件,提高任務(wù)執(zhí)行效率。
(2)Trigger:觸發(fā)器,用于定義任務(wù)調(diào)度的時(shí)間規(guī)則,告訴任務(wù)調(diào)度器什么時(shí)候觸發(fā)任務(wù),其中CronTrigger是基于cron表達(dá)式構(gòu)建的功能強(qiáng)大的觸發(fā)器。</p>
(3)Calendar:日歷特定時(shí)間點(diǎn)的集合。一個(gè)trigger可以包含多個(gè)Calendar,可用于排除或包含某些時(shí)間點(diǎn)。
(4)JobDetail:是一個(gè)可執(zhí)行的工作,用來描述Job實(shí)現(xiàn)類及其它相關(guān)的靜態(tài)信息,如Job的名稱、監(jiān)聽器等相關(guān)信息。
(5)Job:任務(wù)執(zhí)行接口,只有一個(gè)execute方法,用于執(zhí)行真正的業(yè)務(wù)邏輯。
(6)JobStore:任務(wù)存儲(chǔ)方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存儲(chǔ)在JVM的內(nèi)存中,有丟失和數(shù)量受限的風(fēng)險(xiǎn),JDBCJobStore是將任務(wù)信息持久化到數(shù)據(jù)庫中,支持集群。

2.3.3 實(shí)踐說明

(1)關(guān)于Quartz的基本使用
可參考Quartz官方文檔和網(wǎng)上博客實(shí)踐教程。
(2)業(yè)務(wù)使用要滿足動(dòng)態(tài)修改和重啟不丟失, 一般需要使用數(shù)據(jù)庫進(jìn)行保存。
Quartz本身支持JDBCJobStore,但是其配置的數(shù)據(jù)表比較多,官方推薦配置可參照官方文檔,超過10張表,業(yè)務(wù)使用比較重。
在使用的時(shí)候只需要存在基本trigger配置和對應(yīng)任務(wù)以及相關(guān)執(zhí)行日志的表即可滿足絕大部分需求。</p>
(3)組件化
將quartz動(dòng)態(tài)任務(wù)配置信息持久化到數(shù)據(jù)庫,將數(shù)據(jù)操作包裝成基本jar包,供項(xiàng)目之間使用,引用項(xiàng)目只需要引入jar包依賴和配置對應(yīng)的數(shù)據(jù)表,使用時(shí)就可以對Quartz配置透明。
(4)擴(kuò)展
集群模式: 通過故障轉(zhuǎn)移和負(fù)載均衡實(shí)現(xiàn)了任務(wù)的高可用性,通過數(shù)據(jù)庫的鎖機(jī)制來確保任務(wù)執(zhí)行的唯一性,但是集群特性僅僅只是用來HA,節(jié)點(diǎn)數(shù)量的增加并不會(huì)提升單個(gè)任務(wù)的執(zhí)行效率,不能實(shí)現(xiàn)水平擴(kuò)展。
Quartz插件:可以對特定需要進(jìn)行擴(kuò)展,比如增加觸發(fā)器和任務(wù)執(zhí)行日志,任務(wù)依賴串行處理場景,可參考:quartz插件——實(shí)現(xiàn)任務(wù)之間的串行調(diào)度

2.3.4 缺陷和不足

(1)需要把任務(wù)信息持久化到業(yè)務(wù)數(shù)據(jù)表,和業(yè)務(wù)有耦合。
(2)調(diào)度邏輯和執(zhí)行邏輯并存于同一個(gè)項(xiàng)目中,在機(jī)器性能固定的情況下,業(yè)務(wù)和調(diào)度之間不可避免地會(huì)相互影響。
(3)quartz集群模式下,是通過數(shù)據(jù)庫獨(dú)占鎖來唯一獲取任務(wù),任務(wù)執(zhí)行并沒有實(shí)現(xiàn)完善的負(fù)載均衡機(jī)制。

2.4 輕量級神器 XXL-JOB

2.4.1 基本介紹

XXL-JOB是一個(gè)輕量級分布式任務(wù)調(diào)度平臺,主打特點(diǎn)是平臺化,易部署,開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴(kuò)展,代碼仍在持續(xù)更新中。

“調(diào)度中心”是任務(wù)調(diào)度控制臺,平臺自身并不承擔(dān)業(yè)務(wù)邏輯,只是負(fù)責(zé)任務(wù)的統(tǒng)一管理和調(diào)度執(zhí)行,并且提供任務(wù)管理平臺, “執(zhí)行器” 負(fù)責(zé)接收“調(diào)度中心”的調(diào)度并執(zhí)行,可直接部署執(zhí)行器,也可以將執(zhí)行器集成到現(xiàn)有業(yè)務(wù)項(xiàng)目中。 通過將任務(wù)的調(diào)度控制和任務(wù)的執(zhí)行解耦,業(yè)務(wù)使用只需要關(guān)注業(yè)務(wù)邏輯的開發(fā)。
主要提供了任務(wù)的動(dòng)態(tài)配置管理、任務(wù)監(jiān)控和統(tǒng)計(jì)報(bào)表以及調(diào)度日志幾大功能模塊,支持多種運(yùn)行模式和路由策略,可基于對應(yīng)執(zhí)行器機(jī)器集群數(shù)量進(jìn)行簡單分片數(shù)據(jù)處理。

2.4.2 原理解析

2.1.0版本前核心調(diào)度模塊都是基于quartz框架,2.1.0版本開始自研調(diào)度組件,移除quartz依賴 ,使用時(shí)間輪調(diào)度。


2.4.3 實(shí)踐說明

詳細(xì)配置和介紹參考官方文檔

2.4.3.1 demo使用:

示例1:實(shí)現(xiàn)簡單任務(wù)配置,只需要繼承IJobHandler 抽象類,并聲明注解

@JobHandler(value="offlineTaskJobHandler")
@Component
public class OfflineTaskJobHandler extends IJobHandler {
  
   @Reference(check = false,version = "cms-dev",group="cms-service")
   private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
  
   @Override
   public ReturnT<String> execute(String param) throws Exception {
      XxlJobLogger.log(" offlineTaskJobHandler start.");
  
      try {
         offlineTaskExecutorFacade.executeOfflineTask();
      } catch (Exception e) {
         XxlJobLogger.log("offlineTaskJobHandler-->exception." , e);
         return FAIL;
      }
  
      XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end.");
      return SUCCESS;
   }
}

示例2:分片廣播任務(wù)。

@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
  
   @Override
   public ReturnT<String> execute(String param) throws Exception {
  
      // 分片參數(shù)
      ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
      XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());
  
      // 業(yè)務(wù)邏輯
      for (int i = 0; i < shardingVO.getTotal(); i++) {
         if (i == shardingVO.getIndex()) {
            XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
         } else {
            XxlJobLogger.log("第 {} 片, 忽略", i);
         }
      }
  
      return SUCCESS;
   }
}
2.4.3.2 整合dubbo

(1)引入dubbo-spring-boot-starter和業(yè)務(wù)facade jar包依賴。

<dependency>
    <groupId>com.alibaba.spring.boot</groupId>
    <artifactId>dubbo-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
  
<dependency>
    <groupId>com.demo.service</groupId>
    <artifactId>xxx-facade</artifactId>
    <version>1.9-SNAPSHOT</version>
</dependency>

(2)配置文件加入dubbo消費(fèi)端配置(可根據(jù)環(huán)境定義多個(gè)配置文件,通過profile切換)。

## Dubbo 服務(wù)消費(fèi)者配置
spring.dubbo.application.name=xxl-job
  
spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183
spring.dubbo.port=20880
  
spring.dubbo.version=demo
spring.dubbo.group=demo-service

(3)代碼中通過@Reference注入facade接口即可。

@Reference(check = false,version = demo,group = demo-service)
private OfflineTaskExecutorFacade offlineTaskExecutorFacade; 

(4)啟動(dòng)程序加入@EnableDubboConfiguration注解。

@SpringBootApplication
@EnableDubboConfiguration
public class XxlJobExecutorApplication {
   public static void main(String[] args) {
        SpringApplication.run(XxlJobExecutorApplication.class, args);
   }
}

2.4.4 任務(wù)可視化配置

內(nèi)置了平臺項(xiàng)目,方便了開發(fā)者對任務(wù)的管理和執(zhí)行日志的監(jiān)控,并提供了一些便于測試的功能。


2.4.5 擴(kuò)展

(1)任務(wù)監(jiān)控和報(bào)表的優(yōu)化。
(2)任務(wù)報(bào)警方式的擴(kuò)展,比如加入告警中心,提供內(nèi)部消息,短信告警。
(3)對實(shí)際業(yè)務(wù)內(nèi)部執(zhí)行出現(xiàn)異常情況下的不同監(jiān)控告警和重試策略。

2.5 高可用 Elastic-Job

2.5.1 基本介紹

Elastic-Job是一個(gè)分布式調(diào)度解決方案,由兩個(gè)相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。
Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)。
可惜的是已經(jīng)兩年沒有迭代更新記錄。

2.5.2 原理解析

elastic.png

2.5.3 實(shí)踐說明

2.5.3.1 demo使用

(1)安裝zookeeper,配置注冊中心config,配置文件加入注冊中心zk的配置。

@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
  
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
                                             @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

注冊中心zk的配置:

spring.application.name=demo_elasticjob
  
regCenter.serverList=localhost:2181
regCenter.namespace=demo_elasticjob
  
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd

(2)配置數(shù)據(jù)源config,并配置文件中加入數(shù)據(jù)源配置。

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceProperties {
    private String url;
    private String username;
    private String password;
  
    @Bean
    @Primary
    public DataSource getDataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        return dataSource;
    }
}

數(shù)據(jù)源配置

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd

(3)配置事件config。

@Configuration
public class JobEventConfig {
    @Autowired
    private DataSource dataSource;
  
    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
}

(4)為了便于靈活配置不同的任務(wù)觸發(fā)事件,加入ElasticSimpleJob注解。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
  
    @AliasFor("cron")
    String value() default "";
  
    @AliasFor("value")
    String cron() default "";
  
    String jobName() default "";
  
    int shardingTotalCount() default 1;
  
    String shardingItemParameters() default "";
  
    String jobParameter() default "";
}

(5)對配置進(jìn)行初始化。

@Configuration
@ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")
public class ElasticJobAutoConfiguration {
  
    @Value("${regCenter.serverList}")
    private String serverList;
  
    @Value("${regCenter.namespace}")
    private String namespace;
  
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private DataSource dataSource;
  
    @PostConstruct
    public void initElasticJob() {
        ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        regCenter.init();
        Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
  
        for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
            SimpleJob simpleJob = entry.getValue();
            ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
  
            String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
  
            JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
            SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
            jobScheduler.init();
        }
    }
}

(6)實(shí)現(xiàn) SimpleJob接口,按上文中方法整合dubbo, 完成業(yè)務(wù)邏輯。

@ElasticSimpleJob(
        cron = "*/10 * * * * ?",
        jobName = "OfflineTaskJob",
        shardingTotalCount = 2,
        jobParameter = "測試參數(shù)",
        shardingItemParameters = "0=A,1=B")
@Component
public class MySimpleJob implements SimpleJob {
    Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
  
    @Reference(check = false, version = "cms-dev", group = "cms-service")
    private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
  
  
    @Override
    public void execute(ShardingContext shardingContext) {
  
        offlineTaskExecutorFacade.executeOfflineTask();
  
        logger.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " +
                        "當(dāng)前分片項(xiàng): %s.當(dāng)前參數(shù): %s," +
                        "作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
    }
}

2.6 其余開源框架

(1) Saturn :Saturn是唯品會(huì)開源的一個(gè)分布式任務(wù)調(diào)度平臺,在Elastic Job的基礎(chǔ)上進(jìn)行了改造。
(2) SIA-TASK :是宜信開源的分布式任務(wù)調(diào)度平臺。

三、優(yōu)劣勢對比和業(yè)務(wù)場景適配思考

業(yè)務(wù)思考:
(1)豐富任務(wù)監(jiān)控?cái)?shù)據(jù)和告警策略。
(2) 接入統(tǒng)一登錄和權(quán)限控制。
(3) 進(jìn)一步簡化業(yè)務(wù)接入步驟。

四、結(jié)語

對于并發(fā)場景不是特別高的系統(tǒng)來說,xxl-job配置部署簡單易用,不需要引入多余的組件,同時(shí)提供了可視化的控制臺,使用起來非常友好,是一個(gè)比較好的選擇。希望直接利用開源分布式框架能力的系統(tǒng),建議根據(jù)自身的情況來進(jìn)行合適的選型。

附:參考文獻(xiàn)

quartz插件——實(shí)現(xiàn)任務(wù)之間的串行調(diào)度

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容