之前發(fā)布過一篇文章《分布式定時任務框架---Uncode Schedule》,在這篇文章中已經(jīng)介紹uncode-schedule分布式定時任務框架的有關功能,以及實現(xiàn)機制。本文主要結(jié)合源碼來介紹一下分布式定時任務uncode-schedule框架的實現(xiàn)原理。
1. Uncode-Schedule功能概述
Uncode-Schedule是基于zookeeper的分布式任務調(diào)度組件,非常小巧,使用簡單。
1.1. 它能夠確保所有任務在集群中不重復,不遺漏的執(zhí)行。
1.2. 單節(jié)點故障時,任務能夠自動轉(zhuǎn)移到其他節(jié)點繼續(xù)執(zhí)行。
1.3. 支持動態(tài)添加和刪除任務。
1.4. 支持添加機器ip黑名單。
1.5. 支持手動執(zhí)行任務。
2. 使用方法
2.1. 配置maven依賴,pom.xml配置如下:
<dependency>
<groupId>cn.uncode</groupId>
<artifactId>uncode-schedule</artifactId>
<version>0.8.0</version>
</dependency>
2.2. schedule.properties配置
這里主要配置固定值,而不是系統(tǒng)自動生成的,目前可配置機器編碼,配置如下:
#uncode.schedule.server.code=0000000001
2.3. 定時任務的spring配置,applicationContext.xml配置如下:
- ScheduleManager配置
<bean id="zkScheduleManager" class="cn.uncode.schedule.ZKScheduleManager" init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="192.168.7.149:2181" />
<entry key="rootPath" value="/uncode/schedule" />
<entry key="zkSessionTimeout" value="60000" />
<entry key="userName" value="ScheduleAdmin" />
<entry key="password" value="password" />
<entry key="autoRegisterTask" value="true" />
<entry key="isCheckParentPath" value="true" />
<entry key="ipBlacklist" value="192.168.7.231" />
</map>
</property>
</bean>
- spring task配置
<task:scheduled-tasks scheduler="zkScheduleManager">
<task:scheduled ref="simpleTask" method="print" cron="0/30 * * * * ?" />
</task:scheduled-tasks>
- 待執(zhí)行任務類
@Component
public class SimpleTask {
private static int i = 0;
private Logger log = LoggerFactory.getLogger(SimpleTask.class);
public void print() {
log.info("===========print start!=========");
log.info("print:"+i);i++;
log.info("===========print end !=========");
}
}
從上面的配置信息中可以看出,使用框架Uncode-Schedule可以很簡單的實現(xiàn)定時任務的分布式。從代碼上看,和原來的spring task或quartz任務寫法完全一樣。
關鍵點是,每個定時任務配置的調(diào)度器是uncode-schedule框架自定義的調(diào)度器 cn.uncode.schedule.ZKScheduleManager。上面是基于xml的配置,同樣的,基于注解的配置是<task:annotation-driven scheduler="zkScheduleManager" />,詳細的配置方式可以參考uncode-schedule-learn,或者uncode-schedule。
3. 源碼分析
從上面的Uncode-Schedule框架的使用和功能來看,源碼分析應該有5個入口:
- 類
cn.uncode.schedule.ZKScheduleManager的init方法; - 類
cn.uncode.schedule.ZKScheduleManager的定時任務初始化; - 類
cn.uncode.schedule.ZKScheduleManager的心跳檢測hearBeatTimer; - 控制管理類
cn.uncode.schedule.ConsoleManager; - 對外暴露的連個servlet接口
ManagerServlet和ManualServlet;
下面按照誰許依次進行源碼分析:
3.1. 類 cn.uncode.schedule.ZKScheduleManager 的 init 方法
該方法的主要作用是,將配置文件中的數(shù)據(jù)加載進內(nèi)存,連接zookeeper,校驗zookeeper的連接狀態(tài),注冊任務服務器,計算統(tǒng)一時間,啟動心跳檢測任務。
init方法的代碼如下:
public void init() throws Exception {
Properties properties = new Properties();
for (Map.Entry<String, String> e : this.zkConfig.entrySet()) {
properties.put(e.getKey(), e.getValue());
}
this.init(properties);
}
將xml配置文件中的配置信息加載進properties變量,然后去進一步初始化。
public void init(Properties p) throws Exception {
if (this.initialThread != null) {
this.initialThread.stopThread();
}
this.initLock.lock();
try {
this.scheduleDataManager = null;
if (this.zkManager != null) {
this.zkManager.close();
}
//連接zookeeper
this.zkManager = new ZKManager(p);
this.errorMessage = "Zookeeper connecting ......"
+ this.zkManager.getConnectStr();
initialThread = new InitialThread(this);
initialThread.setName("ScheduleManager-initialThread");
initialThread.start();
} finally {
this.initLock.unlock();
}
}
在代碼中通過this.zkManager = new ZKManager(p); 和zookeeper建立連接,然后會啟動一個初始化線程,這個線程的作業(yè)主要是等待連接zookeeper成功之后,進一步初始化之后的注冊服務器等,初始化線程的代碼如下:
class InitialThread extends Thread {
private transient Logger log = LoggerFactory.getLogger(InitialThread.class);
ZKScheduleManager sm;
public InitialThread(ZKScheduleManager sm) {
this.sm = sm;
}
boolean isStop = false;
public void stopThread() {
this.isStop = true;
}
@Override
public void run() {
sm.initLock.lock();
try {
int count = 0;
while (!sm.zkManager.checkZookeeperState()) {
count = count + 1;
if (count % 50 == 0) {
sm.errorMessage = "Zookeeper connecting ......"
+ sm.zkManager.getConnectStr() + " spendTime:"
+ count * 20 + "(ms)";
log.error(sm.errorMessage);
}
Thread.sleep(20);
if (this.isStop) {
return;
}
}
sm.initialData();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
sm.initLock.unlock();
}
}
}
看線程的 run 方法,while 循環(huán)中檢測是否連接成功zookeeper,連接成功之后,調(diào)用 sm.initialData(); 真正的初始化 ZKScheduleManager,初始化的代碼如下:
public void initialData() throws Exception {
//首先進行了框架的版本兼容性校驗
this.zkManager.initial();
this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
if (this.start) {
// 注冊調(diào)度管理器
this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer);
if (hearBeatTimer == null) {
hearBeatTimer = new Timer("ScheduleManager-"
+ this.currenScheduleServer.getUuid() + "-HearBeat");
}
hearBeatTimer.schedule(new HeartBeatTimerTask(this), 2000, this.timerInterval);
}
}
代碼中首先進行了版本兼容性校驗,然后將自身作為一個調(diào)度服務器注冊到管理器中,最后啟動檢測調(diào)度器本身的心跳任務。心跳檢測的任務在下一個小節(jié)重點分析,這里重點看一下注冊調(diào)度管理器,代碼如下:
@Override
public void registerScheduleServer(ScheduleServer server) throws Exception {
if(server.isRegister()){
throw new Exception(server.getUuid() + " 被重復注冊");
}
//clearExpireScheduleServer();
String realPath;
//此處必須增加UUID作為唯一性保障
StringBuffer id = new StringBuffer();
id.append(server.getIp()).append("$")
.append(UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
String serverCode = ScheduleUtil.getServerCode();
if(serverCode != null){ //如果配置文件schedule.properties中配置server code
String zkServerPath = pathServer + "/" + id.toString() + "$" + serverCode;
realPath = this.getZooKeeper().create(zkServerPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
}else{
String zkServerPath = pathServer + "/" + id.toString() +"$";
realPath = this.getZooKeeper().create(zkServerPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT_SEQUENTIAL);
}
server.setUuid(realPath.substring(realPath.lastIndexOf("/") + 1));
Timestamp heartBeatTime = new Timestamp(getSystemTime());
server.setHeartBeatTime(heartBeatTime);
String valueString = this.gson.toJson(server);
this.getZooKeeper().setData(realPath,valueString.getBytes(),-1);
server.setRegister(true);
}
將調(diào)度服務器信息注冊到zookeeper中,服務器信息在zk上的節(jié)點是由 ip$UUID$serverCode 組成,存儲在目錄{rootPath}/server 下,例如, 192.168.7.231$B6A47BA82F4C44389D8D066F571D51D8$1000000001。其中serverCode有兩個來源,一是配置文件schedule.properties中的 uncode.schedule.server.code,另一個是由zk的持久化順序節(jié)點生產(chǎn),這個數(shù)值關系到分布式系統(tǒng)中l(wèi)eader節(jié)點的選取,因此做成可配置的,從而控制leader節(jié)點的選取,選leader節(jié)點的算法將會在心跳檢測中詳細介紹。
并且zk中server路徑下的每一個服務器節(jié)點中都存儲有相關數(shù)據(jù),主要數(shù)據(jù)包括注冊時間、最后一次心跳時間、ip、UUID等。
3.2. 類cn.uncode.schedule.ZKScheduleManager的定時任務初始化
這里主要介紹分布式任務調(diào)度器初始化完畢之后,定時任務啟動時的任務注冊和任務啟動的代碼。
類cn.uncode.schedule.ZKScheduleManager繼承了類 org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler,它又實現(xiàn)了接口org.springframework.scheduling.TaskScheduler,重寫以下接口來實現(xiàn)在任務調(diào)度的同時將定時任務的信息注冊到zookeeper中。
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
ScheduledFuture<?> schedule(Runnable task, Date startTime);
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);
重寫之后的源代碼如下:
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
TaskDefine taskDefine = getTaskDefine(task);
LOGGER.info("spring task init------taskName:{}, period:{}", taskDefine.stringKey(), period);
taskDefine.setPeriod(period);
addTask(task, taskDefine);
return super.scheduleAtFixedRate(taskWrapper(task), period);
}
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
TaskDefine taskDefine = getTaskDefine(task);
if(trigger instanceof CronTrigger){
CronTrigger cronTrigger = (CronTrigger)trigger;
taskDefine.setCronExpression(cronTrigger.getExpression());
LOGGER.info("spring task init------trigger:" + cronTrigger.getExpression());
}
addTask(task, taskDefine);
return super.schedule(taskWrapper(task), trigger);
}
public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
TaskDefine taskDefine = getTaskDefine(task);
LOGGER.info("spring task init------taskName:{}, period:{}", taskDefine.stringKey(), startTime);
taskDefine.setStartTime(startTime);
addTask(task, taskDefine);
return super.schedule(taskWrapper(task), startTime);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
TaskDefine taskDefine = getTaskDefine(task);
LOGGER.info("spring task init------taskName:{}, period:{}", taskDefine.stringKey(), period);
taskDefine.setStartTime(startTime);
taskDefine.setPeriod(period);
addTask(task, taskDefine);
return super.scheduleAtFixedRate(taskWrapper(task), startTime, period);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
TaskDefine taskDefine = getTaskDefine(task);
LOGGER.info("spring task init------taskName:{}, delay:{}", taskDefine.stringKey(), delay);
taskDefine.setStartTime(startTime);
taskDefine.setPeriod(delay);
taskDefine.setType(TaskDefine.TASK_TYPE_QSD);
addTask(task, taskDefine);
return super.scheduleWithFixedDelay(taskWrapper(task), startTime, delay);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
TaskDefine taskDefine = getTaskDefine(task);
LOGGER.info("spring task init------taskName:{}, delay:{}", taskDefine.stringKey(), delay);
taskDefine.setPeriod(delay);
taskDefine.setType(TaskDefine.TASK_TYPE_QSD);
addTask(task, taskDefine);
return super.scheduleWithFixedDelay(taskWrapper(task), delay);
}
主要是在任務調(diào)度之前,通過private TaskDefine getTaskDefine(Runnable task);獲取任務的詳細信息,然后通過private void addTask(Runnable task, TaskDefine taskDefine)將其存儲到zookeeper中。
另外一個關鍵點是,所有的task都經(jīng)過了 taskWrapper 的包裝,先看代碼:
/**
* 將Spring的定時任務進行包裝,決定任務是否在本機執(zhí)行。
* @param task
* @return
*/
private Runnable taskWrapper(final Runnable task){
return new Runnable(){
public void run(){
Method targetMethod = null;
if(task instanceof ScheduledMethodRunnable){
ScheduledMethodRunnable uncodeScheduledMethodRunnable = (ScheduledMethodRunnable)task;
targetMethod = uncodeScheduledMethodRunnable.getMethod();
}else{
org.springframework.scheduling.support.ScheduledMethodRunnable springScheduledMethodRunnable = (org.springframework.scheduling.support.ScheduledMethodRunnable)task;
targetMethod = springScheduledMethodRunnable.getMethod();
}
String[] beanNames = applicationcontext.getBeanNamesForType(targetMethod.getDeclaringClass());
if(null != beanNames && StringUtils.isNotEmpty(beanNames[0])){
String name = ScheduleUtil.getTaskNameFormBean(beanNames[0], targetMethod.getName());
boolean isOwner = false;
try {
if(!isScheduleServerRegister){
Thread.sleep(1000);
}
if(zkManager.checkZookeeperState()){
isOwner = scheduleDataManager.isOwner(name, currenScheduleServer.getUuid());
isOwnerMap.put(name, isOwner);
}else{
// 如果zk不可用,使用歷史數(shù)據(jù)
if(null != isOwnerMap){
isOwner = isOwnerMap.get(name);
}
}
if(isOwner){
task.run();
scheduleDataManager.saveRunningInfo(name, currenScheduleServer.getUuid());
LOGGER.info("Cron job has been executed.");
}
} catch (Exception e) {
LOGGER.error("Check task owner error.", e);
}
}
}
};
}
這里主要控制定時任務的執(zhí)行,在執(zhí)行時,需要檢測該任務是否屬于該服務器。并且考慮到zookeeper不可用的情況,如果不可用查看緩存的任務歸屬關系。
3.3. 類cn.uncode.schedule.ZKScheduleManager的心跳檢測hearBeatTimer
在分布式系統(tǒng)中心跳檢測任務是很重要的,負責整個分布式系統(tǒng)的穩(wěn)定性和健壯性。在3.1.節(jié)中的代碼中我們看到,心跳檢測的定時任務調(diào)度代碼 hearBeatTimer.schedule(new HeartBeatTimerTask(this), 2000, this.timerInterval); 啟動延遲2秒執(zhí)行,心跳間隔2秒。心跳檢測任務 HeartBeatTimerTask 的代碼如下:
class HeartBeatTimerTask extends java.util.TimerTask {
private transient final Logger log = LoggerFactory.getLogger(HeartBeatTimerTask.class);
ZKScheduleManager manager;
public HeartBeatTimerTask(ZKScheduleManager aManager) {
manager = aManager;
}
public void run() {
try {
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
manager.refreshScheduleServer();
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
從以上代碼中可以看到,心跳檢測通過 manager.refreshScheduleServer(); 不停在刷新調(diào)度服務器信息,代碼是:
/**
* 1. 定時向數(shù)據(jù)配置中心更新當前服務器的心跳信息。 如果發(fā)現(xiàn)本次更新的時間如果已經(jīng)超過了,服務器死亡的心跳周期,則不能在向服務器更新信息。
* 而應該當作新的服務器,進行重新注冊。
* 2. 任務分配
* 3. 檢查任務是否屬于本機,是否添加到調(diào)度器
*
* @throws Exception
*/
public void refreshScheduleServer() throws Exception {
try {
// 更新或者注冊服務器信息
rewriteScheduleInfo();
// 如果任務信息沒有初始化成功,不做任務相關的處理
if (!this.isScheduleServerRegister) {
return;
}
// 重新分配任務
this.assignScheduleTask();
// 檢查本地任務
this.checkLocalTask();
} catch (Throwable e) {
// 清除內(nèi)存中所有的已經(jīng)取得的數(shù)據(jù)和任務隊列,避免心跳線程失敗時候?qū)е碌臄?shù)據(jù)重復
this.clearMemoInfo();
if (e instanceof Exception) {
throw (Exception) e;
} else {
throw new Exception(e.getMessage(), e);
}
}
}
進入到方法之后看到,心跳檢測任務主要負責:
- 方法
rewriteScheduleInfo();的功能是,定時向數(shù)據(jù)配置中心zk更新當前服務器的心跳信息,如果更新失敗,重新注冊調(diào)度服務器信息(在3.1節(jié)中已經(jīng)介紹過了,就是方法scheduleDataManager.registerScheduleServer); - 方法
assignScheduleTask();的功能是,定時任務的分配,分配任務的時候會校驗該節(jié)點是否是leader節(jié)點,因為只有l(wèi)eader節(jié)點才能分配任務;在分配任務的時候啟用了服務器ip黑名單,在黑名單列表中的機器不參與任務分配; - 檢查本地的定時任務,添加調(diào)度器;該功能是檢查是否有通過控制臺添加uncode task 類型的定時任務,如果有的話啟動該定時任務;這是一種自定義的定時任務類型,任務的啟動方式也是自定義的,主要方法在類
DynamicTaskManager中;
下面看幾個關鍵步驟的代碼:首先是leader節(jié)點的選擇算法代碼,
private String getLeader(List<String> serverList){
if(serverList == null || serverList.size() ==0){
return "";
}
long no = Long.MAX_VALUE;
long tmpNo = -1;
String leader = null;
for(String server:serverList){
tmpNo =Long.parseLong( server.substring(server.lastIndexOf("$")+1));
if(no > tmpNo){
no = tmpNo;
leader = server;
}
}
return leader;
}
從代碼可以看出,選擇leader節(jié)點的算法是,取serverCode最小的服務器為leader。這種方法的好處是,由于serverCode是遞增的,再新增服務器的時候,leader節(jié)點不會變化,比較穩(wěn)定,算法又簡單。
3.4. 控制管理類cn.uncode.schedule.ConsoleManager
在該類的功能主要是對外提供的是一些操作任務和數(shù)據(jù)的方法,包括注冊在zk上的定時任務數(shù)據(jù)的增、刪、查;以及定時任務的執(zhí)行入口。主要代碼如下:
public static void addScheduleTask(TaskDefine taskDefine) throws Exception{
ConsoleManager.getScheduleManager().getScheduleDataManager().addTask(taskDefine);
}
public static void delScheduleTask(TaskDefine taskDefine) {
try {
ConsoleManager.scheduleManager.getScheduleDataManager().delTask(taskDefine);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
public static List<TaskDefine> queryScheduleTask() {
List<TaskDefine> taskDefines = new ArrayList<TaskDefine>();
try {
List<TaskDefine> tasks = ConsoleManager.getScheduleManager().getScheduleDataManager().selectTask();
taskDefines.addAll(tasks);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return taskDefines;
}
public static boolean isExistsTask(TaskDefine taskDefine) throws Exception{
return ConsoleManager.scheduleManager.getScheduleDataManager().isExistsTask(taskDefine);
}
/**
* 手動執(zhí)行定時任務
* @param task
*/
public static void runTask(TaskDefine task) throws Exception{
Object object = null;
if (StringUtils.isNotEmpty(task.getTargetBean())) {
object = ZKScheduleManager.getApplicationcontext().getBean(task.getTargetBean());
}
if (object == null) {
log.error("任務名稱 = [{}]---------------未啟動成功,targetBean不存在,請檢查是否配置正確!??!", task.stringKey());
throw new Exception("targetBean:"+task.getTargetBean()+"不存在");
}
Method method = null;
try {
if(StringUtils.isNotEmpty(task.getParams())){
method = object.getClass().getDeclaredMethod(task.getTargetMethod(), String.class);
}else{
method = object.getClass().getDeclaredMethod(task.getTargetMethod());
}
} catch (Exception e) {
log.error(String.format("定時任務bean[%s],method[%s]初始化失敗.", task.getTargetBean(), task.getTargetMethod()), e);
throw new Exception("定時任務:"+task.stringKey()+"初始化失敗");
}
if (method != null) {
try {
if(StringUtils.isNotEmpty(task.getParams())){
method.invoke(object, task.getParams());
}else{
method.invoke(object);
}
} catch (Exception e) {
log.error(String.format("定時任務bean[%s],method[%s]調(diào)用失敗.", task.getTargetBean(), task.getTargetMethod()), e);
throw new Exception("定時任務:"+task.stringKey()+"調(diào)用失敗");
}
}
log.info("任務名稱 = [{}]----------啟動成功", task.stringKey());
}
3.5. 對外暴露的連個servlet接口ManagerServlet和ManualServlet
servlet ManagerServlet是一個簡單管理界面,ManualServlet是一個手動執(zhí)行定時任務的接口;使用方法是要在項目中的web.xml中配置響應的servlet,配置文件代碼如下:
<!-- 配置 uncode schedule 管理后臺 -->
<servlet>
<servlet-name>UncodeSchedule</servlet-name>
<servlet-class>cn.uncode.schedule.web.ManagerServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>UncodeSchedule</servlet-name>
<url-pattern>/uncode/schedule</url-pattern>
</servlet-mapping>
<!-- 配置 uncode schedule 手動執(zhí)行器 -->
<servlet>
<servlet-name>ScheduleManual</servlet-name>
<servlet-class>cn.uncode.schedule.web.ManualServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>ScheduleManual</servlet-name>
<url-pattern>/schedule/manual</url-pattern>
</servlet-mapping>
結(jié)束語,源代碼分析結(jié)束,uncode-schedule分布式定時任務框架實現(xiàn)的主要功能都已覆蓋到,有問題的請留言!最后再一次附上源代碼 github鏈接!下一篇預告:《uncode-schedule-manage定制化管理系統(tǒng)》