一、簡介
ConcurrentLinkedHashMap是google 開源的線程安全的方便并發(fā)的Map, Map利用LRU緩存機(jī)制對Map中存儲對象進(jìn)行換入換出管理。采用兩套資源控制機(jī)制,一套同步機(jī)制,使用ConcurrentMap對對象數(shù)據(jù)進(jìn)行KV存儲,保證多線程并發(fā)安全地調(diào)用Map資源,而對于存儲對象的換入換出管理則采用異步機(jī)制,使用Queue buffer存儲每次的因?qū)ο笞x寫而產(chǎn)生的對象換入換出任務(wù),當(dāng)遇到讀任務(wù)超過閾值或?qū)懭蝿?wù)時,加鎖后,執(zhí)行buffer中的多個任務(wù),依次對evictionDeque進(jìn)行節(jié)點調(diào)整,需要移除的數(shù)據(jù),從map中移除。在jdk1.8后官方建議用Caffeine代替
二、代碼解析
2.1 相關(guān)類
**Weigher **是一個計算每條記錄占用存儲單元數(shù)接口,項目在類Weighers中給了許多針對不同需求的計算方式,如Byte 數(shù)組可以通過數(shù)組長度計算為存儲單元個數(shù),而就一般應(yīng)用的存儲對象,可以直接用SingletonWeigher,每條記錄占用一個存儲單元。
@ThreadSafe
public interface Weigher<V> {
/**
* Measures an object's weight to determine how many units of capacity that
* the value consumes. A value must consume a minimum of one unit.
*
* @param value the object to weigh
* @return the object's weight
*/
int weightOf(V value);
}
**WeightedValue **是對Value的裝飾,包含了Value占用的存儲單元個數(shù)weight值,以及根據(jù)weight值計算狀態(tài)
- active 存在于map和隊列中
- retire 從map里刪除但隊列里還未刪除
- dead map和隊列里都刪除
**Node **實現(xiàn)了鏈接表中Linked Node,便于LinkedDeque的雙向索引,是Map以及evictionDeque存儲對象。
**Task **是針對存儲對象LRU順序操作的抽象類,繼承自Task的有ReadTask、AddTask、UpdateTask、RemoveTask, 每一個Task有一個根據(jù)創(chuàng)建順序分配的order。
2.2 ConcurrentLinkedHashMap主要屬性
// 存儲數(shù)據(jù)
final ConcurrentMap<K, Node<K, V>> data;
// 實際存儲大小
final AtomicLong weightedSize;
// 維護(hù)對象換入換出
final LinkedDeque<Node<K, V>> evictionDeque;
// 限制存儲大小
final AtomicLong capacity;
// 回調(diào)
final EvictionListener<K, V> listener;
2.3 主要操作過程
get 操作,首先從Map中讀取,再添加一個addTask用于調(diào)整queue中LRU order
@Override
public V get(Object key) {
final Node<K, V> node = data.get(key);
if (node == null) {
return null;
}
afterRead(node); // 處理LRU 異步調(diào)整隊列順序 注意不是立刻調(diào)整,當(dāng)滿足32條時才調(diào)整
return node.getValue();
}
put 操作稍微復(fù)雜,需要判斷是否只在缺少才插入(putIfAbsent),如果不存在,直接插入,如果存在, 而不是只在不存在的情況下插入,則更新。
V put(K key, V value, boolean onlyIfAbsent) {
checkNotNull(key);
checkNotNull(value);
final int weight = weigher.weightOf(key, value);
final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
final Node<K, V> node = new Node<K, V>(key, weightedValue);
for (;;) {
final Node<K, V> prior = data.putIfAbsent(node.key, node);
if (prior == null) {
// 處理LRU 異步添加到隊列中
afterWrite(new AddTask(node, weight));
return null;
} else if (onlyIfAbsent) {
afterRead(prior);
return prior.getValue();
}
// 以下代碼是更新時更新權(quán)重大小
for (;;) {
final WeightedValue<V> oldWeightedValue = prior.get();
if (!oldWeightedValue.isAlive()) {
break;
}
if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
final int weightedDifference = weight - oldWeightedValue.weight;
if (weightedDifference == 0) {
afterRead(prior);
} else {
afterWrite(new UpdateTask(prior, weightedDifference));
}
return oldWeightedValue.value;
}
}
}
}
remove
@Override
public V remove(Object key) {
final Node<K, V> node = data.remove(key);
if (node == null) {
return null;
}
// 設(shè)置節(jié)點狀態(tài) 改為 retire
makeRetired(node);
// 處理LRU 隊列刪除節(jié)點
afterWrite(new RemovalTask(node));
return node.getValue();
}
2.4 LRU 處理過程
void afterWrite(Runnable task) {
// 添加寫任務(wù)到緩存區(qū)
writeBuffer.add(task);
// 設(shè)置處理狀態(tài)為必須
drainStatus.lazySet(REQUIRED);
// 嘗試去添加并調(diào)整順序
tryToDrainBuffers();
// 通知
notifyListener();
}
void afterRead(Node<K, V> node) {
final int bufferIndex = readBufferIndex();
// 把此次查詢節(jié)點放入緩存區(qū) 并返回待調(diào)整的數(shù)量
final long writeCount = recordRead(bufferIndex, node);
// 嘗試調(diào)整
drainOnReadIfNeeded(bufferIndex, writeCount);
notifyListener();
}
void drainOnReadIfNeeded(int bufferIndex, long writeCount) {
final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get());
// READ_BUFFER_THRESHOLD 為32 當(dāng)待調(diào)整數(shù)量大于32時進(jìn)入調(diào)整
final boolean delayable = (pending < READ_BUFFER_THRESHOLD);
final DrainStatus status = drainStatus.get();
if (status.shouldDrainBuffers(delayable)) {
tryToDrainBuffers();
}
}
void tryToDrainBuffers() {
if (evictionLock.tryLock()) {
try {
drainStatus.lazySet(PROCESSING);
drainBuffers();
} finally {
drainStatus.compareAndSet(PROCESSING, IDLE);
evictionLock.unlock();
}
}
}
@GuardedBy("evictionLock")
void drainBuffers() {
// 處理讀緩存區(qū)
drainReadBuffers();
// 處理寫緩存區(qū)
drainWriteBuffer();
}
@GuardedBy("evictionLock")
void drainReadBuffers() {
final int start = (int) Thread.currentThread().getId();
final int end = start + NUMBER_OF_READ_BUFFERS;
for (int i = start; i < end; i++) {
drainReadBuffer(i & READ_BUFFERS_MASK);
}
}
@GuardedBy("evictionLock")
void drainReadBuffer(int bufferIndex) {
final long writeCount = readBufferWriteCount[bufferIndex].get();
for (int i = 0; i < READ_BUFFER_DRAIN_THRESHOLD; i++) {
final int index = (int) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK);
final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index];
final Node<K, V> node = slot.get();
if (node == null) {
break;
}
slot.lazySet(null);
applyRead(node);
readBufferReadCount[bufferIndex]++;
}
readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount);
}
2.5 任務(wù)實現(xiàn)過程
AddTask
final class AddTask implements Runnable {
final Node<K, V> node;
final int weight;
AddTask(Node<K, V> node, int weight) {
this.weight = weight;
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
weightedSize.lazySet(weightedSize.get() + weight);
// ignore out-of-order write operations
if (node.get().isAlive()) {
evictionDeque.add(node);
// 踢出多余的
evict();
}
}
}
@GuardedBy("evictionLock")
void evict() {
// Attempts to evict entries from the map if it exceeds the maximum
// capacity. If the eviction fails due to a concurrent removal of the
// victim, that removal may cancel out the addition that triggered this
// eviction. The victim is eagerly unlinked before the removal task so
// that if an eviction is still required then a new victim will be chosen
// for removal.
while (hasOverflowed()) {
final Node<K, V> node = evictionDeque.poll();
// If weighted values are used, then the pending operations will adjust
// the size to reflect the correct weight
if (node == null) {
return;
}
// Notify the listener only if the entry was evicted
if (data.remove(node.key, node)) {
pendingNotifications.add(node);
}
makeDead(node);
}
}
RemovalTask
final class RemovalTask implements Runnable {
final Node<K, V> node;
RemovalTask(Node<K, V> node) {
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
// add may not have been processed yet
evictionDeque.remove(node);
makeDead(node);
}
}
UpdateTask
final class UpdateTask implements Runnable {
final int weightDifference;
final Node<K, V> node;
public UpdateTask(Node<K, V> node, int weightDifference) {
this.weightDifference = weightDifference;
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
weightedSize.lazySet(weightedSize.get() + weightDifference);
applyRead(node);
evict();
}
}
三、示例
- 限制內(nèi)存大小
EntryWeigher<K, V> memoryUsageWeigher = new EntryWeigher<K, V>() {
final MemoryMeter meter = new MemoryMeter();
@Override public int weightOf(K key, V value) {
long bytes = meter.measure(key) + meter.measure(value);
return (int) Math.min(bytes, Integer.MAX_VALUE);
}
};
ConcurrentMap<K, V> cache = new ConcurrentLinkedHashMap.Builder<K, V>()
.maximumWeightedCapacity(1024 * 1024) // 1 MB
.weigher(memoryUsageWeigher)
.build();
- 限制條數(shù)
ConcurrentLinkedHashMap<String, String> map = new ConcurrentLinkedHashMap.Builder<String, String>()
.maximumWeightedCapacity(2).weigher(Weighers.singleton()).build();