一、xxl-job的調(diào)度流程和配置
任務(wù)調(diào)度器和執(zhí)行器使用http協(xié)議通信,各自有輪詢線程處理不同業(yè)務(wù)。
二、XXL-JOB客戶端啟動(dòng)流程
-
加載Bean:
從spring容器獲取所有對(duì)象,并遍歷查找方法上標(biāo)記XxlJob注解的方法 將xxljob配置的jobname作為key,對(duì)象Handle作為value注冊(cè)Map 中 ConcurrentMap jobHandlerRepository的Map中維護(hù);
創(chuàng)建執(zhí)行任務(wù)的線程池;
啟動(dòng)內(nèi)嵌的Netty服務(wù);
啟動(dòng)注冊(cè)線程,每隔30s上報(bào)一次注冊(cè)信息。
public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000));
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
}).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
}
三、任務(wù)的下發(fā)與執(zhí)行
任務(wù)的下發(fā)與執(zhí)行(服務(wù)端發(fā)送給客戶端):
收到服務(wù)器不動(dòng)執(zhí)行進(jìn)行任務(wù)分發(fā):
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
}
1 /beat:心跳?;顧z測(cè)
直接return success,用戶服務(wù)器探活;
2 /idleBeat:任務(wù)執(zhí)行策略配置為忙碌轉(zhuǎn)移時(shí)使用;
等待隊(duì)列如果存在待執(zhí)行任務(wù)時(shí),返回false;
等待隊(duì)列為空時(shí):返回true;
3 /run:接收到執(zhí)行任務(wù)指令
將任務(wù)提交到執(zhí)行隊(duì)列中,并返回true;
隊(duì)列滿或handler不存在時(shí)返回false;
4 /kill:中斷任務(wù)
對(duì)執(zhí)行任務(wù)的線程執(zhí)行 JobThread.interrupt();
每個(gè)任務(wù)Id會(huì)有一個(gè)線程,Kill僅殺死執(zhí)行該任務(wù)Id的線程,下次再下發(fā)任務(wù)發(fā)現(xiàn)線程已中斷會(huì)重新創(chuàng)建線程。
5 /log:獲取執(zhí)行l(wèi)og
返回客戶端執(zhí)行的本地log給服務(wù)端。
四、客戶端注冊(cè)和執(zhí)行結(jié)果上報(bào)
客戶端注冊(cè)和執(zhí)行結(jié)果上報(bào)(客戶端發(fā)送給服務(wù)端)
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
1 /registry:注冊(cè)客戶端信息
啟動(dòng)線程定時(shí)注冊(cè)自己的服務(wù)到調(diào)度器;
創(chuàng)建線程,30s輪詢一次,上報(bào)注冊(cè)信息。
2 /registryRemove:移出執(zhí)行器請(qǐng)求
將自己從執(zhí)行器列表移除;
程序退出時(shí)會(huì)調(diào)用一次,在Netty的finally代碼塊自動(dòng)執(zhí)行。
3 /callback:異步回調(diào)結(jié)果
執(zhí)行器異步回調(diào)給調(diào)度器執(zhí)行任務(wù)結(jié)果;
每次任務(wù)完成時(shí)上報(bào)。
五、附錄
1 網(wǎng)絡(luò)通信格式:
(1)客戶端注冊(cè)
http://127.0.0.1:8080/xxl-job-admin/api/registry
{
"registryGroup": "EXECUTOR"
"registryKey": "xxl-job-executor-sample"
"registryValue": "http://172.30.0.67:9999/"
}
Response:
{
"code": 200
"msg": null
"content": null
}
(2)客戶端移除注冊(cè)
http://127.0.0.1:8080/xxl-job-admin/api/registryRemove
{
"registryGroup": "EXECUTOR"
"registryKey": "xxl-job-executor-sample"
"registryValue": "http://xxljob-axzo.cn"
}
Response:
{
"code": 200
"msg": null
"content": null
}
(3)客戶端執(zhí)行任務(wù)結(jié)果上報(bào)
http://127.0.0.1:8080/xxl-job-admin/api/callback
{
"logId": 1238
"logDateTim": 1667197980007
"handleCode": 200
}
Response:
{
"code": 200
"msg": null
"content": null
}
(4)執(zhí)行器下發(fā)任務(wù):同步回調(diào)僅代表任務(wù)是否發(fā)送成功
http://172.30.0.67:9999/run
{
"jobId": 4
"executorHandler": "demoJobHandler"
"executorParams": ""
"executorBlockStrategy": "SERIAL_EXECUTION"
"executorTimeout": 0
"logId": 1238
"logDateTime": 1667197980007
"glueType": "BEAN"
"glueSource": ""
"glueUpdatetime": 1666683613000
"broadcastIndex": 0
"broadcastTotal": 1
}
Response:
{
"code": 200
"msg": null
"content": null
}
2 Token配置詳解
1.配置了token后,client發(fā)送的每隔http請(qǐng)求頭會(huì)帶上XXL-JOB-ACCESS-TOKEN :{xxl.job.accessToken} ;
2.該參數(shù)不會(huì)對(duì)請(qǐng)求參數(shù)加密;
3.如果配置不匹配,客戶端請(qǐng)求報(bào)錯(cuò):
{
"code": 500
"msg": "The access token is wrong."
"content": null
}
4.發(fā)送配置token的請(qǐng)求,Header中新增了Token參數(shù)
5.配置錯(cuò)token的返回
程序員的核心競(jìng)爭(zhēng)力其實(shí)還是技術(shù),因此對(duì)技術(shù)還是要不斷的學(xué)習(xí),關(guān)注 “IT巔峰技術(shù)” 公眾號(hào) ,該公眾號(hào)內(nèi)容定位:中高級(jí)開(kāi)發(fā)、架構(gòu)師、中層管理人員等中高端崗位服務(wù)的,除了技術(shù)交流外還有很多架構(gòu)思想和實(shí)戰(zhàn)案例。
作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書(shū)作者,同時(shí)也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人,曾就職于拼多多、德邦等公司,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人,主要負(fù)責(zé)開(kāi)發(fā)框架的搭建、中間件相關(guān)技術(shù)的二次開(kāi)發(fā)和運(yùn)維管理、混合云及基礎(chǔ)服務(wù)平臺(tái)的建設(shè)。