memcached源碼分析-哈希表(hashtable)模塊


導(dǎo)航

memcached源碼分析
memcached源碼分析-網(wǎng)絡(luò)模塊
memcached源碼分析-指令解析模塊
memcached源碼分析-哈希表(hashtable)模塊
memcached源碼分析-slab存儲機(jī)制

1.前言

Memcached是一個(gè)高性能的開源分布式內(nèi)存對象緩存系統(tǒng),說到這里的"高性能"、"緩存"那么不得不提memcached內(nèi)存是如何管理的,這也是memecached核心內(nèi)容,接下來主要分hashtable模塊、slabs內(nèi)存分配管理機(jī)制、LRU淘汰策略。這一章節(jié)先行分析hashtable模塊。memcached的哈希表是用來保存item*數(shù)據(jù)結(jié)構(gòu)的一段固定大小的內(nèi)存區(qū)域,這里主要分析的功能點(diǎn)有hash表的段鎖、hash表的增刪改查、hash表的動態(tài)拓容。

2.hash表段鎖

memcached是多線程的實(shí)現(xiàn)的,為了確保多線程操作hash表的安全性,代碼中使用了鎖。什么是段鎖,就是有可能會多個(gè)key對應(yīng)一把鎖,而不是每一個(gè)key都對應(yīng)一把鎖,因?yàn)椴煌膋ey可能hash出來的值一樣,那么就都會對應(yīng)這一把鎖。


段鎖示意圖
程序中在memcached_thread_init函數(shù)中初始化了鎖
void memcached_thread_init(int nthreads, void *arg) {
    //....

    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (nthreads < 5) {
        power = 12;
    } else if (nthreads <= 10) {
        power = 13;
    } else if (nthreads <= 20) {
        power = 14;
    } else {
        /* 32k buckets. just under the hashpower default. */
        power = 15;
    }
    //段鎖的槽大小要小于hash槽的大小
    //個(gè)人理解power等于hashpower是可取的,可以減少鎖沖突,如果power大于hashpower是一種資源的浪費(fèi)
    if (power >= hashpower) {
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
        exit(1);
    }

    //哈希鎖的數(shù)量計(jì)算
    item_lock_count = hashsize(power);
    item_lock_hashpower = power;

    //申請hash鎖資源
    //哈希鎖不會和hash表一樣進(jìn)行拓容,如果后期hash拓容,那么會導(dǎo)致越多的hash槽位對應(yīng)同一把鎖
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror("Can't allocate item locks");
        exit(1);
    }
    //初始化
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }
}

鎖的使用實(shí)例
//鎖的使用
enum store_item_type store_item(item *item, int comm, conn* c) {
    enum store_item_type ret;
    uint32_t hv;
    //根據(jù)item計(jì)算一個(gè)對應(yīng)的hash value
    hv = hash(ITEM_key(item), item->nkey);
    //根據(jù)hash value 獲取對應(yīng)的鎖資源,鎖hash槽
    item_lock(hv);
    //存儲item到hash槽鏈表上
    ret = do_store_item(item, comm, c, hv);
    //釋放鎖資源
    item_unlock(hv);
    return ret;
}

3.哈希表的增刪改查

hash表的增刪改查,說到hash表,我們先介紹一個(gè)memcached中的一個(gè)重要結(jié)構(gòu)體item,item結(jié)構(gòu)是用于存儲緩存數(shù)據(jù)的節(jié)點(diǎn),item內(nèi)存分配于slab(后面會做詳細(xì)分析),而hash表主要作用是記錄item節(jié)點(diǎn),我們在后面可通過key快速查取到緩存數(shù)據(jù)

item節(jié)點(diǎn)結(jié)構(gòu)
/**
 * Structure for storing items within memcached.
 */
typedef struct _stritem {
    /* Protected by LRU locks */
    struct _stritem *next;
    struct _stritem *prev;

    /* Rest are protected by an item lock */
    //h_next用于記錄哈希表槽中下一個(gè)item節(jié)點(diǎn)的地址
    //多線程訪問hash槽節(jié)點(diǎn),是通過item lock保證線程安全的
    struct _stritem *h_next;    /* hash chain next */

    rel_time_t      time;       /* least recent access */
    rel_time_t      exptime;    /* expire time */
    int             nbytes;     /* size of data */
    unsigned short  refcount;
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* ITEM_* above */
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

哈希表的初始化

main函數(shù)中調(diào)用assoc_init實(shí)現(xiàn)hashtable的初始化工作

void assoc_init(const int hashtable_init) {
    //根據(jù)hashtable_init計(jì)算哈希表的大小
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
    //根據(jù)hashpower計(jì)算hash表一共有多少個(gè)桶槽
    //例如hashpower = 16,那么通過hashsize計(jì)算可得桶槽大小為1<<16 = 65536
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }
    STATS_LOCK();
    stats_state.hash_power_level = hashpower;
    stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
    STATS_UNLOCK();
}

哈希表查找

查找過程:
1.根據(jù)key計(jì)算一個(gè)hash值hv
2.根據(jù)hv獲取段鎖(保證多線程安全訪問)
3.根據(jù)hv到哈希表中找到對應(yīng)的桶槽
4.遍歷桶槽的的單向鏈表,比較key值,找到具體的item節(jié)點(diǎn)

item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
    item *it;
    uint32_t hv;
    hv = hash(key, nkey);
    //獲取鎖,保證多線程訪問hash表的安全性
    item_lock(hv);
    //根據(jù)key值,取hash表中查找item
    it = do_item_get(key, nkey, hv, c, do_update);
    //釋放鎖資源
    item_unlock(hv);
    return it;
}

item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
    //assoc_find就是具體的hash表查找
    item *it = assoc_find(key, nkey, hv);
    //...
}

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;
    unsigned int oldbucket;

    //expanding是用來判斷hash表是否正在拓容
    //expanding == true說明hash表正在拓容
    //先簡要說明一下拓容:hash表拓容就是分配一張更大的hash表,然后將數(shù)據(jù)從頭到尾將舊的hash內(nèi)容復(fù)制一份到新的hash表
    //expand_bucket是標(biāo)記hash表已經(jīng)拓容到哪個(gè)桶槽了
    //比如舊的hash表old_hashtble一共65536個(gè)桶槽位,已經(jīng)復(fù)制了10000個(gè)桶槽數(shù)據(jù)到新哈希表primary_hashtable,那么此時(shí)
    //expand_bucket == 10000
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        //1.哈希表正在拓容
        //2.hv & hashmask(hashpower - 1) 大于expand_bucket說明,要查詢的桶槽在old_hashtable處
        it = old_hashtable[oldbucket];
    } else {
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

    item *ret = NULL;
    int depth = 0;
    //遍歷桶槽的單向鏈表
    while (it) {
        //比較具體的key值
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}

向hash表中插入一個(gè)新增的item
int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;
    //...
    //這里的解析參考我們的assoc_find分析
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    { 
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;      
    } else {
        it->h_next = primary_hashtable[hv & hashmask(hashpower)] 
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }
}

從hash表中刪除一個(gè)item節(jié)點(diǎn)
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
    //根據(jù)key,nkey,hv查找節(jié)點(diǎn)
    //我們要查找key對應(yīng)的item節(jié)點(diǎn)pnode
    //prev->h_next指向pnode
    //這里的before就是prev->h_next的變量地址,也就是pnode前驅(qū)item節(jié)點(diǎn)h_next指針變量的地址
    item **before = _hashitem_before(key, nkey, hv);

    if (*before) {
        item *nxt;
        /* The DTrace probe cannot be triggered as the last instruction
         * due to possible tail-optimization by the compiler
         */
        MEMCACHED_ASSOC_DELETE(key, nkey);
        nxt = (*before)->h_next;
        (*before)->h_next = 0;   /* probably pointless, but whatever. */
        *before = nxt;
        return;
    }
    /* Note:  we never actually get here.  the callers don't delete things
       they can't find. */
    assert(*before != 0);
}

/* returns the address of the item pointer before the key.  if *item == 0,
   the item wasn't found */
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
    item **pos;
    unsigned int oldbucket;

    //這里的解析參考我們的assoc_find分析
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        pos = &old_hashtable[oldbucket];
    } else {
        pos = &primary_hashtable[hv & hashmask(hashpower)];
    }

    //比較key
    while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
        pos = &(*pos)->h_next;
    }
    return pos;
}

3.哈希表的拓容

哈希表何時(shí)進(jìn)行拓容呢?memcached做法是向libevent中注冊了一個(gè)時(shí)鐘回調(diào),定時(shí)的去檢測是否需要對哈希表進(jìn)行拓容操作。hash表的拓容過程則是由memcached單獨(dú)申請了一個(gè)線程進(jìn)行拓容操作。

檢測拓容回調(diào)函數(shù)
int main(){
    //...
    /* initialise clock event */
    clock_handler(0, 0, 0);
    //....
}

/* libevent uses a monotonic clock when available for event scheduling. Aside
 * from jitter, simply ticking our internal timer here is accurate enough.
 * Note that users who are setting explicit dates for expiration times *must*
 * ensure their clocks are correct before starting memcached. */
static void clock_handler(const int fd, const short which, void *arg) {
    //...
    // While we're here, check for hash table expansion.
    // This function should be quick to avoid delaying the timer.
    //判斷是否需要hash拓容
    assoc_start_expand(stats_state.curr_items);

    evtimer_set(&clockevent, clock_handler, 0);
    event_base_set(main_base, &clockevent);
    evtimer_add(&clockevent, &t);
    //...
}

void assoc_start_expand(uint64_t curr_items) {
    //正在拓容,返回
    if (started_expanding)
        return;

    //curr_items是當(dāng)前hash表中記錄的item節(jié)點(diǎn)的個(gè)數(shù)
    //當(dāng)curr_items大于當(dāng)前hash表桶槽個(gè)數(shù)的1.5倍的時(shí)候,則需要拓容當(dāng)前的hash表
    if (curr_items > (hashsize(hashpower) * 3) / 2 &&
          hashpower < HASHPOWER_MAX) {
        started_expanding = true;
        //信號量觸發(fā)
        pthread_cond_signal(&maintenance_cond);
    }
}

拓容線程
//hash的拓容memcached獨(dú)立的啟動了一個(gè)線程進(jìn)行拓容操作
static void *assoc_maintenance_thread(void *arg) {

    mutex_lock(&maintenance_lock);
    while (do_run_maintenance_thread) {
        int ii = 0;

        /* There is only one expansion thread, so no need to global lock. */
        //expanding初始化為false,當(dāng)assoc_start_expand中信號量觸發(fā)將會觸發(fā)assoc_expand的執(zhí)行
        //assoc_expand中對expanding置為true
        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
            item *it, *next;
            unsigned int bucket;
            void *item_lock = NULL;

            /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
             * is the lowest N bits of the hv, and the bucket of item_locks is
             *  also the lowest M bits of hv, and N is greater than M.
             *  So we can process expanding with only one item_lock. cool! */
             //expand_bucket變量含義解析我們可以參考assoc_find中的解釋
             //當(dāng)前桶槽嘗試加鎖
            if ((item_lock = item_trylock(expand_bucket))) {
                //for循環(huán)遍歷桶槽下的鏈表,將每個(gè)節(jié)點(diǎn)從old_hashtable遷移到primary_hashtable
                for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                    next = it->h_next;
                    bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                    it->h_next = primary_hashtable[bucket];
                    primary_hashtable[bucket] = it;
                }

                old_hashtable[expand_bucket] = NULL;
                //下一個(gè)桶槽
                expand_bucket++;
                //當(dāng)expand_bucket == old_hashtable尺寸大小,則節(jié)點(diǎn)轉(zhuǎn)移完畢
                if (expand_bucket == hashsize(hashpower - 1)) {
                    //停止拓容
                    expanding = false;
                    free(old_hashtable);
                    STATS_LOCK();
                    stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                    stats_state.hash_is_expanding = false;
                    STATS_UNLOCK();
                    if (settings.verbose > 1)
                        fprintf(stderr, "Hash table expansion done\n");
                }

            } else {
                usleep(10*1000);
            }

            if (item_lock) {
                item_trylock_unlock(item_lock);
                item_lock = NULL;
            }
        }

        if (!expanding) {
            /* We are done expanding.. just wait for next invocation */
            started_expanding = false;
            //信號量未觸發(fā),當(dāng)前線程將阻塞在這里
            //當(dāng)assoc_start_expand中執(zhí)行pthread_cond_signal,當(dāng)前線程不在阻塞
            pthread_cond_wait(&maintenance_cond, &maintenance_lock);
            /* assoc_expand() swaps out the hash table entirely, so we need
             * all threads to not hold any references related to the hash
             * table while this happens.
             * This is instead of a more complex, possibly slower algorithm to
             * allow dynamic hash table expansion without causing significant
             * wait times.
             */
            pause_threads(PAUSE_ALL_THREADS);
            //拓容前的準(zhǔn)備
            assoc_expand();
            pause_threads(RESUME_ALL_THREADS);
        }
    }
    return NULL;
}

/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
    //副本
    old_hashtable = primary_hashtable;
    //新拓容的hash表
    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++;
        //assoc_maintenance_thread函數(shù)將依據(jù)該變量,開始進(jìn)行拓容操作
        expanding = true;
        expand_bucket = 0;
        STATS_LOCK();
        stats_state.hash_power_level = hashpower;
        stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats_state.hash_is_expanding = true;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容