聊聊緩存

本地緩存:mybatis實(shí)現(xiàn):裝飾器模式實(shí)踐

  • PerpetualCache:永久緩存:通過(guò)HashMap實(shí)現(xiàn)最大容量為Integer.MaxValue 的不過(guò)期緩存
  • LruCache:固定緩存大小,實(shí)現(xiàn)最近最少使用的Key過(guò)期:通過(guò)LinkedHashMap實(shí)現(xiàn)LRU算法的緩存
    • 覆寫(xiě)removeEldestEntry方法(達(dá)到最大容量返回true)
    • accessOrder 設(shè)為true
public void setSize(final int size) {
    keyMap = new LinkedHashMap<Object, Object>(size, .75F, true) {
      private static final long serialVersionUID = 4267176411845948333L;

      @Override
      protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
        boolean tooBig = size() > size;
        if (tooBig) {
          eldestKey = eldest.getKey();
        }
        return tooBig;
      }
    };
  }
  • FifoCache:固定緩存大小,實(shí)現(xiàn)按添加順序,最老的key過(guò)期:利用LinkedList 實(shí)現(xiàn)FIFO算法的緩存,使用Deque 的addLast removeFirst 方法
private void cycleKeyList(Object key) {
    keyList.addLast(key);
    if (keyList.size() > size) {
      Object oldestKey = keyList.removeFirst();
      delegate.removeObject(oldestKey);
    }
  }
  • BlockingCache:通過(guò)ReentrantLock + ConcurrentHashMap 實(shí)現(xiàn)阻塞式緩存,鎖粒度為單個(gè)緩存Key,以達(dá)到單機(jī)多線(xiàn)程環(huán)境下,只觸發(fā)一次緩存數(shù)據(jù)load(業(yè)務(wù)邏輯為,get加鎖,先get為null,從db load出來(lái)后put進(jìn)緩存,并釋放鎖)
public class BlockingCache implements Cache {
  private long timeout;
  private final Cache delegate;
  private final ConcurrentHashMap<Object, ReentrantLock> locks;
}
  • TransactionalCache:實(shí)現(xiàn)類(lèi)似不可重復(fù)讀的事務(wù)緩存

集中式緩存:Memcached

業(yè)務(wù)邏輯:查詢(xún)緩存,若未命中,觸發(fā)加載db數(shù)據(jù)進(jìn)緩存

//查詢(xún)緩存
Object value = cache.get(key);
if (value != null) return value;
//查詢(xún)db數(shù)據(jù)放入緩存中再返回。
value = loadingFromDb(key);
cache.put(key, value)
return value;

考慮有一下幾個(gè)問(wèn)題:

  • 緩存穿透:若一直在查詢(xún)一條不存在的數(shù)據(jù),每次訪(fǎng)問(wèn)緩存拿到的結(jié)果都是null,那么每次都會(huì)觸發(fā)一次db加載數(shù)據(jù)的操作:改進(jìn),維護(hù)一個(gè)黑名單,直接返回空或者默認(rèn)數(shù)據(jù)。
  • 緩存雪崩:若某一時(shí)間點(diǎn),有大量請(qǐng)求需要查詢(xún)緩存,這時(shí)緩存沒(méi)數(shù)據(jù)或者緩存集中過(guò)期,會(huì)觸發(fā)大量從數(shù)據(jù)庫(kù) 加載數(shù)據(jù)的請(qǐng)求,給數(shù)據(jù)庫(kù)造成很大壓力,嚴(yán)重時(shí)可能產(chǎn)生數(shù)據(jù)庫(kù)雪崩:
  • 改進(jìn)思路:
    • 首先要避免緩存集中過(guò)期的情況,設(shè)置過(guò)期時(shí)間,給個(gè)浮動(dòng)范圍,讓其可以在幾分鐘內(nèi)平滑過(guò)期;
    • 大量請(qǐng)求進(jìn)來(lái),還是可能會(huì)出現(xiàn)緩存沒(méi)值的場(chǎng)景,觸發(fā)大量db loading,分析可知,只要有一次db loading成功,刷新過(guò)緩存,后續(xù)請(qǐng)求進(jìn)來(lái)后就可以直接走緩存了,加個(gè)ReentrantLock,然后tryLock等待個(gè)幾秒鐘就可以解決這個(gè)問(wèn)題,實(shí)現(xiàn)預(yù)熱緩存
//查詢(xún)緩存
Object value = cache.get(key);
if (value != null) return value;
//多線(xiàn)程環(huán)境下加鎖防止加載邏輯被觸發(fā)多次,或者加分布式鎖
reentrantLock.tryLock(ms);
//查詢(xún)db數(shù)據(jù)放入緩存中再返回。
value = loadingFromDb(key);
cache.put(key, value);
//解鎖
reentrantLock.unlock();
return value;
  • 緩存一致性問(wèn)題:存在場(chǎng)景需要緩存多個(gè)不同Key的訂單數(shù)據(jù),但由于訂單數(shù)據(jù)會(huì)在多個(gè)環(huán)節(jié)被修改,一旦數(shù)據(jù)被修改了,就會(huì)出現(xiàn)數(shù)據(jù)不一致,即緩存的是臟數(shù)據(jù)
    • 改進(jìn)一(偏向時(shí)效性):利用MQ來(lái)保證一致性,約定一個(gè)topic,在數(shù)據(jù)被修改時(shí),發(fā)消息觸發(fā)刷新邏輯從數(shù)據(jù)庫(kù)加載最新改動(dòng)的數(shù)據(jù)進(jìn)緩存。
    • 改進(jìn)二(業(yè)務(wù)容忍一定的延遲):為緩存記錄設(shè)定過(guò)期時(shí)間,在緩存過(guò)期后到來(lái)的請(qǐng)求,會(huì)自動(dòng)觸發(fā)緩存刷新邏輯,這里面帶來(lái)個(gè)小問(wèn)題,一般對(duì)熱點(diǎn)數(shù)據(jù)的訪(fǎng)問(wèn)請(qǐng)求都會(huì)持續(xù)的進(jìn)行。在熱點(diǎn)數(shù)據(jù)過(guò)期后,再請(qǐng)求,會(huì)進(jìn)入緩存刷新邏輯,表現(xiàn)出來(lái)會(huì)有周期性的接口請(qǐng)求慢的場(chǎng)景,對(duì)用戶(hù)體驗(yàn)不友好。對(duì)此在進(jìn)行優(yōu)化,設(shè)計(jì)二階段緩存實(shí)現(xiàn)+異步刷新邏輯:即通過(guò)給緩存對(duì)象設(shè)定 軟過(guò)期和硬過(guò)期時(shí)間(軟過(guò)期<硬過(guò)期),每次查詢(xún)緩存對(duì)象時(shí),判斷下軟過(guò)期是否小于當(dāng)前時(shí)間,如果是,先返回緩存對(duì)象,提交一個(gè)Runable,通過(guò)線(xiàn)程池觸發(fā)異步刷新緩存的邏輯:考慮到微服務(wù)下有多個(gè)實(shí)例部署,加一個(gè)redis鎖,同一時(shí)刻,保證數(shù)據(jù)加載邏輯只會(huì)被觸發(fā)一次
/**
     * 異步緩存重刷
     *
     * @param cachedObject          緩存數(shù)據(jù)實(shí)體
     * @param realKey               真實(shí)的緩存KEY
     * @param logicalExpireSeconds  邏輯失效
     * @param physicalExpireSeconds 物理失效
     * @param cacheNullable         是否允許cacheNull
     * @param func                  異步reload業(yè)務(wù)回調(diào)
     * @param clazz                 數(shù)據(jù)返回類(lèi)型
     * @param <T>                   數(shù)據(jù)加載回調(diào)返回類(lèi)型
     */
    private <T> T asyncReloadAndGetCachedResult(CacheObject<T> cachedObject, String realKey, int logicalExpireSeconds,
                                                int physicalExpireSeconds, boolean cacheNullable, Func<T> func,
                                                Class<T> clazz, Lock lock) {
        T result = null;
        if (cachedObject != null) {
            if (cachedObject.value != null) {
                if (clazz == null || clazz.isAssignableFrom(cachedObject.value.getClass())) {
                    //clazz沒(méi)設(shè)定,類(lèi)型轉(zhuǎn)換異常只能再業(yè)務(wù)層發(fā)現(xiàn)了
                    result = cachedObject.value;
                    if (cachedObject.isLogicExpire()) {
                        //軟過(guò)期,觸發(fā)異步重刷機(jī)制,返回緩存中數(shù)據(jù)
                        asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable, func, clazz,
                            lock);
                    }
                } else {
                    //如果緩存中的數(shù)據(jù)會(huì)導(dǎo)致類(lèi)型轉(zhuǎn)換異常
                    LOGGER.error("cached result class type is not match! expect:{}, actual:{}", clazz,
                        cachedObject.value.getClass());
                    result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable,
                        func, clazz);
                }
            } else {
                if (cacheNullable) {
                    if (cachedObject.isLogicExpire()) {
                        //軟過(guò)期,觸發(fā)異步重刷機(jī)制,返回緩存中數(shù)據(jù)
                        asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, true, func, clazz, lock);
                    }
                } else {
                    //不允許cache緩存卻拿到了null(可能修改了nullable參數(shù)),這里強(qiáng)制進(jìn)行一次緩存刷新,避免業(yè)務(wù)層發(fā)生錯(cuò)誤
                    result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, false,
                        func, clazz);
                }
            }
        }
        return result;
    }
/**
     * 異步緩存重刷
     *
     * @param realKey               真實(shí)的緩存KEY
     * @param logicalExpireSeconds  邏輯失效
     * @param physicalExpireSeconds 物理失效
     * @param cacheNullable         是否允許cacheNull
     * @param func                  異步reload業(yè)務(wù)回調(diào)
     * @param tClass                緩存的對(duì)象類(lèi)型
     * @param <T>                   數(shù)據(jù)加載回調(diào)返回類(lèi)型
     */
    private <T> void asyncReload(String realKey, int logicalExpireSeconds, int physicalExpireSeconds,
                                 boolean cacheNullable, Func<T> func, Class<T> tClass, Lock lock) {
        try {
            //if (!LOCAL_QUEUED_WORKING_TASK_SET.contains(realKey)) {
            JedisLock jedisLock = cacheRefreshLockFactory.buildLock(realKey);
            if (jedisLock.tryLock()) {
                try {
                    //更新待處理任務(wù)列表,避免重入
                    //LOCAL_QUEUED_WORKING_TASK_SET.add(realKey);
                    CACHE_REFRESH_EXECUTOR.execute(new CacheRefreshTask(realKey, () -> {
                        try {
                            if (lock != null) {
                                //阻塞等待主線(xiàn)程完成剩余操作
                                lock.lock();
                            }
                            try {
                                T value = func.invoke();
                                if (cacheNullable) {
                                    refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds, tClass);
                                } else {
                                    if (value != null) {
                                        refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds,
                                            tClass);
                                    }
                                }
                            } finally {
                                if (lock != null) {
                                    lock.unlock();
                                }
                            }
                        } catch (Exception ex) {
                            LOGGER.error("reload cache of {} failed!", realKey, ex);
                        } finally {
                            //移除任務(wù)
                            //LOCAL_QUEUED_WORKING_TASK_SET.remove(realKey);
                        }
                    }));
                } finally {
                    //do nothing
                }
            } else {
                //若任務(wù)正在處理中,不做任何處理。
                //此處不能更新LOCAL_QUEUED_WORKING_TASK_SET,因?yàn)槿蝿?wù)不再當(dāng)前實(shí)例上得到運(yùn)行,任務(wù)不會(huì)得到更新。
            }
            //} else {
            //    LOGGER.warn("task is in queue waiting for being executed!");
            //}
        } catch (Exception e) {
            LOGGER.error("asyncReload failed! realKey:{}", realKey, e);
        }
    }

分布式緩存

參考:https://www.cnblogs.com/moonandstar08/p/5405991.html
當(dāng)緩存的數(shù)據(jù)量超過(guò)單機(jī)緩存的上線(xiàn),就需要引入多臺(tái)緩存服務(wù)器 組成緩存集群,對(duì)緩存請(qǐng)求做負(fù)載均衡,每臺(tái)機(jī)器負(fù)責(zé) 1/n的 請(qǐng)求,常用算法有:輪循算法(Round Robin)、哈希算法(HASH)、最少連接算法(Least Connection)、響應(yīng)速度算法(Response Time)、加權(quán)法(Weighted ),但考慮到緩存服務(wù)的特殊性,他需要在機(jī)器上存儲(chǔ)數(shù)據(jù),常見(jiàn)的這些算法(除hash算法外)會(huì)造成緩存數(shù)據(jù)冗余,并且命中率不高,造成集群利用率降低,而且隨著系統(tǒng)訪(fǎng)問(wèn)量上升,壓力越來(lái)越大,實(shí)際壓力超過(guò)服務(wù)器的能承受的上限,出現(xiàn)節(jié)點(diǎn)宕機(jī)的情況,后續(xù)會(huì)通過(guò)加節(jié)點(diǎn)機(jī)器的形式來(lái)緩解壓力,在分布式緩存中,經(jīng)常會(huì)出現(xiàn)增刪機(jī)器的情況,引發(fā)緩存的重分布,在重分布過(guò)程中,大量請(qǐng)求到達(dá)db,極端情況下引起數(shù)據(jù)庫(kù)宕機(jī)

哈希算法:i = hash(key) mod N,對(duì)將要存入的key做hash計(jì)算,然后對(duì)可用的機(jī)器總數(shù)取模,得出的編號(hào)i,就是實(shí)際緩存需要存放的目標(biāo)機(jī)器編號(hào),在增刪節(jié)點(diǎn)機(jī)器的情況下,N發(fā)生變化,導(dǎo)致現(xiàn)有緩存失效,引發(fā)大量的緩存重分布,造成db壓力過(guò)大,極端情況下引起數(shù)據(jù)庫(kù)宕機(jī)

一致性哈希算法:Consistent Hashing,思路是將所有緩存機(jī)器編號(hào),計(jì)算hash值,放入一個(gè)擁有2的31次方的hash環(huán)中,對(duì)將要存入的緩存key做hash計(jì)算,得出hash值,取hash環(huán)按順時(shí)針尋找,找到的第一個(gè)可用的hash值,它所對(duì)應(yīng)的機(jī)器,就是目標(biāo)緩存機(jī)器。

分析:

  • 場(chǎng)景1:假如集群中原先有N個(gè)節(jié)點(diǎn),第N個(gè)節(jié)點(diǎn)宕機(jī),重新發(fā)布業(yè)務(wù)系統(tǒng)后,那按照一致性哈希算法,原先分步在第N個(gè)節(jié)點(diǎn)上的數(shù)據(jù)(hash(cacheKey) > hash(N-1))會(huì)重分步到第一個(gè)節(jié)點(diǎn)上,前N-1個(gè)節(jié)點(diǎn)的緩存數(shù)據(jù)依然有效可用,
  • 假如集群中原先有N個(gè)節(jié)點(diǎn),現(xiàn)增加一個(gè)節(jié)點(diǎn),重新發(fā)布業(yè)務(wù)系統(tǒng)后,那按照一致性哈希算法,原先分步在第N個(gè)節(jié)點(diǎn)上的一部分?jǐn)?shù)據(jù)(hash(N)< hash(cacheKey)< hash(N + 1))會(huì)重分布到新增節(jié)點(diǎn)上
    結(jié)論:由此可見(jiàn),一致性哈希算法,在增減集群節(jié)點(diǎn)時(shí),只會(huì)引起一部分緩存數(shù)據(jù)重分布,相比普通的hash算法,減少了緩存失效的范圍,減少了系統(tǒng)顛簸

缺點(diǎn):一致性哈希算法的使用,因?yàn)榧簷C(jī)器的hash值在hash環(huán)中的位置問(wèn)題,可能會(huì)引發(fā)數(shù)據(jù)傾斜,出現(xiàn)集群中某些節(jié)點(diǎn)量很大,其他節(jié)點(diǎn)存儲(chǔ)量很小,可以引入虛擬節(jié)點(diǎn)(即為集群中每臺(tái)機(jī)器計(jì)算多次hash值)hash環(huán)中放置虛擬節(jié)點(diǎn),多個(gè)虛擬節(jié)點(diǎn)和機(jī)器之間維護(hù)一份映射,存儲(chǔ)數(shù)據(jù)時(shí),通過(guò)虛擬節(jié)點(diǎn)找到機(jī)器,通過(guò)這總方式,讓集群機(jī)器在hash環(huán)中的分布更分散,已達(dá)到均勻分布的效率。

舉例:dubbo消費(fèi)方對(duì)提供方的調(diào)用,采用一致性hash算法來(lái)做的話(huà),會(huì)為每個(gè)提供方Invoker生成160份虛擬節(jié)點(diǎn),以此讓請(qǐng)求的分發(fā)更加均勻。

實(shí)現(xiàn):TreeMap(存儲(chǔ)hash環(huán)) + hashMap(維護(hù)虛擬節(jié)點(diǎn)和機(jī)器的映射),參考dubbo的一致性hash算法實(shí)現(xiàn)

private static final class ConsistentHashSelector<T> {
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
        private final int                       replicaNumber;
        private final int                       identityHashCode;  
        private final int[]                     argumentIndex;
        //初始化hash環(huán)
        public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = System.identityHashCode(invokers);
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i ++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
        //順時(shí)針查找滿(mǎn)足條件的機(jī)器
        private Invoker<T> sekectForKey(long hash) {
            Invoker<T> invoker;
            Long key = hash;
            if (!virtualInvokers.containsKey(key)) {
                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
                if (tailMap.isEmpty()) {
                    key = virtualInvokers.firstKey();
                } else {
                    key = tailMap.firstKey();
                }
            }
            invoker = virtualInvokers.get(key);
            return invoker;
        }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容