TBSchedule使用簡介

TBSchedule是什么

TBSchedule是一個支持分布式的調(diào)度框架,讓批量任務(wù)或者不斷變化的任務(wù)能夠被動態(tài)的分配到多個主機的JVM中,在不同的線程組中并行執(zhí)行,所有的任務(wù)能夠被不重復(fù),不遺漏的快速處理?;赯ooKeeper的純Java實現(xiàn),由Alibaba開源。? ??

TBSchedule能干什么

TBSchedule可以將調(diào)度作業(yè)從業(yè)務(wù)系統(tǒng)中分離出來,降低或者是消除和業(yè)務(wù)系統(tǒng)的耦合度,進行高效異步任務(wù)處理。在互聯(lián)網(wǎng)和電商領(lǐng)域TBSchedule的使用非常廣泛,目前被應(yīng)用于阿里巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等很多互聯(lián)網(wǎng)企業(yè)的流程調(diào)度系統(tǒng)。

TBSchedule如何使用

1、下載TBSchdule源碼

下載地址:TBSchedule源碼

官網(wǎng)目前打不開了,可從本人的百度網(wǎng)盤下載。鏈接:TBSchedule源碼 密碼:vuzg

2、部署zookeeper

下載zookeeper并安裝部署。ZK下載地址

3、部署管理控制臺

把下載的源碼的console文件夾中ScheduleConsole.war文件部署到tomcat容器中,并啟動tomcat服務(wù)。

圖1.管理控制臺war包

訪問http://localhost:8080/ScheduleConsole/schedule/config.jsp地址進行鏈接ZK的基礎(chǔ)信息配置,如下圖:

圖2.配置ZK連接信息

4、編寫客戶端代碼

引入jar包

<dependency>

? ? <groupId>com.taobao.pamirs.schedule</groupId>

? ? <artifactId>tbschedule</artifactId>

? ? <version>3.3.3.2</version>

</dependency>

配置zk連接信息

<bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"

? ? ? init-method="init">

? ? <property name="zkConfig">

? ? ? ? ? ? <entry key="zkConnectString" value="localhost:2181" />

? ? ? ? ? ? <entry key="rootPath" value="/tb-schedule/test" />

? ? ? ? ? ? <entry key="zkSessionTimeout" value="60000" />

? ? ? ? ? ? <entry key="userName" value="admin" />

? ? ? ? ? ? <entry key="password" value="123456" />

? ? ? ? ? ? <entry key="isCheckParentPath" value="true" />

</bean>

實現(xiàn)IScheduleTaskDealSingle接口及selectTasks()方法和execute()方法。

package com.zhl.tbSchedule;

import com.alibaba.fastjson.JSONObject;

import com.taobao.pamirs.schedule.IScheduleTaskDealSingle;

import com.taobao.pamirs.schedule.TaskItemDefine;

import com.zhl.tbSchedule.business.dao.TBOrderMapper;

import com.zhl.tbSchedule.business.dao.TBOrderMapperCopy;

import com.zhl.tbSchedule.business.domain.TBOrder;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.util.ArrayList;

import java.util.Comparator;

import java.util.Date;

import java.util.List;

@Slf4j

@Component("iScheduleTaskDealSingleTest")

public class IScheduleTaskDealSingleTestimplements IScheduleTaskDealSingle {

@Autowired

? ? public TBOrderMappertbOrderMapper;

? ? @Autowired

? ? public TBOrderMapperCopytbOrderMapperCopy;

? ? @Override

? ? public ComparatorgetComparator() {

return null;

? ? }

@Override

? ? public ListselectTasks(String taskParameter, String ownSign, int taskQueueNum,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? List taskItemList, int eachFetchDataNum)throws Exception {

System.out.println(Thread.currentThread().getName() +" _IScheduleTaskDealSingleTest start to selectTasks..........");

? ? ? ? if (taskItemList ==null || taskItemList.size() <0) {

return null;

? ? ? ? }

System.out.println("IScheduleTaskDealSingleTest config params,taskParameter:{" + taskParameter +"},ownSina:{"

? ? ? ? ? ? ? ? + ownSign +"},taskQueueNum:{" + taskQueueNum +"},taskItemList:{" + JSONObject.toJSONString(taskItemList)

+"}, eachFetchDataNum:{" + eachFetchDataNum +"}");

? ? ? ? List models =new ArrayList();

? ? ? ? String billingNumber ="";

? ? ? ? for (TaskItemDefine taskItemDefine : taskItemList) {

billingNumber += taskItemDefine.getTaskItemId() +"";

? ? ? ? }

if (StringUtils.isNotBlank(billingNumber)) {

//billingNumber = billingNumber.substring(0,billingNumber.length() - 1);

? ? ? ? ? ? models =tbOrderMapper.selectByBillNumber(billingNumber, eachFetchDataNum);

? ? ? ? }

System.out.println("IScheduleTaskDealSingleTest selectTasks result..........models.size:" + models.size());

? ? ? ? return models;

? ? }

@Override

? ? public boolean execute(TBOrder model, String ownSign)throws Exception {

System.out.println(Thread.currentThread().getName() +" _IScheduleTaskDealSingleTest執(zhí)行開始啦.........." +new Date());

? ? ? ? // System.out.println(model);

? ? ? ? tbOrderMapperCopy.insertTBOrder(model);

? ? ? ? tbOrderMapper.updateStatus(model.getBillNumber());

return true;

? ? }

}

selectTasks方法參數(shù)說明:

taskParameter:對應(yīng)控制臺自定義參數(shù),可自定義傳入做邏輯上的操作

taskQueueNum:對應(yīng)控制臺任務(wù)項數(shù)量

taskItemList:集合中TaskItemDefine的id值對應(yīng)任務(wù)項值,多線程處理時,根據(jù)任務(wù)項協(xié)調(diào)數(shù)據(jù)一致性和完整性

eachFetchDataNum:對應(yīng)控制臺每次獲取數(shù)量,由于子計時單元開始后,會不斷的去取數(shù)據(jù)進行處理,直到取不到數(shù)據(jù)子計時才停止,等待下一個子計時開始。可以限制每次取數(shù),防止一次性數(shù)據(jù)記錄過大,內(nèi)存不足。

ownSign:環(huán)境參數(shù),可用于區(qū)分生產(chǎn)、測試、開發(fā)環(huán)境

5、配置任務(wù)管理和調(diào)度策略

圖3.任務(wù)管理配置

配置參數(shù)說明:

任務(wù)名稱:策略調(diào)度的標(biāo)示,一旦創(chuàng)建保存,不可更改

任務(wù)處理的SpringBean:注冊到spring的任務(wù)bean,如iScheduleTaskDealSingleTest

心跳頻率/假定服務(wù)死亡時間/處理模式/沒有數(shù)據(jù)時休眠時長/執(zhí)行結(jié)束時間:一般保持默認(rèn)即可

線程數(shù):處理該任務(wù)的線程數(shù)(一個線程組的線程數(shù)量),在沒有劃分多任務(wù)項的情況下,多線程是沒有意義的,且線程數(shù)量大于任務(wù)項也是沒有意義的(線程數(shù)小于等于任務(wù)項),注意如果開啟多線程,必須對數(shù)據(jù)做任務(wù)項過濾

單線程組最大任務(wù)項:配置單JVM處理的最大任務(wù)項數(shù)量,多任務(wù)項情況下,可按需限制,一般默認(rèn),多執(zhí)行機會均衡分配

每次獲取數(shù)量:子計時單元開始,線程會不斷的去獲取數(shù)據(jù)(selectTasks方法每次獲取的限制)并處理數(shù)據(jù),直到獲取不到數(shù)據(jù)子計時才結(jié)束(方法內(nèi)不用就可以隨意配置)

每次執(zhí)行數(shù)量:每次execute方法執(zhí)行的數(shù)據(jù)量,只在bean實現(xiàn)IScheduleTaskDealMulti才生效

每次處理完休眠時間:子計時單元開始,只要有數(shù)據(jù),就會不停的獲取不停的處理,這個時間設(shè)置后,子計時單元開始每次獲取執(zhí)行后,不管還有沒有數(shù)據(jù),都先歇會兒再獲取處理

自定義參數(shù):可自定義控制任務(wù)邏輯操作

任務(wù)項:這項很重要,在多線程情況下,劃分任務(wù)項是有意義的,但是要注意必須通過任務(wù)項參數(shù),協(xié)調(diào)待處理數(shù)據(jù)

圖4.調(diào)度策略配置

配置參數(shù)說明:

策略名稱:策略標(biāo)示,可任意填寫

任務(wù)類型:一般保持默認(rèn)Schedule

任務(wù)名稱:對應(yīng)任務(wù)欄被調(diào)度任務(wù)名稱

任務(wù)參數(shù):一般不用,保持默認(rèn)

單JVM最大線程組數(shù)量:單個JVM允許開啟的線程組數(shù)

最大線程組數(shù)量:多處理機情況下的線程總數(shù)限制(總線程為2,任務(wù)項線程為4是沒有意義的)

IP地址:127.0.0.1或者localhost會在所有機器上運行,注意多處理機若沒有根據(jù)任務(wù)子項劃分?jǐn)?shù)據(jù)處理,會導(dǎo)致多處理機重復(fù)處理數(shù)據(jù),謹(jǐn)慎配置

配置完成后啟動客戶端即可進行任務(wù)調(diào)度。

6、分布式高可用高效率保障

1)調(diào)度機的高可用有保障

多調(diào)度機向注冊中心注冊后,共享調(diào)度任務(wù),且同一調(diào)度任務(wù)僅由一臺調(diào)度機執(zhí)行調(diào)度,當(dāng)前調(diào)度機異常宕機后,其余的調(diào)度機會接上

2)執(zhí)行機的高可用有保障

多執(zhí)行機向注冊中心注冊后,配置執(zhí)行機單線程(多機總線程為1)執(zhí)行任務(wù),調(diào)度機會隨機啟動一臺執(zhí)行機執(zhí)行,當(dāng)前執(zhí)行異常機宕機后,調(diào)度機會會新調(diào)度一臺執(zhí)行機。

3)執(zhí)行機的并行高效保障

配置執(zhí)行機多線程且劃分多任務(wù)子項后,各任務(wù)子項均衡分配到所有執(zhí)行機,各執(zhí)行機均執(zhí)行,多線程數(shù)據(jù)一致性協(xié)調(diào)由任務(wù)項參數(shù)區(qū)分。

4)彈性擴展失效轉(zhuǎn)移保障

運行中的執(zhí)行機宕機,或新增執(zhí)行機,調(diào)度機將在下次任務(wù)執(zhí)行前重新分配任務(wù)項,不影響正常執(zhí)行機任務(wù)(崩潰的執(zhí)行機當(dāng)前任務(wù)處理失效);運行中的調(diào)度機宕機或動態(tài)新增調(diào)度機,不影響執(zhí)行機當(dāng)前任務(wù),調(diào)度機宕機后動態(tài)切換。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容