tbschedule源碼解讀

tbschedule源碼解讀

tbschedule部署包括兩部分,一個是負(fù)責(zé)配置管理的后臺程序,一個是客戶端接入包,這兩個程序依賴zk進行信息交互。

zk數(shù)據(jù)的大致結(jié)構(gòu)

factory部分

/app1/factory
/app1/factory/facotoryUUID1
/app1/strategy
/app1/strategy/strategy1
/app1/strategy/strategy1/factoryUUID1

可以有多個facotory,每個factory對應(yīng)一個客戶端啟動的TBScheduleManagerFactory實例,每個JVM可以有多個factory實例,factory實例也可以存在于不同的JVM中。
strategy是在后臺配置的任務(wù)策略,每個factory啟動時候回去檢查自己能處理哪幾個strategy,如果能處理則在/app1/strategy/strategy1/路徑下注冊自己,注冊的這個信息在tbschedule源碼里叫做FactoryRunningInfo

ScheduleServer部分

/app1/baseTaskType
/app1/baseTaskType/task1
/app1/baseTaskType/task1/task1
/app1/baseTaskType/task1/task1/server
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1
/app1/baseTaskType/task1/task1/taskItem
/app1/baseTaskType/task1/task1/taskItem/taskItem1
/app1/baseTaskType/task1/task1/taskItem/taskItem1/cur_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/deal_desc
/app1/baseTaskType/task1/task1/taskItem/taskItem1/parameter
/app1/baseTaskType/task1/task1/taskItem/taskItem1/req_server
/app1/baseTaskType/task1/task1/taskItem/taskItem1/sts

task1是在后臺配置的任務(wù)。
/app1/baseTaskType/task1/task1/server/scheduleServerUUID1表示可以用來處理任務(wù)的調(diào)度器,每個factory實例可以有多個ScheduleServer實例。
/app1/baseTaskType/task1/task1/taskItem表示配置任務(wù)時,每個任務(wù)可以拆分成幾個小的任務(wù)項。該節(jié)點的子節(jié)點,表示這個任務(wù)項運行時的信息,例如cur_server表示這個taskItem正在被哪個ScheduleServer處理。這些在tbschedule源碼里也叫作runningInfo。

核心類圖


TBScheduleManagerFactoryfactory實例對象,管理這個factory內(nèi)部所有的事情。
ZKManager負(fù)責(zé)與zk之間的連接,數(shù)據(jù)交換。
IScheduleDataManager負(fù)責(zé)/app1/baseTaskType及其子節(jié)點所有數(shù)據(jù)模型維護。
ScheduleDataManger4ZK負(fù)責(zé)/app1/factory``/app1/strategy及其字節(jié)點數(shù)據(jù)模型維護。
IStrategyTask每個實例代表一個線程組,每個strategy可對應(yīng)多個IStrategyTask實例,來真正處理配置的任務(wù)。

關(guān)于這幾個類的組合關(guān)系如下圖:


一個Factory處理多個strategy,每個strategy下有多個IStrategyTask對象。
TBScheduleManager實現(xiàn)IStrategyTask接口,一個TBScheduleManager實例跟ScheduleServer、ScheduleProcessor、IScheduleTaskDeal的關(guān)系都是一比一的關(guān)系。ScheduleServer是針對某一個task的的調(diào)度器。IScheduleTaskDeal是我們自己代碼里需要實現(xiàn)的任務(wù)對象。ScheduleProcessor是處理任務(wù)的多線程任務(wù)處理器,代表一個線程組。可以包含多個線程,線程的最大數(shù)量取決于后臺配置的task身上的threadNum字段。
一個Factory有多個IStrategyTask的原因是,任務(wù)需要分片處理,每個分片對應(yīng)一個IStrategyTask實例。
一個ScheduleProcessor有多個Thread的原因是,一個任務(wù)分片下一次可以取出多個任務(wù),開啟多線程可以并發(fā)處理這些任務(wù)。

初始化流程


整個初始化過程大量使用Thread、Timer,很多工作都是異步進行的,且這些線程之間通過了狀態(tài)對象、鎖等方式進行了協(xié)調(diào)。
整個初始化過程粗略來看包括以下幾步:

  1. 創(chuàng)建ZKManager對象
  2. 啟動初始化線程InitalThread,然后立即返回
    接著便是InitalThread異步做的初始化工作:
  3. 準(zhǔn)備好ZKManager、ScheduleDataManager4ZK、ScheduleStrategyDataManager4ZK對象
  4. 啟動定時Timer對象ManagerFactoryTimerTask
    接著便是ManagerFactoryTimerTask定時執(zhí)行的工作,主要是去掃描strategy配置,重新分配factory去處理這些strategy。分配完factory,會創(chuàng)建StrategyTask進行任務(wù)的處理。

factory刷新工作詳解

整個過程源碼入口在ManagerFactoryTimerTask#run()中,而主要的邏輯集中在TBScheduleManagerFactory#refresh()。這里不去關(guān)心stop factory的逆向流程,只來看正向流程,見TBScheduleManagerFactory #reRegisterManagerFactory。

  1. 遍歷strategy,重新計算factory實例跟strategy的匹配關(guān)系
  2. 找到當(dāng)前factory實例不能處理的strategy,并停止掉正在運行的StrategyTask
  3. 遍歷跟當(dāng)前factory實例相關(guān)的strategy,選舉出每個strategy的leader factory實例,由leader重新計算每個factory實例能夠分到的reqNum,即根據(jù)strategy身上的assignNum``numOfSingleServer,將assignNum平分給每個factory實例。
  4. 調(diào)整當(dāng)前factory實例分配到每個strategy的的StrategyTask的數(shù)量,確保數(shù)量等于上一步分配給自己的數(shù)量。

factory線程組數(shù)量分配算法

ScheduleUtil#assignTaskNumber

/**
   * 分配任務(wù)數(shù)量
   * @param serverNum 總的服務(wù)器數(shù)量
   * @param taskItemNum 任務(wù)項數(shù)量
   * @param maxNumOfOneServer 每個server最大任務(wù)項數(shù)目
   * @return
   */
  public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){
    int[] taskNums = new int[serverNum];
    int numOfSingle = taskItemNum / serverNum;
    int otherNum = taskItemNum % serverNum;
    //20150323 刪除, 任務(wù)分片保證分配到所有的線程組數(shù)上。 開始
//    if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) {
//      numOfSingle = maxNumOfOneServer;
//      otherNum = 0;
//    }
    //20150323 刪除, 任務(wù)分片保證分配到所有的線程組數(shù)上。 結(jié)束
    for (int i = 0; i < taskNums.length; i++) {
      if (i < otherNum) {
        taskNums[i] = numOfSingle + 1;
      } else {
        taskNums[i] = numOfSingle;
      }
    }
    return taskNums;
  }

TBScheduleManagerStatic的初始化流程

  1. 找到task配置的用戶實現(xiàn)的IScheduleTaskDeal對象
  2. 將當(dāng)前ScheduleServer實例注冊到/app1/baseTaskType/task1/task1/server/scheduleServerUUID1位置
  3. 啟動心跳TimerHearBeatTimerTask
  4. 啟動初始化線程

心跳Timer

這里主要做的事情就是重新將taskItem分配到每個SchedueServer,源碼位置在TBScheduleManagerStatic#assignScheduleTask()。首先選舉出當(dāng)前ScheduleServer對應(yīng)的task對應(yīng)的所有ScheduleServer實例,選舉出一個leader,由leader進行分配工作。

  1. 等到初始化線程完成initialRunningInfo的工作
  2. clearTaskItem,遍歷所有taskItem,查看對應(yīng)的cur_server是否還能找到,找不到則將cur_server置為null
  3. assignTaskItem,給每個taskItem分配合適的ScheduleServer實例。

初始化線程

  1. initialRunningInfo,由當(dāng)前task的leader ScheduleServer實例初始化這個task下所有的taskItem子節(jié)點的數(shù)據(jù),此時還沒有分配每個taskItem由哪個ScheduleServer實例執(zhí)行(見心跳Timer)
  2. getCurrentScheduleTaskItemListNow,重新加載當(dāng)前ScheduleServer能處理的taskItem項目
  3. computerStart,創(chuàng)建兩個Timer,一個用來計算任務(wù)下次執(zhí)行開始時間,一個用來計算任務(wù)下次終止執(zhí)行時間。停止跟恢復(fù)通過TBScheduleManager身上的isPauseSchedule字段來標(biāo)識。
  4. 恢復(fù)的時候去創(chuàng)建TBScheduleProcessorSleep``TBScheduleProcessorNotSleep對象;停止的時候,會將已經(jīng)在執(zhí)行的任務(wù)處理完,但是緩存在隊列中待執(zhí)行的任務(wù)將被丟棄。

TBScheduleProcessorSleep多線程工作原理

啟動task配置的threadNum數(shù)量的線程去處理任務(wù)。由其中某一個線程去獲取任務(wù),將入taskList隊列中,所有的線程從這個隊列中獲取任務(wù)執(zhí)行,如果是Multi任務(wù),可以一次取多個任務(wù)執(zhí)行。在一個線程獲取任務(wù)的過程中,其他線程處于休眠狀態(tài),任務(wù)獲取完畢喚醒其他線程。獲取任務(wù)代碼在TBScheduleProcessorSleep#loadScheduleData,每次獲取都是調(diào)用一次IScheduleTaskDeal對象selectTasks方法獲取一批任務(wù)放到taskList中。
兩次loadScheduleData有一個休眠時間,即在task上配置的SleepTimeInterval。
一旦TBScheduleProcessorSleep啟動了,會一直循環(huán)執(zhí)行,知道PauseTimer讓其停止,如果你沒有配置結(jié)束時間,則不會停止,而是一直運行;也可以通過后臺配置將任務(wù)停止。

總結(jié)

tbschdule通過任務(wù)分片,將一個任務(wù)分配給多個線程組(即ScheduleServer實例)執(zhí)行,這些線程組可以分布在相同或者不同的JVM上。而每個線程組支持多線程處理某一個分片的任務(wù)。
tbschedule同時支持失效任務(wù)轉(zhuǎn)移功能,并且可以通過管理后臺對任務(wù)進行調(diào)度管理。
不過官方文檔實在太少。

參考:
tbschedule
關(guān)于TbSchedule任務(wù)調(diào)度管理框架的整合部署


進入博主個人站點,閱讀更多文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、前言 上一篇文章iOS多線程淺匯-原理篇中整理了一些有關(guān)多線程的基本概念。本篇博文介紹的是iOS中常用的幾個多...
    nuclear閱讀 2,146評論 6 18
  • 不足的地方請大家多多指正,如有其它沒有想到的常問面試題請大家多多評論,一起成長,感謝!~ String可以被繼承嗎...
    啟示錄是真的閱讀 3,073評論 3 3
  • 一 基礎(chǔ)篇 1.1 Java基礎(chǔ) 面向?qū)ο蟮奶卣鞒橄?將一類對象的共同特征總結(jié)出來構(gòu)建類的過程。繼承:對已有類的一...
    essential_note閱讀 771評論 0 0
  • 25號回家,這一周很平穩(wěn),參加了賣車,扮老虎,打吃雞游戲,情緒很好!自己買藥??吹竭@樣我很開心。
    荷塘魚閱讀 154評論 0 0
  • 為人耳熟能詳?shù)氖荢FBT各種具代表性的焦點解決問題,包括成果問句、奇跡問句、假設(shè)問句、例外問句、差異問句、應(yīng)對問句...
    風(fēng)清霜潔閱讀 148評論 0 2

友情鏈接更多精彩內(nèi)容