Quartz定時任務(wù)串行調(diào)度 fixedDelay

項目需要搞分布式,出于一些原因定時器的代碼也需要部署兩份,但是定時器是不需要跑兩遍,所以考慮了分布式的定時任務(wù)框架Quartz。主要解決2個問題:

  1. 多臺服務(wù)運行,保證只有一臺服務(wù)的定時器在跑。這臺服務(wù)不掛,另一臺上的定時器永遠不啟動。
  2. 保證定時器串行調(diào)度。一個定時任務(wù)沒有執(zhí)行完,絕對不會觸發(fā)第二次。(類似于Spring的定時器的fixedDelayString參數(shù))

于是我就開始寫demo,第一個很容易就得到了解決這里就不細說了,坑在了第二個,由于我用的是比較新的版本所以找了很多資料都不合適,結(jié)果沒辦法,跑去調(diào)試源碼。(容易嗎我,寫個demo要去調(diào)源碼)

網(wǎng)上很多人都說只要concurrent 設(shè)置為false

    @Bean("testJobDetail")
    public JobDetailFactoryBean testJobDetail() {
        JobDetailFactoryBean bean = new JobDetailFactoryBean();
        bean.setDurability(true);
        bean.setRequestsRecovery(true);
        bean.setJobClass(MyDetailQuartzJobBean.class);
        Map<String, String> map = new HashMap<>();
        //配置定時任務(wù)類
        map.put("targetObject", "testScheduleTask");
        map.put("targetMethod", "execute");
        //是否允許任務(wù)并發(fā)執(zhí)行。當(dāng)值為false時,表示必須等到前一個線程處理完畢后才再啟一個新的線程
//        map.put("concurrent", "false");
        bean.setJobDataAsMap(map);
        return bean;
    }

或者,使QuartzJobBean類實現(xiàn)org.quartz.StatefulJob接口即可

public class BackCoupon implements StatefulJob {
    @Override
    public void execute(JobExecutionContext context)
              throws JobExecutionException {
    }
}

我都試過了完全沒效果StatefulJob接口已經(jīng)被廢棄了不推薦使用,沒辦法了,擼起袖子調(diào)試源碼吧。我們知道quartz是可以把jobdetail存到數(shù)據(jù)表里的


圖片.png

我發(fā)現(xiàn)了一個很可疑的字段IS_NONCONCURRENT,入口點就在這里了。段點打在類的org.quartz.impl.jdbcjobstore.JobStoreSupport類storeJob方法,第610行,這里觸發(fā)了jobdetail的一個更新操作,點進去發(fā)現(xiàn)了orcale的具體實現(xiàn)

 public int updateJobDetail(Connection conn, JobDetail job) throws IOException, SQLException {
        ByteArrayOutputStream baos = this.serializeJobData(job.getJobDataMap());
        byte[] data = baos.toByteArray();
        PreparedStatement ps = null;
        PreparedStatement ps2 = null;
        ResultSet rs = null;

        int var13;
        try {
            ps = conn.prepareStatement(this.rtp("UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = EMPTY_BLOB()  WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"));
            ps.setString(1, job.getDescription());
            ps.setString(2, job.getJobClass().getName());
            this.setBoolean(ps, 3, job.isDurable());
            this.setBoolean(ps, 4, job.isConcurrentExectionDisallowed());
            this.setBoolean(ps, 5, job.isPersistJobDataAfterExecution());
            this.setBoolean(ps, 6, job.requestsRecovery());
            ps.setString(7, job.getKey().getName());
            ps.setString(8, job.getKey().getGroup());
            ps.executeUpdate();
            ps.close();
            ps = conn.prepareStatement(this.rtp("SELECT JOB_DATA FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ? FOR UPDATE"));
            ps.setString(1, job.getKey().getName());
            ps.setString(2, job.getKey().getGroup());
            rs = ps.executeQuery();
            int res = 0;
            if (rs.next()) {
                Blob dbBlob = this.writeDataToBlob(rs, 1, data);
                ps2 = conn.prepareStatement(this.rtp("UPDATE {0}JOB_DETAILS SET JOB_DATA = ?  WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"));
                ps2.setBlob(1, dbBlob);
                ps2.setString(2, job.getKey().getName());
                ps2.setString(3, job.getKey().getGroup());
                res = ps2.executeUpdate();
            }

            var13 = res;
        } finally {
            closeResultSet(rs);
            closeStatement(ps);
            closeStatement(ps2);
        }

        return var13;
    }

這個時候發(fā)現(xiàn)IS_NONCONCURRENT 字段是通過JOB的isConcurrentExectionDisallowed屬性確定的。而且下方有個循環(huán),這個循環(huán)就是把之前JobDetailFactoryBean 配置的map里的值取出來??匆幌耂QL,根本就不能配置IS_NONCONCURRENT 嘛,那么IS_NONCONCURRENT 怎么辦呢?
點進去JobDetail的實現(xiàn)類查看相關(guān)的getter and setter發(fā)現(xiàn)如下東西:

  public boolean isConcurrentExectionDisallowed() {
        return ClassUtils.isAnnotationPresent(this.jobClass, DisallowConcurrentExecution.class);
    }

是否有出現(xiàn)標(biāo)簽?哦,原來是通過掃描標(biāo)簽啊,DisallowConcurrentExecution這個標(biāo)簽是個空標(biāo)簽。找到定義的job,在類名上加上標(biāo)簽。問題就得到了解決。

比如我這里綁定的job是MyDetailQuartzJobBean方法,用的是網(wǎng)上給的綁定方法map里面定義targetObject,targetMethod。然后通過反射調(diào)用自己寫的定時任務(wù),那么就在類名上加上注解即可。

package com.quartz.demo.service;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.lang.reflect.Method;


@DisallowConcurrentExecution
public class MyDetailQuartzJobBean extends QuartzJobBean {
    // 計劃任務(wù)所在類
    private String targetObject;

    // 具體需要執(zhí)行的計劃任務(wù)
    private String targetMethod;
    private ApplicationContext ctx;

    @Override
    protected void executeInternal(JobExecutionContext context)
            throws JobExecutionException {
        try {
            Object otargetObject = ctx.getBean(targetObject);
            Method m = null;
            try {
                m = otargetObject.getClass().getMethod(targetMethod);
                m.invoke(otargetObject);
            } catch (SecurityException e) {
                e.printStackTrace();
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.ctx = applicationContext;
    }

    public void setTargetObject(String targetObject) {
        this.targetObject = targetObject;
    }

    public void setTargetMethod(String targetMethod) {
        this.targetMethod = targetMethod;
    }
}

或者綁定JOB的時候就直接綁定到實際的定時任務(wù)類,不要通過反射(網(wǎng)友們說可能會出現(xiàn)序列化問題,我并沒有嘗試過)

環(huán)境:2.0.3的spring-boot-starter-quartz。orcale數(shù)據(jù)庫。

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.noraui</groupId>
            <artifactId>ojdbc7</artifactId>
            <version>12.1.0.2</version>
        </dependency>

        <dependency>
            <groupId>com.mchange</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

參考:

quartz任務(wù)串行并行

Scheduled with fixed delay in quartz scheduler?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容