Memcache-哈希表-源碼分析

memcached-version-1.4.25

介紹

memcache 的 hashtable 就是一塊保存 item* 固定大小的內(nèi)存區(qū)域,也就是一個(gè)固定大小的指針數(shù)組,在程序啟動(dòng)的時(shí)候會(huì)初始化這個(gè) hashtable 數(shù)組的大小,主要核心涉及到的點(diǎn)就是 hashtable動(dòng)態(tài)擴(kuò)容、hashtable段鎖等,對(duì)應(yīng)的 hashtable 處理的文件為 hash.chash.h 、 assoc.c、 assoc.h

Memcache 哈希表

memcache 哈希表

源碼實(shí)現(xiàn)

初始化設(shè)置 hash 算法,hash_init ()

// 目前只提供2種hash算法 jenkins 和 murmur3
enum hashfunc_type {
    JENKINS_HASH=0, MURMUR3_HASH  
};
//啟動(dòng)memcache的時(shí)候調(diào)用
int hash_init(enum hashfunc_type type) {
    switch(type) {
        case JENKINS_HASH:
            hash = jenkins_hash; // jenkins_hash 函數(shù)指針
            settings.hash_algorithm = "jenkins";
            break;
        case MURMUR3_HASH:
            hash = MurmurHash3_x86_32;  // murmur3_hash 函數(shù)指針
            settings.hash_algorithm = "murmur3"; 
            break;
        default:
            return -1;
    }
    return 0;
}

初始化 hashtable 表,assoc_init()

settings.hashpower_init //hashtable的基準(zhǔn)值,啟動(dòng)memcache的時(shí)候設(shè)定

void assoc_init(const int hashtable_init) {
    // 如果存在則賦值 hashpower 沒(méi)有的話就用默認(rèn)的 hashpower = 16
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
    // hashsize 根據(jù)傳入的 hashpower 算出最終的 hashtable 長(zhǎng)度
    // hashsize(n) -> #define hashsize(n) ((ub4)1<<(n)) 
    // 實(shí)際上就是進(jìn)行右移運(yùn)算,右移 hashpower 位 ( 1 << 16 = 65536 )
    // 申請(qǐng)內(nèi)存 item** primary_hashtable -> hashtable頭指針 
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }
    STATS_LOCK();
    stats.hash_power_level = hashpower;
    stats.hash_bytes = hashsize(hashpower) * sizeof(void *); // hashtable 占了多少字節(jié)內(nèi)存
    STATS_UNLOCK();
}

hashtable 段鎖

在 memcache 初始化線程那部分,還會(huì)初始化一些互斥鎖, 其中就包括了 hashtable 的段鎖,什么是段鎖,就是有可能會(huì)多個(gè)key對(duì)應(yīng)一把鎖,而不是每一個(gè)key都對(duì)應(yīng)一把鎖,因?yàn)椴煌膋ey可能hash出來(lái)的值一樣,那么就都會(huì)對(duì)應(yīng)這一把鎖。

初始化 hashtable 鎖

// memcached_thread_init 函數(shù)其中一段代碼
  
void memcached_thread_init(int nthreads, struct event_base *main_base) {
    int         i;
    int         power;
    
    //.........
    
    //根據(jù)線程數(shù)設(shè)置 hashtable 鎖的啟動(dòng)值 power 跟上面的 hashpower 同理
    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (nthreads < 5) {
        power = 12;
    } else {
        /* 8192 buckets, and central locks don't scale much past 5 threads */
        power = 13;
    }
    // power 小于 hashpower 因?yàn)?hashtable鎖的數(shù)量 和 hashtable的數(shù)量 并不是一一對(duì)應(yīng)的
    // 也就是說(shuō)并不是 每一個(gè)hashtable的key都對(duì)應(yīng)一把鎖,memcache為了省內(nèi)存,采用多個(gè)key
    // 對(duì)應(yīng)一把鎖,也就是段鎖
    if (power >= hashpower) {
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
        exit(1);
    }

    //計(jì)算hashtable鎖的長(zhǎng)度, 默認(rèn)4個(gè)線程 power = 12 、 1 << 12 = 4096/鎖
    item_lock_count = hashsize(power); 
    item_lock_hashpower = power;
    
    //申請(qǐng)鎖
    //之后這個(gè)item_locks鎖并不會(huì)隨著hashtable的擴(kuò)容而擴(kuò)容
    //也就是說(shuō)無(wú)論之后hashtable變成多大,都會(huì)對(duì)應(yīng)這 hashsize(power) -> 4096/鎖 
    //這也會(huì)導(dǎo)致越來(lái)越多的key對(duì)應(yīng)一把鎖
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror("Can't allocate item locks");
        exit(1);
    }
    //循環(huán)初始化item_locks鎖
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }
    
    //.........
}

至此通過(guò)上面的代碼已經(jīng)將 memcache hashtable 初始化完畢

插入 、 擴(kuò)容 hashtable

memcache 的 hashtable 擴(kuò)容處理的方式是,在程序啟動(dòng)階段的時(shí)候開(kāi)一個(gè)線程處于待命狀態(tài),當(dāng)需要擴(kuò)容的時(shí)候觸發(fā)該線程,動(dòng)態(tài)的進(jìn)行擴(kuò)容處理,而且在擴(kuò)容操作的時(shí)候也不是表鎖,而是利用上面說(shuō)的段鎖

擴(kuò)容流程:

在 hashtable 擴(kuò)容的時(shí)候 memcache 會(huì)把當(dāng)前 primary_hashtable (哈希表)復(fù)制一份給 old_hashtable(哈希表),然后對(duì) primary_hashtable 進(jìn)行擴(kuò)容 (1 << hashpower + 1),如果當(dāng)前正處于 hashtable 擴(kuò)容階段, 同時(shí)有請(qǐng)求要訪問(wèn) key,則會(huì)判斷當(dāng)前的 key - hash 之后的索引位置是否小于當(dāng)前遷移的索引位置,如果小于則代表已經(jīng)遷移到新的 primary_hashtable 的索引位置了,如果大于則代表還未遷移到則在新的 primary_hashtable,所以就還在老的 old_hashtable 位置操作,擴(kuò)容完成之后會(huì)釋放 old_hashtable,然后就全部都在 primary_hashtable 操作了。

說(shuō)明:

在遷移 hashtable 的時(shí)候,會(huì)從小到大一個(gè)一個(gè)索引位置進(jìn)行遷移,而這個(gè)索引位置就是hash之后的值,所以在遷移每一個(gè)索引位置的時(shí)候都會(huì)對(duì)當(dāng)前的索引位置加鎖,這個(gè)鎖用的就是上面說(shuō)的段鎖,但可能鎖的數(shù)量有限,會(huì)出現(xiàn)很多索引位置共用同一個(gè)鎖的情況。
例如:
0 & (4096-1) = 0 、4096 & (4096-1) = 0 、8192 & (4096-1) = 0 這幾個(gè)hash索引位置都會(huì)用到 0 這把鎖
如果這個(gè)時(shí)候一個(gè)key正好要訪問(wèn),同時(shí)這個(gè)key-hash之后的值正好跟我們遷移的這個(gè)索引位置對(duì)應(yīng)則會(huì)堵塞,因?yàn)橐呀?jīng)加鎖,其他的key如果不命中我們正在遷移的這個(gè)位置則正常訪問(wèn),至于訪問(wèn) primary_hashtable 還是 old_hashtable 則依據(jù)上面(擴(kuò)容流程)說(shuō)的條件。

插入 hashtable 函數(shù), assoc_insert()

// hv = hash(key, nkey); 哈希值

int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;
    
    //判斷是否正在擴(kuò)容
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        //插入到hashtable
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {
       //插入到hashtable
       it->h_next = primary_hashtable[hv & hashmask(hashpower)];
       primary_hashtable[hv & hashmask(hashpower)] = it;
    }
    
    // 加鎖,防止多個(gè)線程同時(shí)觸發(fā)擴(kuò)容操作
    pthread_mutex_lock(&hash_items_counter_lock);
    hash_items++;
    // 判斷是否需要擴(kuò)容
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        //觸發(fā)擴(kuò)容操作
        assoc_start_expand();
    }
    pthread_mutex_unlock(&hash_items_counter_lock);

    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
    return 1;
}

觸發(fā)擴(kuò)容操作 , assoc_start_expand()

static void assoc_start_expand(void) {
    if (started_expanding)
        return;

    started_expanding = true;
    // 喚醒 hashtable 擴(kuò)容線程
    pthread_cond_signal(&maintenance_cond);
}

hashtable 擴(kuò)容線程函數(shù),assoc_maintenance_thread()

// 默認(rèn)會(huì)堵塞在這個(gè)位置 pthread_cond_wait(&maintenance_cond, &maintenance_lock);
// 直到上面 pthread_cond_signal 喚醒操作, 然后往下執(zhí)行.

static void *assoc_maintenance_thread(void *arg) {

    mutex_lock(&maintenance_lock);
    
    // 死循環(huán)
    while (do_run_maintenance_thread) {
        int ii = 0;

        /* 循環(huán)遷移 old_hashtable ->  primary_hashtable */
        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
            item *it, *next;
            int bucket;
            void *item_lock = NULL;
            
            // 把當(dāng)前正在遷移的索引位置加鎖
            if ((item_lock = item_trylock(expand_bucket))) {
                    // 循環(huán)處理當(dāng)前索引位置的所有item
                    // 因?yàn)樵诋?dāng)前索引位置可能會(huì)存在多個(gè)item
                    // 就是hash沖突鏈?zhǔn)浇鉀Q
                    for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                        // 獲取下一個(gè)item
                        next = it->h_next;
                        // 按照新的hashtable長(zhǎng)度重新定位一個(gè)索引位置
                        bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                        // 賦值保存
                        it->h_next = primary_hashtable[bucket];
                        primary_hashtable[bucket] = it;
                    }
                    // 當(dāng)前的索引位置item都遷移完畢之后,將之前的hashtable索引位置至空
                    old_hashtable[expand_bucket] = NULL;
                    // 更新當(dāng)前索引位置
                    expand_bucket++;
                    // 判斷是否全部遷移完畢
                    if (expand_bucket == hashsize(hashpower - 1)) {
                        expanding = false; // 關(guān)閉正在擴(kuò)容hashtable狀態(tài)
                        free(old_hashtable); // 釋放 old_hashtable
                        STATS_LOCK();
                        stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                        stats.hash_is_expanding = 0;
                        STATS_UNLOCK();
                        if (settings.verbose > 1)
                            fprintf(stderr, "Hash table expansion done\n");
                    }

            } else {
                // 如果加鎖失敗,可能存在別的線程正在操作hashtable當(dāng)前索引位置的item
                // 所以延時(shí)等待,直到搶到鎖為止
                usleep(10*1000);
            }
            // 釋放鎖
            if (item_lock) {
                item_trylock_unlock(item_lock);
                item_lock = NULL;
            }
        }

        if (!expanding) {
            /* We are done expanding.. just wait for next invocation */
            started_expanding = false;
            // 等待喚醒
            pthread_cond_wait(&maintenance_cond, &maintenance_lock);
            pause_threads(PAUSE_ALL_THREADS);
            // 擴(kuò)容 hashtable 操作
            assoc_expand();
            //往下執(zhí)行while循環(huán)
            pause_threads(RESUME_ALL_THREADS);
        }
    }
    return NULL;
}

擴(kuò)容 hashtable 函數(shù),assoc_expand()

static void assoc_expand(void) {
    // 保存一份當(dāng)前的 hashtable
    old_hashtable = primary_hashtable;
    
    // 擴(kuò)容,每次擴(kuò)容 hashpower + 1 
    // (1 << 16) = 65536、(1 << 17) = 131072
    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++; // hashpower++
        expanding = true; // 表示當(dāng)前正在擴(kuò)容
        expand_bucket = 0; // 當(dāng)前遷移到primary_hashtable索引位置
        STATS_LOCK();
        stats.hash_power_level = hashpower;
        stats.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats.hash_is_expanding = 1;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}

查找 hashtable 函數(shù),assoc_find()

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;
    unsigned int oldbucket;
    
    // 是否正處于擴(kuò)容操作
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        // 獲取item
        it = old_hashtable[oldbucket];
    } else {
        // 獲取item
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

    item *ret = NULL;
    int depth = 0;
    //循環(huán)判斷,因?yàn)閔ash沖突采用鏈?zhǔn)浇鉀Q法,所以需要遍歷這個(gè)鏈,找到key相等的
    while (it) {
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}

結(jié)束

以上就是 Memcache 的哈希表以及對(duì)應(yīng)的操作,由于是多線程模型,所以無(wú)論是 (插入 、查找、 擴(kuò)容) 都會(huì)對(duì)當(dāng)前操作的key對(duì)應(yīng)的索引位置加鎖 item_locks,所以隨著 hashtable 不斷的擴(kuò)大,鎖的爭(zhēng)搶也會(huì)越來(lái)越大,性能可能也會(huì)存在一些影響.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容