定時調(diào)度框架:Elastic-Job

(1)簡介

elastic-job 是由當(dāng)當(dāng)網(wǎng)基于quartz 二次開發(fā)之后的分布式調(diào)度解決方案 , 由兩個相對獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成 。

elastic-job主要的設(shè)計理念是無中心化的分布式定時調(diào)度框架,思路來源于Quartz的基于數(shù)據(jù)庫的高可用方案。但數(shù)據(jù)庫沒有分布式協(xié)調(diào)功能,所以在高可用方案的基礎(chǔ)上增加了彈性擴(kuò)容和數(shù)據(jù)分片的思路,以便于更大限度的利用分布式服務(wù)器的資源。

elastic-job是當(dāng)當(dāng)網(wǎng)基于Zookepper 、Quartz開源的一個java分布式定時任務(wù),解決了Quartz不支持分布式的弊端.

elastic-job由兩個相互獨(dú)立子項(xiàng)目Elastic-Job-Lite 、 Elastic-Job-Cloud組成.

Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供最輕量級的分布式任務(wù)的協(xié)調(diào)服務(wù),外部依賴僅Zookeeper

(2)Thread

package com.zhaoyang.demo;

import java.io.IOException;

import java.time.LocalDateTime;

public class A {

? ? // 每隔3s執(zhí)行一次。

? ? public static void main(String[] args) throws IOException {

? ? ? ? new Thread(() -> {

? ? ? ? ? ? while (true) {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? Thread.sleep(3000);

? ? ? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? System.out.println("執(zhí)行任務(wù)"+ LocalDateTime.now());

? ? ? ? ? ? }

? ? ? ? }).start();

? ? ? ? System.in.read();

? ? }

}

(3)Timer

package com.zhaoyang.demo;

import java.time.LocalDateTime;

import java.util.Timer;

import java.util.TimerTask;

public class B {

? ? // 5s之后開始執(zhí)行,后續(xù)每隔3s執(zhí)行一次。

? ? public static void main(String[] args) {

? ? ? ? Timer timer = new Timer();

? ? ? ? timer.schedule(new TimerTask() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void run() {

? ? ? ? ? ? ? ? System.out.println("執(zhí)行任務(wù)"+ LocalDateTime.now());

? ? ? ? ? ? }

? ? ? ? }, 5000,3000);

? ? }

}

(4)ScheduledThreadPoolExecutor

package com.zhaoyang.demo;

import java.time.LocalDateTime;

import java.util.concurrent.ScheduledThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class C {

? ? // 5s之后開始執(zhí)行,后續(xù)每隔3s執(zhí)行一次。

? ? public static void main(String[] args) {

? ? ? ? ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

? ? ? ? executor.scheduleWithFixedDelay(new Runnable() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void run() {

? ? ? ? ? ? ? ? System.out.println("執(zhí)行任務(wù)"+ LocalDateTime.now());

? ? ? ? ? ? }

? ? ? ? }, 5, 3, TimeUnit.SECONDS);

? ? }

}

(5)Spring Task

package com.zhaoyang.demo;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

/**

* 以上幾種方式都有幾個共同的缺點(diǎn):

*

* 單線程執(zhí)行,若前一個任務(wù)執(zhí)行時間較長,會導(dǎo)致下一個任務(wù)饑餓阻塞

* 無分布式協(xié)調(diào)機(jī)制,如果只有一個節(jié)點(diǎn)就會單點(diǎn)風(fēng)險,如果部署多個節(jié)點(diǎn)就會有并發(fā)執(zhí)行的問題

* 隨著任務(wù)規(guī)模增多,無統(tǒng)一視角對其進(jìn)行任務(wù)進(jìn)度進(jìn)行追蹤和管控

* 功能比較簡單,沒有超時、重試等高級特性

*/

@Component

public class MySpringTask {

? ? @Scheduled(cron = "0/5 * * * * ?")

? ? public void test(){

? ? ? ? System.out.println("執(zhí)行SpringTask");

? ? }

}

(6)SimpleJob簡單作業(yè)-基礎(chǔ)

package com.zhaoyang.elastic;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import java.time.LocalDateTime;

// (1)SimpleJob-簡單作業(yè)

public class MySimpleJob implements SimpleJob {

? ? @Override

? ? public void execute(ShardingContext context) {

? ? ? ? System.out.println("執(zhí)行任務(wù)"+ LocalDateTime.now());

? ? }

}

package com.zhaoyang.elastic;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

// (2)SimpleJob-簡單作業(yè):集成springboot

@Component

public class MySimpleJob2 implements SimpleJob {

? ? @Override

? ? public void execute(ShardingContext context) {

? ? ? ? System.out.println("執(zhí)行任務(wù)2"+ LocalDateTime.now());

? ? }

}

(7)DataflowJob數(shù)據(jù)流作業(yè)-基礎(chǔ)

package com.zhaoyang.elastic;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;

import java.time.LocalDateTime;

import java.util.ArrayList;

import java.util.List;

// (2)DataflowJob-數(shù)據(jù)流作業(yè)

public class MyDataflowJob implements DataflowJob<String> {

? ? @Override

? ? public List<String> fetchData(ShardingContext shardingContext) {

? ? ? ? List<String> data = new ArrayList<>();

? ? ? ? data.add("數(shù)據(jù)1");

? ? ? ? data.add("數(shù)據(jù)2");

? ? ? ? data.add("數(shù)據(jù)3");

? ? ? ? data.add("數(shù)據(jù)4");

? ? ? ? return data;

? ? }

? ? @Override

? ? public void processData(ShardingContext shardingContext, List<String> list) {

? ? ? ? System.out.println(LocalDateTime.now()+"處理數(shù)據(jù):"+list);

? ? }

}

(8)SimpleJob簡單作業(yè)-進(jìn)階

package com.zhaoyang;

import com.zhaoyang.elastic.MySimpleJob;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

// (1)SimpleJob-簡單作業(yè)

public class TestMySimpleJob {

? ? public static void main(String[] args) {

? ? ? ? new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();

? ? }

? ? // 連接Zookeeper

? ? private static CoordinatorRegistryCenter createRegistryCenter() {

? ? ? ? CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));

? ? ? ? regCenter.init();

? ? ? ? return regCenter;

? ? }

? ? // 創(chuàng)建作業(yè)配置

? ? private static JobConfiguration createJobConfiguration() {

? ? ? ? return JobConfiguration.newBuilder("MySimpleJob", 1)

? ? ? ? ? ? ? ? .cron("0/3 * * * * ?")

? ? ? ? ? ? ? ? .build();

? ? }

}

(9)DataflowJob數(shù)據(jù)流作業(yè)-進(jìn)階

package com.zhaoyang;

import com.zhaoyang.elastic.MyDataflowJob;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

/**

*

* (2)DataflowJob-數(shù)據(jù)流作業(yè)

*

* streaming.process=true,表示開啟流式處理,默認(rèn)為false

* overwrite=true,表示要重寫Job配置,如果不設(shè)置這個,新修改的或新增的配置將不會生效

* 一旦這么做了之后,我們會發(fā)現(xiàn)以上代碼會不停的執(zhí)行任務(wù),而不是每隔3s執(zhí)行一次了。

*

* 這是因?yàn)?,如果開啟流式處理,則作業(yè)只有在 fetchData 方法的返回值為 null 或集合容量為空時,才停止抓取,否則作業(yè)將一直運(yùn)行下去; 如果關(guān)閉流式處理,則作業(yè)只會在每次作業(yè)執(zhí)行過程中執(zhí)行一次 fetchData 和 processData 方法,隨即完成本次作業(yè)。

*

* 所以,以上代碼每次調(diào)用 fetchData 方法都能獲取到數(shù)據(jù),所以會一直執(zhí)行。

*

* 如果采用流式作業(yè)處理方式,那么就需要業(yè)務(wù)代理自己來控制什么時候從fetchData獲取不到數(shù)據(jù),從而停止本次任務(wù)的執(zhí)行。

*/

public class TestMyDataflowJob {

? ? public static void main(String[] args) {

? ? ? ? new ScheduleJobBootstrap(createRegistryCenter(), new MyDataflowJob(), createJobConfiguration()).schedule();

? ? }

? ? // 連接Zookeeper

? ? private static CoordinatorRegistryCenter createRegistryCenter() {

? ? ? ? CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));

? ? ? ? regCenter.init();

? ? ? ? return regCenter;

? ? }

? ? // 創(chuàng)建作業(yè)配置

? ? private static JobConfiguration createJobConfiguration() {

? ? ? ? return JobConfiguration.newBuilder("MyDataflowJob", 1)

? ? ? ? ? ? ? ? .cron("0/3 * * * * ?")

? ? ? ? ? ? ? ? .build();

? ? }

}

(10)ScheduleJobBootstrap腳本作業(yè)

package com.zhaoyang;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

import org.apache.shardingsphere.elasticjob.script.props.ScriptJobProperties;

// (3)腳本作業(yè)

/**

* 注意ScheduleJobBootstrap的第二個參數(shù)為"SCRIPT",另外通過設(shè)置script.command.line來配置要執(zhí)行的腳本。

*

* 其底層其實(shí)就是利用的CommandLine來執(zhí)行的命令,所以只要在你機(jī)器上能執(zhí)行的命令,那么就可以在這里進(jìn)行設(shè)置并執(zhí)行。

*/

public class TestScriptJob {

? ? public static void main(String[] args) {

? ? ? ? new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT", createJobConfiguration()).schedule();

? ? }

? ? private static CoordinatorRegistryCenter createRegistryCenter() {

? ? ? ? CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));

? ? ? ? regCenter.init();

? ? ? ? return regCenter;

? ? }

? ? private static JobConfiguration createJobConfiguration() {

? ? ? ? // 創(chuàng)建作業(yè)配置

? ? ? ? return JobConfiguration.newBuilder("MyScriptJob", 1)

? ? ? ? ? ? ? ? .cron("0/5 * * * * ?")

? ? ? ? ? ? ? ? .setProperty(ScriptJobProperties.SCRIPT_KEY, "java -version")

? ? ? ? ? ? ? ? .overwrite(true)

? ? ? ? ? ? ? ? .build();

? ? }

}

(11)HTTP作業(yè)

package com.zhaoyang;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;

import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

// (4)HTTP作業(yè)(3.0.0-beta 提供)

/**

* 注意ScheduleJobBootstrap的第二個參數(shù)為"HTTP",另外通過設(shè)置http.uri、http.method等參數(shù)來配置請求信息。

*

* 其底層其實(shí)就是利用的HttpURLConnection來實(shí)現(xiàn)的。

*

* 如果要看到調(diào)用結(jié)果,得把日志級別設(shè)置為debug,因?yàn)樵贖ttpJobExecutor源碼中中是這么打印請求結(jié)果的:

*/

public class TestHttpJob {

? ? public static void main(String[] args) {

? ? ? ? new ScheduleJobBootstrap(createRegistryCenter(), "HTTP", createJobConfiguration()).schedule();

? ? }

? ? private static CoordinatorRegistryCenter createRegistryCenter() {

? ? ? ? CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));

? ? ? ? regCenter.init();

? ? ? ? return regCenter;

? ? }

? ? private static JobConfiguration createJobConfiguration() {

? ? ? ? // 創(chuàng)建作業(yè)配置

? ? ? ? return JobConfiguration.newBuilder("MyHttpJob", 1)

? ? ? ? ? ? ? ? .cron("0/5 * * * * ?")

? ? ? ? ? ? ? ? .setProperty(HttpJobProperties.URI_KEY, "http://www.baidu.com")

? ? ? ? ? ? ? ? .setProperty(HttpJobProperties.METHOD_KEY, "GET")

? ? ? ? ? ? ? ? .setProperty(HttpJobProperties.DATA_KEY, "source=ejob") // 請求體

? ? ? ? ? ? ? ? .overwrite(true)

? ? ? ? ? ? ? ? .build();

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容