緩存過期算法
- LFU(Least Frequently Used,最近最不常用)
根據(jù)最近使用的頻率來淘汰頻率低的數(shù)據(jù)。核心思想是:如果數(shù)據(jù)過去多次被訪問,那么將來被訪問的概率也高。該算法需要維護(hù)數(shù)據(jù)的訪問頻率,開銷很大。此外不適合應(yīng)對突發(fā)性熱點(diǎn)訪問場景。 - LRU(Least recently used,最近最少使用)
根據(jù)最近訪問記錄淘汰最近很少被訪問的數(shù)據(jù)。核心思想是:如果數(shù)據(jù)最近被訪問過,那么將來被訪問的幾率也更高。缺點(diǎn)在于偶發(fā)性的批量操作會降低命中率。guavacache使用的就是此算法。 -
TinyLFU
輕量LFU算法,相比較LFU,使用更少的內(nèi)存來保存訪問頻率。TinyLFU保存了近期的訪問頻率,而不是整個生命周期的訪問頻率,所以可以很好的應(yīng)對突發(fā)性熱點(diǎn)場景。這些訪問記錄會作為一個過濾頻率,當(dāng)新加入的數(shù)據(jù)訪問頻率比要淘汰的數(shù)據(jù)訪問頻率才加入緩存。
image.png
TinyLFU 通過 Count-Min Sketch 算法來記錄頻率信息,它占用空間小且誤報率低。但在應(yīng)對稀疏的突發(fā)性訪問量大的場景,將很難保存這類元素,因為可能無法在短時間內(nèi)積累足夠的訪問頻率,從而被過濾器過濾掉。
-
W-TinyLFU
對TinyLFU在稀疏突發(fā)性訪問大的場景做了優(yōu)化,W-TinyLFU 將新記錄暫時放入 Window Cache 里面,只有通過 TinLFU 考察才能進(jìn)入 Main Cache。
image.png
其中過濾器使用了Count-Min Sketch算法(一種布隆過濾器的變種)實(shí)現(xiàn),即根據(jù)不同的hash算法創(chuàng)建不同的數(shù)組,針對每一個數(shù)據(jù)進(jìn)行多次hash,并在該hash算法的對應(yīng)數(shù)組hash索引位置上+1,由于hash算法存在沖突,那么在最后取計數(shù)的時候,取所有數(shù)組中最小的值即可。
Caffeine簡介
相對于guavacache,caffeine采用更好的過期策略W-TinyLFU,并通過RingBuffer緩存訪問信息,再批量異步處理;此外caffeine底層直接使用jdk1.8的ConcurrentHashMap(因此caffeine只能在1.8上使用),比1.7的ConcurrentHashMap要快不少(guavacache底層實(shí)現(xiàn)類似于1.7的ConcurrentHashMap),官方號稱其性能接近jdk1.8的ConcurrentHashMap。下面是官方的測試對比圖:

caffeine使用和guava差不多,因此切換過來成本較低。
讀寫數(shù)據(jù)結(jié)構(gòu)
不同于guavacache采用accessQueue、recencyQueue、writeQueue隊列來記錄讀寫操作。caffeine采用的是readBuffer和writeBuffer。
readBuffer
采用多個 RingBuffer(striped ring buffer 條帶環(huán)形緩沖,有損),通過線程 id 哈希到對應(yīng)的RingBuffer。當(dāng)一個RingBuffer滿了后,后續(xù)的寫入會丟棄直到這個RingBuffer可用。
當(dāng)讀命中后,會將數(shù)據(jù)寫入RingBuffer,這個寫入操作性能很高。然后由線程池(默認(rèn)forkjoin或自定義)異步消費(fèi)RingBuffer,將數(shù)據(jù)加入到數(shù)據(jù)淘汰隊列中。
writeBuffer
MpscGrowableArrayQueue實(shí)現(xiàn),和JCTools中的原理差不多。寫和讀操作不一樣,讀多寫少,且不允許有損(丟失數(shù)據(jù)),mpsc可參考【netty學(xué)習(xí)筆記十七】Mpsc高性能無鎖隊列。
自定義過期策略
caffeine除了支持expireAfterAccess和expireAfterWrite,還支持expireAfter,即根據(jù)key定義不同的過期時間。這里的實(shí)現(xiàn)是用時間輪,可參考【netty學(xué)習(xí)筆記十八】netty時間輪。
源碼簡析
直接看get方法,最終會調(diào)computeIfAbsent方法
public @Nullable V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction,
boolean recordStats, boolean recordLoad) {
long now = expirationTicker().read();
// 這里的data就是ConcurrentHashMap
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node != null) {
V value = node.getValue();
//如果key已存在且沒過期(這里過期都是exprire開頭的屬性),則返回value
if ((value != null) && !hasExpired(node, now)) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}
afterRead(node, now, /* recordHit */ recordStats);
return value;
}
}
if (recordStats) {
mappingFunction = statsAware(mappingFunction, recordLoad);
}
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
return doComputeIfAbsent(key, keyRef, mappingFunction, new long[] { now }, recordStats);
}
/** Returns the current value from a computeIfAbsent invocation. */
@Nullable V doComputeIfAbsent(K key, Object keyRef,
Function<? super K, ? extends V> mappingFunction, long[/* 1 */] now, boolean recordStats) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
V[] newValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
K[] nodeKey = (K[]) new Object[1];
@SuppressWarnings({"unchecked", "rawtypes"})
Node<K, V>[] removed = new Node[1];
int[] weight = new int[2]; // old, new
RemovalCause[] cause = new RemovalCause[1];
Node<K, V> node = data.compute(keyRef, (k, n) -> {
// n!=null表示數(shù)據(jù)是過期的
if (n == null) {
// 這里最終調(diào)用自定義加載數(shù)據(jù)方法那里
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
return null;
}
now[0] = expirationTicker().read();
weight[1] = weigher.weigh(key, newValue[0]);
n = nodeFactory.newNode(key, keyReferenceQueue(),
newValue[0], valueReferenceQueue(), weight[1], now[0]);
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
return n;
}
// 對節(jié)點(diǎn)加鎖,若n過期了, 其他線程會等待n更新完畢
synchronized (n) {
nodeKey[0] = n.getKey();
weight[0] = n.getWeight();
oldValue[0] = n.getValue();
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now[0])) {
cause[0] = RemovalCause.EXPIRED;
} else {
return n;
}
//刪除數(shù)據(jù)并重新加載
writer.delete(nodeKey[0], oldValue[0], cause[0]);
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
removed[0] = n;
n.retire();
return null;
}
weight[1] = weigher.weigh(key, newValue[0]);
n.setValue(newValue[0], valueReferenceQueue());
n.setWeight(weight[1]);
now[0] = expirationTicker().read();
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
return n;
}
});
if (node == null) {
if (removed[0] != null) {
afterWrite(new RemovalTask(removed[0]));
}
return null;
}
if (cause[0] != null) {
// 舊值移除通知
if (hasRemovalListener()) {
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
}
statsCounter().recordEviction(weight[0]);
}
// 新值還未計算完?
if (newValue[0] == null) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, oldValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
afterRead(node, now[0], /* recordHit */ recordStats);
return oldValue[0];
}
//記錄寫操作
if ((oldValue[0] == null) && (cause[0] == null)) {
afterWrite(new AddTask(node, weight[1]));
} else {
int weightedDifference = (weight[1] - weight[0]);
afterWrite(new UpdateTask(node, weightedDifference));
}
//返回新值
return newValue[0];
}
refreshAfterWrite和expireAfterWrite區(qū)別
和guava有點(diǎn)不一樣,refreshAfterWrite是過期了直接返回舊值,然后通過異步線程池進(jìn)行刷新(默認(rèn)線程池為forkjoin)。而expireAfterWrite是一樣的,都是加載新值,其他線程需要等待。
refreshAfterWrite在BoundedLocalCache#afterRead -> refreshIfNeeded方法:
void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}
K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = (now + ASYNC_EXPIRY);
if (((now - oldWriteTime) > refreshAfterWriteNanos())
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
CompletableFuture<V> refreshFuture;
long startTime = statsTicker().read();
//是否異步
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = future.thenCompose(value ->
cacheLoader.asyncReload(key, value, executor));
refreshFuture = refresh;
} else {
// no-op if load is pending
node.casWriteTime(refreshWriteTime, oldWriteTime);
return;
}
} else {
@SuppressWarnings("NullAway")
// 異常加載新值
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture = refresh;
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime;
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
node.casWriteTime(refreshWriteTime, oldWriteTime);
statsCounter().recordLoadFailure(loadTime);
return;
}
@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;
boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
return value;
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true);
if (discard[0] && hasRemovalListener()) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
}
});
} catch (Throwable t) {
node.casWriteTime(refreshWriteTime, oldWriteTime);
logger.log(Level.SEVERE, "Exception thrown when submitting refresh task", t);
}
}
}
refreshAfterWrite異步加載過程如上,expireAfterWrite會在hasExpired就會判斷,如果過期就調(diào)用doComputeIfAbsent加載新值。
boolean hasExpired(Node<K, V> node, long now) {
return (expiresAfterAccess() && (now - node.getAccessTime() >= expiresAfterAccessNanos()))
| (expiresAfterWrite() && (now - node.getWriteTime() >= expiresAfterWriteNanos()))
| (expiresVariable() && (now - node.getVariableTime() >= 0));
}
Count–Min Sketch算法實(shí)現(xiàn)
上文寫到caffeine采用W-TinyLFU實(shí)現(xiàn)淘汰策略,其中過濾器這塊使用了Count–Min Sketch算法。caffeine這塊的實(shí)現(xiàn)在FrequencySketch類中,在實(shí)現(xiàn)中做了一些優(yōu)化。
FrequencySketch使用一個long數(shù)組來記錄訪問頻率,數(shù)組大小為最接近緩存大小且比其大的2的冪數(shù)。在FrequencySketch中,認(rèn)為最大的訪問頻率是15,換成二進(jìn)制則是4位,那一個long理論上可以放16種算法。但caffeine將long分為16等份,每份4bit用來存儲對應(yīng)的頻率。long結(jié)構(gòu)如下:

我們再看看添加一次訪問頻率的代碼:
public void increment(@NonNull E e) {
if (isNotInitialized()) {
return;
}
// 和jdk的hashmap一樣,這里的spread操作會讓hash更均勻
int hash = spread(e.hashCode());
// 獲取一個小于16的數(shù),即判斷在long里那個等份
int start = (hash & 3) << 2;
// 設(shè)計了4種hash方法(對應(yīng)不同的種子),分別計算4個不同的table(long數(shù)組)下標(biāo)
int index0 = indexOf(hash, 0);
int index1 = indexOf(hash, 1);
int index2 = indexOf(hash, 2);
int index3 = indexOf(hash, 3);
// 將table[index]、table[index+1]、table[index+2]、table[index+3]對應(yīng)的等分追加1
boolean added = incrementAt(index0, start);
added |= incrementAt(index1, start + 1);
added |= incrementAt(index2, start + 2);
added |= incrementAt(index3, start + 3);
//處理頻率很高但不經(jīng)常使用的數(shù)據(jù)
if (added && (++size == sampleSize)) {
reset();
}
}
boolean incrementAt(int i, int j) {
//j為16等份的下標(biāo),offset即為64等份的下標(biāo)
int offset = j << 2;
// mask用來判斷是否等于15
long mask = (0xfL << offset);
if ((table[i] & mask) != mask) {
// 不等于15則追加1
table[i] += (1L << offset);
return true;
}
return false;
}
數(shù)據(jù)保新
如果有些數(shù)據(jù)頻率很高,但不經(jīng)常使用怎么辦,總不能一直放在long數(shù)組中吧。Caffeine有個機(jī)制,當(dāng)所有數(shù)據(jù)的統(tǒng)計頻率數(shù)達(dá)到某一個閾值(默認(rèn)為maximum的10倍),則對所有數(shù)的頻率減半。
if (added && (++size == sampleSize)) {
reset();
}
/** Reduces every counter by half of its original value. */
void reset() {
int count = 0;
for (int i = 0; i < table.length; i++) {
count += Long.bitCount(table[i] & ONE_MASK);
table[i] = (table[i] >>> 1) & RESET_MASK;
}
size = (size >>> 1) - (count >>> 2);
}
Window
上面提到TinyLFU在面對突發(fā)性的稀疏流量時表現(xiàn)很差,新數(shù)據(jù)很難積累到足夠多的頻率來通過過濾器。而caffeine在此基礎(chǔ)上做了優(yōu)化,引入Window Tiny LFU(W-TinyLFU)。


當(dāng)window區(qū)數(shù)據(jù)滿了,就會根據(jù)LRU把數(shù)據(jù)candidate放入probation區(qū),如果probation也滿了,則跟probation數(shù)據(jù)進(jìn)行pk,輸?shù)谋惶蕴?br>
caffeine默認(rèn)配置為window容量占1%,剩余的80%為Protected,20%為probation(實(shí)驗測試這樣配置效果最好),實(shí)際運(yùn)行時會動態(tài)調(diào)整。
驅(qū)逐代碼如下:
/** Evicts entries if the cache exceeds the maximum. */
@GuardedBy("evictionLock")
void evictEntries() {
if (!evicts()) {
return;
}
//淘汰window區(qū)的記錄
int candidates = evictFromWindow();
//淘汰main區(qū)的記錄
evictFromMain(candidates);
}
/**
* Evicts entries from the window space into the main space while the window size exceeds a
* maximum.
*
* @return the number of candidate entries evicted from the window space
*/
@GuardedBy("evictionLock")
int evictFromWindow() {
int candidates = 0;
//獲取window queue的頭部節(jié)點(diǎn)
Node<K, V> node = accessOrderWindowDeque().peek();
//若超過window最大限制,則處理
while (windowWeightedSize() > windowMaximum()) {
// The pending operations will adjust the size to reflect the correct weight
if (node == null) {
break;
}
Node<K, V> next = node.getNextInAccessOrder();
if (node.getWeight() != 0) {
node.makeMainProbation();
//從window區(qū)移除并加入Probation
accessOrderWindowDeque().remove(node);
accessOrderProbationDeque().add(node);
candidates++;
//調(diào)整size
setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());
}
node = next;
}
return candidates;
}
@GuardedBy("evictionLock")
void evictFromMain(int candidates) {
int victimQueue = PROBATION;
//分別獲取第一個和最后個
Node<K, V> victim = accessOrderProbationDeque().peekFirst();
Node<K, V> candidate = accessOrderProbationDeque().peekLast();
// 當(dāng)cache容量不夠時處理
while (weightedSize() > maximum()) {
// Stop trying to evict candidates and always prefer the victim
if (candidates == 0) {
candidate = null;
}
// Try evicting from the protected and window queues
// 嘗試從protected和window區(qū)獲取victim數(shù)據(jù)
if ((candidate == null) && (victim == null)) {
if (victimQueue == PROBATION) {
victim = accessOrderProtectedDeque().peekFirst();
victimQueue = PROTECTED;
continue;
} else if (victimQueue == PROTECTED) {
victim = accessOrderWindowDeque().peekFirst();
victimQueue = WINDOW;
continue;
}
// The pending operations will adjust the size to reflect the correct weight
break;
}
// Skip over entries with zero weight
if ((victim != null) && (victim.getPolicyWeight() == 0)) {
victim = victim.getNextInAccessOrder();
continue;
} else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
candidate = candidate.getPreviousInAccessOrder();
candidates--;
continue;
}
// Evict immediately if only one of the entries is present
if (victim == null) {
@SuppressWarnings("NullAway")
Node<K, V> previous = candidate.getPreviousInAccessOrder();
Node<K, V> evict = candidate;
candidate = previous;
candidates--;
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
} else if (candidate == null) {
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
// Evict immediately if an entry was collected
K victimKey = victim.getKey();
K candidateKey = candidate.getKey();
if (victimKey == null) {
@NonNull Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
} else if (candidateKey == null) {
candidates--;
@NonNull Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
}
// Evict immediately if the candidate's weight exceeds the maximum
// weight太大的節(jié)點(diǎn)直接驅(qū)逐
if (candidate.getPolicyWeight() > maximum()) {
candidates--;
Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
// Evict the entry with the lowest frequency
candidates--;
// admit根據(jù)頻率記錄來比較
if (admit(candidateKey, victimKey)) {
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
candidate = candidate.getPreviousInAccessOrder();
} else {
Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
}
}
}
//頻率比較
@GuardedBy("evictionLock")
boolean admit(K candidateKey, K victimKey) {
int victimFreq = frequencySketch().frequency(victimKey);
int candidateFreq = frequencySketch().frequency(candidateKey);
//誰大誰贏
if (candidateFreq > victimFreq) {
return true;
//相等,則candidateFreq <= 5算輸。這里有考慮衰減的情況,怕candidate利用這個規(guī)律淘汰老數(shù)據(jù),主要是提高命中率。
} else if (candidateFreq <= 5) {
// The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
// exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
// candidate reduces the number of random acceptances to minimize the impact on the hit rate.
return false;
}
// 相等且candidateFreq>5,則隨機(jī)淘汰一個
int random = ThreadLocalRandom.current().nextInt();
return ((random & 127) == 0);
}
高性能讀寫操作
緩存對數(shù)據(jù)進(jìn)行處理(讀、寫)后,都會伴隨一些額外的操作,如:
- 判斷數(shù)據(jù)是否過期;
- 統(tǒng)計頻率;
- 記錄讀寫
- 統(tǒng)計命中率等
在guava中,讀寫操作和這些額外的操作一起進(jìn)行。caffeine借鑒了WAL思想,執(zhí)行讀寫操作后,將操作記錄記載緩沖區(qū),后面再異步處理,提高了性能。
ReadBuffer
每次讀命中后,會執(zhí)行afterRead:
void afterRead(Node<K, V> node, long now, boolean recordHit) {
if (recordHit) {
statsCounter().recordHits(1);
}
// 將數(shù)據(jù)加入ReadBuffer
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
refreshIfNeeded(node, now);
}
// 首先會進(jìn)入BoundedBuffer的父類StripedBuffer
public int offer(E e) {
int mask;
int result = 0;
Buffer<E> buffer;
boolean uncontended = true;
Buffer<E>[] buffers = table;
if ((buffers == null)
|| (mask = buffers.length - 1) < 0
//根據(jù)線程id判斷在哪個buffer
|| (buffer = buffers[getProbe() & mask]) == null
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
expandOrRetry(e, uncontended);
}
return result;
}
public int offer(E e) {
long head = readCounter;
long tail = relaxedWriteCounter();
//獲取buffer大小,超過則加入失敗,即允許有數(shù)據(jù)丟失
long size = (tail - head);
if (size >= SPACED_SIZE) {
return Buffer.FULL;
}
// cas操作追加16
if (casWriteCounter(tail, tail + OFFSET)) {
//求余獲取下標(biāo)
int index = (int) (tail & SPACED_MASK);
buffer.lazySet(index, e);
return Buffer.SUCCESS;
}
return Buffer.FAILED;
}
這里要注意下,ringbuffer默認(rèn)為16,而其數(shù)組大小是256,這里是假設(shè)引用大小為4字節(jié)(ringbuffer存的是引用),緩存行大小為64。所以這里每個緩存行只存一個數(shù)據(jù),所以cas操作追加16,即數(shù)組中每16個元素只有一個有效存儲,空間換時間。
參考
https://albenw.github.io/posts/a4ae1aa2/
https://zhouxinghang.github.io/caffeine.html

