起因
一個(gè)基于Spring Cloud框架的應(yīng)用,如果注冊(cè)到了Eureka server,那么它就會(huì)定時(shí)更新服務(wù)列表,這個(gè)定時(shí)任務(wù)啟動(dòng)的代碼在com.netflix.discovery.DiscoveryClient類(lèi)的initScheduledTasks方法中,如下(來(lái)自工程eureka-client,版本1.10.7):
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
//更新服務(wù)列表
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 初始化定時(shí)拉取服務(wù)注冊(cè)信息
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
...
//略去其他代碼
由此可見(jiàn),TimedSupervisorTask類(lèi)被使用在了定時(shí)任務(wù)的初始化中,我們具體來(lái)看看這個(gè)類(lèi)的結(jié)構(gòu):
package com.netflix.discovery;
import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.LongGauge;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 在執(zhí)行超時(shí)時(shí)調(diào)度子任務(wù)的管理器任務(wù)
* A supervisor task that schedules subtasks while enforce a timeout.
* Wrapped subtasks must be thread safe.
*
* @author David Qiang Liu
*/
public class TimedSupervisorTask extends TimerTask {
private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);
private final Counter successCounter;
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
private final String name;
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
private final Runnable task;
private final AtomicLong delay;
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
//默認(rèn)30秒
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
//默認(rèn)300秒
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
successCounter = Monitors.newCounter("success");
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
@Override
public void run() {
Future<?> future = null;
try {
//使用Future,可以設(shè)定子純種的超時(shí)時(shí)間,這樣當(dāng)前線(xiàn)程就不用無(wú)限等待了
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//指定等待子線(xiàn)程的最長(zhǎng)時(shí)間(初始為30秒)
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//delay是個(gè)很有用的變量,后面會(huì)用到,這里記得每次執(zhí)行任務(wù)成功都會(huì)將delay重置
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
//任務(wù)線(xiàn)程超時(shí)的時(shí)候,就把delay變量翻倍,但不會(huì)超過(guò)外部調(diào)用時(shí)設(shè)定的最大延時(shí)時(shí)間(300秒)
long newDelay = Math.min(maxDelay, currentDelay * 2);
//設(shè)置為最新的值,考慮到多線(xiàn)程,所以用了CAS
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
//觸發(fā)了拒絕策略,就會(huì)將調(diào)度器停掉
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
//一旦出現(xiàn)未知的異常,就停掉調(diào)度器
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
//這里任務(wù)要么執(zhí)行完畢,要么發(fā)生異常,都用cancel方法來(lái)清理任務(wù);
if (future != null) {
future.cancel(true);
}
//只要調(diào)度器沒(méi)有停止,就再指定等待時(shí)間之后在執(zhí)行一次同樣的任務(wù)
if (!scheduler.isShutdown()) {
//這里就是周期性任務(wù)的原因:只要沒(méi)有停止調(diào)度器,就再創(chuàng)建一次性任務(wù),執(zhí)行時(shí)間時(shí)dealy的值,
//假設(shè)外部調(diào)用時(shí)傳入的超時(shí)時(shí)間為30秒(構(gòu)造方法的入?yún)imeout),最大間隔時(shí)間為300秒(構(gòu)造方法的入?yún)xpBackOffBound)
//如果最近一次任務(wù)沒(méi)有超時(shí),那么就在30秒后開(kāi)始新任務(wù),
//如果最近一次任務(wù)超時(shí)了,那么就在50秒后開(kāi)始新任務(wù)(異常處理中有個(gè)乘以二的操作,乘以二后的300秒)
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
...
//略去其他代碼
}
我們可以仔細(xì)看看run方法的具體實(shí)現(xiàn),因?yàn)檫@里有一個(gè)值得借鑒的設(shè)計(jì)思路?。?!
我們簡(jiǎn)單來(lái)看看這個(gè)方法具體執(zhí)行流程:
????1.執(zhí)行submit()方法提交任務(wù)
????2.執(zhí)行future.get()方法,如果沒(méi)有在規(guī)定的時(shí)間得到返回值或者任務(wù)出現(xiàn)異常,則進(jìn)入異常處理catch代碼塊。
????3.如果發(fā)生異常
??????a. 發(fā)生TimeoutException異常,則執(zhí)行Math.min(maxDelay, currentDelay * 2);得到任務(wù)延時(shí)時(shí)間 * 2 和 最大延時(shí)時(shí)間的最小值,然后改變?nèi)蝿?wù)的延時(shí)時(shí)間timeoutMillis(延時(shí)任務(wù)時(shí)間默認(rèn)值是30s)
??????b.發(fā)生RejectedExecutionException異常,則將rejectedCounter值+1
??????c.發(fā)生Throwable異常,則將throwableCounter值+1
????4.如果沒(méi)有發(fā)生異常,則再設(shè)置一次延時(shí)任務(wù)時(shí)間timeoutMillis
????5.進(jìn)入finally代碼塊
??????a.如果future不為null,則執(zhí)行future.cancel(true),中斷線(xiàn)程停止任務(wù)
??????b.如果線(xiàn)程池沒(méi)有shutdown,則創(chuàng)建一個(gè)新的定時(shí)任務(wù)
注意:不知道有沒(méi)有小伙伴發(fā)現(xiàn),不管我們的定時(shí)任務(wù)執(zhí)行是成功還是結(jié)束(如果還沒(méi)有執(zhí)行結(jié)束,也會(huì)被中斷),然后會(huì)再重新初始化一個(gè)新的任務(wù)。并且這個(gè)任務(wù)的延時(shí)時(shí)間還會(huì)因?yàn)椴煌那闆r受到改變,在try代碼塊中如果不發(fā)現(xiàn)異常,則會(huì)重新初始化延時(shí)時(shí)間,如果發(fā)生TimeoutException異常,則會(huì)更改延時(shí)時(shí)間,更改為 任務(wù)延時(shí)時(shí)間 * 2 和 最大延時(shí)時(shí)間的最小值。所以我們會(huì)發(fā)現(xiàn)這樣的設(shè)計(jì)會(huì)讓整個(gè)延時(shí)任務(wù)很靈活。如果不發(fā)生異常,則延時(shí)時(shí)間不會(huì)變;如果發(fā)現(xiàn)異常,則增長(zhǎng)延時(shí)時(shí)間;如果程序又恢復(fù)正常了,則延時(shí)時(shí)間又恢復(fù)成了默認(rèn)值。
總結(jié):我們?cè)谠O(shè)計(jì)延時(shí)/周期性任務(wù)時(shí)就可以參考TimedSupervisorTask的實(shí)現(xiàn),程序一旦遇到發(fā)生超時(shí)異常,就將間隔時(shí)間調(diào)大,如果連續(xù)超時(shí),那么每次間隔時(shí)間都會(huì)增大一倍,一直到達(dá)外部參數(shù)設(shè)定的上限為止,一旦新任務(wù)不再發(fā)生超時(shí)異常,間隔時(shí)間又會(huì)自動(dòng)恢復(fù)為初始值。