elastic-job-master 運(yùn)行實(shí)例

1.環(huán)境搭建

使用的官方的example-job-example中的elastic-job-example-lite-java
這里粘上github的地址:https://github.com/dangdangdotcom/elastic-job
下載后解壓導(dǎo)入elastic-job-example maven工程
沒裝maven插件的需要裝個(gè)maven插件,下載個(gè)最新的eclipse都配著有
項(xiàng)目導(dǎo)進(jìn)去了,好像扯得有點(diǎn)偏,我們回到環(huán)境的搭建上
需要配置:
(1)zookeeper,這個(gè)網(wǎng)上的教程很詳細(xì)
(2)mysql,還需要建個(gè)庫(kù)命名為:elastic_job_log,后面會(huì)用到

(3)elastic-job-lite-console-2.1.2.tar.gz,這個(gè)百度搜一下吧,應(yīng)該能下到。

2.環(huán)境的啟動(dòng)

(1)zookeeper的配置這里就不說了,打開目錄進(jìn)入bin


運(yùn)行zkServer.cmd;
(2)把下載的這個(gè)解壓elastic-job-lite-console-2.1.2.tar.gz,進(jìn)入目錄bin文件,運(yùn)行start.bat啟動(dòng)服務(wù)后,用瀏覽器進(jìn)入http://localhost:8899 看是否啟動(dòng)成功,默認(rèn)的用戶名:root,密碼:root;
進(jìn)入注冊(cè)中心添加zookeeper

進(jìn)入事件追蹤數(shù)據(jù)添加數(shù)據(jù)庫(kù)(數(shù)據(jù)這里要開啟的,名字,地址也要對(duì),不然連不上的)

到這里我們的環(huán)境也就配好了,現(xiàn)在進(jìn)入最后一步。

3.修改代碼并運(yùn)行

添加aliyun倉(cāng)庫(kù)

http://maven.aliyun.com/nexus/content/groups/public/

然后修改一下代碼
修改com.dangdang.ddframe.job.example.JavaMain.java

package com.dangdang.ddframe.job.example;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.example.job.dataflow.JavaDataflowJob;
import com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.commons.dbcp.BasicDataSource;
import javax.sql.DataSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
public final class JavaMain {
// zookeeper config
private static final int EMBED_ZOOKEEPER_PORT = 2181;
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
private static final String DIGEST_STR = "admin.admin";
private static final String JOB_NAMESPACE = "elastic-job-example-lite-java";
// MySQL config
private static final String EVENT_RDB_STORAGE_DRIVER = "com.mysql.jdbc.Driver";
private static final String EVENT_RDB_STORAGE_URL = "jdbc:mysql://localhost:3306/elastic_job_log";
private static final String EVENT_RDB_STORAGE_USERNAME = "root";
private static final String EVENT_RDB_STORAGE_PASSWORD = "root";
public static void main(final String[] args) throws IOException {
// 連接到注冊(cè)中心
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
// 數(shù)據(jù)源配置
JobEventConfiguration jobEventConfig =
new JobEventRdbConfiguration(setUpEventTraceDataSource());
// 設(shè)置簡(jiǎn)單的任務(wù)
setUpSimpleJob(regCenter, jobEventConfig);
//setUpDataflowJob(regCenter, jobEventConfig);
//setUpScriptJob(regCenter, jobEventConfig);
}
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig =
new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
zkConfig.setDigest(DIGEST_STR); // 設(shè)置digest
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
private static DataSource setUpEventTraceDataSource() {
BasicDataSource result = new BasicDataSource();
result.setDriverClassName(EVENT_RDB_STORAGE_DRIVER);
result.setUrl(EVENT_RDB_STORAGE_URL);
result.setUsername(EVENT_RDB_STORAGE_USERNAME);
result.setPassword(EVENT_RDB_STORAGE_PASSWORD);
return result;
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder("javaSimpleJob", "0 0/2 * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
// 全局參數(shù)
.jobParameter("jobParameter=jobParameter")
// 自定義的異常處理類
.jobProperties("job_exception_handler",
"com.dangdang.ddframe.job.example.TestHandler")
// 自定義的線程池
.jobProperties("executor_service_handler", "com.dangdang.ddframe.job.lite.spring.fixture.handler.SimpleExecutorServiceHandler")
.build();
SimpleJobConfiguration simpleJobConfig =
new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
.newBuilder(simpleJobConfig)
.monitorPort(9888)  // 啟動(dòng)監(jiān)聽  監(jiān)聽的端口為9888
.build();
new JobScheduler(regCenter,        // 注冊(cè)中心
liteJobConfiguration,      // job的配置
jobEventConfig,              // 數(shù)據(jù)源的配置
new MyElasticJobListener()  // 任務(wù)的監(jiān)聽器
).init();                            // 啟動(dòng)任務(wù)
}
private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder("javaDataflowElasticJobfalse", "0 0/3 * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
.build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig,
JavaDataflowJob.class.getCanonicalName(),
false);
new JobScheduler(regCenter,
LiteJobConfiguration.newBuilder(dataflowJobConfig).build(),
jobEventConfig).init();
}
private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0 0/2 * * * ?", 3).build();
ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
}
private static String buildScriptCommandLine() throws IOException {
if (System.getProperties().getProperty("os.name").contains("Windows")) {
return Paths.get(JavaMain.class.getResource("/script/demo.bat").getPath().substring(1))
.toString();
}
Path result = Paths.get(JavaMain.class.getResource("/script/demo.sh").getPath());
Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x"));
return result.toString();
}
}

修改 com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob

packagecom.dangdang.ddframe.job.example.job.simple;
importcom.dangdang.ddframe.job.api.ShardingContext;
importcom.dangdang.ddframe.job.api.simple.SimpleJob;
importcom.dangdang.ddframe.job.example.fixture.entity.Foo;
import com.dangdang.ddframe.job.example.fixture.repository.FooRepository;
importcom.dangdang.ddframe.job.example.fixture.repository.FooRepositoryFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class JavaSimpleJob implementsSimpleJob {
private FooRepository fooRepository =FooRepositoryFactory.getFooRepository();
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s| %s",
shardingContext.getShardingItem(),new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId(), "SIMPLE"));
System.out.println("----------------------------" +shardingContext.getShardingParameter());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------------------------------------------------"+ shardingContext.getShardingParameter());
List data =fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}

創(chuàng)建類 com.dangdang.ddframe.job.example.MyElasticJobListener

package com.dangdang.ddframe.job.example;
importcom.dangdang.ddframe.job.executor.ShardingContexts;
importcom.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
publicclass MyElasticJobListener implements ElasticJobListener {
@Override
publicvoid afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println("-----------------afterJobExecuted---------");
}
@Override
publicvoid beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println("-----------------beforeJobExecuted---------");
}
}

運(yùn)行JavaMain.java

4.運(yùn)行結(jié)果分析

如圖三個(gè)分片都給一個(gè)實(shí)例運(yùn)行

運(yùn)行成功。
我們來試一下開兩個(gè)JavaMain,看看它是怎么分配的

運(yùn)行實(shí)例變成了兩個(gè)

作業(yè)維度分片狀態(tài)中的分片也改變了,把分片1分給了另一個(gè)實(shí)例。


在歷史狀態(tài)中可以看到,0,2片分給一個(gè)實(shí)例,1分給一個(gè)實(shí)例,他們的狀態(tài)如圖。可以看到分片1是先運(yùn)行中,然后等待運(yùn)行,時(shí)間是一樣的,可能是后臺(tái)響應(yīng)的問題吧。
看到這里也就告一段落了,第一次寫這樣的文檔,還請(qǐng)大家指教,有什么不懂的下面留言,有時(shí)間我就會(huì)答復(fù)。

070721更新

剛才遇到一個(gè)朋友導(dǎo)入項(xiàng)目之后項(xiàng)目出錯(cuò),看了一下是log.,或者.set方法出錯(cuò)。試著重裝了一下lombok,就可以了。原來是它的lombok裝到另一個(gè)eclipse里面了。。。。
雙擊.jar文件,選取你用的eclipse,安裝重啟后clean一下工程即可。

捕獲.PNG
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,659評(píng)論 19 139
  • 最近打算研究一下elastic-job,同事都沒有這方面的使用經(jīng)驗(yàn),自己打算看官方文檔跟蹤源碼一步步完善一系列的使...
    二月_春風(fēng)閱讀 5,088評(píng)論 1 3
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,282評(píng)論 6 342
  • 手機(jī)快沒電了,先草草的表達(dá)一下自己的心情。 今天早上十一點(diǎn)的火車從濟(jì)南出發(fā),明天早上四點(diǎn)半到達(dá)西安,我的大陜西?;?..
    海爾兄弟姐妹1026閱讀 251評(píng)論 0 0
  • 她記得她五歲的時(shí)候第一次做噩夢(mèng)。她驚醒在床上,睜著大大的眼睛,盯著灰白色的天花板和長(zhǎng)長(zhǎng)的架蚊帳的竹竿,不敢繼續(xù)睡。...
    大張家大小姐閱讀 244評(píng)論 0 0

友情鏈接更多精彩內(nèi)容