一、簡(jiǎn)介
二、帶著問(wèn)題
三、源碼
3.1 代碼結(jié)構(gòu)
3.2 核心代碼
源碼學(xué)習(xí) https://github.com/muesli/cache2go
一、簡(jiǎn)介
cache2go是一個(gè)簡(jiǎn)單的緩存庫(kù),大概500行代碼。
- 支持并發(fā)安全
- 支持過(guò)期清除
- 支持訪問(wèn)計(jì)數(shù)
- 配置回調(diào)函數(shù)
二、帶著問(wèn)題
- 并發(fā)的使用
- 過(guò)期清除如何實(shí)現(xiàn)的
- 回掉函數(shù)的配置
三、源碼
3.1 代碼結(jié)構(gòu)
- Cache.go 緩存庫(kù)
- CacheTable.go 緩存表
- CacheItem.go 緩存項(xiàng)
3.2 核心代碼
- CacheItem
type CacheItem struct {
sync.RWMutex
key interface{}
data interface{}
//每個(gè)item都有一個(gè)訪問(wèn)時(shí)間和訪問(wèn)次數(shù),以及存活時(shí)間。
lifeSpan time.Duration
createdOn time.Time
accessedOn time.Time
accessCount int64
aboutToExpire []func(key interface{}) //支持回調(diào)函數(shù)
}
每次訪問(wèn)item時(shí)更新accessedOn和accessCount;
每次清理時(shí),判斷now-accessedOn是否已大于存活時(shí)間,來(lái)決定清理;
// KeepAlive marks an item to be kept for another expireDuration period.
func (item *CacheItem) KeepAlive() {
item.Lock()
defer item.Unlock()
item.accessedOn = time.Now()
item.accessCount++
}
- CacheTable
// CacheTable is a table within the cache
type CacheTable struct {
sync.RWMutex
// The table's name.
name string
// All cached items.
items map[interface{}]*CacheItem
// Timer responsible for triggering cleanup.
cleanupTimer *time.Timer //以表的維度,來(lái)清理過(guò)期數(shù)據(jù)
// Current timer duration.
cleanupInterval time.Duration
// Callback method triggered when trying to load a non-existing key.
loadData func(key interface{}, args ...interface{}) *CacheItem
// Callback method triggered when adding a new item to the cache.
addedItem []func(item *CacheItem)
// Callback method triggered before deleting an item from the cache.
aboutToDeleteItem []func(item *CacheItem)
}
Add
回調(diào)函數(shù)的調(diào)用;
每次添加,如果存活時(shí)間<清理周期,會(huì)觸發(fā)一次清理;
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
item := NewCacheItem(key, lifeSpan, data)
// Add item to cache.
table.Lock()
table.items[item.key] = item
// Cache values so we don't keep blocking the mutex.
expDur := table.cleanupInterval
addedItem := table.addedItem
table.Unlock()
// Trigger callback after adding an item to cache. 回掉函數(shù)的觸發(fā)
if addedItem != nil {
for _, callback := range addedItem {
callback(item)
}
}
// If we haven't set up any expiration check timer or found a more imminent item.
if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
table.expirationCheck()
}
return item
}
Delete
// Delete an item from the cache.
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
table.Lock()
defer table.Unlock()
r, ok := table.items[key]
if !ok {
return nil, ErrKeyNotFound
}
// Cache value so we don't keep blocking the mutex.
aboutToDeleteItem := table.aboutToDeleteItem
table.Unlock()
// Trigger callbacks before deleting an item from cache.
if aboutToDeleteItem != nil {
for _, callback := range aboutToDeleteItem {
callback(r)
}
}
r.RLock()
defer r.RUnlock()
if r.aboutToExpire != nil {
for _, callback := range r.aboutToExpire {
callback(key)
}
}
table.Lock()
delete(table.items, key)
return r, nil
}
Value
如果存在,更新item的訪問(wèn);
如果不存在,支持自定義加載函數(shù);
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
table.RLock()
r, ok := table.items[key]
loadData := table.loadData
table.RUnlock()
if ok {
// Update access counter and timestamp.
r.KeepAlive()
return r, nil
}
// Item doesn't exist in cache. Try and fetch it with a data-loader.
if loadData != nil {
item := loadData(key, args...)
if item != nil {
table.Add(key, item.lifeSpan, item.data)
return item, nil
}
return nil, ErrKeyNotFoundOrLoadable
}
return nil, ErrKeyNotFound
}
定期清理
清理實(shí)際上是遍歷每個(gè)item,判斷它的過(guò)期時(shí)間,過(guò)期就刪除;
清理的觸發(fā):一是每次添加時(shí),二是每次清理之后創(chuàng)建一個(gè)計(jì)時(shí)器;
非固定周期的清理,基于item的存活時(shí)間來(lái)調(diào)整;
// Expiration check loop, triggered by a self-adjusting timer.
func (table *CacheTable) expirationCheck() {
table.Lock()
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
table.log("Expiration check installed for table", table.name)
}
// To be more accurate with timers, we would need to update 'now' on every
// loop iteration. Not sure it's really efficient though.
now := time.Now()
smallestDuration := 0 * time.Second
for key, item := range table.items {
// Cache values so we don't keep blocking the mutex.
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()
if lifeSpan == 0 {
continue
}
if now.Sub(accessedOn) >= lifeSpan {
// Item has excessed its lifespan.
table.deleteInternal(key)
} else {
// Find the item chronologically closest to its end-of-lifespan.
if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}
// Setup the interval for the next cleanup run.
table.cleanupInterval = smallestDuration
if smallestDuration > 0 {
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
go table.expirationCheck()
})
}
table.Unlock()
}
四、思考
- 鎖的類型:創(chuàng)建后一成不變的可以不加鎖,只讀加讀鎖,寫入加寫鎖;
- 鎖的范圍:加鎖獲取數(shù)據(jù)后,后續(xù)操作緩存數(shù)據(jù)不再加鎖;
- 鎖的粒度:對(duì)數(shù)據(jù)表的操作,獲取表鎖。對(duì)于某個(gè)item的操作,獲取item的鎖;