LTS用戶文檔
LTS(light-task-scheduler)主要用于解決分布式任務(wù)調(diào)度問題,支持實(shí)時(shí)任務(wù),定時(shí)任務(wù)和Cron任務(wù)。有較好的伸縮性,擴(kuò)展性,健壯穩(wěn)定性而被多家公司使用,同時(shí)也希望開源愛好者一起貢獻(xiàn)。
項(xiàng)目地址
github地址: https://github.com/ltsopensource/light-task-scheduler
oschina地址: http://git.oschina.net/hugui/light-task-scheduler
例子: https://github.com/ltsopensource/lts-examples
框架概況
LTS 有主要有以下四種節(jié)點(diǎn):
- JobClient:主要負(fù)責(zé)提交任務(wù), 并接收任務(wù)執(zhí)行反饋結(jié)果。
- JobTracker:負(fù)責(zé)接收并分配任務(wù),任務(wù)調(diào)度。
- TaskTracker:負(fù)責(zé)執(zhí)行任務(wù),執(zhí)行完反饋給JobTracker。
- LTS-Admin:(管理后臺(tái))主要負(fù)責(zé)節(jié)點(diǎn)管理,任務(wù)隊(duì)列管理,監(jiān)控管理等。
其中JobClient,JobTracker,TaskTracker節(jié)點(diǎn)都是無狀態(tài)的。 可以部署多個(gè)并動(dòng)態(tài)的進(jìn)行刪減,來實(shí)現(xiàn)負(fù)載均衡,實(shí)現(xiàn)更大的負(fù)載量, 并且框架采用FailStore策略使LTS具有很好的容錯(cuò)能力。
LTS注冊(cè)中心提供多種實(shí)現(xiàn)(Zookeeper,redis等),注冊(cè)中心進(jìn)行節(jié)點(diǎn)信息暴露,master選舉。(Mongo or Mysql)存儲(chǔ)任務(wù)隊(duì)列和任務(wù)執(zhí)行日志, netty or mina做底層通信, 并提供多種序列化方式fastjson, hessian2, java等。
LTS支持任務(wù)類型:
- 實(shí)時(shí)任務(wù):提交了之后立即就要執(zhí)行的任務(wù)。
- 定時(shí)任務(wù):在指定時(shí)間點(diǎn)執(zhí)行的任務(wù),譬如 今天3點(diǎn)執(zhí)行(單次)。
- Cron任務(wù):CronExpression,和quartz類似(但是不是使用quartz實(shí)現(xiàn)的)譬如 0 0/1 * * * ?
支持動(dòng)態(tài)修改任務(wù)參數(shù),任務(wù)執(zhí)行時(shí)間等設(shè)置,支持后臺(tái)動(dòng)態(tài)添加任務(wù),支持Cron任務(wù)暫停,支持手動(dòng)停止正在執(zhí)行的任務(wù)(有條件),支持任務(wù)的監(jiān)控統(tǒng)計(jì),支持各個(gè)節(jié)點(diǎn)的任務(wù)執(zhí)行監(jiān)控,JVM監(jiān)控等等.
架構(gòu)圖
概念說明
節(jié)點(diǎn)組
- 英文名稱 NodeGroup,一個(gè)節(jié)點(diǎn)組等同于一個(gè)小的集群,同一個(gè)節(jié)點(diǎn)組中的各個(gè)節(jié)點(diǎn)是對(duì)等的,等效的,對(duì)外提供相同的服務(wù)。
- 每個(gè)節(jié)點(diǎn)組中都有一個(gè)master節(jié)點(diǎn),這個(gè)master節(jié)點(diǎn)是由LTS動(dòng)態(tài)選出來的,當(dāng)一個(gè)master節(jié)點(diǎn)掛掉之后,LTS會(huì)立馬選出另外一個(gè)master節(jié)點(diǎn),框架提供API監(jiān)聽接口給用戶。
FailStore
- 顧名思義,這個(gè)主要是用于失敗了存儲(chǔ)的,主要用于節(jié)點(diǎn)容錯(cuò),當(dāng)遠(yuǎn)程數(shù)據(jù)交互失敗之后,存儲(chǔ)在本地,等待遠(yuǎn)程通信恢復(fù)的時(shí)候,再將數(shù)據(jù)提交。
- FailStore主要用戶JobClient的任務(wù)提交,TaskTracker的任務(wù)反饋,TaskTracker的業(yè)務(wù)日志傳輸?shù)膱?chǎng)景下。
- FailStore目前提供幾種實(shí)現(xiàn):leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用于可以自由選擇使用哪種,用戶也可以采用SPI擴(kuò)展使用自己的實(shí)現(xiàn)。
流程圖
下圖是一個(gè)標(biāo)準(zhǔn)的實(shí)時(shí)任務(wù)執(zhí)行流程。

目前后臺(tái)帶有由ztajy提供的一個(gè)簡易的認(rèn)證功能. 用戶名密碼在auth.cfg中,用戶自行修改.
特性
1、Spring支持
LTS可以完全不用Spring框架,但是考慮到很用用戶項(xiàng)目中都是用了Spring框架,所以LTS也提供了對(duì)Spring的支持,包括Xml和注解,引入lts-spring.jar即可。
2、業(yè)務(wù)日志記錄器
在TaskTracker端提供了業(yè)務(wù)日志記錄器,供應(yīng)用程序使用,通過這個(gè)業(yè)務(wù)日志器,可以將業(yè)務(wù)日志提交到JobTracker,這些業(yè)務(wù)日志可以通過任務(wù)ID串聯(lián)起來,可以在LTS-Admin中實(shí)時(shí)查看任務(wù)的執(zhí)行進(jìn)度。
3、SPI擴(kuò)展支持
SPI擴(kuò)展可以達(dá)到零侵入,只需要實(shí)現(xiàn)相應(yīng)的接口,并實(shí)現(xiàn)即可被LTS使用,目前開放出來的擴(kuò)展接口有
對(duì)任務(wù)隊(duì)列的擴(kuò)展,用戶可以不選擇使用mysql或者mongo作為隊(duì)列存儲(chǔ),也可以自己實(shí)現(xiàn)。
對(duì)業(yè)務(wù)日志記錄器的擴(kuò)展,目前主要支持console,mysql,mongo,用戶也可以通過擴(kuò)展選擇往其他地方輸送日志。
4、故障轉(zhuǎn)移
當(dāng)正在執(zhí)行任務(wù)的TaskTracker宕機(jī)之后,JobTracker會(huì)立馬將分配在宕機(jī)的TaskTracker的所有任務(wù)再分配給其他正常的TaskTracker節(jié)點(diǎn)執(zhí)行。
5、節(jié)點(diǎn)監(jiān)控
可以對(duì)JobTracker,TaskTracker節(jié)點(diǎn)進(jìn)行資源監(jiān)控,任務(wù)監(jiān)控等,可以實(shí)時(shí)的在LTS-Admin管理后臺(tái)查看,進(jìn)而進(jìn)行合理的資源調(diào)配。
6、多樣化任務(wù)執(zhí)行結(jié)果支持
LTS框架提供四種執(zhí)行結(jié)果支持,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,并對(duì)每種結(jié)果采取相應(yīng)的處理機(jī)制,譬如重試。
EXECUTE_SUCCESS: 執(zhí)行成功,這種情況,直接反饋客戶端(如果任務(wù)被設(shè)置了要反饋給客戶端)。
EXECUTE_FAILED:執(zhí)行失敗,這種情況,直接反饋給客戶端,不進(jìn)行重試。
EXECUTE_LATER:稍后執(zhí)行(需要重試),這種情況,不反饋客戶端,重試策略采用1min,2min,3min的策略,默認(rèn)最大重試次數(shù)為10次,用戶可以通過參數(shù)設(shè)置修改這個(gè)重試次數(shù)。
EXECUTE_EXCEPTION:執(zhí)行異常, 這種情況也會(huì)重試(重試策略,同上)
7、FailStore容錯(cuò)
采用FailStore機(jī)制來進(jìn)行節(jié)點(diǎn)容錯(cuò),F(xiàn)ail And Store,不會(huì)因?yàn)檫h(yuǎn)程通信的不穩(wěn)定性而影響當(dāng)前應(yīng)用的運(yùn)行。具體FailStore說明,請(qǐng)參考概念說明中的FailStore說明。
項(xiàng)目編譯打包
項(xiàng)目主要采用maven進(jìn)行構(gòu)建,目前提供shell腳本的打包。 環(huán)境依賴:Java(jdk1.6+) Maven
用戶使用一般分為兩種:
1、Maven構(gòu)建
可以通過maven命令將lts的jar包上傳到本地倉庫中。在父pom.xml中添加相應(yīng)的repository,并用deploy命令上傳即可。具體引用方式可以參考lts中的例子即可。
2、直接Jar引用
需要將lts的各個(gè)模塊打包成單獨(dú)的jar包,并且將所有l(wèi)ts依賴包引入。具體引用哪些jar包可以參考lts中的例子即可。
JobTracker和LTS-Admin部署
提供(cmd)windows和(shell)linux兩種版本腳本來進(jìn)行編譯和部署:
運(yùn)行根目錄下的sh build.sh或build.cmd腳本,會(huì)在dist目錄下生成lts-{version}-bin文件夾
下面是其目錄結(jié)構(gòu),其中bin目錄主要是JobTracker和LTS-Admin的啟動(dòng)腳本。jobtracker 中是 JobTracker的配置文件和需要使用到的jar包,lts-admin是LTS-Admin相關(guān)的war包和配置文件。 lts-{version}-bin的文件結(jié)構(gòu)
-- lts-${version}-bin
|-- bin
| |-- jobtracker.cmd
| |-- jobtracker.sh
| |-- lts-admin.cmd
| |-- lts-admin.sh
| |-- lts-monitor.cmd
| |-- lts-monitor.sh
| |-- tasktracker.sh
|-- conf
| |-- log4j.properties
| |-- lts-admin.cfg
| |-- lts-monitor.cfg
| |-- readme.txt
| |-- tasktracker.cfg
| |-- zoo
| |-- jobtracker.cfg
| |-- log4j.properties
| |-- lts-monitor.cfg
|-- lib
| |-- *.jar
|-- war
|-- jetty
| |-- lib
| |-- *.jar
|-- lts-admin.war
JobTracker啟動(dòng)。如果你想啟動(dòng)一個(gè)節(jié)點(diǎn),直接修改下conf/zoo下的配置文件,然后運(yùn)行 sh jobtracker.sh zoo start即可,如果你想啟動(dòng)兩個(gè)JobTracker節(jié)點(diǎn),那么你需要拷貝一份zoo,譬如命名為zoo2,修改下zoo2下的配置文件,然后運(yùn)行sh jobtracker.sh zoo2 start即可。logs文件夾下生成jobtracker-zoo.out日志。
LTS-Admin啟動(dòng).修改conf/lts-monitor.cfg和conf/lts-admin.cfg下的配置,然后運(yùn)行bin下的sh lts-admin.sh或lts-admin.cmd腳本即可。logs文件夾下會(huì)生成lts-admin.out日志,啟動(dòng)成功在日志中會(huì)打印出訪問地址,用戶可以通過這個(gè)訪問地址訪問了。
JobClient(部署)使用
需要引入lts的jar包有l(wèi)ts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。
API方式啟動(dòng)
JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobClient.start();
// 提交任務(wù)
Job job = new Job();
job.setTaskId("3213213123");
job.setParam("shopId", "11111");
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
// job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表達(dá)式
// job.setTriggerTime(new Date()); // 支持指定時(shí)間執(zhí)行
Response response = jobClient.submitJob(job);
Spring XML方式啟動(dòng)
<bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean">
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="nodeGroup" value="test_jobClient"/>
<property name="masterChangeListeners">
<list>
<bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
</list>
</property>
<property name="jobFinishedHandler">
<bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/>
</property>
<property name="configs">
<props>
<!-- 參數(shù) -->
<prop key="job.fail.store">leveldb</prop>
</props>
</property>
</bean>
Spring 全注解方式
@Configuration
public class LTSSpringConfig {
@Bean(name = "jobClient")
public JobClient getJobClient() throws Exception {
JobClientFactoryBean factoryBean = new JobClientFactoryBean();
factoryBean.setClusterName("test_cluster");
factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
factoryBean.setNodeGroup("test_jobClient");
factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
new MasterChangeListenerImpl()
});
Properties configs = new Properties();
configs.setProperty("job.fail.store", "leveldb");
factoryBean.setConfigs(configs);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
}
TaskTracker(部署使用)
需要引入lts的jar包有l(wèi)ts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。 ###定義自己的任務(wù)執(zhí)行類
public class MyJobRunner implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
try {
// TODO 業(yè)務(wù)邏輯
// 會(huì)發(fā)送到 LTS (JobTracker上)
jobContext.getBizLogger().info("測(cè)試,業(yè)務(wù)日志啊啊啊啊啊");
} catch (Exception e) {
return new Result(Action.EXECUTE_FAILED, e.getMessage());
}
return new Result(Action.EXECUTE_SUCCESS, "執(zhí)行成功了,哈哈");
}
}
API方式啟動(dòng)
TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(MyJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20);
taskTracker.start();
Spring XML方式啟動(dòng)
<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
<property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
<property name="bizLoggerLevel" value="INFO"/>
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="nodeGroup" value="test_trade_TaskTracker"/>
<property name="workThreads" value="20"/>
<property name="masterChangeListeners">
<list>
<bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
</list>
</property>
<property name="configs">
<props>
<prop key="job.fail.store">leveldb</prop>
</props>
</property>
</bean>
Spring注解方式啟動(dòng)
@Configuration
public class LTSSpringConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Bean(name = "taskTracker")
public TaskTracker getTaskTracker() throws Exception {
TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
factoryBean.setApplicationContext(applicationContext);
factoryBean.setClusterName("test_cluster");
factoryBean.setJobRunnerClass(MyJobRunner.class);
factoryBean.setNodeGroup("test_trade_TaskTracker");
factoryBean.setBizLoggerLevel("INFO");
factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
new MasterChangeListenerImpl()
});
factoryBean.setWorkThreads(20);
Properties configs = new Properties();
configs.setProperty("job.fail.store", "leveldb");
factoryBean.setConfigs(configs);
factoryBean.afterPropertiesSet();
// factoryBean.start();
return factoryBean.getObject();
}
}
參數(shù)說明
參數(shù)說明
使用建議
一般在一個(gè)JVM中只需要一個(gè)JobClient實(shí)例即可,不要為每種任務(wù)都新建一個(gè)JobClient實(shí)例,這樣會(huì)大大的浪費(fèi)資源,因?yàn)橐粋€(gè)JobClient可以提交多種任務(wù)。相同的一個(gè)JVM一般也盡量保持只有一個(gè)TaskTracker實(shí)例即可,多了就可能造成資源浪費(fèi)。當(dāng)遇到一個(gè)TaskTracker要運(yùn)行多種任務(wù)的時(shí)候,請(qǐng)參考下面的 "一個(gè)TaskTracker執(zhí)行多種任務(wù)"。
一個(gè)TaskTracker執(zhí)行多種任務(wù)
有的時(shí)候,業(yè)務(wù)場(chǎng)景需要執(zhí)行多種任務(wù),有些人會(huì)問,是不是要每種任務(wù)類型都要一個(gè)TaskTracker去執(zhí)行。我的答案是否定的,如果在一個(gè)JVM中,最好使用一個(gè)TaskTracker去運(yùn)行多種任務(wù),因?yàn)橐粋€(gè)JVM中使用多個(gè)TaskTracker實(shí)例比較浪費(fèi)資源(當(dāng)然當(dāng)你某種任務(wù)量比較多的時(shí)候,可以將這個(gè)任務(wù)單獨(dú)使用一個(gè)TaskTracker節(jié)點(diǎn)來執(zhí)行)。那么怎么才能實(shí)現(xiàn)一個(gè)TaskTracker執(zhí)行多種任務(wù)呢。下面是我給出來的參考例子。
/**
* 總?cè)肟冢?taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
* JobClient 提交 任務(wù)時(shí)指定 Job 類型 job.setParam("type", "aType")
*/
public class JobRunnerDispatcher implements JobRunner {
private static final ConcurrentHashMap<String/*type*/, JobRunner>
JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
static {
JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以從Spring中拿
JOB_RUNNER_MAP.put("bType", new JobRunnerB());
}
@Override
public Result run(JobContext jobContext) throws Throwable {
Job job = jobContext.getJob();
String type = job.getParam("type");
return JOB_RUNNER_MAP.get(type).run(job);
}
}
class JobRunnerA implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
// TODO A類型Job的邏輯
return null;
}
}
class JobRunnerB implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
// TODO B類型Job的邏輯
return null;
}
}
TaskTracker的JobRunner測(cè)試
一般在編寫TaskTracker的時(shí)候,只需要測(cè)試JobRunner的實(shí)現(xiàn)邏輯是否正確,又不想啟動(dòng)LTS進(jìn)行遠(yuǎn)程測(cè)試。為了方便測(cè)試,LTS提供了JobRunner的快捷測(cè)試方法。自己的測(cè)試類集成com.github.ltsopensource.tasktracker.runner.JobRunnerTester即可,并實(shí)現(xiàn)initContext和newJobRunner方法即可。如lts-examples中的例子:
public class TestJobRunnerTester extends JobRunnerTester {
public static void main(String[] args) throws Throwable {
// Mock Job 數(shù)據(jù)
Job job = new Job();
job.setTaskId("2313213");
JobContext jobContext = new JobContext();
jobContext.setJob(job);
JobExtInfo jobExtInfo = new JobExtInfo();
jobExtInfo.setRetry(false);
jobContext.setJobExtInfo(jobExtInfo);
// 運(yùn)行測(cè)試
TestJobRunnerTester tester = new TestJobRunnerTester();
Result result = tester.run(jobContext);
System.out.println(JSON.toJSONString(result));
}
@Override
protected void initContext() {
// TODO 初始化Spring容器
}
@Override
protected JobRunner newJobRunner() {
return new TestJobRunner();
}
}
Spring Quartz Cron任務(wù)無縫接入
對(duì)于Quartz的Cron任務(wù)只需要在Spring配置中增加一下代碼就可以接入LTS平臺(tái)
<bean class="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean">
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="nodeGroup" value="quartz_test_group"/>
</bean>
Spring Boot 支持
@SpringBootApplication
@EnableJobTracker // 啟動(dòng)JobTracker
@EnableJobClient // 啟動(dòng)JobClient
@EnableTaskTracker // 啟動(dòng)TaskTracker
@EnableMonitor // 啟動(dòng)Monitor
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
剩下的就只是在application.properties中添加相應(yīng)的配置就行了, 具體見lts-example中的com.github.ltsopensource.examples.springboot包下的例子
多網(wǎng)卡選擇問題
當(dāng)機(jī)器有內(nèi)網(wǎng)兩個(gè)網(wǎng)卡的時(shí)候,有時(shí)候,用戶想讓LTS的流量走外網(wǎng)網(wǎng)卡,那么需要在host中,把主機(jī)名稱的映射地址改為外網(wǎng)網(wǎng)卡地址即可,內(nèi)網(wǎng)同理。
關(guān)于節(jié)點(diǎn)標(biāo)識(shí)問題
如果在節(jié)點(diǎn)啟動(dòng)的時(shí)候設(shè)置節(jié)點(diǎn)標(biāo)識(shí),LTS會(huì)默認(rèn)設(shè)置一個(gè)UUID為節(jié)點(diǎn)標(biāo)識(shí),可讀性會(huì)比較差,但是能保證每個(gè)節(jié)點(diǎn)的唯一性,如果用戶能自己保證節(jié)點(diǎn)標(biāo)識(shí)的唯一性,可以通過 setIdentity 來設(shè)置,譬如如果每個(gè)節(jié)點(diǎn)都是部署在一臺(tái)機(jī)器(一個(gè)虛擬機(jī))上,那么可以將identity設(shè)置為主機(jī)名稱
SPI擴(kuò)展說明
支持JobLogger,JobQueue等等的SPI擴(kuò)展
和其它解決方案比較
LTS-Admin使用jetty啟動(dòng)(默認(rèn)),不定期掛掉解決方案 見issue#389
歡迎關(guān)注 高廣超的簡書博客 與 收藏文章 !
歡迎關(guān)注 頭條號(hào):互聯(lián)網(wǎng)技術(shù)棧 !
個(gè)人介紹:
高廣超:多年一線互聯(lián)網(wǎng)研發(fā)與架構(gòu)設(shè)計(jì)經(jīng)驗(yàn),擅長設(shè)計(jì)與落地高可用、高性能、可擴(kuò)展的互聯(lián)網(wǎng)架構(gòu)。目前從事大數(shù)據(jù)相關(guān)研發(fā)與架構(gòu)工作。
本文首發(fā)在 高廣超的簡書博客 轉(zhuǎn)載請(qǐng)注明!