本地緩存:mybatis實(shí)現(xiàn):裝飾器模式實(shí)踐
- PerpetualCache:永久緩存:通過(guò)HashMap實(shí)現(xiàn)最大容量為Integer.MaxValue 的不過(guò)期緩存
- LruCache:固定緩存大小,實(shí)現(xiàn)最近最少使用的Key過(guò)期:通過(guò)LinkedHashMap實(shí)現(xiàn)LRU算法的緩存
- 覆寫(xiě)removeEldestEntry方法(達(dá)到最大容量返回true)
- accessOrder 設(shè)為true
public void setSize(final int size) {
keyMap = new LinkedHashMap<Object, Object>(size, .75F, true) {
private static final long serialVersionUID = 4267176411845948333L;
@Override
protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
boolean tooBig = size() > size;
if (tooBig) {
eldestKey = eldest.getKey();
}
return tooBig;
}
};
}
- FifoCache:固定緩存大小,實(shí)現(xiàn)按添加順序,最老的key過(guò)期:利用LinkedList 實(shí)現(xiàn)FIFO算法的緩存,使用Deque 的addLast removeFirst 方法
private void cycleKeyList(Object key) {
keyList.addLast(key);
if (keyList.size() > size) {
Object oldestKey = keyList.removeFirst();
delegate.removeObject(oldestKey);
}
}
- BlockingCache:通過(guò)ReentrantLock + ConcurrentHashMap 實(shí)現(xiàn)阻塞式緩存,鎖粒度為單個(gè)緩存Key,以達(dá)到單機(jī)多線(xiàn)程環(huán)境下,只觸發(fā)一次緩存數(shù)據(jù)load(業(yè)務(wù)邏輯為,get加鎖,先get為null,從db load出來(lái)后put進(jìn)緩存,并釋放鎖)
public class BlockingCache implements Cache {
private long timeout;
private final Cache delegate;
private final ConcurrentHashMap<Object, ReentrantLock> locks;
}
- TransactionalCache:實(shí)現(xiàn)類(lèi)似不可重復(fù)讀的事務(wù)緩存
集中式緩存:Memcached
業(yè)務(wù)邏輯:查詢(xún)緩存,若未命中,觸發(fā)加載db數(shù)據(jù)進(jìn)緩存
//查詢(xún)緩存
Object value = cache.get(key);
if (value != null) return value;
//查詢(xún)db數(shù)據(jù)放入緩存中再返回。
value = loadingFromDb(key);
cache.put(key, value)
return value;
考慮有一下幾個(gè)問(wèn)題:
- 緩存穿透:若一直在查詢(xún)一條不存在的數(shù)據(jù),每次訪(fǎng)問(wèn)緩存拿到的結(jié)果都是null,那么每次都會(huì)觸發(fā)一次db加載數(shù)據(jù)的操作:改進(jìn),維護(hù)一個(gè)黑名單,直接返回空或者默認(rèn)數(shù)據(jù)。
- 緩存雪崩:若某一時(shí)間點(diǎn),有大量請(qǐng)求需要查詢(xún)緩存,這時(shí)緩存沒(méi)數(shù)據(jù)或者緩存集中過(guò)期,會(huì)觸發(fā)大量從數(shù)據(jù)庫(kù) 加載數(shù)據(jù)的請(qǐng)求,給數(shù)據(jù)庫(kù)造成很大壓力,嚴(yán)重時(shí)可能產(chǎn)生數(shù)據(jù)庫(kù)雪崩:
- 改進(jìn)思路:
- 首先要避免緩存集中過(guò)期的情況,設(shè)置過(guò)期時(shí)間,給個(gè)浮動(dòng)范圍,讓其可以在幾分鐘內(nèi)平滑過(guò)期;
- 大量請(qǐng)求進(jìn)來(lái),還是可能會(huì)出現(xiàn)緩存沒(méi)值的場(chǎng)景,觸發(fā)大量db loading,分析可知,只要有一次db loading成功,刷新過(guò)緩存,后續(xù)請(qǐng)求進(jìn)來(lái)后就可以直接走緩存了,加個(gè)ReentrantLock,然后tryLock等待個(gè)幾秒鐘就可以解決這個(gè)問(wèn)題,實(shí)現(xiàn)預(yù)熱緩存
//查詢(xún)緩存
Object value = cache.get(key);
if (value != null) return value;
//多線(xiàn)程環(huán)境下加鎖防止加載邏輯被觸發(fā)多次,或者加分布式鎖
reentrantLock.tryLock(ms);
//查詢(xún)db數(shù)據(jù)放入緩存中再返回。
value = loadingFromDb(key);
cache.put(key, value);
//解鎖
reentrantLock.unlock();
return value;
- 緩存一致性問(wèn)題:存在場(chǎng)景需要緩存多個(gè)不同Key的訂單數(shù)據(jù),但由于訂單數(shù)據(jù)會(huì)在多個(gè)環(huán)節(jié)被修改,一旦數(shù)據(jù)被修改了,就會(huì)出現(xiàn)數(shù)據(jù)不一致,即緩存的是臟數(shù)據(jù)
- 改進(jìn)一(偏向時(shí)效性):利用MQ來(lái)保證一致性,約定一個(gè)topic,在數(shù)據(jù)被修改時(shí),發(fā)消息觸發(fā)刷新邏輯從數(shù)據(jù)庫(kù)加載最新改動(dòng)的數(shù)據(jù)進(jìn)緩存。
- 改進(jìn)二(業(yè)務(wù)容忍一定的延遲):為緩存記錄設(shè)定過(guò)期時(shí)間,在緩存過(guò)期后到來(lái)的請(qǐng)求,會(huì)自動(dòng)觸發(fā)緩存刷新邏輯,這里面帶來(lái)個(gè)小問(wèn)題,一般對(duì)熱點(diǎn)數(shù)據(jù)的訪(fǎng)問(wèn)請(qǐng)求都會(huì)持續(xù)的進(jìn)行。在熱點(diǎn)數(shù)據(jù)過(guò)期后,再請(qǐng)求,會(huì)進(jìn)入緩存刷新邏輯,表現(xiàn)出來(lái)會(huì)有周期性的接口請(qǐng)求慢的場(chǎng)景,對(duì)用戶(hù)體驗(yàn)不友好。對(duì)此在進(jìn)行優(yōu)化,設(shè)計(jì)二階段緩存實(shí)現(xiàn)+異步刷新邏輯:即通過(guò)給緩存對(duì)象設(shè)定 軟過(guò)期和硬過(guò)期時(shí)間(軟過(guò)期<硬過(guò)期),每次查詢(xún)緩存對(duì)象時(shí),判斷下軟過(guò)期是否小于當(dāng)前時(shí)間,如果是,先返回緩存對(duì)象,提交一個(gè)Runable,通過(guò)線(xiàn)程池觸發(fā)異步刷新緩存的邏輯:考慮到微服務(wù)下有多個(gè)實(shí)例部署,加一個(gè)redis鎖,同一時(shí)刻,保證數(shù)據(jù)加載邏輯只會(huì)被觸發(fā)一次
/**
* 異步緩存重刷
*
* @param cachedObject 緩存數(shù)據(jù)實(shí)體
* @param realKey 真實(shí)的緩存KEY
* @param logicalExpireSeconds 邏輯失效
* @param physicalExpireSeconds 物理失效
* @param cacheNullable 是否允許cacheNull
* @param func 異步reload業(yè)務(wù)回調(diào)
* @param clazz 數(shù)據(jù)返回類(lèi)型
* @param <T> 數(shù)據(jù)加載回調(diào)返回類(lèi)型
*/
private <T> T asyncReloadAndGetCachedResult(CacheObject<T> cachedObject, String realKey, int logicalExpireSeconds,
int physicalExpireSeconds, boolean cacheNullable, Func<T> func,
Class<T> clazz, Lock lock) {
T result = null;
if (cachedObject != null) {
if (cachedObject.value != null) {
if (clazz == null || clazz.isAssignableFrom(cachedObject.value.getClass())) {
//clazz沒(méi)設(shè)定,類(lèi)型轉(zhuǎn)換異常只能再業(yè)務(wù)層發(fā)現(xiàn)了
result = cachedObject.value;
if (cachedObject.isLogicExpire()) {
//軟過(guò)期,觸發(fā)異步重刷機(jī)制,返回緩存中數(shù)據(jù)
asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable, func, clazz,
lock);
}
} else {
//如果緩存中的數(shù)據(jù)會(huì)導(dǎo)致類(lèi)型轉(zhuǎn)換異常
LOGGER.error("cached result class type is not match! expect:{}, actual:{}", clazz,
cachedObject.value.getClass());
result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable,
func, clazz);
}
} else {
if (cacheNullable) {
if (cachedObject.isLogicExpire()) {
//軟過(guò)期,觸發(fā)異步重刷機(jī)制,返回緩存中數(shù)據(jù)
asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, true, func, clazz, lock);
}
} else {
//不允許cache緩存卻拿到了null(可能修改了nullable參數(shù)),這里強(qiáng)制進(jìn)行一次緩存刷新,避免業(yè)務(wù)層發(fā)生錯(cuò)誤
result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, false,
func, clazz);
}
}
}
return result;
}
/**
* 異步緩存重刷
*
* @param realKey 真實(shí)的緩存KEY
* @param logicalExpireSeconds 邏輯失效
* @param physicalExpireSeconds 物理失效
* @param cacheNullable 是否允許cacheNull
* @param func 異步reload業(yè)務(wù)回調(diào)
* @param tClass 緩存的對(duì)象類(lèi)型
* @param <T> 數(shù)據(jù)加載回調(diào)返回類(lèi)型
*/
private <T> void asyncReload(String realKey, int logicalExpireSeconds, int physicalExpireSeconds,
boolean cacheNullable, Func<T> func, Class<T> tClass, Lock lock) {
try {
//if (!LOCAL_QUEUED_WORKING_TASK_SET.contains(realKey)) {
JedisLock jedisLock = cacheRefreshLockFactory.buildLock(realKey);
if (jedisLock.tryLock()) {
try {
//更新待處理任務(wù)列表,避免重入
//LOCAL_QUEUED_WORKING_TASK_SET.add(realKey);
CACHE_REFRESH_EXECUTOR.execute(new CacheRefreshTask(realKey, () -> {
try {
if (lock != null) {
//阻塞等待主線(xiàn)程完成剩余操作
lock.lock();
}
try {
T value = func.invoke();
if (cacheNullable) {
refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds, tClass);
} else {
if (value != null) {
refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds,
tClass);
}
}
} finally {
if (lock != null) {
lock.unlock();
}
}
} catch (Exception ex) {
LOGGER.error("reload cache of {} failed!", realKey, ex);
} finally {
//移除任務(wù)
//LOCAL_QUEUED_WORKING_TASK_SET.remove(realKey);
}
}));
} finally {
//do nothing
}
} else {
//若任務(wù)正在處理中,不做任何處理。
//此處不能更新LOCAL_QUEUED_WORKING_TASK_SET,因?yàn)槿蝿?wù)不再當(dāng)前實(shí)例上得到運(yùn)行,任務(wù)不會(huì)得到更新。
}
//} else {
// LOGGER.warn("task is in queue waiting for being executed!");
//}
} catch (Exception e) {
LOGGER.error("asyncReload failed! realKey:{}", realKey, e);
}
}
分布式緩存
參考:https://www.cnblogs.com/moonandstar08/p/5405991.html
當(dāng)緩存的數(shù)據(jù)量超過(guò)單機(jī)緩存的上線(xiàn),就需要引入多臺(tái)緩存服務(wù)器 組成緩存集群,對(duì)緩存請(qǐng)求做負(fù)載均衡,每臺(tái)機(jī)器負(fù)責(zé) 1/n的 請(qǐng)求,常用算法有:輪循算法(Round Robin)、哈希算法(HASH)、最少連接算法(Least Connection)、響應(yīng)速度算法(Response Time)、加權(quán)法(Weighted ),但考慮到緩存服務(wù)的特殊性,他需要在機(jī)器上存儲(chǔ)數(shù)據(jù),常見(jiàn)的這些算法(除hash算法外)會(huì)造成緩存數(shù)據(jù)冗余,并且命中率不高,造成集群利用率降低,而且隨著系統(tǒng)訪(fǎng)問(wèn)量上升,壓力越來(lái)越大,實(shí)際壓力超過(guò)服務(wù)器的能承受的上限,出現(xiàn)節(jié)點(diǎn)宕機(jī)的情況,后續(xù)會(huì)通過(guò)加節(jié)點(diǎn)機(jī)器的形式來(lái)緩解壓力,在分布式緩存中,經(jīng)常會(huì)出現(xiàn)增刪機(jī)器的情況,引發(fā)緩存的重分布,在重分布過(guò)程中,大量請(qǐng)求到達(dá)db,極端情況下引起數(shù)據(jù)庫(kù)宕機(jī)
哈希算法:i = hash(key) mod N,對(duì)將要存入的key做hash計(jì)算,然后對(duì)可用的機(jī)器總數(shù)取模,得出的編號(hào)i,就是實(shí)際緩存需要存放的目標(biāo)機(jī)器編號(hào),在增刪節(jié)點(diǎn)機(jī)器的情況下,N發(fā)生變化,導(dǎo)致現(xiàn)有緩存失效,引發(fā)大量的緩存重分布,造成db壓力過(guò)大,極端情況下引起數(shù)據(jù)庫(kù)宕機(jī)
一致性哈希算法:Consistent Hashing,思路是將所有緩存機(jī)器編號(hào),計(jì)算hash值,放入一個(gè)擁有2的31次方的hash環(huán)中,對(duì)將要存入的緩存key做hash計(jì)算,得出hash值,取hash環(huán)按順時(shí)針尋找,找到的第一個(gè)可用的hash值,它所對(duì)應(yīng)的機(jī)器,就是目標(biāo)緩存機(jī)器。
分析:
- 場(chǎng)景1:假如集群中原先有N個(gè)節(jié)點(diǎn),第N個(gè)節(jié)點(diǎn)宕機(jī),重新發(fā)布業(yè)務(wù)系統(tǒng)后,那按照一致性哈希算法,原先分步在第N個(gè)節(jié)點(diǎn)上的數(shù)據(jù)(hash(cacheKey) > hash(N-1))會(huì)重分步到第一個(gè)節(jié)點(diǎn)上,前N-1個(gè)節(jié)點(diǎn)的緩存數(shù)據(jù)依然有效可用,
- 假如集群中原先有N個(gè)節(jié)點(diǎn),現(xiàn)增加一個(gè)節(jié)點(diǎn),重新發(fā)布業(yè)務(wù)系統(tǒng)后,那按照一致性哈希算法,原先分步在第N個(gè)節(jié)點(diǎn)上的一部分?jǐn)?shù)據(jù)(hash(N)< hash(cacheKey)< hash(N + 1))會(huì)重分布到新增節(jié)點(diǎn)上
結(jié)論:由此可見(jiàn),一致性哈希算法,在增減集群節(jié)點(diǎn)時(shí),只會(huì)引起一部分緩存數(shù)據(jù)重分布,相比普通的hash算法,減少了緩存失效的范圍,減少了系統(tǒng)顛簸
缺點(diǎn):一致性哈希算法的使用,因?yàn)榧簷C(jī)器的hash值在hash環(huán)中的位置問(wèn)題,可能會(huì)引發(fā)數(shù)據(jù)傾斜,出現(xiàn)集群中某些節(jié)點(diǎn)量很大,其他節(jié)點(diǎn)存儲(chǔ)量很小,可以引入虛擬節(jié)點(diǎn)(即為集群中每臺(tái)機(jī)器計(jì)算多次hash值)hash環(huán)中放置虛擬節(jié)點(diǎn),多個(gè)虛擬節(jié)點(diǎn)和機(jī)器之間維護(hù)一份映射,存儲(chǔ)數(shù)據(jù)時(shí),通過(guò)虛擬節(jié)點(diǎn)找到機(jī)器,通過(guò)這總方式,讓集群機(jī)器在hash環(huán)中的分布更分散,已達(dá)到均勻分布的效率。
舉例:dubbo消費(fèi)方對(duì)提供方的調(diào)用,采用一致性hash算法來(lái)做的話(huà),會(huì)為每個(gè)提供方Invoker生成160份虛擬節(jié)點(diǎn),以此讓請(qǐng)求的分發(fā)更加均勻。
實(shí)現(xiàn):TreeMap(存儲(chǔ)hash環(huán)) + hashMap(維護(hù)虛擬節(jié)點(diǎn)和機(jī)器的映射),參考dubbo的一致性hash算法實(shí)現(xiàn)
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
//初始化hash環(huán)
public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = System.identityHashCode(invokers);
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i ++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker.getUrl().toFullString() + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
//順時(shí)針查找滿(mǎn)足條件的機(jī)器
private Invoker<T> sekectForKey(long hash) {
Invoker<T> invoker;
Long key = hash;
if (!virtualInvokers.containsKey(key)) {
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {
key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}
}