xxl-job架構(gòu)分析

近期花了一些時(shí)間翻了xxl-job的源碼,稍作分析,希望能從如此成熟的框架中洞悉一些分布式任務(wù)調(diào)度的本質(zhì)。

本文的行文包括如下幾點(diǎn):

  • 梳理出整個(gè)核心設(shè)計(jì)圖,從骨架上俯瞰xxl-job的架構(gòu)設(shè)計(jì)
  • 梳理服務(wù)端核心設(shè)計(jì),建立服務(wù)端設(shè)計(jì)的核心思路
  • 梳理客戶端核心設(shè)計(jì),建立客戶端設(shè)計(jì)的核心思路

核心設(shè)計(jì)圖

image.png

服務(wù)端設(shè)計(jì)

XxlJobScheduler中可以窺視服務(wù)端設(shè)計(jì)的最核心的內(nèi)容,包括如下:

  • 觸發(fā)器線程池(任務(wù)調(diào)度執(zhí)行)
  • 服務(wù)注冊(cè)與注銷(xiāo)
  • 任務(wù)執(zhí)行異常補(bǔ)償機(jī)制
  • 客戶端異常下線處理、回調(diào)處理
  • 任務(wù)執(zhí)行統(tǒng)計(jì)報(bào)表
  • 任務(wù)執(zhí)行定時(shí)任務(wù)(生成任務(wù)下一次執(zhí)行時(shí)間)

觸發(fā)器

設(shè)計(jì)上分為兩個(gè)線程池,fastTriggerPoolslowTriggerPool,二者唯一的不同就是blockqueue隊(duì)列的大小不一樣,fastTriggerPool是1000,slowTriggerPool是2000

public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }
    // ....
}

注釋中提示,當(dāng)一分鐘內(nèi)任務(wù)執(zhí)行發(fā)生timeout的次數(shù)超過(guò)10次,任務(wù)將會(huì)被投入慢觸發(fā)器線程池,但是此處提及的timeout可不是指http執(zhí)行的超時(shí)時(shí)間,指的是任務(wù)執(zhí)行的時(shí)間超過(guò)500ms。

long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
    minTim = minTim_now;
    jobTimeoutCountMap.clear();
}

// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) {       // ob-timeout threshold 500ms
    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
    if (timeoutCount != null) {
        timeoutCount.incrementAndGet();
    }
}

每次任務(wù)執(zhí)行完成,將會(huì)判斷當(dāng)前時(shí)間是否等于上一次執(zhí)行的最小時(shí)間(通過(guò)當(dāng)前時(shí)間戳除以60000來(lái)粗糙的表示1分鐘的間距)
當(dāng)任務(wù)執(zhí)行時(shí)間超過(guò)500ms,則記錄超時(shí)次數(shù)。(500ms還不是可以配置的參數(shù),不得不說(shuō)有點(diǎn)粗糙……)

服務(wù)注冊(cè)與注銷(xiāo)

registryOrRemoveThreadPool線程池是用來(lái)處理接收來(lái)自客戶端的注冊(cè)與注銷(xiāo)的處理任務(wù),比較特殊的是線程池的拒絕策略是r.run(),意味著將會(huì)由客戶端請(qǐng)求的線程池來(lái)完成執(zhí)行。(xxl-job相當(dāng)多的線程池都是使用該策略)

registryMonitorThread是定時(shí)任務(wù),執(zhí)行時(shí)間間隔為30s,主要工作如下:

  • 檢查是否有自動(dòng)注冊(cè)的調(diào)度組(執(zhí)行器),如果有,則如下處理
  • 查詢(xún)超過(guò)90s沒(méi)有上報(bào)注冊(cè)的客戶端并刪除
  • 查詢(xún)正常上報(bào)的客戶端(注冊(cè)時(shí)間在90s內(nèi)),并將注冊(cè)的IP地址更新到調(diào)度組
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {

  // remove dead address (admin/executor)
  List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
  if (ids!=null && ids.size()>0) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
  }

  // fresh online address (admin/executor)
  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
  List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
  if (list != null) {
    //....
  }

  // fresh group address
  // ....
}

任務(wù)執(zhí)行異常補(bǔ)償機(jī)制

monitorThread是定時(shí)任務(wù),執(zhí)行時(shí)間間隔為10s,主要工作如下:

  • 查詢(xún)出1000內(nèi)執(zhí)行失敗的任務(wù)(告警狀態(tài)為默認(rèn)),循環(huán)處理這些失敗的任務(wù)
  • 將任務(wù)的默認(rèn)狀態(tài)更新為鎖定狀態(tài)(更新失敗則跳過(guò))
  • 判斷該任務(wù)是否配置了重試次數(shù),如果配置的話,則進(jìn)行重試
  • 判斷該任務(wù)是否配置了報(bào)警郵件,如果配置的話,則進(jìn)行告警
  • 更新任務(wù)狀態(tài)為1,2,3中一個(gè)

告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無(wú)需告警、2-告警成功、3-告警失敗

List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
  for (long failLogId: failLogIds) {

    // lock log
    int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
    if (lockRet < 1) {
      continue;
    }
    XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
    XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

    // 1、fail retry monitor
    if (log.getExecutorFailRetryCount() > 0) {
      JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
      String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
      log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
    }

    // 2、fail alarm monitor
    int newAlarmStatus = 0;   // 告警狀態(tài):0-默認(rèn)、-1=鎖定狀態(tài)、1-無(wú)需告警、2-告警成功、3-告警失敗
    if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
      boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
      newAlarmStatus = alarmResult?2:3;
    } else {
      newAlarmStatus = 1;
    }

    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
  }
}

客戶端異常下線處理、回調(diào)處理

callbackThreadPool線程池用來(lái)處理接收來(lái)自客戶端的回調(diào)處理。拒絕策略也是r.run()。

monitorThread,是定時(shí)任務(wù),時(shí)間間隔為60s,主要工作如下:

  • 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過(guò)10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失敗
// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過(guò)10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失??;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
  for (Long logId: losedJobIds) {
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setId(logId);
    jobLog.setHandleTime(new Date());
    jobLog.setHandleCode(ReturnT.FAIL_CODE);
    jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

    XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
  }
}

任務(wù)執(zhí)行統(tǒng)計(jì)報(bào)表

統(tǒng)計(jì)報(bào)表的核心代碼如下:

Date todayTo = itemDay.getTime();

// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);

Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
    int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
    int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
    int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
    int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

    xxlJobLogReport.setRunningCount(triggerDayCountRunning);
    xxlJobLogReport.setSucCount(triggerDayCountSuc);
    xxlJobLogReport.setFailCount(triggerDayCountFail);
}

// do refresh
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}

任務(wù)執(zhí)行定時(shí)任務(wù)(生成任務(wù)下一次執(zhí)行時(shí)間)

scheduleThread是個(gè)定時(shí)任務(wù),時(shí)間間隔是動(dòng)態(tài)計(jì)算的,這是服務(wù)端最核心的部分功能。

@Override
public void run() {

    try {
        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); // 秒數(shù)為0, 5, 10.....
    } catch (InterruptedException e) {
        //....
    }
    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
    // 默認(rèn)為(200 + 100) * 20 = 60000
    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

    while (!scheduleThreadToStop) {

        // Scan Job
        long start = System.currentTimeMillis();
        // 獲取數(shù)據(jù)庫(kù)資源
        boolean preReadSuc = true;
        try {

            // ...
            // 爭(zhēng)取到鎖權(quán)限(爭(zhēng)取不到的節(jié)點(diǎn),將會(huì)在這里阻塞,比如當(dāng)部署多個(gè)節(jié)點(diǎn)時(shí))
            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
            preparedStatement.execute();

            // tx start

            // 1、pre read
            long nowTime = System.currentTimeMillis();
            // 查詢(xún)下一次執(zhí)行時(shí)間小于當(dāng)前時(shí)間的5s后,并查默認(rèn)6000條
            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
            if (scheduleList!=null && scheduleList.size()>0) {
                // 2、push time-ring
                for (XxlJobInfo jobInfo: scheduleList) {

                    // time-ring jump
                    // 當(dāng)前時(shí)間 > 下一次觸發(fā)時(shí)間 + 5s
                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                        // 1、misfire match
                        // 過(guò)期調(diào)度策略
                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                            // FIRE_ONCE_NOW 》 trigger
                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                        }

                        // 2、fresh next
                        // 刷新下次執(zhí)行的時(shí)間
                        // 如果計(jì)算不到下次執(zhí)行時(shí)間,將會(huì)停止任務(wù)
                        refreshNextValidTime(jobInfo, new Date());

                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                        // 當(dāng)前時(shí)間稍大于下一次執(zhí)行時(shí)間(5s內(nèi))
                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                        // 1、trigger
                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                        // 2、fresh next
                        refreshNextValidTime(jobInfo, new Date());
                        // 如果下次觸發(fā)的時(shí)間間隔在5s內(nèi),則寫(xiě)入ring線程,由時(shí)間輪線程處理
                        // next-trigger-time in 5s, pre-read again
                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                            // 計(jì)算下一次觸發(fā)事件將會(huì)掉落在時(shí)間輪的哪一格上
                            // 1、make ring second
                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);//

                            // 2、push time ring
                            pushTimeRing(ringSecond, jobInfo.getId());

                            // 3、fresh next
                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                        }

                    } else {
                        // 下次觸發(fā)的時(shí)間在當(dāng)前時(shí)間之后,計(jì)算本次觸發(fā)的時(shí)間將會(huì)掉落在時(shí)間輪的哪一個(gè)格上
                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                        // 1、make ring second
                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                        // 2、push time ring
                        pushTimeRing(ringSecond, jobInfo.getId());

                        // 3、fresh next
                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                    }

                }

                // 3、update trigger info
                for (XxlJobInfo jobInfo: scheduleList) {
                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                }

            } else {
                preReadSuc = false; // 沒(méi)有可執(zhí)行任務(wù)
            }

            // tx stop


        } catch (Exception e) {
            // ....
        } finally {
            // 關(guān)閉資源
        }
        long cost = System.currentTimeMillis()-start;


        // Wait seconds, align second
        if (cost < 1000) {  // scan-overtime, not wait
            try {
                // pre-read period: success > scan each second; fail > skip this period;
                // 5s內(nèi)沒(méi)有可執(zhí)行的任務(wù),則沉睡到下一個(gè)零整5s,比如0,5,10....
                // 如果有則沉睡到下一個(gè)零整1s, 1,2,3,4....
                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

    }

    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}

ring thread是一個(gè)時(shí)間輪設(shè)計(jì),最高為60格,內(nèi)部是一個(gè)map,key為秒數(shù),value為待執(zhí)行任務(wù)ID列表。

public void run() {

    while (!ringThreadToStop) {

        // align second
        try {
            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); // 對(duì)齊到下一秒
        } catch (InterruptedException e) {
            if (!ringThreadToStop) {
                logger.error(e.getMessage(), e);
            }
        }

        try {
            // second data
            List<Integer> ringItemData = new ArrayList<>();
            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時(shí)太長(zhǎng),跨過(guò)刻度,向前校驗(yàn)一個(gè)刻度;
            for (int i = 0; i < 2; i++) {
                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); // 將當(dāng)前刻度+上一個(gè)刻度。距離當(dāng)前是4s, i=0則為4, i1則為3
                if (tmpData != null) {
                    ringItemData.addAll(tmpData);
                }
            }

            // ring trigger
            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
            if (ringItemData.size() > 0) {
                // do trigger
                for (int jobId: ringItemData) {
                    // do trigger
                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                }
                // clear
                ringItemData.clear();
            }
        } catch (Exception e) {
            if (!ringThreadToStop) {
                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
            }
        }
    }
    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}

小結(jié):
由此服務(wù)端的核心設(shè)計(jì)基本就算拆解完了,最核心的部分是任務(wù)執(zhí)行定時(shí)任務(wù),內(nèi)部實(shí)現(xiàn)了秒級(jí)的時(shí)間輪算法(有點(diǎn)粗糙....)??偟募軜?gòu)設(shè)計(jì)并不算簡(jiǎn)單,由于沒(méi)有外部框架依賴(lài),需要自己實(shí)現(xiàn)服務(wù)注冊(cè)/服務(wù)注銷(xiāo)/服務(wù)掉線檢查等等的功能,所以總體上又可以將架構(gòu)分為:服務(wù)治理與服務(wù)調(diào)度兩個(gè)大塊去看。服務(wù)治理主要作用于維系服務(wù)端與客戶端兩個(gè)角色的連接狀態(tài)。服務(wù)調(diào)度則是xxl-job的核心功能。

客戶端設(shè)計(jì)

客戶端的肯定不如服務(wù)端復(fù)雜,基本上就幾個(gè)點(diǎn):

  • 掃描Spring容器中,加了XXL注解的方法,生成MethodJobHandler加入到注冊(cè)表中
  • 初始話客戶端日志相關(guān)內(nèi)容
  • 初始話客戶端服務(wù)器(用戶接收來(lái)自服務(wù)端調(diào)用)
  • 服務(wù)注冊(cè)與注銷(xiāo)
  • 處理執(zhí)行成功的回調(diào)任務(wù)

其中重點(diǎn)關(guān)注的點(diǎn)在于客戶端服務(wù)器,以及服務(wù)注冊(cè)與注銷(xiāo)

客戶端服務(wù)器

EmbedServer是一個(gè)基于Netty構(gòu)建的Http服務(wù)器,默認(rèn)端口號(hào)為9000

// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                        .addLast(new HttpServerCodec())
                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));// http 處理器
            }
        })
        .childOption(ChannelOption.SO_KEEPALIVE, true);

// bind
ChannelFuture future = bootstrap.bind(port).sync();

logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

// start registry
startRegistry(appname, address); // 服務(wù)注冊(cè)

// wait util stop
future.channel().closeFuture().sync();

EmbedHttpServerHandler處理來(lái)自服務(wù)端的Http請(qǐng)求,主要包括心跳,空閑線程檢查執(zhí)行,停止任務(wù)執(zhí)行,獲取日志.

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

    // valid 僅支持post
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    // 檢驗(yàn)token 
    if (accessToken!=null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);// 檢查線程是否在運(yùn)行,隊(duì)列中是否有數(shù)據(jù)
        } else if ("/run".equals(uri)) {
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {// 停止客戶端線程
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
       // ....
    }
}

重點(diǎn)關(guān)注run事件,它主要是找到相應(yīng)的IHandler,構(gòu)建JobThread,推到線程隊(duì)列中等線程處理。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler 
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
    //...
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
    // ....
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    if (jobThread != null) { // 根據(jù)配置的阻塞策略來(lái)執(zhí)行
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // 丟棄后續(xù)調(diào)度
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆蓋之前調(diào)度
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else { // 單機(jī)串行
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); // 生成該handler的執(zhí)行線程并注冊(cè)到注冊(cè)表中,啟動(dòng)該線程
    }

    // push data to queue
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); // 放入阻塞隊(duì)列,線程會(huì)對(duì)阻塞隊(duì)列的數(shù)據(jù)進(jìn)行處理
    return pushResult;
}

job thread的處理邏輯,具體如下:


// init
try {
    handler.init(); // ihandler有init方法支持
} catch (Throwable e) {
    logger.error(e.getMessage(), e);
}

// execute
while(!toStop){
    running = false;
    idleTimes++; // 空閑次數(shù)

    TriggerParam triggerParam = null;
    try {
        // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
        triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); // 阻塞等待3s
        if (triggerParam!=null) {
            running = true;
            idleTimes = 0; // 空閑次數(shù)清零
            triggerLogIdSet.remove(triggerParam.getLogId());

            // log filename, like "logPath/yyyy-MM-dd/9999.log"
            String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
            // 上下文環(huán)境
            XxlJobContext xxlJobContext = new XxlJobContext(
                    triggerParam.getJobId(),
                    triggerParam.getExecutorParams(),
                    logFileName,
                    triggerParam.getBroadcastIndex(),
                    triggerParam.getBroadcastTotal());

            // init job context
            XxlJobContext.setXxlJobContext(xxlJobContext);

            // execute
            XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
            // 設(shè)置了執(zhí)行超時(shí)時(shí)間,則采用異步超時(shí)等待執(zhí)行的方式
            if (triggerParam.getExecutorTimeout() > 0) {
                // limit timeout
                Thread futureThread = null;
                try {
                    FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                        @Override
                        public Boolean call() throws Exception {

                            // init job context
                            XxlJobContext.setXxlJobContext(xxlJobContext);

                            handler.execute(); // 執(zhí)行
                            return true;
                        }
                    });
                    futureThread = new Thread(futureTask);
                    futureThread.start();

                    Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);// 超時(shí)時(shí)間的單位為秒
                } catch (TimeoutException e) {
                    // ...
                    // handle result
                    XxlJobHelper.handleTimeout("job execute timeout "); // 處理超時(shí)
                } finally {
                    futureThread.interrupt();
                }
            } else {
                // just execute
                handler.execute(); // 直接執(zhí)行
            }

            // ....
        } else {
            if (idleTimes > 30) { // 當(dāng)空閑次數(shù)超過(guò)30次
                if(triggerQueue.size() == 0) {  // avoid concurrent trigger causes jobId-lost 等待執(zhí)行隊(duì)列中沒(méi)有數(shù)據(jù)
                    XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); // 移除該線程
                }
            }
        }
    } catch (Throwable e) {
        // 處理異常
    } finally {
        if(triggerParam != null) {
            // callback handler info
            if (!toStop) {
                // commonm 寫(xiě)入回調(diào)隊(duì)列,等待回調(diào)線程上報(bào)處理結(jié)果
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg() )
                );
            } else {
                // is killed 線程已暫停,可以kill,由服務(wù)端來(lái)kill
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.HANDLE_COCE_FAIL,
                        stopReason + " [job running, killed]" )
                );
            }
        }
    }
}

// 線程停止后,隊(duì)列中有數(shù)據(jù),寫(xiě)入回調(diào)隊(duì)列,等待kill
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
    TriggerParam triggerParam = triggerQueue.poll();
    if (triggerParam!=null) {
        // is killed 
        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                triggerParam.getLogId(),
                triggerParam.getLogDateTime(),
                XxlJobContext.HANDLE_COCE_FAIL,
                stopReason + " [job not executed, in the job queue, killed.]")
        );
    }
}

// destroy
try {
    handler.destroy(); // 線程停止時(shí)銷(xiāo)毀回調(diào),ihandler有destroy支持
} catch (Throwable e) {
    logger.error(e.getMessage(), e);
}

小結(jié):至此,客戶端的執(zhí)行邏輯基本分析完成了。其中設(shè)計(jì)點(diǎn)主要為:每一個(gè)@XXL的方法都將生成一個(gè)對(duì)應(yīng)的線程來(lái)處理服務(wù)端的調(diào)度。并且當(dāng)線程的空閑次數(shù)超過(guò)30次,每次3s,總共為90s,沒(méi)有任務(wù)處理,將會(huì)關(guān)閉線程,下次調(diào)度時(shí)如果沒(méi)有線程再重新生成,即同一個(gè)前后調(diào)度間隔超過(guò)90s的任務(wù)那不是要來(lái)回構(gòu)建/刪除線程嗎?

服務(wù)注冊(cè)與注銷(xiāo)

EmbedServer構(gòu)建Http服務(wù)器時(shí)會(huì)啟動(dòng)服務(wù)注冊(cè)線程registryThread,registryThread每30s上報(bào)一次心跳。如果registryThreadtoStop被更新為true,則進(jìn)入服務(wù)注銷(xiāo)流程。

public void run() {

    // registry
    while (!toStop) {
        try {
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> registryResult = adminBiz.registry(registryParam); // 注冊(cè)(上報(bào)心跳)
                    // ...
                } catch (Exception e) {}

            }
        } catch (Exception e) {}
        try {
            if (!toStop) {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); // 30s
            }
        } catch (InterruptedException e) {}
    }

    // registry remove
    try {
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); // 注銷(xiāo)
                //....
            } catch (Exception e) {}

        }
    } catch (Exception e) {}

}

小結(jié):客戶端的設(shè)計(jì)言簡(jiǎn)意賅,配置多個(gè)admin adress就向多個(gè)服務(wù)端注冊(cè),這種方式也進(jìn)一步說(shuō)明服務(wù)端之間并未做狀態(tài)同步(如果是同一個(gè)數(shù)據(jù)庫(kù)即可以相互感知)??偠灾?,客戶端的設(shè)計(jì)主要是比較耗費(fèi)線程資源,有些線程可能會(huì)出現(xiàn)不斷構(gòu)建、刪除的情況。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容