tbschedule源碼解讀
tbschedule部署包括兩部分,一個是負(fù)責(zé)配置管理的后臺程序,一個是客戶端接入包,這兩個程序依賴zk進行信息交互。
zk數(shù)據(jù)的大致結(jié)構(gòu)
factory部分:
/app1/factory
/app1/factory/facotoryUUID1
/app1/strategy
/app1/strategy/strategy1
/app1/strategy/strategy1/factoryUUID1
可以有多個facotory,每個factory對應(yīng)一個客戶端啟動的TBScheduleManagerFactory實例,每個JVM可以有多個factory實例,factory實例也可以存在于不同的JVM中。
strategy是在后臺配置的任務(wù)策略,每個factory啟動時候回去檢查自己能處理哪幾個strategy,如果能處理則在/app1/strategy/strategy1/路徑下注冊自己,注冊的這個信息在tbschedule源碼里叫做FactoryRunningInfo。
ScheduleServer部分:
/app1/baseTaskType
/app1/baseTaskType/task1
/app1/baseTaskType/task1/task1
/app1/baseTaskType/task1/task1/server
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
/app1/baseTaskType/task1/task1/taskItem
/app1/baseTaskType/task1/task1/taskItem/taskItem1
/app1/baseTaskType/task1/task1/taskItem/taskItem1/cur_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/deal_desc
/app1/baseTaskType/task1/task1/taskItem/taskItem1/parameter
/app1/baseTaskType/task1/task1/taskItem/taskItem1/req_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/sts
task1是在后臺配置的任務(wù)。
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1表示可以用來處理任務(wù)的調(diào)度器,每個factory實例可以有多個ScheduleServer實例。
/app1/baseTaskType/task1/task1/taskItem表示配置任務(wù)時,每個任務(wù)可以拆分成幾個小的任務(wù)項。該節(jié)點的子節(jié)點,表示這個任務(wù)項運行時的信息,例如cur_server表示這個taskItem正在被哪個ScheduleServer處理。這些在tbschedule源碼里也叫作runningInfo。
核心類圖

TBScheduleManagerFactoryfactory實例對象,管理這個factory內(nèi)部所有的事情。ZKManager負(fù)責(zé)與zk之間的連接,數(shù)據(jù)交換。IScheduleDataManager負(fù)責(zé)/app1/baseTaskType及其子節(jié)點所有數(shù)據(jù)模型維護。ScheduleDataManger4ZK負(fù)責(zé)/app1/factory``/app1/strategy及其字節(jié)點數(shù)據(jù)模型維護。IStrategyTask每個實例代表一個線程組,每個strategy可對應(yīng)多個IStrategyTask實例,來真正處理配置的任務(wù)。
關(guān)于這幾個類的組合關(guān)系如下圖:

一個Factory處理多個strategy,每個strategy下有多個
IStrategyTask對象。TBScheduleManager實現(xiàn)IStrategyTask接口,一個TBScheduleManager實例跟ScheduleServer、ScheduleProcessor、IScheduleTaskDeal的關(guān)系都是一比一的關(guān)系。ScheduleServer是針對某一個task的的調(diào)度器。IScheduleTaskDeal是我們自己代碼里需要實現(xiàn)的任務(wù)對象。ScheduleProcessor是處理任務(wù)的多線程任務(wù)處理器,代表一個線程組。可以包含多個線程,線程的最大數(shù)量取決于后臺配置的task身上的threadNum字段。一個Factory有多個
IStrategyTask的原因是,任務(wù)需要分片處理,每個分片對應(yīng)一個IStrategyTask實例。一個
ScheduleProcessor有多個Thread的原因是,一個任務(wù)分片下一次可以取出多個任務(wù),開啟多線程可以并發(fā)處理這些任務(wù)。
初始化流程

整個初始化過程大量使用Thread、Timer,很多工作都是異步進行的,且這些線程之間通過了狀態(tài)對象、鎖等方式進行了協(xié)調(diào)。
整個初始化過程粗略來看包括以下幾步:
- 創(chuàng)建ZKManager對象
- 啟動初始化線程
InitalThread,然后立即返回
接著便是InitalThread異步做的初始化工作: - 準(zhǔn)備好ZKManager、ScheduleDataManager4ZK、ScheduleStrategyDataManager4ZK對象
- 啟動定時Timer對象
ManagerFactoryTimerTask
接著便是ManagerFactoryTimerTask定時執(zhí)行的工作,主要是去掃描strategy配置,重新分配factory去處理這些strategy。分配完factory,會創(chuàng)建StrategyTask進行任務(wù)的處理。
factory刷新工作詳解
整個過程源碼入口在ManagerFactoryTimerTask#run()中,而主要的邏輯集中在TBScheduleManagerFactory#refresh()。這里不去關(guān)心stop factory的逆向流程,只來看正向流程,見TBScheduleManagerFactory #reRegisterManagerFactory。
- 遍歷strategy,重新計算factory實例跟strategy的匹配關(guān)系
- 找到當(dāng)前factory實例不能處理的strategy,并停止掉正在運行的
StrategyTask - 遍歷跟當(dāng)前factory實例相關(guān)的strategy,選舉出每個strategy的leader factory實例,由leader重新計算每個factory實例能夠分到的reqNum,即根據(jù)strategy身上的
assignNum``numOfSingleServer,將assignNum平分給每個factory實例。 - 調(diào)整當(dāng)前factory實例分配到每個strategy的的StrategyTask的數(shù)量,確保數(shù)量等于上一步分配給自己的數(shù)量。
factory線程組數(shù)量分配算法
見ScheduleUtil#assignTaskNumber
/**
* 分配任務(wù)數(shù)量
* @param serverNum 總的服務(wù)器數(shù)量
* @param taskItemNum 任務(wù)項數(shù)量
* @param maxNumOfOneServer 每個server最大任務(wù)項數(shù)目
* @return
*/
public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){
int[] taskNums = new int[serverNum];
int numOfSingle = taskItemNum / serverNum;
int otherNum = taskItemNum % serverNum;
//20150323 刪除, 任務(wù)分片保證分配到所有的線程組數(shù)上。 開始
// if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) {
// numOfSingle = maxNumOfOneServer;
// otherNum = 0;
// }
//20150323 刪除, 任務(wù)分片保證分配到所有的線程組數(shù)上。 結(jié)束
for (int i = 0; i < taskNums.length; i++) {
if (i < otherNum) {
taskNums[i] = numOfSingle + 1;
} else {
taskNums[i] = numOfSingle;
}
}
return taskNums;
}
TBScheduleManagerStatic的初始化流程

- 找到task配置的用戶實現(xiàn)的
IScheduleTaskDeal對象 - 將當(dāng)前ScheduleServer實例注冊到
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1位置 - 啟動心跳Timer
HearBeatTimerTask - 啟動初始化線程
心跳Timer
這里主要做的事情就是重新將taskItem分配到每個SchedueServer,源碼位置在TBScheduleManagerStatic#assignScheduleTask()。首先選舉出當(dāng)前ScheduleServer對應(yīng)的task對應(yīng)的所有ScheduleServer實例,選舉出一個leader,由leader進行分配工作。
- 等到初始化線程完成initialRunningInfo的工作
- clearTaskItem,遍歷所有taskItem,查看對應(yīng)的cur_server是否還能找到,找不到則將cur_server置為null
- assignTaskItem,給每個taskItem分配合適的ScheduleServer實例。
初始化線程
- initialRunningInfo,由當(dāng)前task的leader ScheduleServer實例初始化這個task下所有的taskItem子節(jié)點的數(shù)據(jù),此時還沒有分配每個taskItem由哪個ScheduleServer實例執(zhí)行(見心跳Timer)
- getCurrentScheduleTaskItemListNow,重新加載當(dāng)前ScheduleServer能處理的taskItem項目
- computerStart,創(chuàng)建兩個Timer,一個用來計算任務(wù)下次執(zhí)行開始時間,一個用來計算任務(wù)下次終止執(zhí)行時間。停止跟恢復(fù)通過
TBScheduleManager身上的isPauseSchedule字段來標(biāo)識。 - 恢復(fù)的時候去創(chuàng)建
TBScheduleProcessorSleep``TBScheduleProcessorNotSleep對象;停止的時候,會將已經(jīng)在執(zhí)行的任務(wù)處理完,但是緩存在隊列中待執(zhí)行的任務(wù)將被丟棄。
TBScheduleProcessorSleep多線程工作原理
啟動task配置的threadNum數(shù)量的線程去處理任務(wù)。由其中某一個線程去獲取任務(wù),將入taskList隊列中,所有的線程從這個隊列中獲取任務(wù)執(zhí)行,如果是Multi任務(wù),可以一次取多個任務(wù)執(zhí)行。在一個線程獲取任務(wù)的過程中,其他線程處于休眠狀態(tài),任務(wù)獲取完畢喚醒其他線程。獲取任務(wù)代碼在TBScheduleProcessorSleep#loadScheduleData,每次獲取都是調(diào)用一次IScheduleTaskDeal對象selectTasks方法獲取一批任務(wù)放到taskList中。
兩次loadScheduleData有一個休眠時間,即在task上配置的SleepTimeInterval。
一旦TBScheduleProcessorSleep啟動了,會一直循環(huán)執(zhí)行,知道PauseTimer讓其停止,如果你沒有配置結(jié)束時間,則不會停止,而是一直運行;也可以通過后臺配置將任務(wù)停止。
總結(jié)
tbschdule通過任務(wù)分片,將一個任務(wù)分配給多個線程組(即ScheduleServer實例)執(zhí)行,這些線程組可以分布在相同或者不同的JVM上。而每個線程組支持多線程處理某一個分片的任務(wù)。
tbschedule同時支持失效任務(wù)轉(zhuǎn)移功能,并且可以通過管理后臺對任務(wù)進行調(diào)度管理。
不過官方文檔實在太少。
參考:
tbschedule
關(guān)于TbSchedule任務(wù)調(diào)度管理框架的整合部署