前面講解過Java中線程池ThreadPoolExecutor原理探究,ThreadPoolExecutor是Executors中一部分功能,下面來介紹另外一部分功能也就是ScheduledThreadPoolExecutor的實現(xiàn),后者是一個可以在一定延遲時候或者定時進行任務(wù)調(diào)度的線程池。
類圖結(jié)構(gòu)

Executors其實是個工具類,里面提供了好多靜態(tài)方法,根據(jù)用戶選擇返回不同的線程池實例。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實現(xiàn)ScheduledExecutorService接口。
線程池隊列是DelayedWorkQueue,它是對delayqueue的優(yōu)化,ScheduledFutureTask是阻塞隊列元素是對任務(wù)修飾。
構(gòu)造函數(shù):
//使用改造后的delayqueue.
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
一個例子
// 任務(wù)間以固定時間間隔執(zhí)行,延遲1s后開始執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后間隔2s再次執(zhí)行,任務(wù)執(zhí)行完畢后間隔2s再次執(zhí)行,依次往復(fù)
static void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 由于是定時任務(wù),一直不會返回
result.get();
System.out.println("over");
}
// 相對開始加入任務(wù)的時間點固定頻率執(zhí)行:從加入任務(wù)開始算1s后開始執(zhí)行任務(wù),1+2s開始執(zhí)行,1+2*2s執(zhí)行,1+n*2s開始執(zhí)行;
// 但是如果執(zhí)行任務(wù)時間大約2s則不會并發(fā)執(zhí)行后續(xù)任務(wù)將會延遲。
static void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 由于是定時任務(wù),一直不會返回
result.get();
System.out.println("over");
}
// 延遲1s后開始執(zhí)行,只執(zhí)行一次,沒有返回值
static void scheduleRunable() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("gh");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 1000, TimeUnit.MILLISECONDS);
System.out.println(result.get());
}
// 延遲1s后開始執(zhí)行,只執(zhí)行一次,有返回值
static void scheduleCaller() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "gh";
}
}, 1000, TimeUnit.MILLISECONDS);
// 阻塞,直到任務(wù)執(zhí)行完成
System.out.print(result.get());
}
源碼分析
schedule(Runnable command, long delay,TimeUnit unit)方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//裝飾任務(wù),主要實現(xiàn)public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//添加任務(wù)到延遲隊列
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池關(guān)閉了,則拒絕任務(wù)
if (isShutdown())
reject(task);
else {
//添加任務(wù)到隊列
super.getQueue().add(task);
//再次檢查線程池關(guān)閉
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//確保至少一個線程在處理任務(wù),即使核心線程數(shù)corePoolSize為0
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//增加核心線程數(shù)
if (wc < corePoolSize)
addWorker(null, true);
//如果初始化corePoolSize==0,則也添加一個線程。
else if (wc == 0)
addWorker(null, false);
}
上面做的首先吧runnable裝飾為delay隊列所需要的格式的元素,然后把元素加入到阻塞隊列,然后線程池線程會從阻塞隊列獲取超時的元素任務(wù)進行處理,下面看下隊列元素如何實現(xiàn)的。
//r為被修飾任務(wù),result=null,ns為當前時間加上delay時間后的
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//通過適配器把runnable轉(zhuǎn)換為callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
修飾后把當前任務(wù)修飾為了delay隊列所需元素,下面看下元素的兩個重要方法:
- 過期時間計算
//元素過期算法,裝飾后時間-當前時間,就是即將過期剩余時間
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
- 元素比較
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
schedule(Callable<V> callable,
long delay,
TimeUnit unit)和schedule(Runnable command, long delay,TimeUnit unit)類似。
compareTo作用是在加入元素到dealy隊列時候進行比較,需要調(diào)整堆讓最快要過期的元素放到隊首。所以無論什么時候向隊列里面添加元素,隊首的都是最即將過期的元素。
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
定時調(diào)度:相鄰任務(wù)間時間固定
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//修飾包裝,注意這里是period=-delay<0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//添加任務(wù)到隊列
delayedExecute(t);
return t;
}
//period為 delay時間
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
我們知道任務(wù)添加到隊列后,工作線程會從隊列獲取并移除到期的元素,然后執(zhí)行run方法,所以下面看看ScheduledFutureTask的run方法如何實現(xiàn)定時調(diào)度的
public void run() {
//是否只執(zhí)行一次
boolean periodic = isPeriodic();
//取消任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
//只執(zhí)行一次,調(diào)用schdule時候
else if (!periodic)
ScheduledFutureTask.super.run();
//定時執(zhí)行
else if (ScheduledFutureTask.super.runAndReset()) {
//設(shè)置time=time+period
setNextRunTime();
//重新加入該任務(wù)到delay隊列
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else//由于period=-delay所以執(zhí)行這里,設(shè)置time=now()+delay
time = triggerTime(-p);
}
總結(jié):定時調(diào)度是先從隊列獲取任務(wù)然后執(zhí)行,然后在重新設(shè)置任務(wù)時間,在把任務(wù)放入隊列實現(xiàn)的。
如果任務(wù)執(zhí)行時間大于delay時間則等任務(wù)執(zhí)行完畢后的delay時間后在次調(diào)用任務(wù),不會同一個任務(wù)并發(fā)執(zhí)行。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
定時調(diào)度:相對起始時間點固定頻率調(diào)用
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//裝飾任務(wù)類,注意period=period>0,不是負的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//添加任務(wù)到隊列
delayedExecute(t);
return t;
}
private void setNextRunTime() {
long p = period;
//period=delay;
if (p > 0)
time += p;//由于period>0所以執(zhí)行這里,設(shè)置time=time+delay
else
time = triggerTime(-p);
}
總結(jié):相對于上面delay,rate方式執(zhí)行規(guī)則為時間為initdelday + n*period;時候啟動任務(wù),但是如果當前任務(wù)還沒有執(zhí)行完,要等到當前任務(wù)執(zhí)行完畢后在執(zhí)行一個任務(wù)。
總結(jié)
調(diào)度線程池主要用于定時器或者延遲一定時間在執(zhí)行任務(wù)時候使用。內(nèi)部使用優(yōu)化的DelayQueue來實現(xiàn),由于使用隊列來實現(xiàn)定時器,有出入隊調(diào)整堆等操作,所以定時并不是非常非常精確。