足以讓你軟件穩(wěn)定運(yùn)行,躺著把錢賺了的細(xì)節(jié)!
去年我的小團(tuán)伙,幫一家公司開發(fā)一個終端推送服務(wù)系統(tǒng)。在剛上線不久出現(xiàn)了穩(wěn)定性問題,716臺終端數(shù)據(jù)不推送(如果無法解決,將對該公司造成716*2000的經(jīng)濟(jì)損失),經(jīng)過幾輪測試,發(fā)現(xiàn)問題很有可能在于定時器部分,然后我們閱讀大量文獻(xiàn),基本上定位到問題在于定時器遇到exception的關(guān)閉與掛起。
Timer的缺陷
Timer被設(shè)計成支持多個定時任務(wù),通過源碼發(fā)現(xiàn)它有一個任務(wù)隊列用來存放這些定時任務(wù),并且啟動了一個線程來處理
通過這種單線程的方式實現(xiàn),在存在多個定時任務(wù)的時候便會存在問題:若任務(wù)B執(zhí)行時間過長,將導(dǎo)致任務(wù)A延遲了啟動時間!
還存在另外一個問題,應(yīng)該是屬于設(shè)計的問題:若任務(wù)線程在執(zhí)行隊列中某個任務(wù)時,該任務(wù)拋出異常,將導(dǎo)致線程因跳出循環(huán)體而終止,即Timer停止了工作!
同樣是舉個栗子:
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " A: do task");
}
}, 0, 5*1000);
timer.schedule(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " B: sleep");
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 10*1000, 5000);
timer.schedule(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " C: throw Exception");
throw new RuntimeException("test");
}
}, 30*1000, 5000);
}
通過以上程序發(fā)現(xiàn):一開始,任務(wù)A能正常每隔5秒運(yùn)行一次。在任務(wù)B啟動后,由于任務(wù)B運(yùn)行時間需要20秒,導(dǎo)致任務(wù)A要等到任務(wù)B執(zhí)行完才能執(zhí)行。更可怕的是,任務(wù)C啟動后,拋了個異常,定時任務(wù)掛了!
不過這種單線程的實現(xiàn)也有優(yōu)點(diǎn):線程安全!
ScheduledThreadPoolExecutor簡介
ScheduledThreadPoolExecutor可以說是Timer的多線程實現(xiàn)版本,連JDK官方都推薦使用ScheduledThreadPoolExecutor替代Timer。它是接口ScheduledExecutorService的子類
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以本質(zhì)上說ScheduledThreadPoolExecutor還是一個線程池(可參考《Java線程池ThreadPoolExecutor簡介》)。它也有coorPoolSize和workQueue,接受Runnable的子類作為任務(wù)。
特殊的地方在于它實現(xiàn)了自己的工作隊列DelayedWorkQueue,該任務(wù)隊列的作用是按照一定順序?qū)﹃犃兄械娜蝿?wù)進(jìn)行排序。比如,按照距離下次執(zhí)行時間的長短的升序方式排列,讓需要盡快執(zhí)行的任務(wù)排在隊首,“不那么著急”的任務(wù)排在隊列后方,從而方便線程獲取到“應(yīng)該”被執(zhí)行的任務(wù)。除此之外,ScheduledThreadPoolExecutor還在任務(wù)執(zhí)行結(jié)束后,計算出下次執(zhí)行的時間,重新放到工作隊列中,等待下次調(diào)用。
上面通過一個程序說明了Timer存在的問題!這里我將Timer換成了用ScheduledThreadPoolExecutor來實現(xiàn),注意TimerTask也是Runnable的子類。
public static void main(String[] args) {
int corePoolSize = 3;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize);
pool.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " A: do task");
}
}, 0 ,5, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " B: sleep");
try {
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 10, 5, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println(sdf.format(new Date()) + " C: throw Exception");
throw new RuntimeException("test");
}
}, 30, 5, TimeUnit.SECONDS);
}
由于有3個任務(wù)需要調(diào)度,因此我將corePoolSize設(shè)置為3。通過控制臺打印可以看到這次任務(wù)A一直都在正常運(yùn)行(任務(wù)時間間隔為5秒),并不受任務(wù)B的影響。任務(wù)C拋出異常后,雖然本身停止了調(diào)度,但沒有影響到其他任務(wù)的調(diào)度??梢哉fScheduledThreadPoolExecutor解決Timer存在的問題!
那要是將corePoolSize設(shè)置為1,變成單線程跑呢?結(jié)果當(dāng)然是和Timer一樣,任務(wù)B會導(dǎo)致任務(wù)A延遲執(zhí)行,不過比較好的是任務(wù)C拋異常不會影響到其他任務(wù)的調(diào)度。
可以說ScheduledThreadPoolExecutor適用于大部分場景,甚至就算timer提供的Date參數(shù)類型的開始時間也可以通過自己轉(zhuǎn)的方式來實現(xiàn)。任務(wù)調(diào)度框架Quatz也是在ScheduledThreadPoolExecutor基礎(chǔ)上實現(xiàn)的。
一般我們都使用單線程版的ScheduledThreadPoolExecutor居多,推薦通過以下方式來構(gòu)建(構(gòu)建后其線程數(shù)就不可更改)
ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
Timer異常后,任務(wù)就終止
如果Timer報錯的時候還要繼續(xù)執(zhí)行任務(wù),解決方法:在異常處理中加代碼。
另一種解決辦法:java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService異常后,任務(wù)會被掛起,解決方法:在異常處理中加代碼。
public static ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(1);
/**
* 1分鐘執(zhí)行一次
*/
public static void runTimer() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
.....
} catch (Exception e) {
e.printStackTrace();
......
}
}
}, 0, 60, TimeUnit.SECONDS);
}
RocketMQ源碼分析:
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
if (timeDelay != null) {
this.timer.schedule(new DeliverScanJobTimerTask(level), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// log.info("scheduleAtFixedRate");
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 0, 2000);
}
public synchronized void persist() {
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("persist file [{}] exception", e);
}
}
public void shutdown() {
log.info("Shutdown");
this.timer.cancel();
}
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(32);
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.store.getBrokerConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
public Long getLevelDelayTime(int delayLevel) {
return delayLevelTable.get(delayLevel);
}
class DeliverScanJobTimerTask extends TimerTask {
private final int delayLevel;
public DeliverScanJobTimerTask(int delayLevel) {
this.delayLevel = delayLevel;
}
@Override
public void run() {
try {
//
TarminalManager tarminalManager = ScheduleScanJobService.this.store.getTarminalManager();
List<TarminalPO> tarminals = tarminalManager.scan(delayLevel);
for (TarminalPO tarminalPO : tarminals) {
this.executeOnTimeup(tarminalPO);
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleScanJobService.this.timer.schedule(new DeliverScanJobTimerTask(
this.delayLevel), DELAY_FOR_A_PERIOD);
} finally {
ScheduleScanJobService.this.timer.schedule(
new DeliverScanJobTimerTask(this.delayLevel), ScheduleScanJobService.this.getLevelDelayTime(delayLevel));
}
}
public void executeOnTimeup(final TarminalPO tarminalPO) {
ScheduleScanJobService.this.store.getSendMessageExecutor().submit(new Runnable() {
@Override
public void run() {
ScheduleScanJobService.log.info("Send Message Executor : {} => {} : {}", delayLevel, ScheduleScanJobService.this.getLevelDelayTime(delayLevel), tarminalPO.getCode());
TarminalSet tarminalSet = null;
byte[] data = null;
{
// TODO 通過接口獲取消息和終端信息
}
{
//發(fā)送內(nèi)容
String content = "這是一個特殊時刻,左手右手一個慢動…作!一起搖擺";
//屏參數(shù)設(shè)置
tarminalSet = new TarminalSet();
tarminalSet.setInfoModelNormal(5);
tarminalSet.setInfoSpeed(0x00);
tarminalSet.setPropertyWidth(256);
tarminalSet.setPropertyHeight(64);
tarminalSet.setInfoTimeStay(4);
//bytes
data = LedManager.getInstance().sendOntimeMessage(tarminalPO.getCode(), content, tarminalSet, null);
}
// 發(fā)送
SendFactory.factory(tarminalSet).doSend(tarminalPO.getCode(), data);
}
});
}
}