1.環(huán)境搭建
使用的官方的example-job-example中的elastic-job-example-lite-java
這里粘上github的地址:https://github.com/dangdangdotcom/elastic-job
下載后解壓導(dǎo)入elastic-job-example maven工程
沒裝maven插件的需要裝個(gè)maven插件,下載個(gè)最新的eclipse都配著有
項(xiàng)目導(dǎo)進(jìn)去了,好像扯得有點(diǎn)偏,我們回到環(huán)境的搭建上
需要配置:
(1)zookeeper,這個(gè)網(wǎng)上的教程很詳細(xì)
(2)mysql,還需要建個(gè)庫(kù)命名為:elastic_job_log,后面會(huì)用到
(3)elastic-job-lite-console-2.1.2.tar.gz,這個(gè)百度搜一下吧,應(yīng)該能下到。
2.環(huán)境的啟動(dòng)
(1)zookeeper的配置這里就不說了,打開目錄進(jìn)入bin
運(yùn)行zkServer.cmd;
(2)把下載的這個(gè)解壓elastic-job-lite-console-2.1.2.tar.gz,進(jìn)入目錄bin文件,運(yùn)行start.bat啟動(dòng)服務(wù)后,用瀏覽器進(jìn)入http://localhost:8899 看是否啟動(dòng)成功,默認(rèn)的用戶名:root,密碼:root;
進(jìn)入注冊(cè)中心添加zookeeper
進(jìn)入事件追蹤數(shù)據(jù)添加數(shù)據(jù)庫(kù)(數(shù)據(jù)這里要開啟的,名字,地址也要對(duì),不然連不上的)
到這里我們的環(huán)境也就配好了,現(xiàn)在進(jìn)入最后一步。
3.修改代碼并運(yùn)行
添加aliyun倉(cāng)庫(kù)
http://maven.aliyun.com/nexus/content/groups/public/
然后修改一下代碼
修改com.dangdang.ddframe.job.example.JavaMain.java
package com.dangdang.ddframe.job.example;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.example.job.dataflow.JavaDataflowJob;
import com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.commons.dbcp.BasicDataSource;
import javax.sql.DataSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
public final class JavaMain {
// zookeeper config
private static final int EMBED_ZOOKEEPER_PORT = 2181;
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
private static final String DIGEST_STR = "admin.admin";
private static final String JOB_NAMESPACE = "elastic-job-example-lite-java";
// MySQL config
private static final String EVENT_RDB_STORAGE_DRIVER = "com.mysql.jdbc.Driver";
private static final String EVENT_RDB_STORAGE_URL = "jdbc:mysql://localhost:3306/elastic_job_log";
private static final String EVENT_RDB_STORAGE_USERNAME = "root";
private static final String EVENT_RDB_STORAGE_PASSWORD = "root";
public static void main(final String[] args) throws IOException {
// 連接到注冊(cè)中心
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
// 數(shù)據(jù)源配置
JobEventConfiguration jobEventConfig =
new JobEventRdbConfiguration(setUpEventTraceDataSource());
// 設(shè)置簡(jiǎn)單的任務(wù)
setUpSimpleJob(regCenter, jobEventConfig);
//setUpDataflowJob(regCenter, jobEventConfig);
//setUpScriptJob(regCenter, jobEventConfig);
}
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig =
new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
zkConfig.setDigest(DIGEST_STR); // 設(shè)置digest
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
private static DataSource setUpEventTraceDataSource() {
BasicDataSource result = new BasicDataSource();
result.setDriverClassName(EVENT_RDB_STORAGE_DRIVER);
result.setUrl(EVENT_RDB_STORAGE_URL);
result.setUsername(EVENT_RDB_STORAGE_USERNAME);
result.setPassword(EVENT_RDB_STORAGE_PASSWORD);
return result;
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder("javaSimpleJob", "0 0/2 * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
// 全局參數(shù)
.jobParameter("jobParameter=jobParameter")
// 自定義的異常處理類
.jobProperties("job_exception_handler",
"com.dangdang.ddframe.job.example.TestHandler")
// 自定義的線程池
.jobProperties("executor_service_handler", "com.dangdang.ddframe.job.lite.spring.fixture.handler.SimpleExecutorServiceHandler")
.build();
SimpleJobConfiguration simpleJobConfig =
new SimpleJobConfiguration(coreConfig, JavaSimpleJob.class.getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration
.newBuilder(simpleJobConfig)
.monitorPort(9888) // 啟動(dòng)監(jiān)聽 監(jiān)聽的端口為9888
.build();
new JobScheduler(regCenter, // 注冊(cè)中心
liteJobConfiguration, // job的配置
jobEventConfig, // 數(shù)據(jù)源的配置
new MyElasticJobListener() // 任務(wù)的監(jiān)聽器
).init(); // 啟動(dòng)任務(wù)
}
private static void setUpDataflowJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder("javaDataflowElasticJobfalse", "0 0/3 * * * ?", 3)
.shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
.build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig,
JavaDataflowJob.class.getCanonicalName(),
false);
new JobScheduler(regCenter,
LiteJobConfiguration.newBuilder(dataflowJobConfig).build(),
jobEventConfig).init();
}
private static void setUpScriptJob(final CoordinatorRegistryCenter regCenter, final JobEventConfiguration jobEventConfig) throws IOException {
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0 0/2 * * * ?", 3).build();
ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(scriptJobConfig).build(), jobEventConfig).init();
}
private static String buildScriptCommandLine() throws IOException {
if (System.getProperties().getProperty("os.name").contains("Windows")) {
return Paths.get(JavaMain.class.getResource("/script/demo.bat").getPath().substring(1))
.toString();
}
Path result = Paths.get(JavaMain.class.getResource("/script/demo.sh").getPath());
Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x"));
return result.toString();
}
}
修改 com.dangdang.ddframe.job.example.job.simple.JavaSimpleJob
packagecom.dangdang.ddframe.job.example.job.simple;
importcom.dangdang.ddframe.job.api.ShardingContext;
importcom.dangdang.ddframe.job.api.simple.SimpleJob;
importcom.dangdang.ddframe.job.example.fixture.entity.Foo;
import com.dangdang.ddframe.job.example.fixture.repository.FooRepository;
importcom.dangdang.ddframe.job.example.fixture.repository.FooRepositoryFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class JavaSimpleJob implementsSimpleJob {
private FooRepository fooRepository =FooRepositoryFactory.getFooRepository();
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s| %s",
shardingContext.getShardingItem(),new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId(), "SIMPLE"));
System.out.println("----------------------------" +shardingContext.getShardingParameter());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------------------------------------------------"+ shardingContext.getShardingParameter());
List data =fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
創(chuàng)建類 com.dangdang.ddframe.job.example.MyElasticJobListener
package com.dangdang.ddframe.job.example;
importcom.dangdang.ddframe.job.executor.ShardingContexts;
importcom.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
publicclass MyElasticJobListener implements ElasticJobListener {
@Override
publicvoid afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println("-----------------afterJobExecuted---------");
}
@Override
publicvoid beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println("-----------------beforeJobExecuted---------");
}
}
運(yùn)行JavaMain.java
4.運(yùn)行結(jié)果分析
如圖三個(gè)分片都給一個(gè)實(shí)例運(yùn)行
運(yùn)行成功。
我們來試一下開兩個(gè)JavaMain,看看它是怎么分配的
運(yùn)行實(shí)例變成了兩個(gè)
作業(yè)維度分片狀態(tài)中的分片也改變了,把分片1分給了另一個(gè)實(shí)例。
在歷史狀態(tài)中可以看到,0,2片分給一個(gè)實(shí)例,1分給一個(gè)實(shí)例,他們的狀態(tài)如圖。可以看到分片1是先運(yùn)行中,然后等待運(yùn)行,時(shí)間是一樣的,可能是后臺(tái)響應(yīng)的問題吧。
看到這里也就告一段落了,第一次寫這樣的文檔,還請(qǐng)大家指教,有什么不懂的下面留言,有時(shí)間我就會(huì)答復(fù)。
070721更新
剛才遇到一個(gè)朋友導(dǎo)入項(xiàng)目之后項(xiàng)目出錯(cuò),看了一下是log.,或者.set方法出錯(cuò)。試著重裝了一下lombok,就可以了。原來是它的lombok裝到另一個(gè)eclipse里面了。。。。
雙擊.jar文件,選取你用的eclipse,安裝重啟后clean一下工程即可。