場(chǎng)景
統(tǒng)計(jì)一個(gè)批量接口會(huì)有多少數(shù)據(jù),這個(gè)接口的QPS在100萬級(jí)別。有幾種方案:
- 每次調(diào)用都串行計(jì)算一次;
- 每次調(diào)用使用線程池并行計(jì)算。
由于并發(fā)量特別的大,第1種場(chǎng)景肯定不適合,這會(huì)把相應(yīng)時(shí)間拉長(zhǎng)。第二種方法每次請(qǐng)求過來都放到一個(gè)線程池里面請(qǐng)求,比第一種強(qiáng)很多,用這種方式基本上可以解決80%左右的需求了。那么還有能優(yōu)化的地方么?答案是有的。
Cache + 線程池
一般在大的公司都有一些監(jiān)控系統(tǒng),可以將監(jiān)控的數(shù)據(jù)上報(bào)到監(jiān)控系統(tǒng)中。上面兩個(gè)場(chǎng)景都是每次請(qǐng)求都會(huì)調(diào)用上報(bào)接口,這樣特別浪費(fèi)資源也可能出現(xiàn)性能問題。是否可以想一個(gè)辦法減少上報(bào)次數(shù)呢?我們可以使用cache匯總在一起,打包通過線程池異步上報(bào)。是不是這種方式會(huì)更好一些。
實(shí)現(xiàn)
怎么實(shí)現(xiàn)呢? 首先我們需要一個(gè)cache,這次我們使用Guava Cache。
Guava Cache 是google開發(fā)開源項(xiàng)目Guava中帶有的功能,只提供堆緩存,也就是說重啟機(jī)器后就沒有了,特點(diǎn):小巧玲瓏,性能最好。
private volatile static Cache<String, MutableInt> metricCache = null;
public static Cache<String, MutableInt> getMetricCache(){
if (metricCache == null) {
synchronized (this) {
if (metricCache == null) {
metricCache = initMetricCache();
return metricCache;
}
}
}
return metricCache;
}
private static Cache<String, MutableInt> initMetricCache(){
Cache<String, MutableInt> initMetricCache = CacheBuilder.newBuilder()
// 設(shè)置緩存?zhèn)€數(shù)
.maximumSize(1024)
// 設(shè)置cache中的數(shù)據(jù)在寫入之后的存活時(shí)間為1秒
.expireAfterWrite(1, TimeUnit.MINUTES)
// 設(shè)置并發(fā)數(shù)為8,即同一時(shí)間最多只能有5個(gè)線程往cache執(zhí)行寫入操作
.concurrencyLevel(8)
// 聲明一個(gè)監(jiān)聽器,緩存項(xiàng)被移除時(shí)做一些額外操作。這里使用異步線程池的形式實(shí)現(xiàn),更加高效。
.removalListener(RemovalListeners.asynchronous(new RemovalListener<String, MutableInt>(){
@Override
public void onRemoval(RemovalNotification<String, MutableInt> notification) {
// 刪除后的邏輯操作,這里是上報(bào)到監(jiān)控系統(tǒng)中
metricForCount(notification.getKey(), notification.getValue().intValue());
}
},
// 自定義線程池,這里就不在把實(shí)現(xiàn)的代碼粘進(jìn)來了
taskExecutor.getTaskExecutor()))
.build();
return initMetricCache;
}
對(duì)上面的代碼進(jìn)行分析:
-
CacheBuilder.newBuilder()創(chuàng)建一個(gè)Guava Cache,設(shè)置一些配置; - 在調(diào)用時(shí)考慮到高效性,使用了一個(gè)小技巧延遲加載,參考
getMetricCache()實(shí)現(xiàn); - 在Guava Cache中使用
removalListener特性,結(jié)合我們的需求,當(dāng)統(tǒng)計(jì)記錄達(dá)到一定的數(shù)量后,刪除掉并在監(jiān)聽的線程池中實(shí)現(xiàn)上報(bào)。
應(yīng)用
看著很牛B,怎么使用呢?
public static void logMetricForCount(final String key, final int count) {
try {
MutableInt logMetric = getMetricCache().get(key, new Callable<MutableInt>() {
@Override
public MutableInt call() throws Exception {
return new MutableInt(0);
}
});
// 計(jì)數(shù)
logMetric.add(count);
if(logMetric.intValue() > 500){
// 當(dāng)計(jì)數(shù)達(dá)到500個(gè)時(shí)刪除此key,從而觸發(fā)上面配置的removalListener
getMetricCache().invalidate(key);
}
} catch (Exception e) {
logger.warn("統(tǒng)計(jì){}信息次數(shù){}異常", key, count, e);
}
}
在實(shí)戰(zhàn)的計(jì)數(shù)操作,apache提供了MutableInt專門用于高效計(jì)數(shù)的類。還使用到Guava Cache的特性。
MutableInt logMetric = getMetricCache().get(key, new Callable<MutableInt>() {
@Override
public MutableInt call() throws Exception {
return new MutableInt(0);
}
});
當(dāng)沒有g(shù)et到數(shù)據(jù)時(shí),自動(dòng)初始化一個(gè)。是不是很棒!
代碼是不是就到此結(jié)束了? 不是的。我們?cè)陂_發(fā)代碼時(shí)需要考慮高效。Guava Cache在設(shè)計(jì)時(shí)也考慮到高效性,不過如果不仔細(xì)閱讀使用文檔,也會(huì)給自己買坑。
Guava Cache清理什么時(shí)候發(fā)生?使用CacheBuilder構(gòu)建的緩存不會(huì)"自動(dòng)"執(zhí)行清理和回收工作,也不會(huì)在某個(gè)緩存項(xiàng)過期后馬上清理,也沒有諸如此類的清理機(jī)制。相反,它會(huì)在寫操作時(shí)順帶做少量的維護(hù)工作,或者偶爾在讀操作時(shí)做——如果寫操作實(shí)在太少的話。
如果你的緩存是高吞吐的,那就無需擔(dān)心緩存的維護(hù)和清理等工作。如果你的 緩存只會(huì)偶爾有寫操作,而你又不想清理工作阻礙了讀操作,那么可以創(chuàng)建自己的維護(hù)線程,以固定的時(shí)間間隔調(diào)用Cache.cleanUp()。ScheduledExecutorService可以幫助你很好地實(shí)現(xiàn)這樣的定時(shí)調(diào)度。
對(duì)于高并發(fā)量的情況下,我們還需要寫一個(gè)線程去定時(shí)cleanUp。
Runnable metrciCacheCleanUpTask = new Runnable() {
@Override
public void run() {
getMetricCache().cleanUp();
} catch (Exception e) {
logger.error("定時(shí)cleanUp方法異常",e);
}
}
};
// 使用線程池每分鐘執(zhí)行一次
commTaskScheduler.scheduleWithFixedDelay(metrciCacheCleanUpTask, 60000);
線程池相關(guān)的實(shí)現(xiàn)可以參考我以前的blog,微分享-spring線程池實(shí)戰(zhàn)
Guava Cache CacheLoader還提供了數(shù)據(jù)加載機(jī)制,有興趣的話可以研究一下。