Eureka的TimedSupervisorTask類(lèi)

起因

一個(gè)基于Spring Cloud框架的應(yīng)用,如果注冊(cè)到了Eureka server,那么它就會(huì)定時(shí)更新服務(wù)列表,這個(gè)定時(shí)任務(wù)啟動(dòng)的代碼在com.netflix.discovery.DiscoveryClient類(lèi)的initScheduledTasks方法中,如下(來(lái)自工程eureka-client,版本1.10.7):

/**                                                                                         
 * Initializes all scheduled tasks.                                                         
 */                                                                                         
private void initScheduledTasks() {    
    //更新服務(wù)列表                                                     
    if (clientConfig.shouldFetchRegistry()) {                                               
        // registry cache refresh timer                                                     
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();  
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // 初始化定時(shí)拉取服務(wù)注冊(cè)信息
        cacheRefreshTask = new TimedSupervisorTask(                                         
                "cacheRefresh",                                                             
                scheduler,                                                                  
                cacheRefreshExecutor,                                                       
                registryFetchIntervalSeconds,                                               
                TimeUnit.SECONDS,                                                           
                expBackOffBound,                                                            
                new CacheRefreshThread()                                                    
        );                                                                                  
        scheduler.schedule(                                                                 
                cacheRefreshTask,                                                           
                registryFetchIntervalSeconds, TimeUnit.SECONDS);                            
    }  
    ...
    //略去其他代碼                                                                                     

由此可見(jiàn),TimedSupervisorTask類(lèi)被使用在了定時(shí)任務(wù)的初始化中,我們具體來(lái)看看這個(gè)類(lèi)的結(jié)構(gòu):

package com.netflix.discovery;

import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.LongGauge;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 在執(zhí)行超時(shí)時(shí)調(diào)度子任務(wù)的管理器任務(wù)
 * A supervisor task that schedules subtasks while enforce a timeout.
 * Wrapped subtasks must be thread safe.
 *
 * @author David Qiang Liu
 */
public class TimedSupervisorTask extends TimerTask {
    private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);

    private final Counter successCounter;
    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    private final LongGauge threadPoolLevelGauge;

    private final String name;
    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor executor;
    private final long timeoutMillis;
    private final Runnable task;

    private final AtomicLong delay;
    private final long maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler;
        this.executor = executor;
        //默認(rèn)30秒
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        //默認(rèn)300秒
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        successCounter = Monitors.newCounter("success");
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

    @Override
    public void run() {
        Future<?> future = null;
        try {
            //使用Future,可以設(shè)定子純種的超時(shí)時(shí)間,這樣當(dāng)前線(xiàn)程就不用無(wú)限等待了
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            //指定等待子線(xiàn)程的最長(zhǎng)時(shí)間(初始為30秒)
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            //delay是個(gè)很有用的變量,后面會(huì)用到,這里記得每次執(zhí)行任務(wù)成功都會(huì)將delay重置
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            //任務(wù)線(xiàn)程超時(shí)的時(shí)候,就把delay變量翻倍,但不會(huì)超過(guò)外部調(diào)用時(shí)設(shè)定的最大延時(shí)時(shí)間(300秒)
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            //設(shè)置為最新的值,考慮到多線(xiàn)程,所以用了CAS
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            //觸發(fā)了拒絕策略,就會(huì)將調(diào)度器停掉
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }

            rejectedCounter.increment();
        } catch (Throwable e) {
            //一旦出現(xiàn)未知的異常,就停掉調(diào)度器
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            //這里任務(wù)要么執(zhí)行完畢,要么發(fā)生異常,都用cancel方法來(lái)清理任務(wù);
            if (future != null) {
                future.cancel(true);
            }
            //只要調(diào)度器沒(méi)有停止,就再指定等待時(shí)間之后在執(zhí)行一次同樣的任務(wù)
            if (!scheduler.isShutdown()) {
                //這里就是周期性任務(wù)的原因:只要沒(méi)有停止調(diào)度器,就再創(chuàng)建一次性任務(wù),執(zhí)行時(shí)間時(shí)dealy的值,
                //假設(shè)外部調(diào)用時(shí)傳入的超時(shí)時(shí)間為30秒(構(gòu)造方法的入?yún)imeout),最大間隔時(shí)間為300秒(構(gòu)造方法的入?yún)xpBackOffBound)
                //如果最近一次任務(wù)沒(méi)有超時(shí),那么就在30秒后開(kāi)始新任務(wù),
                //如果最近一次任務(wù)超時(shí)了,那么就在50秒后開(kāi)始新任務(wù)(異常處理中有個(gè)乘以二的操作,乘以二后的300秒)
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
    ...
    //略去其他代碼
}

我們可以仔細(xì)看看run方法的具體實(shí)現(xiàn),因?yàn)檫@里有一個(gè)值得借鑒的設(shè)計(jì)思路?。?!

我們簡(jiǎn)單來(lái)看看這個(gè)方法具體執(zhí)行流程:
????1.執(zhí)行submit()方法提交任務(wù)
????2.執(zhí)行future.get()方法,如果沒(méi)有在規(guī)定的時(shí)間得到返回值或者任務(wù)出現(xiàn)異常,則進(jìn)入異常處理catch代碼塊。
????3.如果發(fā)生異常
??????a. 發(fā)生TimeoutException異常,則執(zhí)行Math.min(maxDelay, currentDelay * 2);得到任務(wù)延時(shí)時(shí)間 * 2 和 最大延時(shí)時(shí)間的最小值,然后改變?nèi)蝿?wù)的延時(shí)時(shí)間timeoutMillis(延時(shí)任務(wù)時(shí)間默認(rèn)值是30s)
??????b.發(fā)生RejectedExecutionException異常,則將rejectedCounter值+1
??????c.發(fā)生Throwable異常,則將throwableCounter值+1
????4.如果沒(méi)有發(fā)生異常,則再設(shè)置一次延時(shí)任務(wù)時(shí)間timeoutMillis
????5.進(jìn)入finally代碼塊
??????a.如果future不為null,則執(zhí)行future.cancel(true),中斷線(xiàn)程停止任務(wù)
??????b.如果線(xiàn)程池沒(méi)有shutdown,則創(chuàng)建一個(gè)新的定時(shí)任務(wù)

注意:不知道有沒(méi)有小伙伴發(fā)現(xiàn),不管我們的定時(shí)任務(wù)執(zhí)行是成功還是結(jié)束(如果還沒(méi)有執(zhí)行結(jié)束,也會(huì)被中斷),然后會(huì)再重新初始化一個(gè)新的任務(wù)。并且這個(gè)任務(wù)的延時(shí)時(shí)間還會(huì)因?yàn)椴煌那闆r受到改變,在try代碼塊中如果不發(fā)現(xiàn)異常,則會(huì)重新初始化延時(shí)時(shí)間,如果發(fā)生TimeoutException異常,則會(huì)更改延時(shí)時(shí)間,更改為 任務(wù)延時(shí)時(shí)間 * 2 和 最大延時(shí)時(shí)間的最小值。所以我們會(huì)發(fā)現(xiàn)這樣的設(shè)計(jì)會(huì)讓整個(gè)延時(shí)任務(wù)很靈活。如果不發(fā)生異常,則延時(shí)時(shí)間不會(huì)變;如果發(fā)現(xiàn)異常,則增長(zhǎng)延時(shí)時(shí)間;如果程序又恢復(fù)正常了,則延時(shí)時(shí)間又恢復(fù)成了默認(rèn)值。

總結(jié):我們?cè)谠O(shè)計(jì)延時(shí)/周期性任務(wù)時(shí)就可以參考TimedSupervisorTask的實(shí)現(xiàn),程序一旦遇到發(fā)生超時(shí)異常,就將間隔時(shí)間調(diào)大,如果連續(xù)超時(shí),那么每次間隔時(shí)間都會(huì)增大一倍,一直到達(dá)外部參數(shù)設(shè)定的上限為止,一旦新任務(wù)不再發(fā)生超時(shí)異常,間隔時(shí)間又會(huì)自動(dòng)恢復(fù)為初始值。

參考

參考一
參考二

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容