Memcached源碼分析 - 數(shù)據(jù)存儲(3)

Memcached源碼分析 - 網(wǎng)絡(luò)模型(1)
Memcached源碼分析 - 命令解析(2)
Memcached源碼分析 - 數(shù)據(jù)存儲(3)
Memcached源碼分析 - 增刪改查操作(4)
Memcached源碼分析 - 內(nèi)存存儲機制Slabs(5)
Memcached源碼分析 - LRU淘汰算法(6)
Memcached源碼分析 - 消息回應(yīng)(7)

開篇

?這篇文章的目的主要是為了講清楚Memcached內(nèi)部數(shù)據(jù)存儲結(jié)構(gòu),以及基于該存儲結(jié)構(gòu)的增刪改查操作過程。
?基于一貫的風(fēng)格,很多內(nèi)容參考了大神前輩的文章,都一一在參考文章當(dāng)中列出來了。


數(shù)據(jù)存儲結(jié)構(gòu)

Memcached存儲結(jié)構(gòu)圖

存儲結(jié)構(gòu)圖.png

說明:

    1. Memcached在啟動的時候,會默認(rèn)初始化一個HashTable,這個table的默認(rèn)長度為65536。
    1. 我們將這個HashTable中的每一個元素稱為桶,每個桶就是一個item結(jié)構(gòu)的單向鏈表。
    1. Memcached會將key值hash成一個變量名稱為hv的uint32_t類型的值。
    1. 通過hv與桶的個數(shù)之間的按位與計算,hv & hashmask(hashpower),就可以得到當(dāng)前的key會落在哪個桶上面。
    1. 然后會將item掛到這個桶的鏈表上面。鏈表主要是通過item結(jié)構(gòu)中的h_next實現(xiàn)。


Item的存儲結(jié)構(gòu)

Item的存儲結(jié)構(gòu).png

說明:

  • 1.item的結(jié)構(gòu)分兩部分, 第一部分定義 item 結(jié)構(gòu)的屬性, 包括連接其它 item 的指針 (next, prev),還有最近訪問時間(time), 過期的時間(exptime), 以及數(shù)據(jù)部分的大小, 標(biāo)志位, key的長度, 引用次數(shù), 以及 item 是從哪個 slabclass 分配而來。

  • 2.第二部分是 item 的數(shù)據(jù), 由 CAS, key, suffix, value 組成。 item 結(jié)構(gòu)體的定義使用了一個常用的技巧: 定義空數(shù)組 data, 用來指向 item 數(shù)據(jù)部分的首地址, 使用空數(shù)組的好處是 data 指針本身不占用任何存儲空間, 為 item 分配存儲空間后, data 自然而然就指向數(shù)據(jù)部分的首地址.


Memcached存儲結(jié)構(gòu)源碼分析

?assoc_init負(fù)責(zé)初始化hashtable數(shù)據(jù)結(jié)構(gòu),通過初始化hashsize(hashpower)大小的數(shù)組指針,默認(rèn)應(yīng)該是2*16次方大小的數(shù)組。

#define HASHPOWER_DEFAULT 16
unsigned int hashpower = HASHPOWER_DEFAULT;
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)

void assoc_init(const int hashtable_init) {
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }
    STATS_LOCK();
    stats_state.hash_power_level = hashpower;
    stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
    STATS_UNLOCK();
}

?Memcached存儲數(shù)據(jù)結(jié)構(gòu)item定義,item的結(jié)構(gòu)分兩部分, 第一部分定義 item 結(jié)構(gòu)的屬性,第二部分是 item 的數(shù)據(jù)。

/**
 * Structure for storing items within memcached.
 */
typedef struct _stritem {
    /* Protected by LRU locks */
    struct _stritem *next;
    struct _stritem *prev;
    /* Rest are protected by an item lock */
    struct _stritem *h_next;    /* hash chain next */
    rel_time_t      time;       /* least recent access */
    rel_time_t      exptime;    /* expire time */
    int             nbytes;     /* size of data */
    unsigned short  refcount;
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* ITEM_* above */
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;


數(shù)據(jù)增刪改查過程

數(shù)據(jù)查找過程

  1. 首先通過key的hash值hv找到對應(yīng)的桶,區(qū)分是否在擴容。 primary_hashtable[hv & hashmask(hashpower)];
  2. 然后遍歷桶的單鏈表,比較key值并找到對應(yīng)item。
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;
    unsigned int oldbucket;

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it = old_hashtable[oldbucket];
    } else {
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

    item *ret = NULL;
    int depth = 0;
    while (it) {
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}


數(shù)據(jù)插入過程

  1. 首先通過key的hash值hv找到對應(yīng)的桶。
  2. 然后將item放到對應(yīng)桶的單鏈表的頭部
int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;

//    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }

    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
    return 1;
}


數(shù)據(jù)刪除過程

  1. 首先通過key的hash值hv找到對應(yīng)的桶。
  2. 找到桶對應(yīng)的鏈表,遍歷單鏈表,刪除對應(yīng)的Item。
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
    item **pos;
    unsigned int oldbucket;

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        pos = &old_hashtable[oldbucket];
    } else {
        pos = &primary_hashtable[hv & hashmask(hashpower)];
    }

    while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
        pos = &(*pos)->h_next;
    }
    return pos;
}



void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
    item **before = _hashitem_before(key, nkey, hv);

    if (*before) {
        item *nxt;

        //因為before是一個二級指針,其值為所查找item的前驅(qū)item的h_next成員地址.
        //所以*before指向的是所查找的item.因為before是一個二級指針,所以
        //*before作為左值時,可以給h_next成員變量賦值。所以下面三行代碼是
        //使得刪除中間的item后,前后的item還能連得起來。

        MEMCACHED_ASSOC_DELETE(key, nkey);
        nxt = (*before)->h_next;
        (*before)->h_next = 0;   /* probably pointless, but whatever. */
        *before = nxt;
        return;
    }
    /* Note:  we never actually get here.  the callers don't delete things
       they can't find. */
    assert(*before != 0);
}


數(shù)據(jù)擴容過程

1.數(shù)據(jù)擴容過程是由一個單獨線程在檢測是否需要擴容,擴容的前提條件是curr_items > (hashsize(hashpower) * 3) / 2,也就是說數(shù)據(jù)量是原來的1.5倍。
2.檢測需要擴容后通過信號通知pthread_cond_signal(&maintenance_cond)開始執(zhí)行擴容。
3、以2倍的擴容速度進行擴容,primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *))。
4、遷移過程是一個逐步遷移過程,每次都只遷移一個桶里面的Item數(shù)據(jù)。

/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
    old_hashtable = primary_hashtable;

    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++;
        expanding = true;
        expand_bucket = 0;
        STATS_LOCK();
        stats_state.hash_power_level = hashpower;
        stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats_state.hash_is_expanding = true;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}


int start_assoc_maintenance_thread() {
    int ret;
    char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
    if (env != NULL) {
        hash_bulk_move = atoi(env);
        if (hash_bulk_move == 0) {
            hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
        }
    }
    pthread_mutex_init(&maintenance_lock, NULL);
    if ((ret = pthread_create(&maintenance_tid, NULL,
                              assoc_maintenance_thread, NULL)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
        return -1;
    }
    return 0;
}


static void *assoc_maintenance_thread(void *arg) {

    mutex_lock(&maintenance_lock);
    while (do_run_maintenance_thread) {
        int ii = 0;

        //hash_bulk_move用來控制每次遷移,移動多少個桶的item。默認(rèn)是一個.
        //如果expanding為true才會進入循環(huán)體,所以遷移線程剛創(chuàng)建的時候,并不會進入循環(huán)體
        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
            item *it, *next;
            unsigned int bucket;
            void *item_lock = NULL;

           
            if ((item_lock = item_trylock(expand_bucket))) {
                    for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                        next = it->h_next;
                        bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                        it->h_next = primary_hashtable[bucket];
                        primary_hashtable[bucket] = it;
                    }

                    old_hashtable[expand_bucket] = NULL;

                    expand_bucket++;
                    if (expand_bucket == hashsize(hashpower - 1)) {
                        expanding = false;
                        free(old_hashtable);
                        STATS_LOCK();
                        stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                        stats_state.hash_is_expanding = false;
                        STATS_UNLOCK();
                        if (settings.verbose > 1)
                            fprintf(stderr, "Hash table expansion done\n");
                    }

            } else {
                usleep(10*1000);
            }

            if (item_lock) {
                item_trylock_unlock(item_lock);
                item_lock = NULL;
            }
        }

        if (!expanding) {

            started_expanding = false;
            pthread_cond_wait(&maintenance_cond, &maintenance_lock);

            pause_threads(PAUSE_ALL_THREADS);
            assoc_expand();
            pause_threads(RESUME_ALL_THREADS);
        }
    }
    return NULL;
}



/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
    old_hashtable = primary_hashtable;

    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++;
        expanding = true;
        expand_bucket = 0;
        STATS_LOCK();
        stats_state.hash_power_level = hashpower;
        stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats_state.hash_is_expanding = true;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}


void assoc_start_expand(uint64_t curr_items) {
    if (started_expanding)
        return;

    if (curr_items > (hashsize(hashpower) * 3) / 2 &&
          hashpower < HASHPOWER_MAX) {
        started_expanding = true;
        pthread_cond_signal(&maintenance_cond);
    }
}


參考文章

Memcached源碼分析之內(nèi)存管理篇之item結(jié)構(gòu)圖及slab結(jié)構(gòu)圖
Memcached源碼分析 - Memcached源碼分析之HashTable(4)
memcached源碼分析-----哈希表基本操作以及擴容過程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 分布式緩存技術(shù)PK:選擇Redis還是Memcached? 經(jīng)平臺同意授權(quán)轉(zhuǎn)載 作者:田京昆(騰訊后臺研發(fā)工程師)...
    meng_philip123閱讀 69,044評論 7 60
  • HashMap 是 Java 面試必考的知識點,面試官從這個小知識點就可以了解我們對 Java 基礎(chǔ)的掌握程度。網(wǎng)...
    野狗子嗷嗷嗷閱讀 6,810評論 9 107
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,303評論 2 89
  • 媽媽推門進來 端來一杯茶水 挨坐在我床頭 輕聲的問了一句 “怎么不來看節(jié)目?” 又轉(zhuǎn)眼看到我手中的書卷 “在嚼面包...
    月亭閱讀 412評論 4 5
  • 又見到那扇門、那扇窗 草衫的屋頂、泥胚的墻 路上沒有車輛 空氣中裝滿了青草的香 書包里有幾個煎餅和幾塊糖 我穿著打...
    紅塵游記閱讀 178評論 0 0

友情鏈接更多精彩內(nèi)容