xxl-job客戶端架構(gòu)流程

一、xxl-job的調(diào)度流程和配置

任務(wù)調(diào)度器和執(zhí)行器使用http協(xié)議通信,各自有輪詢線程處理不同業(yè)務(wù)。

image
image

二、XXL-JOB客戶端啟動(dòng)流程

  1. 加載Bean:

    從spring容器獲取所有對(duì)象,并遍歷查找方法上標(biāo)記XxlJob注解的方法 將xxljob配置的jobname作為key,對(duì)象Handle作為value注冊(cè)Map 中 ConcurrentMap jobHandlerRepository的Map中維護(hù);

  2. 創(chuàng)建執(zhí)行任務(wù)的線程池;

  3. 啟動(dòng)內(nèi)嵌的Netty服務(wù);

  4. 啟動(dòng)注冊(cè)線程,每隔30s上報(bào)一次注冊(cè)信息。

public class EmbedServer {

    public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {
            @Override
            public void run() {
                // param
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000));

                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                             }
                        }).childOption(ChannelOption.SO_KEEPALIVE, true);

                ChannelFuture future = bootstrap.bind(port).sync();

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();
            }
        });
        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }
}

三、任務(wù)的下發(fā)與執(zhí)行

任務(wù)的下發(fā)與執(zhí)行(服務(wù)端發(fā)送給客戶端):

收到服務(wù)器不動(dòng)執(zhí)行進(jìn)行任務(wù)分發(fā):

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

      switch (uri) {
          case "/beat":
              return executorBiz.beat();
          case "/idleBeat":
              IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
              return executorBiz.idleBeat(idleBeatParam);
          case "/run":
              TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
              return executorBiz.run(triggerParam);
          case "/kill":
              KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
              return executorBiz.kill(killParam);
          case "/log":
              LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
              return executorBiz.log(logParam);
          default:
              return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
}

1 /beat:心跳?;顧z測(cè)

直接return success,用戶服務(wù)器探活;

2 /idleBeat:任務(wù)執(zhí)行策略配置為忙碌轉(zhuǎn)移時(shí)使用;

等待隊(duì)列如果存在待執(zhí)行任務(wù)時(shí),返回false;

等待隊(duì)列為空時(shí):返回true;

3 /run:接收到執(zhí)行任務(wù)指令

將任務(wù)提交到執(zhí)行隊(duì)列中,并返回true;

隊(duì)列滿或handler不存在時(shí)返回false;

4 /kill:中斷任務(wù)

對(duì)執(zhí)行任務(wù)的線程執(zhí)行 JobThread.interrupt();

每個(gè)任務(wù)Id會(huì)有一個(gè)線程,Kill僅殺死執(zhí)行該任務(wù)Id的線程,下次再下發(fā)任務(wù)發(fā)現(xiàn)線程已中斷會(huì)重新創(chuàng)建線程。

5 /log:獲取執(zhí)行l(wèi)og

返回客戶端執(zhí)行的本地log給服務(wù)端。

四、客戶端注冊(cè)和執(zhí)行結(jié)果上報(bào)

客戶端注冊(cè)和執(zhí)行結(jié)果上報(bào)(客戶端發(fā)送給服務(wù)端)

@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}

1 /registry:注冊(cè)客戶端信息

啟動(dòng)線程定時(shí)注冊(cè)自己的服務(wù)到調(diào)度器;

創(chuàng)建線程,30s輪詢一次,上報(bào)注冊(cè)信息。

2 /registryRemove:移出執(zhí)行器請(qǐng)求

將自己從執(zhí)行器列表移除;

程序退出時(shí)會(huì)調(diào)用一次,在Netty的finally代碼塊自動(dòng)執(zhí)行。

3 /callback:異步回調(diào)結(jié)果

執(zhí)行器異步回調(diào)給調(diào)度器執(zhí)行任務(wù)結(jié)果;

每次任務(wù)完成時(shí)上報(bào)。

五、附錄

1 網(wǎng)絡(luò)通信格式:

(1)客戶端注冊(cè)

http://127.0.0.1:8080/xxl-job-admin/api/registry
{
    "registryGroup": "EXECUTOR"
    "registryKey": "xxl-job-executor-sample"
    "registryValue": "http://172.30.0.67:9999/"
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}

(2)客戶端移除注冊(cè)

http://127.0.0.1:8080/xxl-job-admin/api/registryRemove
{
    "registryGroup": "EXECUTOR"
    "registryKey": "xxl-job-executor-sample"
    "registryValue": "http://xxljob-axzo.cn"
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}

(3)客戶端執(zhí)行任務(wù)結(jié)果上報(bào)

http://127.0.0.1:8080/xxl-job-admin/api/callback
{
    "logId": 1238
    "logDateTim": 1667197980007
    "handleCode": 200
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}

(4)執(zhí)行器下發(fā)任務(wù):同步回調(diào)僅代表任務(wù)是否發(fā)送成功

http://172.30.0.67:9999/run
{
    "jobId": 4
    "executorHandler": "demoJobHandler"
    "executorParams": ""
    "executorBlockStrategy": "SERIAL_EXECUTION"
    "executorTimeout": 0
    "logId": 1238
    "logDateTime": 1667197980007
    "glueType": "BEAN"
    "glueSource": ""
    "glueUpdatetime": 1666683613000
    "broadcastIndex": 0
    "broadcastTotal": 1
}

Response:
{
    "code": 200
    "msg": null
    "content": null
}

2 Token配置詳解

1.配置了token后,client發(fā)送的每隔http請(qǐng)求頭會(huì)帶上XXL-JOB-ACCESS-TOKEN :{xxl.job.accessToken} ;

2.該參數(shù)不會(huì)對(duì)請(qǐng)求參數(shù)加密;

3.如果配置不匹配,客戶端請(qǐng)求報(bào)錯(cuò):

{
"code": 500
"msg": "The access token is wrong."
"content": null
}

4.發(fā)送配置token的請(qǐng)求,Header中新增了Token參數(shù)

image

5.配置錯(cuò)token的返回

image

程序員的核心競(jìng)爭(zhēng)力其實(shí)還是技術(shù),因此對(duì)技術(shù)還是要不斷的學(xué)習(xí),關(guān)注 “IT巔峰技術(shù)” 公眾號(hào) ,該公眾號(hào)內(nèi)容定位:中高級(jí)開(kāi)發(fā)、架構(gòu)師、中層管理人員等中高端崗位服務(wù)的,除了技術(shù)交流外還有很多架構(gòu)思想和實(shí)戰(zhàn)案例。

作者是 《 消息中間件 RocketMQ 技術(shù)內(nèi)幕》 一書(shū)作者,同時(shí)也是 “RocketMQ 上海社區(qū)”聯(lián)合創(chuàng)始人,曾就職于拼多多、德邦等公司,現(xiàn)任上市快遞公司架構(gòu)負(fù)責(zé)人,主要負(fù)責(zé)開(kāi)發(fā)框架的搭建、中間件相關(guān)技術(shù)的二次開(kāi)發(fā)和運(yùn)維管理、混合云及基礎(chǔ)服務(wù)平臺(tái)的建設(shè)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容