Memcached源碼分析 - 增刪改查操作(4)

Memcached源碼分析 - 網(wǎng)絡(luò)模型(1)
Memcached源碼分析 - 命令解析(2)
Memcached源碼分析 - 數(shù)據(jù)存儲(chǔ)(3)
Memcached源碼分析 - 增刪改查操作(4)
Memcached源碼分析 - 內(nèi)存存儲(chǔ)機(jī)制Slabs(5)
Memcached源碼分析 - LRU淘汰算法(6)
Memcached源碼分析 - 消息回應(yīng)(7)

開篇

?這篇文章的目的是起到以個(gè)承前啟后的作用,承前是指命令解析,啟后是指數(shù)據(jù)存儲(chǔ)
?這篇文章是指我們?cè)谕瓿擅罱馕龊蠼?jīng)過這篇文章介紹的操作最后通過數(shù)據(jù)存儲(chǔ)里面提到的底層存儲(chǔ)完成數(shù)據(jù)的操作。


Memcached的增刪改查操作源碼分析

增加改查入口

?在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之后,就通過process_command方法將不同操作類型的命令進(jìn)行分發(fā)。

static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;

    assert(c != NULL);

    MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);

    if (settings.verbose > 1)
        fprintf(stderr, "<%d %s\n", c->sfd, command);

    /*
     * for commands set/add/replace, we build an item and read the data
     * directly into it, then continue in nread_complete().
     */

    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    if (add_msghdr(c) != 0) {
        out_of_memory(c, "SERVER_ERROR out of memory preparing response");
        return;
    }

    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

        process_get_command(c, tokens, ntokens, false, false);

    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

        process_update_command(c, tokens, ntokens, comm, false);

    } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {

        process_get_command(c, tokens, ntokens, true, false);

    } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {

        process_arithmetic_command(c, tokens, ntokens, 0);

    } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {

        process_delete_command(c, tokens, ntokens);

    } 

    // 省略很多其他代碼,具體可以去源碼看,我們這里只關(guān)注讀操作和寫操作
    return;
}


增/改 set add replace 操作

set key flags exptime vlen
value

例子
set username 0 10 9
woshishen

set:操作方法名稱
key:緩存的key
flags:緩存標(biāo)識(shí)
exptime:緩存時(shí)間,0 - 不過期
vlen:緩存value的長(zhǎng)度
value:緩存的值,一般會(huì)在第二行。

說明:
?Memcached一般會(huì)通過\n符號(hào)去分隔每個(gè)命令行語句,然后通過空格將一行命令切割成N個(gè)元素,元素會(huì)放進(jìn)一個(gè)tokens的數(shù)組中。
?這邊我們可以看到,set命令會(huì)分層兩部分:命令行部分和Value值部分:
?1. Memcached會(huì)先去解析命令行部分,并且命令行部分中帶上了vlen,就可以知道value的長(zhǎng)度,然后就會(huì)去初始化一個(gè)Item的數(shù)據(jù)結(jié)構(gòu),用于存放緩存數(shù)據(jù)。
?2. 命令行部分解析完畢,Memcached會(huì)去繼續(xù)讀取Socket中的剩余數(shù)據(jù)報(bào)文,邊讀取邊復(fù)制到Item的數(shù)據(jù)結(jié)構(gòu)中,直到讀取到的Value數(shù)據(jù)長(zhǎng)度和命令行中的vlen長(zhǎng)度一致的時(shí)候才會(huì)結(jié)束。然后會(huì)去存儲(chǔ)item,如果item存儲(chǔ)成功,則會(huì)將item掛到HashTable和LRU鏈上面;如果存儲(chǔ)失敗,則會(huì)刪除item。


process_update_command方法分析

?核心點(diǎn)在于item_alloc申請(qǐng)item存儲(chǔ)空間,通過 conn_set_state(c, conn_nread)的進(jìn)入value讀取的流程。
?申請(qǐng)item的存儲(chǔ)空間過程比較復(fù)雜后面會(huì)有單獨(dú)的文章進(jìn)行講解,這里只需要知道我們申請(qǐng)了item的空間并讀取value值生成item數(shù)據(jù)結(jié)構(gòu)即可。

/*********************************
新增、編輯操作
看一個(gè)set操作的命令
命令:
set key flags exptime vlen
value
其中vlen為緩存數(shù)據(jù)長(zhǎng)度
flages 為標(biāo)志
exptime為過期時(shí)間,0 不過期
value 為需要緩存的數(shù)據(jù),value一般都會(huì)在第二行
例如
set username 0 10 9
woshishen
**************************************/

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    char *key;
    size_t nkey;
    unsigned int flags;
    int32_t exptime_int = 0;
    time_t exptime;
    int vlen;
    uint64_t req_cas_id=0;
    item *it;

    assert(c != NULL);

    set_noreply_maybe(c, tokens, ntokens);

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

    //檢查參數(shù)的合法性,解析類似于value長(zhǎng)度的變量值vlen等
    if (!(safe_strtoul(tokens[2].value, (uint32_t *) &flags)
            && safe_strtol(tokens[3].value, &exptime_int)
            && safe_strtol(tokens[4].value, (int32_t *) &vlen))) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }


    exptime = exptime_int;

    if (exptime < 0)
        exptime = REALTIME_MAXDELTA + 1;

    //這邊為何vlen要+2呢?
    //因?yàn)関alue存儲(chǔ)的時(shí)候,每次在數(shù)據(jù)結(jié)尾都會(huì)加上/r/n
    //加上/r/n后,客戶端獲取數(shù)據(jù)就可以通過\r\n來分割 數(shù)據(jù)報(bào)文
    vlen += 2;

    //item_alloc是最核心的方法,item_alloc主要就是去分配一個(gè)item
    //結(jié)構(gòu)用于存儲(chǔ)需要緩存的信息
    //key:緩存的key
    //nkey:緩存的長(zhǎng)度
    //flags:標(biāo)識(shí)
    //exptime:過期時(shí)間
    //vlen:緩存value的長(zhǎng)度
    //這邊你可能有疑問了?為何這邊只傳遞了vlen,緩存數(shù)據(jù)的字節(jié)長(zhǎng)度,而沒有value的值呢?
    //1. 因?yàn)閟et/add/replace等這些命令,會(huì)將命令行和數(shù)據(jù)行分為兩行傳輸
    //2. 而我們首選會(huì)去解析命令行,命令行中需要包括緩存數(shù)據(jù)value的長(zhǎng)度,這樣我們就可以根據(jù)長(zhǎng)度去預(yù)先分配內(nèi)存空間
    //3. 然后我們繼續(xù)取解析數(shù)據(jù)行。因?yàn)榫彺娴臄?shù)據(jù)一般都比較長(zhǎng),TCP發(fā)送會(huì)有粘包和拆包的情況,需要接收多次后才能接收到
    //完整的數(shù)據(jù),所以會(huì)在命令行中先傳遞一個(gè)value的長(zhǎng)度值,這樣就可以在解析命令行的過程中預(yù)先分配存儲(chǔ)的空間,等接收完
    //value的數(shù)據(jù)后,存儲(chǔ)到內(nèi)存空間即可。
    //4. 此函數(shù)最后一行:conn_set_state(c, conn_nread); 就是跳轉(zhuǎn)到conn_nread這個(gè)狀態(tài)中,而conn_nread
    //就是用來讀取value的緩存數(shù)據(jù)的
    it = item_alloc(key, nkey, flags, realtime(exptime), vlen);

    
    ITEM_set_cas(it, req_cas_id);

    c->item = it;
    c->ritem = ITEM_data(it);
    c->rlbytes = it->nbytes;    //value的長(zhǎng)度
    c->cmd = comm;
    conn_set_state(c, conn_nread);
}


item_alloc過程

item數(shù)據(jù)結(jié)構(gòu).png

?item_alloc的過程主要是分配item的內(nèi)存空間,在do_item_alloc方法中實(shí)現(xiàn),核心步驟主要是計(jì)算空間大小,以及申請(qǐng)item對(duì)應(yīng)的內(nèi)存。

  • item_make_header用于計(jì)算空間大小
  • do_item_alloc_pull用于申請(qǐng)item對(duì)應(yīng)的空間
  • do_item_alloc_pull的內(nèi)部邏輯按照先嘗試申請(qǐng)內(nèi)存,申請(qǐng)失敗通過LRU策略選擇淘汰內(nèi)存后使用。
//item的具體結(jié)構(gòu)
typedef struct _stritem {
    //鏈表結(jié)構(gòu):記錄下一個(gè)item的地址
    struct _stritem *next;  //下一個(gè)結(jié)構(gòu)
    //鏈表結(jié)構(gòu):記錄前一個(gè)Item的地址
    struct _stritem *prev;  //前一個(gè)結(jié)構(gòu)
    struct _stritem *h_next; //hashtable的list   /* hash chain next */
    //最近一次的訪問時(shí)間
    rel_time_t      time;       /* least recent access */
    //過期時(shí)間
    rel_time_t      exptime;    /* expire time */
    //value數(shù)據(jù)大小
    int             nbytes;     /* size of data */
    unsigned short  refcount;
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* ITEM_* above */
    //slab class的ID,在哪個(gè)slab class上
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    //存儲(chǔ)數(shù)據(jù)的
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;


item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
    item *it;
    /* do_item_alloc handles its own locks */
    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
    return it;
}

item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
                    const rel_time_t exptime, const int nbytes) {
    uint8_t nsuffix;
    item *it = NULL;
    char suffix[40];
    // Avoid potential underflows.
    if (nbytes < 2)
        return 0;

    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }

    unsigned int id = slabs_clsid(ntotal);
    unsigned int hdr_id = 0;
   
    if (ntotal > settings.slab_chunk_size_max) {
        // 省略相關(guān)代碼
    } else {
        it = do_item_alloc_pull(ntotal, id);
    }

    if (it == NULL) {
        pthread_mutex_lock(&lru_locks[id]);
        itemstats[id].outofmemory++;
        pthread_mutex_unlock(&lru_locks[id]);
        return NULL;
    }

    assert(it->slabs_clsid == 0);
    //assert(it != heads[id]);

    /* Refcount is seeded to 1 by slabs_alloc() */
    it->next = it->prev = 0;

    /* Items are initially loaded into the HOT_LRU. This is '0' but I want at
     * least a note here. Compiler (hopefully?) optimizes this out.
     */
    if (settings.temp_lru &&
            exptime - current_time <= settings.temporary_ttl) {
        id |= TEMP_LRU;
    } else if (settings.lru_segmented) {
        id |= HOT_LRU;
    } else {
        /* There is only COLD in compat-mode */
        id |= COLD_LRU;
    }
    it->slabs_clsid = id;

    DEBUG_REFCNT(it, '*');
    it->it_flags |= settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    if (settings.inline_ascii_response) {
        memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    } else if (nsuffix > 0) {
        memcpy(ITEM_suffix(it), &flags, sizeof(flags));
    }
    it->nsuffix = nsuffix;

    /* Initialize internal chunk. */
    if (it->it_flags & ITEM_CHUNKED) {
        item_chunk *chunk = (item_chunk *) ITEM_data(it);

        chunk->next = 0;
        chunk->prev = 0;
        chunk->used = 0;
        chunk->size = 0;
        chunk->head = it;
        chunk->orig_clsid = hdr_id;
    }
    it->h_next = 0;

    return it;
}


/**
 * Generates the variable-sized part of the header for an object.
 *
 * key     - The key
 * nkey    - The length of the key
 * flags   - key flags
 * nbytes  - Number of bytes to hold value and addition CRLF terminator
 * suffix  - Buffer for the "VALUE" line suffix (flags, size).
 * nsuffix - The length of the suffix is stored here.
 *
 * Returns the total size of the header.
 */
static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
                     char *suffix, uint8_t *nsuffix) {
    if (settings.inline_ascii_response) {
        /* suffix is defined at 40 chars elsewhere.. */
        *nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
    } else {
        if (flags == 0) {
            *nsuffix = 0;
        } else {
            *nsuffix = sizeof(flags);
        }
    }
    return sizeof(item) + nkey + *nsuffix + nbytes;
}


item *do_item_alloc_pull(const size_t ntotal, const unsigned int id) {
    item *it = NULL;
    int i;

    for (i = 0; i < 10; i++) {
        uint64_t total_bytes;
        /* Try to reclaim memory first */
        if (!settings.lru_segmented) {
            lru_pull_tail(id, COLD_LRU, 0, 0, 0, NULL);
        }
        it = slabs_alloc(ntotal, id, &total_bytes, 0);

        if (settings.temp_lru)
            total_bytes -= temp_lru_size(id);

        if (it == NULL) {
            if (lru_pull_tail(id, COLD_LRU, total_bytes, LRU_PULL_EVICT, 0, NULL) <= 0) {
                if (settings.lru_segmented) {
                    lru_pull_tail(id, HOT_LRU, total_bytes, 0, 0, NULL);
                } else {
                    break;
                }
            }
        } else {
            break;
        }
    }

    if (i > 0) {
        pthread_mutex_lock(&lru_locks[id]);
        itemstats[id].direct_reclaims += i;
        pthread_mutex_unlock(&lru_locks[id]);
    }

    return it;
}


conn_nread過程

?conn_nread的內(nèi)部通過循環(huán)讀取指定長(zhǎng)度的value值,讀取完畢后執(zhí)行complete_nread操作。

case conn_nread:
            if (c->rlbytes == 0) {
                complete_nread(c);
                break;
            }

            /* Check if rbytes < 0, to prevent crash */
            if (c->rlbytes < 0) {
                if (settings.verbose) {
                    fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);
                }
                conn_set_state(c, conn_closing);
                break;
            }

            if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
                /* first check if we have leftovers in the conn_read buffer */
                if (c->rbytes > 0) {
                    int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
                    if (c->ritem != c->rcurr) {
                        memmove(c->ritem, c->rcurr, tocopy);
                    }
                    c->ritem += tocopy;
                    c->rlbytes -= tocopy;
                    c->rcurr += tocopy;
                    c->rbytes -= tocopy;
                    if (c->rlbytes == 0) {
                        break;
                    }
                }

                /*  now try reading from the socket */
                res = read(c->sfd, c->ritem, c->rlbytes);
                if (res > 0) {
                    pthread_mutex_lock(&c->thread->stats.mutex);
                    c->thread->stats.bytes_read += res;
                    pthread_mutex_unlock(&c->thread->stats.mutex);
                    if (c->rcurr == c->ritem) {
                        c->rcurr += res;
                    }
                    c->ritem += res;
                    c->rlbytes -= res;
                    break;
                }
            } else {
                res = read_into_chunked_item(c);
                if (res > 0)
                    break;
            }

            if (res == 0) { /* end of stream */
                conn_set_state(c, conn_closing);
                break;
            }

            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
                if (!update_event(c, EV_READ | EV_PERSIST)) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Couldn't update event\n");
                    conn_set_state(c, conn_closing);
                    break;
                }
                stop = true;
                break;
            }

            /* Memory allocation failure */
            if (res == -2) {
                out_of_memory(c, "SERVER_ERROR Out of memory during read");
                c->sbytes = c->rlbytes;
                c->write_and_go = conn_swallow;
                break;
            }
            /* otherwise we have a real error, on which we close the connection */
            if (settings.verbose > 0) {
                fprintf(stderr, "Failed to read, and not due to blocking:\n"
                        "errno: %d %s \n"
                        "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
                        errno, strerror(errno),
                        (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
                        (int)c->rlbytes, (int)c->rsize);
            }
            conn_set_state(c, conn_closing);
            break;


complete_nread過程

?我們關(guān)注complete_nread_ascii方法,通過store_item保存數(shù)據(jù)。

  • 計(jì)算key所屬的hash桶的位置
  • do_item_link當(dāng)中調(diào)用assoc_insert添加到hash當(dāng)中,通過item_link_q將item掛到LRU鏈上面,通過refcount_incr增加引用計(jì)數(shù)。
static void complete_nread(conn *c) {
    assert(c != NULL);
    assert(c->protocol == ascii_prot
           || c->protocol == binary_prot);

    if (c->protocol == ascii_prot) {
        complete_nread_ascii(c);
    } else if (c->protocol == binary_prot) {
        complete_nread_binary(c);
    }
}


static void complete_nread_ascii(conn *c) {
    assert(c != NULL);

    item *it = c->item;
    int comm = c->cmd;
    enum store_item_type ret;
    bool is_valid = false;

    pthread_mutex_lock(&c->thread->stats.mutex);
    c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
    pthread_mutex_unlock(&c->thread->stats.mutex);

    // 省略很多代碼

    if (!is_valid) {
        out_string(c, "CLIENT_ERROR bad data chunk");
    } else {
      ret = store_item(it, comm, c);

      switch (ret) {
      case STORED:
          out_string(c, "STORED");
          break;
      case EXISTS:
          out_string(c, "EXISTS");
          break;
      case NOT_FOUND:
          out_string(c, "NOT_FOUND");
          break;
      case NOT_STORED:
          out_string(c, "NOT_STORED");
          break;
      default:
          out_string(c, "SERVER_ERROR Unhandled storage type.");
      }

    }

    //這邊竟然刪除這個(gè)Item?你不覺得奇怪么?
    //我們知道刪除item是需要通過判斷item->refcount,引用的次數(shù)
    //我們?cè)赼lloc一個(gè)item的時(shí)候slabs_alloc,refcount會(huì)默認(rèn)設(shè)置為1
    //
    //當(dāng)我們調(diào)用store_item,add/set/replace/prepend/append等操作成功的時(shí)候,會(huì)調(diào)用do_item_link
    //這個(gè)方法,這個(gè)方法會(huì)將refcount設(shè)置為2,則再次去刪除item的時(shí)候判斷引用次數(shù)
    //if (refcount_decr(&it->refcount) == 0) 就不會(huì)被刪除
    //
    //如果我們調(diào)用store_item,發(fā)現(xiàn)存儲(chǔ)失敗了,這個(gè)時(shí)候因?yàn)橐么螖?shù)為1,而且我們的確需要?jiǎng)h除這個(gè)item,則刪除這個(gè)item
    //
    //很繞的邏輯,但是很巧妙
    item_remove(c->item);       /* release the c->item reference */
    c->item = 0;
}



enum store_item_type store_item(item *item, int comm, conn* c) {
    enum store_item_type ret;
    uint32_t hv;

    hv = hash(ITEM_key(item), item->nkey);
    item_lock(hv);
    ret = do_store_item(item, comm, c, hv);
    item_unlock(hv);
    return ret;
}


enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
    char *key = ITEM_key(it);

    //通過KEY找到舊的item
    //add/set/replace/prepend/append等都會(huì)先創(chuàng)建一個(gè)新的item
    
    item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
    enum store_item_type stored = NOT_STORED;

    item *new_it = NULL;
    uint32_t flags;

    //ADD操作,要保證ITEM不存在的情況下才能成功
    //如果ADD操作,發(fā)現(xiàn)item已經(jīng)存在,則返回NOT_STORED
    if (old_it != NULL && comm == NREAD_ADD) {
        //這邊為何要更新item,有兩個(gè)原因:
        //1.更新當(dāng)前item的it->time時(shí)間,并且重建LRU鏈的順序
        //2.這邊代碼后邊會(huì)去執(zhí)行do_item_remove操作,每次remove操作都會(huì)判斷it->refcount
        //如果引用次數(shù)減去1,則需要被刪除。這邊重建LRU鏈之后,it->refcount=2,所有old_it不會(huì)被刪除
        do_item_update(old_it);
    } else if (!old_it && (comm == NREAD_REPLACE
        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
    {
    //replace/prepend/append 等操作,是需要item已經(jīng)存在的情況下操作做處理
    //如果item不存在,則返回NOT_STORED

    } else if (comm == NREAD_CAS) {
        if(old_it == NULL) {
            // LRU expired
            stored = NOT_FOUND;
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.cas_misses++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
        }
        else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
            pthread_mutex_unlock(&c->thread->stats.mutex);

            STORAGE_delete(c->thread->storage, old_it);
            item_replace(old_it, it, hv);
            stored = STORED;
        } else {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
            pthread_mutex_unlock(&c->thread->stats.mutex);

            if(settings.verbose > 1) {
                fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
                        (unsigned long long)ITEM_get_cas(old_it),
                        (unsigned long long)ITEM_get_cas(it));
            }
            stored = EXISTS;
        }
    } else {
        int failed_alloc = 0;
        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
            if (ITEM_get_cas(it) != 0) {
                if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
                    stored = EXISTS;
                }
            }

            if (stored == NOT_STORED) 
                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
                if (new_it == NULL || _store_item_copy_data(comm, old_it, new_it, it) == -1) {
                    failed_alloc = 1;
                    stored = NOT_STORED;
                    if (new_it != NULL)
                        item_remove(new_it);
                } else {
                    it = new_it;
                }
            }
        }

        if (stored == NOT_STORED && failed_alloc == 0) {
            if (old_it != NULL) {
                STORAGE_delete(c->thread->storage, old_it);
                item_replace(old_it, it, hv);
            } else {
                do_item_link(it, hv);
            }

            c->cas = ITEM_get_cas(it);

            stored = STORED;
        }

        //說明:
        //這邊代碼注解中為何一次又一次提到it->refcount?
        //1. 因?yàn)閕t->refcount代表的是引用次數(shù),防止不同線程刪除item
        //2. do_item_remove操作前會(huì)去判斷it->refcount減一后,變成0,則會(huì)刪除這個(gè)ITEM
        //
        //在調(diào)用do_store_item方法之后,memcached會(huì)去調(diào)用do_item_remove(it)的操作。
        //do_item_remove操作主要是將item生成后,結(jié)果SET/ADD等操作失敗的情況,會(huì)去將已經(jīng)分配好的item刪除
        //如果SET和ADD操作成功,一般都會(huì)調(diào)用do_item_link這個(gè)方法會(huì)將item的refcount值加上1,變成2,當(dāng)
        //再次調(diào)用do_item_remove(it);操作的時(shí)候,因?yàn)橐么螖?shù)大于0而不會(huì)被刪除
        //這邊的代碼塊,真心很繞.....

    }

    if (old_it != NULL)
        do_item_remove(old_it);         /* release our reference */
    if (new_it != NULL)
        do_item_remove(new_it);

    if (stored == STORED) {
        c->cas = ITEM_get_cas(it);
    }

    return stored;
}



int do_item_link(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
    it->it_flags |= ITEM_LINKED;
    it->time = current_time;

    STATS_LOCK();
    stats_state.curr_bytes += ITEM_ntotal(it);
    stats_state.curr_items += 1;
    stats.total_items += 1;
    STATS_UNLOCK();

    /* Allocate a new CAS ID on link. */
    ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
    assoc_insert(it, hv);
    item_link_q(it);
    refcount_incr(it);
    item_stats_sizes_add(it);

    return 1;
}


查詢 get 操作

查詢操作主要看下process_get_command方法,該方法主要作用:

  1. 分解get命令,主要是在do/while循環(huán)當(dāng)中,就是tokenize_command方法。
  2. 通過key去HashTable上找到item的地址值,limited_get方法內(nèi)部去get值。
  3. 返回找到的item數(shù)據(jù)值,*(c->ilist + i) = it添加找到的item。
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas, bool should_touch) {

    do {
        while(key_token->length != 0) {

            key = key_token->value;
            nkey = key_token->length;

            it = limited_get(key, nkey, c, exptime, should_touch);
            
            if (it) {
                if (return_cas || !settings.inline_ascii_response)
                { //省略相關(guān)代碼 
                } else {
                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
                                        it->nbytes, ITEM_get_cas(it));
                  if (add_iov(c, "VALUE ", 6) != 0 ||
                      add_iov(c, ITEM_key(it), it->nkey) != 0)
                      {
                          item_remove(it);
                          goto stop;
                      }
                }

                pthread_mutex_lock(&c->thread->stats.mutex);
                if (should_touch) {
                    c->thread->stats.touch_cmds++;
                    c->thread->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
                } else {
                    c->thread->stats.lru_hits[it->slabs_clsid]++;
                    c->thread->stats.get_cmds++;
                }
                pthread_mutex_unlock(&c->thread->stats.mutex);

                *(c->ilist + i) = it;
                i++;
            } else {
                pthread_mutex_lock(&c->thread->stats.mutex);
                if (should_touch) {
                    c->thread->stats.touch_cmds++;
                    c->thread->stats.touch_misses++;
                } else {
                    c->thread->stats.get_misses++;
                    c->thread->stats.get_cmds++;
                }
                MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
                pthread_mutex_unlock(&c->thread->stats.mutex);
            }

            key_token++;
        }
    } while(key_token->value != NULL);

stop:
    c->icurr = c->ilist;
    c->ileft = i;
    if (return_cas || !settings.inline_ascii_response) {
        c->suffixcurr = c->suffixlist;
        c->suffixleft = si;
    }

    if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
        || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
         // 省略相關(guān)代碼
    } else {
        conn_set_state(c, conn_mwrite);
        c->msgcurr = 0;
    }
}


------------------華麗麗的分割線------------------------------

static inline item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch) {
    item *it;
    if (should_touch) {
        it = item_touch(key, nkey, exptime, c);
    } else {
        it = item_get(key, nkey, c, DO_UPDATE);
    }
    if (it && it->refcount > IT_REFCOUNT_LIMIT) {
        item_remove(it);
        it = NULL;
    }
    return it;
}

item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
    item *it;
    uint32_t hv;
    hv = hash(key, nkey);
    item_lock(hv);
    it = do_item_get(key, nkey, hv, c, do_update);
    item_unlock(hv);
    return it;
}

item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
    item *it = assoc_find(key, nkey, hv);

    // 省略其他相關(guān)代碼,關(guān)注assoc_find就可以了。
}


刪除 delete 操作

刪除操作主要看process_delete_command方法:

  1. 先查詢item是否存在
  2. 如果存在則刪除item,不存在,則返回NOT FOUND

static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
    char *key;
    size_t nkey;
    item *it;

    assert(c != NULL);

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

    if(nkey > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    it = item_get(key, nkey, c, DONT_UPDATE);
    if (it) {
        pthread_mutex_lock(&c->thread->stats.mutex);
        c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
        pthread_mutex_unlock(&c->thread->stats.mutex);

        item_unlink(it);
        item_remove(it);      /* release our reference */
        out_string(c, "DELETED");
    } else {
        pthread_mutex_lock(&c->thread->stats.mutex);
        c->thread->stats.delete_misses++;
        pthread_mutex_unlock(&c->thread->stats.mutex);

        out_string(c, "NOT_FOUND");
    }
}

item_unlink調(diào)用do_item_unlink方法,主要有兩個(gè)作用:

  • 從HashTable上將Item的地址值刪除
  • 從LRU的鏈表上,將Item的地址值刪除
  • 通過assoc_delete從hash中刪除item

item_remove調(diào)用do_item_remove釋放item內(nèi)存

void item_unlink(item *item) {
    uint32_t hv;
    hv = hash(ITEM_key(item), item->nkey);
    item_lock(hv);
    do_item_unlink(item, hv);
    item_unlock(hv);
}



void do_item_unlink(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
    if ((it->it_flags & ITEM_LINKED) != 0) {
        it->it_flags &= ~ITEM_LINKED;
        STATS_LOCK();
        stats_state.curr_bytes -= ITEM_ntotal(it);
        stats_state.curr_items -= 1;
        STATS_UNLOCK();
        item_stats_sizes_remove(it);
        assoc_delete(ITEM_key(it), it->nkey, hv);
        item_unlink_q(it);
        do_item_remove(it);
    }
}



void item_remove(item *item) {
    uint32_t hv;
    hv = hash(ITEM_key(item), item->nkey);

    item_lock(hv);
    do_item_remove(item);
    item_unlock(hv);
}


void do_item_remove(item *it) {
    MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);
    assert(it->refcount > 0);

    if (refcount_decr(it) == 0) {
        item_free(it);
    }
}


參考文章

Memcached源碼分析 - Memcached源碼分析之增刪改查操作

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容