Java中線程池ScheduledThreadPoolExecutor原理探究

前面講解過Java中線程池ThreadPoolExecutor原理探究,ThreadPoolExecutor是Executors中一部分功能,下面來介紹另外一部分功能也就是ScheduledThreadPoolExecutor的實現(xiàn),后者是一個可以在一定延遲時候或者定時進行任務(wù)調(diào)度的線程池。

類圖結(jié)構(gòu)

image.png

Executors其實是個工具類,里面提供了好多靜態(tài)方法,根據(jù)用戶選擇返回不同的線程池實例。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實現(xiàn)ScheduledExecutorService接口。
線程池隊列是DelayedWorkQueue,它是對delayqueue的優(yōu)化,ScheduledFutureTask是阻塞隊列元素是對任務(wù)修飾。

構(gòu)造函數(shù):

    //使用改造后的delayqueue.
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

一個例子

    // 任務(wù)間以固定時間間隔執(zhí)行,延遲1s后開始執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后間隔2s再次執(zhí)行,任務(wù)執(zhí)行完畢后間隔2s再次執(zhí)行,依次往復(fù)
    static void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {

                System.out.println(System.currentTimeMillis());

            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);

        // 由于是定時任務(wù),一直不會返回
        result.get();
        System.out.println("over");

    }
    // 相對開始加入任務(wù)的時間點固定頻率執(zhí)行:從加入任務(wù)開始算1s后開始執(zhí)行任務(wù),1+2s開始執(zhí)行,1+2*2s執(zhí)行,1+n*2s開始執(zhí)行;
    // 但是如果執(zhí)行任務(wù)時間大約2s則不會并發(fā)執(zhí)行后續(xù)任務(wù)將會延遲。

    static void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
            public void run() {

                System.out.println(System.currentTimeMillis());

            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);

        // 由于是定時任務(wù),一直不會返回
        result.get();
        System.out.println("over");
    }

    // 延遲1s后開始執(zhí)行,只執(zhí)行一次,沒有返回值
    static void scheduleRunable() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.schedule(new Runnable() {

            @Override
            public void run() {
                System.out.println("gh");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }, 1000, TimeUnit.MILLISECONDS);

        System.out.println(result.get());

    }

    // 延遲1s后開始執(zhí)行,只執(zhí)行一次,有返回值
    static void scheduleCaller() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {

            @Override
            public String call() throws Exception {

                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                return "gh";
            }

        }, 1000, TimeUnit.MILLISECONDS);

        // 阻塞,直到任務(wù)執(zhí)行完成
        System.out.print(result.get());

    }

源碼分析

schedule(Runnable command, long delay,TimeUnit unit)方法

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();

    //裝飾任務(wù),主要實現(xiàn)public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //添加任務(wù)到延遲隊列
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {

    //如果線程池關(guān)閉了,則拒絕任務(wù)
    if (isShutdown())
        reject(task);
    else {
        //添加任務(wù)到隊列
        super.getQueue().add(task);

        //再次檢查線程池關(guān)閉
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //確保至少一個線程在處理任務(wù),即使核心線程數(shù)corePoolSize為0
            ensurePrestart();
    }
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    //增加核心線程數(shù)
    if (wc < corePoolSize)
        addWorker(null, true);
    //如果初始化corePoolSize==0,則也添加一個線程。
    else if (wc == 0)
        addWorker(null, false);
    }

上面做的首先吧runnable裝飾為delay隊列所需要的格式的元素,然后把元素加入到阻塞隊列,然后線程池線程會從阻塞隊列獲取超時的元素任務(wù)進行處理,下面看下隊列元素如何實現(xiàn)的。

//r為被修飾任務(wù),result=null,ns為當前時間加上delay時間后的
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

//通過適配器把runnable轉(zhuǎn)換為callable
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

修飾后把當前任務(wù)修飾為了delay隊列所需元素,下面看下元素的兩個重要方法:

  • 過期時間計算
//元素過期算法,裝飾后時間-當前時間,就是即將過期剩余時間
public long getDelay(TimeUnit unit) {
  return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
  • 元素比較
public int compareTo(Delayed other) {
  if (other == this) // compare zero ONLY if same object
      return 0;
  if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
          return -1;
      else if (diff > 0)
          return 1;
      else if (sequenceNumber < x.sequenceNumber)
          return -1;
      else
          return 1;
  }
  long d = (getDelay(TimeUnit.NANOSECONDS) -
            other.getDelay(TimeUnit.NANOSECONDS));
  return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

schedule(Callable<V> callable,
long delay,
TimeUnit unit)和schedule(Runnable command, long delay,TimeUnit unit)類似。

compareTo作用是在加入元素到dealy隊列時候進行比較,需要調(diào)整堆讓最快要過期的元素放到隊首。所以無論什么時候向隊列里面添加元素,隊首的都是最即將過期的元素。

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

定時調(diào)度:相鄰任務(wù)間時間固定

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();

        //修飾包裝,注意這里是period=-delay<0
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //添加任務(wù)到隊列
        delayedExecute(t);
        return t;
    }
       //period為 delay時間
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

我們知道任務(wù)添加到隊列后,工作線程會從隊列獲取并移除到期的元素,然后執(zhí)行run方法,所以下面看看ScheduledFutureTask的run方法如何實現(xiàn)定時調(diào)度的

public void run() {

    //是否只執(zhí)行一次
    boolean periodic = isPeriodic();

    //取消任務(wù)
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //只執(zhí)行一次,調(diào)用schdule時候
    else if (!periodic)
        ScheduledFutureTask.super.run();

    //定時執(zhí)行
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設(shè)置time=time+period
        setNextRunTime();

        //重新加入該任務(wù)到delay隊列
        reExecutePeriodic(outerTask);
    }
}
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else//由于period=-delay所以執(zhí)行這里,設(shè)置time=now()+delay
                time = triggerTime(-p);
        }

總結(jié):定時調(diào)度是先從隊列獲取任務(wù)然后執(zhí)行,然后在重新設(shè)置任務(wù)時間,在把任務(wù)放入隊列實現(xiàn)的。
如果任務(wù)執(zhí)行時間大于delay時間則等任務(wù)執(zhí)行完畢后的delay時間后在次調(diào)用任務(wù),不會同一個任務(wù)并發(fā)執(zhí)行。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

定時調(diào)度:相對起始時間點固定頻率調(diào)用

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //裝飾任務(wù)類,注意period=period>0,不是負的
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    //添加任務(wù)到隊列
    delayedExecute(t);
    return t;
}
        private void setNextRunTime() {
            long p = period;
           //period=delay;
            if (p > 0)
                time += p;//由于period>0所以執(zhí)行這里,設(shè)置time=time+delay
            else
                time = triggerTime(-p);
        }

總結(jié):相對于上面delay,rate方式執(zhí)行規(guī)則為時間為initdelday + n*period;時候啟動任務(wù),但是如果當前任務(wù)還沒有執(zhí)行完,要等到當前任務(wù)執(zhí)行完畢后在執(zhí)行一個任務(wù)。

總結(jié)

調(diào)度線程池主要用于定時器或者延遲一定時間在執(zhí)行任務(wù)時候使用。內(nèi)部使用優(yōu)化的DelayQueue來實現(xiàn),由于使用隊列來實現(xiàn)定時器,有出入隊調(diào)整堆等操作,所以定時并不是非常非常精確。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容