ConcurrentLinkedHashMap源碼分析

一、簡介

ConcurrentLinkedHashMap是google 開源的線程安全的方便并發(fā)的Map, Map利用LRU緩存機(jī)制對Map中存儲對象進(jìn)行換入換出管理。采用兩套資源控制機(jī)制,一套同步機(jī)制,使用ConcurrentMap對對象數(shù)據(jù)進(jìn)行KV存儲,保證多線程并發(fā)安全地調(diào)用Map資源,而對于存儲對象的換入換出管理則采用異步機(jī)制,使用Queue buffer存儲每次的因?qū)ο笞x寫而產(chǎn)生的對象換入換出任務(wù),當(dāng)遇到讀任務(wù)超過閾值或?qū)懭蝿?wù)時,加鎖后,執(zhí)行buffer中的多個任務(wù),依次對evictionDeque進(jìn)行節(jié)點調(diào)整,需要移除的數(shù)據(jù),從map中移除。在jdk1.8后官方建議用Caffeine代替

二、代碼解析

2.1 相關(guān)類
**Weigher **是一個計算每條記錄占用存儲單元數(shù)接口,項目在類Weighers中給了許多針對不同需求的計算方式,如Byte 數(shù)組可以通過數(shù)組長度計算為存儲單元個數(shù),而就一般應(yīng)用的存儲對象,可以直接用SingletonWeigher,每條記錄占用一個存儲單元。

@ThreadSafe
public interface Weigher<V> {

  /**
   * Measures an object's weight to determine how many units of capacity that
   * the value consumes. A value must consume a minimum of one unit.
   *
   * @param value the object to weigh
   * @return the object's weight
   */
  int weightOf(V value);
}

**WeightedValue **是對Value的裝飾,包含了Value占用的存儲單元個數(shù)weight值,以及根據(jù)weight值計算狀態(tài)

  • active 存在于map和隊列中
  • retire 從map里刪除但隊列里還未刪除
  • dead map和隊列里都刪除

**Node **實現(xiàn)了鏈接表中Linked Node,便于LinkedDeque的雙向索引,是Map以及evictionDeque存儲對象。
**Task **是針對存儲對象LRU順序操作的抽象類,繼承自Task的有ReadTask、AddTask、UpdateTask、RemoveTask, 每一個Task有一個根據(jù)創(chuàng)建順序分配的order。

2.2 ConcurrentLinkedHashMap主要屬性

// 存儲數(shù)據(jù)
final ConcurrentMap<K, Node<K, V>> data; 
// 實際存儲大小
final AtomicLong weightedSize;
// 維護(hù)對象換入換出
final LinkedDeque<Node<K, V>> evictionDeque;
// 限制存儲大小
final AtomicLong capacity;
// 回調(diào)
final EvictionListener<K, V> listener;

2.3 主要操作過程
get 操作,首先從Map中讀取,再添加一個addTask用于調(diào)整queue中LRU order

@Override
  public V get(Object key) {
    final Node<K, V> node = data.get(key);
    if (node == null) {
      return null;
    }
    afterRead(node); // 處理LRU 異步調(diào)整隊列順序 注意不是立刻調(diào)整,當(dāng)滿足32條時才調(diào)整
    return node.getValue();
  }

put 操作稍微復(fù)雜,需要判斷是否只在缺少才插入(putIfAbsent),如果不存在,直接插入,如果存在, 而不是只在不存在的情況下插入,則更新。

 V put(K key, V value, boolean onlyIfAbsent) {
    checkNotNull(key);
    checkNotNull(value);

    final int weight = weigher.weightOf(key, value);
    final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
    final Node<K, V> node = new Node<K, V>(key, weightedValue);

    for (;;) {
      final Node<K, V> prior = data.putIfAbsent(node.key, node);
      if (prior == null) {
          // 處理LRU 異步添加到隊列中
        afterWrite(new AddTask(node, weight));
        return null;
      } else if (onlyIfAbsent) {
        afterRead(prior);
        return prior.getValue();
      }
       // 以下代碼是更新時更新權(quán)重大小
      for (;;) {
        final WeightedValue<V> oldWeightedValue = prior.get();
        if (!oldWeightedValue.isAlive()) {
          break;
        }

        if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
          final int weightedDifference = weight - oldWeightedValue.weight;
          if (weightedDifference == 0) {
            afterRead(prior);
          } else {
            afterWrite(new UpdateTask(prior, weightedDifference));
          }
          return oldWeightedValue.value;
        }
      }
    }
  }

remove

 @Override
  public V remove(Object key) {
    final Node<K, V> node = data.remove(key);
    if (node == null) {
      return null;
    }
    // 設(shè)置節(jié)點狀態(tài) 改為 retire
    makeRetired(node);
    // 處理LRU 隊列刪除節(jié)點
    afterWrite(new RemovalTask(node));
    return node.getValue();
  }

2.4 LRU 處理過程

void afterWrite(Runnable task) {
    // 添加寫任務(wù)到緩存區(qū)
    writeBuffer.add(task);
    // 設(shè)置處理狀態(tài)為必須
    drainStatus.lazySet(REQUIRED);
    // 嘗試去添加并調(diào)整順序
    tryToDrainBuffers();
    // 通知
    notifyListener();
  }

void afterRead(Node<K, V> node) {
    final int bufferIndex = readBufferIndex();
    // 把此次查詢節(jié)點放入緩存區(qū) 并返回待調(diào)整的數(shù)量
    final long writeCount = recordRead(bufferIndex, node);
    // 嘗試調(diào)整
    drainOnReadIfNeeded(bufferIndex, writeCount);
    notifyListener();
  }

void drainOnReadIfNeeded(int bufferIndex, long writeCount) {
    final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get());
    // READ_BUFFER_THRESHOLD 為32  當(dāng)待調(diào)整數(shù)量大于32時進(jìn)入調(diào)整
    final boolean delayable = (pending < READ_BUFFER_THRESHOLD);
    final DrainStatus status = drainStatus.get();
    if (status.shouldDrainBuffers(delayable)) {
      tryToDrainBuffers();
    }
  }

void tryToDrainBuffers() {
    if (evictionLock.tryLock()) {
      try {
        drainStatus.lazySet(PROCESSING);
        drainBuffers();
      } finally {
        drainStatus.compareAndSet(PROCESSING, IDLE);
        evictionLock.unlock();
      }
    }
  }

  @GuardedBy("evictionLock")
  void drainBuffers() {
    // 處理讀緩存區(qū)
    drainReadBuffers();
    // 處理寫緩存區(qū)
    drainWriteBuffer();
  }

@GuardedBy("evictionLock")
  void drainReadBuffers() {
    final int start = (int) Thread.currentThread().getId();
    final int end = start + NUMBER_OF_READ_BUFFERS;
    for (int i = start; i < end; i++) {
      drainReadBuffer(i & READ_BUFFERS_MASK);
    }
  }

  @GuardedBy("evictionLock")
  void drainReadBuffer(int bufferIndex) {
    final long writeCount = readBufferWriteCount[bufferIndex].get();
    for (int i = 0; i < READ_BUFFER_DRAIN_THRESHOLD; i++) {
      final int index = (int) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK);
      final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index];
      final Node<K, V> node = slot.get();
      if (node == null) {
        break;
      }
      slot.lazySet(null);
      applyRead(node);
      readBufferReadCount[bufferIndex]++;
    }
    readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount);
  }

2.5 任務(wù)實現(xiàn)過程
AddTask

final class AddTask implements Runnable {
    final Node<K, V> node;
    final int weight;

    AddTask(Node<K, V> node, int weight) {
      this.weight = weight;
      this.node = node;
    }

    @Override
    @GuardedBy("evictionLock")
    public void run() {
      weightedSize.lazySet(weightedSize.get() + weight);

      // ignore out-of-order write operations
      if (node.get().isAlive()) {
        evictionDeque.add(node);
        // 踢出多余的
        evict();
      }
    }
  }
@GuardedBy("evictionLock")
  void evict() {
    // Attempts to evict entries from the map if it exceeds the maximum
    // capacity. If the eviction fails due to a concurrent removal of the
    // victim, that removal may cancel out the addition that triggered this
    // eviction. The victim is eagerly unlinked before the removal task so
    // that if an eviction is still required then a new victim will be chosen
    // for removal.
    while (hasOverflowed()) {
      final Node<K, V> node = evictionDeque.poll();

      // If weighted values are used, then the pending operations will adjust
      // the size to reflect the correct weight
      if (node == null) {
        return;
      }

      // Notify the listener only if the entry was evicted
      if (data.remove(node.key, node)) {
        pendingNotifications.add(node);
      }

      makeDead(node);
    }
  }

RemovalTask

final class RemovalTask implements Runnable {
    final Node<K, V> node;

    RemovalTask(Node<K, V> node) {
      this.node = node;
    }

    @Override
    @GuardedBy("evictionLock")
    public void run() {
      // add may not have been processed yet
      evictionDeque.remove(node);
      makeDead(node);
    }
  }

UpdateTask

final class UpdateTask implements Runnable {
    final int weightDifference;
    final Node<K, V> node;

    public UpdateTask(Node<K, V> node, int weightDifference) {
      this.weightDifference = weightDifference;
      this.node = node;
    }

    @Override
    @GuardedBy("evictionLock")
    public void run() {
      weightedSize.lazySet(weightedSize.get() + weightDifference);
      applyRead(node);
      evict();
    }
  }

三、示例

  1. 限制內(nèi)存大小
EntryWeigher<K, V> memoryUsageWeigher = new EntryWeigher<K, V>() {
  final MemoryMeter meter = new MemoryMeter();

  @Override public int weightOf(K key, V value) {
    long bytes = meter.measure(key) + meter.measure(value);
    return (int) Math.min(bytes, Integer.MAX_VALUE);
  }
};
ConcurrentMap<K, V> cache = new ConcurrentLinkedHashMap.Builder<K, V>()
    .maximumWeightedCapacity(1024 * 1024) // 1 MB
    .weigher(memoryUsageWeigher)
    .build();
  1. 限制條數(shù)
ConcurrentLinkedHashMap<String, String> map = new ConcurrentLinkedHashMap.Builder<String, String>()
            .maximumWeightedCapacity(2).weigher(Weighers.singleton()).build();
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 操作系統(tǒng)基本概念 操作系統(tǒng)是計算機(jī)科學(xué)研究基石之一。 功能 管理硬件(如設(shè)備驅(qū)動:實現(xiàn)用戶提出的I/O操作請求,完...
    Hengtao24閱讀 4,671評論 2 14
  • 本文以32位機(jī)器為準(zhǔn),串講一些內(nèi)存管理的知識點。 1. 虛擬地址、物理地址、邏輯地址、線性地址 虛擬地址又叫線性地...
    linux服務(wù)器開發(fā)閱讀 2,179評論 0 0
  • 1. 基礎(chǔ)知識 1.1、 基本概念、 功能 馮諾伊曼體系結(jié)構(gòu)1、計算機(jī)處理的數(shù)據(jù)和指令一律用二進(jìn)制數(shù)表示2、順序執(zhí)...
    yunpiao閱讀 5,780評論 1 22
  • 第一章.計算機(jī)系統(tǒng)概述1.基本構(gòu)成2.指令的執(zhí)行3.中斷3.1 目的3.2 類型3.3 中斷控制流3.4 中斷處理...
    某WAP閱讀 932評論 0 0
  • 內(nèi)存是用于存放數(shù)據(jù)的硬件。程序執(zhí)行前需要先放到內(nèi)存中才能被CPU處理。 若計算機(jī)按字節(jié)編址,則每個存儲單元大小為1...
    dev_winner閱讀 1,124評論 0 2

友情鏈接更多精彩內(nèi)容