Timer與ScheduledExecutorService異常處理

足以讓你軟件穩(wěn)定運(yùn)行,躺著把錢賺了的細(xì)節(jié)!
去年我的小團(tuán)伙,幫一家公司開發(fā)一個終端推送服務(wù)系統(tǒng)。在剛上線不久出現(xiàn)了穩(wěn)定性問題,716臺終端數(shù)據(jù)不推送(如果無法解決,將對該公司造成716*2000的經(jīng)濟(jì)損失),經(jīng)過幾輪測試,發(fā)現(xiàn)問題很有可能在于定時器部分,然后我們閱讀大量文獻(xiàn),基本上定位到問題在于定時器遇到exception的關(guān)閉與掛起。

Timer的缺陷

Timer被設(shè)計成支持多個定時任務(wù),通過源碼發(fā)現(xiàn)它有一個任務(wù)隊列用來存放這些定時任務(wù),并且啟動了一個線程來處理
通過這種單線程的方式實現(xiàn),在存在多個定時任務(wù)的時候便會存在問題:若任務(wù)B執(zhí)行時間過長,將導(dǎo)致任務(wù)A延遲了啟動時間!
還存在另外一個問題,應(yīng)該是屬于設(shè)計的問題:若任務(wù)線程在執(zhí)行隊列中某個任務(wù)時,該任務(wù)拋出異常,將導(dǎo)致線程因跳出循環(huán)體而終止,即Timer停止了工作!
同樣是舉個栗子:

public static void main(String[] args) {  
      
    Timer timer = new Timer();  
      
    timer.schedule(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " A: do task");  
        }  
    }, 0, 5*1000);  
      
    timer.schedule(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " B: sleep");  
            try {  
                Thread.sleep(20*1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }, 10*1000, 5000);  
      
    timer.schedule(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " C: throw Exception");  
            throw new RuntimeException("test");  
        }  
    }, 30*1000, 5000);  
}  

通過以上程序發(fā)現(xiàn):一開始,任務(wù)A能正常每隔5秒運(yùn)行一次。在任務(wù)B啟動后,由于任務(wù)B運(yùn)行時間需要20秒,導(dǎo)致任務(wù)A要等到任務(wù)B執(zhí)行完才能執(zhí)行。更可怕的是,任務(wù)C啟動后,拋了個異常,定時任務(wù)掛了!
不過這種單線程的實現(xiàn)也有優(yōu)點(diǎn):線程安全!

ScheduledThreadPoolExecutor簡介

ScheduledThreadPoolExecutor可以說是Timer的多線程實現(xiàn)版本,連JDK官方都推薦使用ScheduledThreadPoolExecutor替代Timer。它是接口ScheduledExecutorService的子類
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以本質(zhì)上說ScheduledThreadPoolExecutor還是一個線程池(可參考《Java線程池ThreadPoolExecutor簡介》)。它也有coorPoolSize和workQueue,接受Runnable的子類作為任務(wù)。

特殊的地方在于它實現(xiàn)了自己的工作隊列DelayedWorkQueue,該任務(wù)隊列的作用是按照一定順序?qū)﹃犃兄械娜蝿?wù)進(jìn)行排序。比如,按照距離下次執(zhí)行時間的長短的升序方式排列,讓需要盡快執(zhí)行的任務(wù)排在隊首,“不那么著急”的任務(wù)排在隊列后方,從而方便線程獲取到“應(yīng)該”被執(zhí)行的任務(wù)。除此之外,ScheduledThreadPoolExecutor還在任務(wù)執(zhí)行結(jié)束后,計算出下次執(zhí)行的時間,重新放到工作隊列中,等待下次調(diào)用。

上面通過一個程序說明了Timer存在的問題!這里我將Timer換成了用ScheduledThreadPoolExecutor來實現(xiàn),注意TimerTask也是Runnable的子類。

public static void main(String[] args) {  
    int corePoolSize = 3;  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize);    
         
       pool.scheduleAtFixedRate(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " A: do task");  
        }  
    }, 0 ,5, TimeUnit.SECONDS);    
      
       pool.scheduleAtFixedRate(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " B: sleep");  
            try {  
                Thread.sleep(20*1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }, 10, 5, TimeUnit.SECONDS);  
      
       pool.scheduleAtFixedRate(new TimerTask() {  
        @Override  
        public void run() {  
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");  
            System.out.println(sdf.format(new Date()) + " C: throw Exception");  
            throw new RuntimeException("test");  
        }  
    }, 30, 5, TimeUnit.SECONDS);  
}  

由于有3個任務(wù)需要調(diào)度,因此我將corePoolSize設(shè)置為3。通過控制臺打印可以看到這次任務(wù)A一直都在正常運(yùn)行(任務(wù)時間間隔為5秒),并不受任務(wù)B的影響。任務(wù)C拋出異常后,雖然本身停止了調(diào)度,但沒有影響到其他任務(wù)的調(diào)度??梢哉fScheduledThreadPoolExecutor解決Timer存在的問題!
那要是將corePoolSize設(shè)置為1,變成單線程跑呢?結(jié)果當(dāng)然是和Timer一樣,任務(wù)B會導(dǎo)致任務(wù)A延遲執(zhí)行,不過比較好的是任務(wù)C拋異常不會影響到其他任務(wù)的調(diào)度。

可以說ScheduledThreadPoolExecutor適用于大部分場景,甚至就算timer提供的Date參數(shù)類型的開始時間也可以通過自己轉(zhuǎn)的方式來實現(xiàn)。任務(wù)調(diào)度框架Quatz也是在ScheduledThreadPoolExecutor基礎(chǔ)上實現(xiàn)的。

一般我們都使用單線程版的ScheduledThreadPoolExecutor居多,推薦通過以下方式來構(gòu)建(構(gòu)建后其線程數(shù)就不可更改)

ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();  
Timer異常后,任務(wù)就終止

如果Timer報錯的時候還要繼續(xù)執(zhí)行任務(wù),解決方法:在異常處理中加代碼。

另一種解決辦法:java.util.concurrent.ScheduledExecutorService;

ScheduledExecutorService異常后,任務(wù)會被掛起,解決方法:在異常處理中加代碼。

public static ScheduledExecutorService executorService = Executors  
        .newScheduledThreadPool(1);  
  
/** 
 * 1分鐘執(zhí)行一次 
 */  
public static void runTimer() {  
    executorService.scheduleAtFixedRate(new Runnable() {  
        @Override  
        public void run() {  
            try {  
                .....               
            } catch (Exception e) {  
                e.printStackTrace();  
                ......
            }  
        }  
    }, 0, 60, TimeUnit.SECONDS);  
}  
RocketMQ源碼分析:
public void start() {
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            if (timeDelay != null) {
                this.timer.schedule(new DeliverScanJobTimerTask(level), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
//                    log.info("scheduleAtFixedRate");
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 0, 2000);
    }

    public synchronized void persist() {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            log.error("persist file [{}] exception", e);
        }
    }

    public void shutdown() {
        log.info("Shutdown");
        this.timer.cancel();
    }

    public boolean parseDelayLevel() {
        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(32);
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);

        String levelString = this.store.getBrokerConfig().getMessageDelayLevel();
        try {
            String[] levelArray = levelString.split(" ");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                Long tu = timeUnitTable.get(ch);

                int level = i + 1;
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                long delayTimeMillis = tu * num;
                this.delayLevelTable.put(level, delayTimeMillis);
            }
        } catch (Exception e) {
            log.error("parseDelayLevel exception", e);
            log.info("levelString String = {}", levelString);
            return false;
        }

        return true;
    }

    public Long getLevelDelayTime(int delayLevel) {
        return delayLevelTable.get(delayLevel);
    }

    class DeliverScanJobTimerTask extends TimerTask {

        private final int delayLevel;

        public DeliverScanJobTimerTask(int delayLevel) {
            this.delayLevel = delayLevel;
        }

        @Override
        public void run() {
            try {
                //
                TarminalManager tarminalManager = ScheduleScanJobService.this.store.getTarminalManager();
                List<TarminalPO> tarminals = tarminalManager.scan(delayLevel);
                for (TarminalPO tarminalPO : tarminals) {
                    this.executeOnTimeup(tarminalPO);
                }

            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleScanJobService.this.timer.schedule(new DeliverScanJobTimerTask(
                        this.delayLevel), DELAY_FOR_A_PERIOD);
            } finally {
                ScheduleScanJobService.this.timer.schedule(
                        new DeliverScanJobTimerTask(this.delayLevel), ScheduleScanJobService.this.getLevelDelayTime(delayLevel));
            }
        }

        public void executeOnTimeup(final TarminalPO tarminalPO) {
            ScheduleScanJobService.this.store.getSendMessageExecutor().submit(new Runnable() {
                @Override
                public void run() {
                    ScheduleScanJobService.log.info("Send Message Executor : {} => {} : {}", delayLevel, ScheduleScanJobService.this.getLevelDelayTime(delayLevel), tarminalPO.getCode());
                    TarminalSet tarminalSet = null;
                    byte[] data = null;
                    {
                        // TODO 通過接口獲取消息和終端信息
                    }
                    {
                        //發(fā)送內(nèi)容
                        String content = "這是一個特殊時刻,左手右手一個慢動…作!一起搖擺";
                        //屏參數(shù)設(shè)置
                        tarminalSet = new TarminalSet();
                        tarminalSet.setInfoModelNormal(5);
                        tarminalSet.setInfoSpeed(0x00);
                        tarminalSet.setPropertyWidth(256);
                        tarminalSet.setPropertyHeight(64);
                        tarminalSet.setInfoTimeStay(4);
                        //bytes
                        data = LedManager.getInstance().sendOntimeMessage(tarminalPO.getCode(), content, tarminalSet, null);
                    }
                    // 發(fā)送
                    SendFactory.factory(tarminalSet).doSend(tarminalPO.getCode(), data);
                }
            });
        }

    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容