TBSchedule是什么
TBSchedule是一個支持分布式的調(diào)度框架,讓批量任務(wù)或者不斷變化的任務(wù)能夠被動態(tài)的分配到多個主機的JVM中,在不同的線程組中并行執(zhí)行,所有的任務(wù)能夠被不重復(fù),不遺漏的快速處理?;赯ooKeeper的純Java實現(xiàn),由Alibaba開源。? ??
TBSchedule能干什么
TBSchedule可以將調(diào)度作業(yè)從業(yè)務(wù)系統(tǒng)中分離出來,降低或者是消除和業(yè)務(wù)系統(tǒng)的耦合度,進行高效異步任務(wù)處理。在互聯(lián)網(wǎng)和電商領(lǐng)域TBSchedule的使用非常廣泛,目前被應(yīng)用于阿里巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等很多互聯(lián)網(wǎng)企業(yè)的流程調(diào)度系統(tǒng)。
TBSchedule如何使用
1、下載TBSchdule源碼
下載地址:TBSchedule源碼
官網(wǎng)目前打不開了,可從本人的百度網(wǎng)盤下載。鏈接:TBSchedule源碼 密碼:vuzg
2、部署zookeeper
下載zookeeper并安裝部署。ZK下載地址
3、部署管理控制臺
把下載的源碼的console文件夾中ScheduleConsole.war文件部署到tomcat容器中,并啟動tomcat服務(wù)。

訪問http://localhost:8080/ScheduleConsole/schedule/config.jsp地址進行鏈接ZK的基礎(chǔ)信息配置,如下圖:

4、編寫客戶端代碼
引入jar包
<dependency>
? ? <groupId>com.taobao.pamirs.schedule</groupId>
? ? <artifactId>tbschedule</artifactId>
? ? <version>3.3.3.2</version>
</dependency>
配置zk連接信息
<bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
? ? ? init-method="init">
? ? <property name="zkConfig">
? ? ? ? ? ? <entry key="zkConnectString" value="localhost:2181" />
? ? ? ? ? ? <entry key="rootPath" value="/tb-schedule/test" />
? ? ? ? ? ? <entry key="zkSessionTimeout" value="60000" />
? ? ? ? ? ? <entry key="userName" value="admin" />
? ? ? ? ? ? <entry key="password" value="123456" />
? ? ? ? ? ? <entry key="isCheckParentPath" value="true" />
</bean>
實現(xiàn)IScheduleTaskDealSingle接口及selectTasks()方法和execute()方法。
package com.zhl.tbSchedule;
import com.alibaba.fastjson.JSONObject;
import com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import com.taobao.pamirs.schedule.TaskItemDefine;
import com.zhl.tbSchedule.business.dao.TBOrderMapper;
import com.zhl.tbSchedule.business.dao.TBOrderMapperCopy;
import com.zhl.tbSchedule.business.domain.TBOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
@Slf4j
@Component("iScheduleTaskDealSingleTest")
public class IScheduleTaskDealSingleTestimplements IScheduleTaskDealSingle {
@Autowired
? ? public TBOrderMappertbOrderMapper;
? ? @Autowired
? ? public TBOrderMapperCopytbOrderMapperCopy;
? ? @Override
? ? public ComparatorgetComparator() {
return null;
? ? }
@Override
? ? public ListselectTasks(String taskParameter, String ownSign, int taskQueueNum,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? List taskItemList, int eachFetchDataNum)throws Exception {
System.out.println(Thread.currentThread().getName() +" _IScheduleTaskDealSingleTest start to selectTasks..........");
? ? ? ? if (taskItemList ==null || taskItemList.size() <0) {
return null;
? ? ? ? }
System.out.println("IScheduleTaskDealSingleTest config params,taskParameter:{" + taskParameter +"},ownSina:{"
? ? ? ? ? ? ? ? + ownSign +"},taskQueueNum:{" + taskQueueNum +"},taskItemList:{" + JSONObject.toJSONString(taskItemList)
+"}, eachFetchDataNum:{" + eachFetchDataNum +"}");
? ? ? ? List models =new ArrayList();
? ? ? ? String billingNumber ="";
? ? ? ? for (TaskItemDefine taskItemDefine : taskItemList) {
billingNumber += taskItemDefine.getTaskItemId() +"";
? ? ? ? }
if (StringUtils.isNotBlank(billingNumber)) {
//billingNumber = billingNumber.substring(0,billingNumber.length() - 1);
? ? ? ? ? ? models =tbOrderMapper.selectByBillNumber(billingNumber, eachFetchDataNum);
? ? ? ? }
System.out.println("IScheduleTaskDealSingleTest selectTasks result..........models.size:" + models.size());
? ? ? ? return models;
? ? }
@Override
? ? public boolean execute(TBOrder model, String ownSign)throws Exception {
System.out.println(Thread.currentThread().getName() +" _IScheduleTaskDealSingleTest執(zhí)行開始啦.........." +new Date());
? ? ? ? // System.out.println(model);
? ? ? ? tbOrderMapperCopy.insertTBOrder(model);
? ? ? ? tbOrderMapper.updateStatus(model.getBillNumber());
return true;
? ? }
}
selectTasks方法參數(shù)說明:
taskParameter:對應(yīng)控制臺自定義參數(shù),可自定義傳入做邏輯上的操作
taskQueueNum:對應(yīng)控制臺任務(wù)項數(shù)量
taskItemList:集合中TaskItemDefine的id值對應(yīng)任務(wù)項值,多線程處理時,根據(jù)任務(wù)項協(xié)調(diào)數(shù)據(jù)一致性和完整性
eachFetchDataNum:對應(yīng)控制臺每次獲取數(shù)量,由于子計時單元開始后,會不斷的去取數(shù)據(jù)進行處理,直到取不到數(shù)據(jù)子計時才停止,等待下一個子計時開始。可以限制每次取數(shù),防止一次性數(shù)據(jù)記錄過大,內(nèi)存不足。
ownSign:環(huán)境參數(shù),可用于區(qū)分生產(chǎn)、測試、開發(fā)環(huán)境
5、配置任務(wù)管理和調(diào)度策略

配置參數(shù)說明:
任務(wù)名稱:策略調(diào)度的標(biāo)示,一旦創(chuàng)建保存,不可更改
任務(wù)處理的SpringBean:注冊到spring的任務(wù)bean,如iScheduleTaskDealSingleTest
心跳頻率/假定服務(wù)死亡時間/處理模式/沒有數(shù)據(jù)時休眠時長/執(zhí)行結(jié)束時間:一般保持默認(rèn)即可
線程數(shù):處理該任務(wù)的線程數(shù)(一個線程組的線程數(shù)量),在沒有劃分多任務(wù)項的情況下,多線程是沒有意義的,且線程數(shù)量大于任務(wù)項也是沒有意義的(線程數(shù)小于等于任務(wù)項),注意如果開啟多線程,必須對數(shù)據(jù)做任務(wù)項過濾
單線程組最大任務(wù)項:配置單JVM處理的最大任務(wù)項數(shù)量,多任務(wù)項情況下,可按需限制,一般默認(rèn),多執(zhí)行機會均衡分配
每次獲取數(shù)量:子計時單元開始,線程會不斷的去獲取數(shù)據(jù)(selectTasks方法每次獲取的限制)并處理數(shù)據(jù),直到獲取不到數(shù)據(jù)子計時才結(jié)束(方法內(nèi)不用就可以隨意配置)
每次執(zhí)行數(shù)量:每次execute方法執(zhí)行的數(shù)據(jù)量,只在bean實現(xiàn)IScheduleTaskDealMulti才生效
每次處理完休眠時間:子計時單元開始,只要有數(shù)據(jù),就會不停的獲取不停的處理,這個時間設(shè)置后,子計時單元開始每次獲取執(zhí)行后,不管還有沒有數(shù)據(jù),都先歇會兒再獲取處理
自定義參數(shù):可自定義控制任務(wù)邏輯操作
任務(wù)項:這項很重要,在多線程情況下,劃分任務(wù)項是有意義的,但是要注意必須通過任務(wù)項參數(shù),協(xié)調(diào)待處理數(shù)據(jù)

配置參數(shù)說明:
策略名稱:策略標(biāo)示,可任意填寫
任務(wù)類型:一般保持默認(rèn)Schedule
任務(wù)名稱:對應(yīng)任務(wù)欄被調(diào)度任務(wù)名稱
任務(wù)參數(shù):一般不用,保持默認(rèn)
單JVM最大線程組數(shù)量:單個JVM允許開啟的線程組數(shù)
最大線程組數(shù)量:多處理機情況下的線程總數(shù)限制(總線程為2,任務(wù)項線程為4是沒有意義的)
IP地址:127.0.0.1或者localhost會在所有機器上運行,注意多處理機若沒有根據(jù)任務(wù)子項劃分?jǐn)?shù)據(jù)處理,會導(dǎo)致多處理機重復(fù)處理數(shù)據(jù),謹(jǐn)慎配置
配置完成后啟動客戶端即可進行任務(wù)調(diào)度。
6、分布式高可用高效率保障
1)調(diào)度機的高可用有保障
多調(diào)度機向注冊中心注冊后,共享調(diào)度任務(wù),且同一調(diào)度任務(wù)僅由一臺調(diào)度機執(zhí)行調(diào)度,當(dāng)前調(diào)度機異常宕機后,其余的調(diào)度機會接上
2)執(zhí)行機的高可用有保障
多執(zhí)行機向注冊中心注冊后,配置執(zhí)行機單線程(多機總線程為1)執(zhí)行任務(wù),調(diào)度機會隨機啟動一臺執(zhí)行機執(zhí)行,當(dāng)前執(zhí)行異常機宕機后,調(diào)度機會會新調(diào)度一臺執(zhí)行機。
3)執(zhí)行機的并行高效保障
配置執(zhí)行機多線程且劃分多任務(wù)子項后,各任務(wù)子項均衡分配到所有執(zhí)行機,各執(zhí)行機均執(zhí)行,多線程數(shù)據(jù)一致性協(xié)調(diào)由任務(wù)項參數(shù)區(qū)分。
4)彈性擴展失效轉(zhuǎn)移保障
運行中的執(zhí)行機宕機,或新增執(zhí)行機,調(diào)度機將在下次任務(wù)執(zhí)行前重新分配任務(wù)項,不影響正常執(zhí)行機任務(wù)(崩潰的執(zhí)行機當(dāng)前任務(wù)處理失效);運行中的調(diào)度機宕機或動態(tài)新增調(diào)度機,不影響執(zhí)行機當(dāng)前任務(wù),調(diào)度機宕機后動態(tài)切換。