很久前參加過今日頭條的面試,遇到一個(gè)題,目前半部分是如何實(shí)現(xiàn) LRU,后半部分是 Redis 中如何實(shí)現(xiàn) LRU。
我的第一反應(yīng)應(yīng)該是內(nèi)存不夠的場景下,淘汰舊內(nèi)容的策略。LRU ... Least Recent Used,淘汰掉最不經(jīng)常使用的??梢陨晕⒍嘌a(bǔ)充兩句,因?yàn)橛?jì)算機(jī)體系結(jié)構(gòu)中,最大的最可靠的存儲(chǔ)是硬盤,它容量很大,并且內(nèi)容可以固化,但是訪問速度很慢,所以需要把使用的內(nèi)容載入內(nèi)存中;內(nèi)存速度很快,但是容量有限,并且斷電后內(nèi)容會(huì)丟失,并且為了進(jìn)一步提升性能,還有CPU內(nèi)部的 L1 Cache,L2 Cache等概念。因?yàn)樗俣仍娇斓牡胤?,它的單位成本越高,容量越小,新的?nèi)容不斷被載入,舊的內(nèi)容肯定要被淘汰,所以就有這樣的使用背景。
LRU原理
在一般標(biāo)準(zhǔn)的操作系統(tǒng)教材里,會(huì)用下面的方式來演示 LRU 原理,假設(shè)內(nèi)存只能容納3個(gè)頁大小,按照 7 0 1 2 0 3 0 4 的次序訪問頁。假設(shè)內(nèi)存按照棧的方式來描述訪問時(shí)間,在上面的,是最近訪問的,在下面的是,最遠(yuǎn)時(shí)間訪問的,LRU就是這樣工作的。

但是如果讓我們自己設(shè)計(jì)一個(gè)基于 LRU 的緩存,這樣設(shè)計(jì)可能問題很多,這段內(nèi)存按照訪問時(shí)間進(jìn)行了排序,會(huì)有大量的內(nèi)存拷貝操作,所以性能肯定是不能接受的。
那么如何設(shè)計(jì)一個(gè)LRU緩存,使得放入和移除都是 O(1) 的,我們需要把訪問次序維護(hù)起來,但是不能通過內(nèi)存中的真實(shí)排序來反應(yīng),有一種方案就是使用雙向鏈表。
基于 HashMap 和 雙向鏈表實(shí)現(xiàn) LRU 的
整體的設(shè)計(jì)思路是,可以使用 HashMap 存儲(chǔ) key,這樣可以做到 save 和 get key的時(shí)間都是 O(1),而 HashMap 的 Value 指向雙向鏈表實(shí)現(xiàn)的 LRU 的 Node 節(jié)點(diǎn),如圖所示。

LRU 存儲(chǔ)是基于雙向鏈表實(shí)現(xiàn)的,下面的圖演示了它的原理。其中 head 代表雙向鏈表的表頭,tail 代表尾部。首先預(yù)先設(shè)置 LRU 的容量,如果存儲(chǔ)滿了,可以通過 O(1) 的時(shí)間淘汰掉雙向鏈表的尾部,每次新增和訪問數(shù)據(jù),都可以通過 O(1)的效率把新的節(jié)點(diǎn)增加到對頭,或者把已經(jīng)存在的節(jié)點(diǎn)移動(dòng)到隊(duì)頭。
下面展示了,預(yù)設(shè)大小是 3 的,LRU存儲(chǔ)的在存儲(chǔ)和訪問過程中的變化。為了簡化圖復(fù)雜度,圖中沒有展示 HashMap部分的變化,僅僅演示了上圖 LRU 雙向鏈表的變化。我們對這個(gè)LRU緩存的操作序列如下:
save("key1", 7)
save("key2", 0)
save("key3", 1)
save("key4", 2)
get("key2")
save("key5", 3)
get("key2")
save("key6", 4)
相應(yīng)的 LRU 雙向鏈表部分變化如下:

總結(jié)一下核心操作的步驟:
save(key, value),首先在 HashMap 找到 Key 對應(yīng)的節(jié)點(diǎn),如果節(jié)點(diǎn)存在,更新節(jié)點(diǎn)的值,并把這個(gè)節(jié)點(diǎn)移動(dòng)隊(duì)頭。如果不存在,需要構(gòu)造新的節(jié)點(diǎn),并且嘗試把節(jié)點(diǎn)塞到隊(duì)頭,如果LRU空間不足,則通過 tail 淘汰掉隊(duì)尾的節(jié)點(diǎn),同時(shí)在 HashMap 中移除 Key。
get(key),通過 HashMap 找到 LRU 鏈表節(jié)點(diǎn),因?yàn)楦鶕?jù)LRU 原理,這個(gè)節(jié)點(diǎn)是最新訪問的,所以要把節(jié)點(diǎn)插入到隊(duì)頭,然后返回緩存的值。
完整基于 Java 的代碼參考如下
```
class?DLinkedNode {
????String key;
????int?value;
????DLinkedNode pre;
????DLinkedNode post;
}
```
LRU Cache
```
public?class?LRUCache {
????private?Hashtable
????????????cache =?new?Hashtable();
????private?int?count;
????private?int?capacity;
????private?DLinkedNode head, tail;
????public?LRUCache(int?capacity) {
????????this.count = 0;
????????this.capacity = capacity;
????????head =?new?DLinkedNode();
????????head.pre =?null;
????????tail =?new?DLinkedNode();
????????tail.post =?null;
????????head.post = tail;
????????tail.pre = head;
????}
????public?int?get(String key) {
????????DLinkedNode node = cache.get(key);
????????if(node ==?null){
????????????return?-1;?// should raise exception here.
????????}
????????// move the accessed node to the head;
????????this.moveToHead(node);
????????return?node.value;
????}
????public?void?set(String key,?int?value) {
????????DLinkedNode node = cache.get(key);
????????if(node ==?null){
????????????DLinkedNode newNode =?new?DLinkedNode();
????????????newNode.key = key;
????????????newNode.value = value;
????????????this.cache.put(key, newNode);
????????????this.addNode(newNode);
????????????++count;
????????????if(count > capacity){
????????????????// pop the tail
????????????????DLinkedNode tail =?this.popTail();
????????????????this.cache.remove(tail.key);
????????????????--count;
????????????}
????????}else{
????????????// update the value.
????????????node.value = value;
????????????this.moveToHead(node);
????????}
????}
????/**
?????* Always add the new node right after head;
?????*/
????private?void?addNode(DLinkedNode node){
????????node.pre = head;
????????node.post = head.post;
????????head.post.pre = node;
????????head.post = node;
????}
????/**
?????* Remove an existing node from the linked list.
?????*/
????private?void?removeNode(DLinkedNode node){
????????DLinkedNode pre = node.pre;
????????DLinkedNode post = node.post;
????????pre.post = post;
????????post.pre = pre;
????}
????/**
?????* Move certain node in between to the head.
?????*/
????private?void?moveToHead(DLinkedNode node){
????????this.removeNode(node);
????????this.addNode(node);
????}
????// pop the current tail.
????private?DLinkedNode popTail(){
????????DLinkedNode res = tail.pre;
????????this.removeNode(res);
????????return?res;
????}
}
```
那么問題的后半部分,是 Redis 如何實(shí)現(xiàn),這個(gè)問題這么問肯定是有坑的,那就是redis肯定不是這樣實(shí)現(xiàn)的。
Redis的LRU實(shí)現(xiàn)
如果按照HashMap和雙向鏈表實(shí)現(xiàn),需要額外的存儲(chǔ)存放 next 和 prev 指針,犧牲比較大的存儲(chǔ)空間,顯然是不劃算的。所以Redis采用了一個(gè)近似的做法,就是隨機(jī)取出若干個(gè)key,然后按照訪問時(shí)間排序后,淘汰掉最不經(jīng)常使用的,具體分析如下:
為了支持LRU,Redis 2.8.19中使用了一個(gè)全局的LRU時(shí)鐘,server.lruclock,定義如下,
```
#define REDIS_LRU_BITS 24
unsigned lruclock:REDIS_LRU_BITS;?/* Clock for LRU eviction */
```
默認(rèn)的LRU時(shí)鐘的分辨率是1秒,可以通過改變REDIS_LRU_CLOCK_RESOLUTION宏的值來改變,Redis會(huì)在serverCron()中調(diào)用updateLRUClock定期的更新LRU時(shí)鐘,更新的頻率和hz參數(shù)有關(guān),默認(rèn)為100ms一次,如下,
```
#define REDIS_LRU_CLOCK_MAX ((1<lru */
#define REDIS_LRU_CLOCK_RESOLUTION 1 /* LRU clock resolution in seconds */
void?updateLRUClock(void) {
????server.lruclock = (server.unixtime / REDIS_LRU_CLOCK_RESOLUTION) &
????????????????????????????????????????????????REDIS_LRU_CLOCK_MAX;
}
```
server.unixtime是系統(tǒng)當(dāng)前的unix時(shí)間戳,當(dāng) lruclock 的值超出REDIS_LRU_CLOCK_MAX時(shí),會(huì)從頭開始計(jì)算,所以在計(jì)算一個(gè)key的最長沒有訪問時(shí)間時(shí),可能key本身保存的lru訪問時(shí)間會(huì)比當(dāng)前的lrulock還要大,這個(gè)時(shí)候需要計(jì)算額外時(shí)間,如下,
```
/* Given an object returns the min number of seconds the object was never
?* requested, using an approximated LRU algorithm. */
unsigned?long?estimateObjectIdleTime(robj *o) {
????if?(server.lruclock >= o->lru) {
????????return?(server.lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
????}?else?{
????????return?((REDIS_LRU_CLOCK_MAX - o->lru) + server.lruclock) *
????????????????????REDIS_LRU_CLOCK_RESOLUTION;
????}
}
```
Redis支持和LRU相關(guān)淘汰策略包括,
volatile-lru設(shè)置了過期時(shí)間的key參與近似的lru淘汰策略
allkeys-lru所有的key均參與近似的lru淘汰策略
當(dāng)進(jìn)行LRU淘汰時(shí),Redis按如下方式進(jìn)行的,
```
......
????????????/* volatile-lru and allkeys-lru policy */
????????????else?if?(server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
????????????????server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
????????????{
????????????????for?(k = 0; k < server.maxmemory_samples; k++) {
????????????????????sds thiskey;
????????????????????long?thisval;
????????????????????robj *o;
????????????????????de = dictGetRandomKey(dict);
????????????????????thiskey = dictGetKey(de);
????????????????????/* When policy is volatile-lru we need an additional lookup
?????????????????????* to locate the real key, as dict is set to db->expires. */
????????????????????if?(server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
????????????????????????de = dictFind(db->dict, thiskey);
????????????????????o = dictGetVal(de);
????????????????????thisval = estimateObjectIdleTime(o);
????????????????????/* Higher idle time is better candidate for deletion */
????????????????????if?(bestkey == NULL || thisval > bestval) {
????????????????????????bestkey = thiskey;
????????????????????????bestval = thisval;
????????????????????}
????????????????}
????????????}
????????????......
```
Redis會(huì)基于server.maxmemory_samples配置選取固定數(shù)目的key,然后比較它們的lru訪問時(shí)間,然后淘汰最近最久沒有訪問的key,maxmemory_samples的值越大,Redis的近似LRU算法就越接近于嚴(yán)格LRU算法,但是相應(yīng)消耗也變高,對性能有一定影響,樣本值默認(rèn)為5。
總結(jié)
看來,雖然一個(gè)簡單的概念,在工業(yè)界的產(chǎn)品中,為了追求空間的利用率,也會(huì)采用權(quán)衡的實(shí)現(xiàn)方案。
對Java架構(gòu)技術(shù)感興趣的同學(xué),歡迎加QQ群619881427,一起學(xué)習(xí),相互討論。
群內(nèi)已經(jīng)有小伙伴將知識(shí)體系整理好(源碼,筆記,PPT,學(xué)習(xí)視頻),歡迎加群免費(fèi)領(lǐng)取。
分享給喜歡Java的,喜歡編程,有夢想成為架構(gòu)師的程序員們,希望能夠幫助到你們。
不是Java的程序員也沒關(guān)系,幫忙轉(zhuǎn)發(fā)給更多朋友!謝謝。
一個(gè)分享小技巧點(diǎn)擊閱讀原文也。可以輕松獲取學(xué)習(xí)資料哦!