Memcached源碼分析 - 網(wǎng)絡(luò)模型(1)
Memcached源碼分析 - 命令解析(2)
Memcached源碼分析 - 數(shù)據(jù)存儲(chǔ)(3)
Memcached源碼分析 - 增刪改查操作(4)
Memcached源碼分析 - 內(nèi)存存儲(chǔ)機(jī)制Slabs(5)
Memcached源碼分析 - LRU淘汰算法(6)
Memcached源碼分析 - 消息回應(yīng)(7)
開篇
?這篇文章的目的是起到以個(gè)承前啟后的作用,承前是指命令解析,啟后是指數(shù)據(jù)存儲(chǔ)。
?這篇文章是指我們?cè)谕瓿擅罱馕龊蠼?jīng)過這篇文章介紹的操作最后通過數(shù)據(jù)存儲(chǔ)里面提到的底層存儲(chǔ)完成數(shù)據(jù)的操作。
Memcached的增刪改查操作源碼分析
增加改查入口
?在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之后,就通過process_command方法將不同操作類型的命令進(jìn)行分發(fā)。
static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
size_t ntokens;
int comm;
assert(c != NULL);
MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
/*
* for commands set/add/replace, we build an item and read the data
* directly into it, then continue in nread_complete().
*/
c->msgcurr = 0;
c->msgused = 0;
c->iovused = 0;
if (add_msghdr(c) != 0) {
out_of_memory(c, "SERVER_ERROR out of memory preparing response");
return;
}
ntokens = tokenize_command(command, tokens, MAX_TOKENS);
if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
process_get_command(c, tokens, ntokens, false, false);
} else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
process_update_command(c, tokens, ntokens, comm, false);
} else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
process_get_command(c, tokens, ntokens, true, false);
} else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
process_arithmetic_command(c, tokens, ntokens, 0);
} else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
process_delete_command(c, tokens, ntokens);
}
// 省略很多其他代碼,具體可以去源碼看,我們這里只關(guān)注讀操作和寫操作
return;
}
增/改 set add replace 操作
set key flags exptime vlen
value
例子
set username 0 10 9
woshishen
set:操作方法名稱
key:緩存的key
flags:緩存標(biāo)識(shí)
exptime:緩存時(shí)間,0 - 不過期
vlen:緩存value的長(zhǎng)度
value:緩存的值,一般會(huì)在第二行。
說明:
?Memcached一般會(huì)通過\n符號(hào)去分隔每個(gè)命令行語句,然后通過空格將一行命令切割成N個(gè)元素,元素會(huì)放進(jìn)一個(gè)tokens的數(shù)組中。
?這邊我們可以看到,set命令會(huì)分層兩部分:命令行部分和Value值部分:
?1. Memcached會(huì)先去解析命令行部分,并且命令行部分中帶上了vlen,就可以知道value的長(zhǎng)度,然后就會(huì)去初始化一個(gè)Item的數(shù)據(jù)結(jié)構(gòu),用于存放緩存數(shù)據(jù)。
?2. 命令行部分解析完畢,Memcached會(huì)去繼續(xù)讀取Socket中的剩余數(shù)據(jù)報(bào)文,邊讀取邊復(fù)制到Item的數(shù)據(jù)結(jié)構(gòu)中,直到讀取到的Value數(shù)據(jù)長(zhǎng)度和命令行中的vlen長(zhǎng)度一致的時(shí)候才會(huì)結(jié)束。然后會(huì)去存儲(chǔ)item,如果item存儲(chǔ)成功,則會(huì)將item掛到HashTable和LRU鏈上面;如果存儲(chǔ)失敗,則會(huì)刪除item。
process_update_command方法分析
?核心點(diǎn)在于item_alloc申請(qǐng)item存儲(chǔ)空間,通過 conn_set_state(c, conn_nread)的進(jìn)入value讀取的流程。
?申請(qǐng)item的存儲(chǔ)空間過程比較復(fù)雜后面會(huì)有單獨(dú)的文章進(jìn)行講解,這里只需要知道我們申請(qǐng)了item的空間并讀取value值生成item數(shù)據(jù)結(jié)構(gòu)即可。
/*********************************
新增、編輯操作
看一個(gè)set操作的命令
命令:
set key flags exptime vlen
value
其中vlen為緩存數(shù)據(jù)長(zhǎng)度
flages 為標(biāo)志
exptime為過期時(shí)間,0 不過期
value 為需要緩存的數(shù)據(jù),value一般都會(huì)在第二行
例如
set username 0 10 9
woshishen
**************************************/
static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
char *key;
size_t nkey;
unsigned int flags;
int32_t exptime_int = 0;
time_t exptime;
int vlen;
uint64_t req_cas_id=0;
item *it;
assert(c != NULL);
set_noreply_maybe(c, tokens, ntokens);
key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;
//檢查參數(shù)的合法性,解析類似于value長(zhǎng)度的變量值vlen等
if (!(safe_strtoul(tokens[2].value, (uint32_t *) &flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *) &vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
exptime = exptime_int;
if (exptime < 0)
exptime = REALTIME_MAXDELTA + 1;
//這邊為何vlen要+2呢?
//因?yàn)関alue存儲(chǔ)的時(shí)候,每次在數(shù)據(jù)結(jié)尾都會(huì)加上/r/n
//加上/r/n后,客戶端獲取數(shù)據(jù)就可以通過\r\n來分割 數(shù)據(jù)報(bào)文
vlen += 2;
//item_alloc是最核心的方法,item_alloc主要就是去分配一個(gè)item
//結(jié)構(gòu)用于存儲(chǔ)需要緩存的信息
//key:緩存的key
//nkey:緩存的長(zhǎng)度
//flags:標(biāo)識(shí)
//exptime:過期時(shí)間
//vlen:緩存value的長(zhǎng)度
//這邊你可能有疑問了?為何這邊只傳遞了vlen,緩存數(shù)據(jù)的字節(jié)長(zhǎng)度,而沒有value的值呢?
//1. 因?yàn)閟et/add/replace等這些命令,會(huì)將命令行和數(shù)據(jù)行分為兩行傳輸
//2. 而我們首選會(huì)去解析命令行,命令行中需要包括緩存數(shù)據(jù)value的長(zhǎng)度,這樣我們就可以根據(jù)長(zhǎng)度去預(yù)先分配內(nèi)存空間
//3. 然后我們繼續(xù)取解析數(shù)據(jù)行。因?yàn)榫彺娴臄?shù)據(jù)一般都比較長(zhǎng),TCP發(fā)送會(huì)有粘包和拆包的情況,需要接收多次后才能接收到
//完整的數(shù)據(jù),所以會(huì)在命令行中先傳遞一個(gè)value的長(zhǎng)度值,這樣就可以在解析命令行的過程中預(yù)先分配存儲(chǔ)的空間,等接收完
//value的數(shù)據(jù)后,存儲(chǔ)到內(nèi)存空間即可。
//4. 此函數(shù)最后一行:conn_set_state(c, conn_nread); 就是跳轉(zhuǎn)到conn_nread這個(gè)狀態(tài)中,而conn_nread
//就是用來讀取value的緩存數(shù)據(jù)的
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
ITEM_set_cas(it, req_cas_id);
c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes; //value的長(zhǎng)度
c->cmd = comm;
conn_set_state(c, conn_nread);
}
item_alloc過程

?item_alloc的過程主要是分配item的內(nèi)存空間,在do_item_alloc方法中實(shí)現(xiàn),核心步驟主要是計(jì)算空間大小,以及申請(qǐng)item對(duì)應(yīng)的內(nèi)存。
- item_make_header用于計(jì)算空間大小
- do_item_alloc_pull用于申請(qǐng)item對(duì)應(yīng)的空間
- do_item_alloc_pull的內(nèi)部邏輯按照先嘗試申請(qǐng)內(nèi)存,申請(qǐng)失敗通過LRU策略選擇淘汰內(nèi)存后使用。
//item的具體結(jié)構(gòu)
typedef struct _stritem {
//鏈表結(jié)構(gòu):記錄下一個(gè)item的地址
struct _stritem *next; //下一個(gè)結(jié)構(gòu)
//鏈表結(jié)構(gòu):記錄前一個(gè)Item的地址
struct _stritem *prev; //前一個(gè)結(jié)構(gòu)
struct _stritem *h_next; //hashtable的list /* hash chain next */
//最近一次的訪問時(shí)間
rel_time_t time; /* least recent access */
//過期時(shí)間
rel_time_t exptime; /* expire time */
//value數(shù)據(jù)大小
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
//slab class的ID,在哪個(gè)slab class上
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
/* this odd type prevents type-punning issues when we do
* the little shuffle to save space when not using CAS. */
//存儲(chǔ)數(shù)據(jù)的
union {
uint64_t cas;
char end;
} data[];
/* if it_flags & ITEM_CAS we have 8 bytes CAS */
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
item *it;
/* do_item_alloc handles its own locks */
it = do_item_alloc(key, nkey, flags, exptime, nbytes);
return it;
}
item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
const rel_time_t exptime, const int nbytes) {
uint8_t nsuffix;
item *it = NULL;
char suffix[40];
// Avoid potential underflows.
if (nbytes < 2)
return 0;
size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
if (settings.use_cas) {
ntotal += sizeof(uint64_t);
}
unsigned int id = slabs_clsid(ntotal);
unsigned int hdr_id = 0;
if (ntotal > settings.slab_chunk_size_max) {
// 省略相關(guān)代碼
} else {
it = do_item_alloc_pull(ntotal, id);
}
if (it == NULL) {
pthread_mutex_lock(&lru_locks[id]);
itemstats[id].outofmemory++;
pthread_mutex_unlock(&lru_locks[id]);
return NULL;
}
assert(it->slabs_clsid == 0);
//assert(it != heads[id]);
/* Refcount is seeded to 1 by slabs_alloc() */
it->next = it->prev = 0;
/* Items are initially loaded into the HOT_LRU. This is '0' but I want at
* least a note here. Compiler (hopefully?) optimizes this out.
*/
if (settings.temp_lru &&
exptime - current_time <= settings.temporary_ttl) {
id |= TEMP_LRU;
} else if (settings.lru_segmented) {
id |= HOT_LRU;
} else {
/* There is only COLD in compat-mode */
id |= COLD_LRU;
}
it->slabs_clsid = id;
DEBUG_REFCNT(it, '*');
it->it_flags |= settings.use_cas ? ITEM_CAS : 0;
it->nkey = nkey;
it->nbytes = nbytes;
memcpy(ITEM_key(it), key, nkey);
it->exptime = exptime;
if (settings.inline_ascii_response) {
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
} else if (nsuffix > 0) {
memcpy(ITEM_suffix(it), &flags, sizeof(flags));
}
it->nsuffix = nsuffix;
/* Initialize internal chunk. */
if (it->it_flags & ITEM_CHUNKED) {
item_chunk *chunk = (item_chunk *) ITEM_data(it);
chunk->next = 0;
chunk->prev = 0;
chunk->used = 0;
chunk->size = 0;
chunk->head = it;
chunk->orig_clsid = hdr_id;
}
it->h_next = 0;
return it;
}
/**
* Generates the variable-sized part of the header for an object.
*
* key - The key
* nkey - The length of the key
* flags - key flags
* nbytes - Number of bytes to hold value and addition CRLF terminator
* suffix - Buffer for the "VALUE" line suffix (flags, size).
* nsuffix - The length of the suffix is stored here.
*
* Returns the total size of the header.
*/
static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
char *suffix, uint8_t *nsuffix) {
if (settings.inline_ascii_response) {
/* suffix is defined at 40 chars elsewhere.. */
*nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
} else {
if (flags == 0) {
*nsuffix = 0;
} else {
*nsuffix = sizeof(flags);
}
}
return sizeof(item) + nkey + *nsuffix + nbytes;
}
item *do_item_alloc_pull(const size_t ntotal, const unsigned int id) {
item *it = NULL;
int i;
for (i = 0; i < 10; i++) {
uint64_t total_bytes;
/* Try to reclaim memory first */
if (!settings.lru_segmented) {
lru_pull_tail(id, COLD_LRU, 0, 0, 0, NULL);
}
it = slabs_alloc(ntotal, id, &total_bytes, 0);
if (settings.temp_lru)
total_bytes -= temp_lru_size(id);
if (it == NULL) {
if (lru_pull_tail(id, COLD_LRU, total_bytes, LRU_PULL_EVICT, 0, NULL) <= 0) {
if (settings.lru_segmented) {
lru_pull_tail(id, HOT_LRU, total_bytes, 0, 0, NULL);
} else {
break;
}
}
} else {
break;
}
}
if (i > 0) {
pthread_mutex_lock(&lru_locks[id]);
itemstats[id].direct_reclaims += i;
pthread_mutex_unlock(&lru_locks[id]);
}
return it;
}
conn_nread過程
?conn_nread的內(nèi)部通過循環(huán)讀取指定長(zhǎng)度的value值,讀取完畢后執(zhí)行complete_nread操作。
case conn_nread:
if (c->rlbytes == 0) {
complete_nread(c);
break;
}
/* Check if rbytes < 0, to prevent crash */
if (c->rlbytes < 0) {
if (settings.verbose) {
fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);
}
conn_set_state(c, conn_closing);
break;
}
if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
if (c->ritem != c->rcurr) {
memmove(c->ritem, c->rcurr, tocopy);
}
c->ritem += tocopy;
c->rlbytes -= tocopy;
c->rcurr += tocopy;
c->rbytes -= tocopy;
if (c->rlbytes == 0) {
break;
}
}
/* now try reading from the socket */
res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.bytes_read += res;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rcurr == c->ritem) {
c->rcurr += res;
}
c->ritem += res;
c->rlbytes -= res;
break;
}
} else {
res = read_into_chunked_item(c);
if (res > 0)
break;
}
if (res == 0) { /* end of stream */
conn_set_state(c, conn_closing);
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
stop = true;
break;
}
/* Memory allocation failure */
if (res == -2) {
out_of_memory(c, "SERVER_ERROR Out of memory during read");
c->sbytes = c->rlbytes;
c->write_and_go = conn_swallow;
break;
}
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0) {
fprintf(stderr, "Failed to read, and not due to blocking:\n"
"errno: %d %s \n"
"rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
errno, strerror(errno),
(long)c->rcurr, (long)c->ritem, (long)c->rbuf,
(int)c->rlbytes, (int)c->rsize);
}
conn_set_state(c, conn_closing);
break;
complete_nread過程
?我們關(guān)注complete_nread_ascii方法,通過store_item保存數(shù)據(jù)。
- 計(jì)算key所屬的hash桶的位置
- do_item_link當(dāng)中調(diào)用assoc_insert添加到hash當(dāng)中,通過item_link_q將item掛到LRU鏈上面,通過refcount_incr增加引用計(jì)數(shù)。
static void complete_nread(conn *c) {
assert(c != NULL);
assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
if (c->protocol == ascii_prot) {
complete_nread_ascii(c);
} else if (c->protocol == binary_prot) {
complete_nread_binary(c);
}
}
static void complete_nread_ascii(conn *c) {
assert(c != NULL);
item *it = c->item;
int comm = c->cmd;
enum store_item_type ret;
bool is_valid = false;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
// 省略很多代碼
if (!is_valid) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
ret = store_item(it, comm, c);
switch (ret) {
case STORED:
out_string(c, "STORED");
break;
case EXISTS:
out_string(c, "EXISTS");
break;
case NOT_FOUND:
out_string(c, "NOT_FOUND");
break;
case NOT_STORED:
out_string(c, "NOT_STORED");
break;
default:
out_string(c, "SERVER_ERROR Unhandled storage type.");
}
}
//這邊竟然刪除這個(gè)Item?你不覺得奇怪么?
//我們知道刪除item是需要通過判斷item->refcount,引用的次數(shù)
//我們?cè)赼lloc一個(gè)item的時(shí)候slabs_alloc,refcount會(huì)默認(rèn)設(shè)置為1
//
//當(dāng)我們調(diào)用store_item,add/set/replace/prepend/append等操作成功的時(shí)候,會(huì)調(diào)用do_item_link
//這個(gè)方法,這個(gè)方法會(huì)將refcount設(shè)置為2,則再次去刪除item的時(shí)候判斷引用次數(shù)
//if (refcount_decr(&it->refcount) == 0) 就不會(huì)被刪除
//
//如果我們調(diào)用store_item,發(fā)現(xiàn)存儲(chǔ)失敗了,這個(gè)時(shí)候因?yàn)橐么螖?shù)為1,而且我們的確需要?jiǎng)h除這個(gè)item,則刪除這個(gè)item
//
//很繞的邏輯,但是很巧妙
item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
enum store_item_type store_item(item *item, int comm, conn* c) {
enum store_item_type ret;
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
ret = do_store_item(item, comm, c, hv);
item_unlock(hv);
return ret;
}
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
char *key = ITEM_key(it);
//通過KEY找到舊的item
//add/set/replace/prepend/append等都會(huì)先創(chuàng)建一個(gè)新的item
item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
enum store_item_type stored = NOT_STORED;
item *new_it = NULL;
uint32_t flags;
//ADD操作,要保證ITEM不存在的情況下才能成功
//如果ADD操作,發(fā)現(xiàn)item已經(jīng)存在,則返回NOT_STORED
if (old_it != NULL && comm == NREAD_ADD) {
//這邊為何要更新item,有兩個(gè)原因:
//1.更新當(dāng)前item的it->time時(shí)間,并且重建LRU鏈的順序
//2.這邊代碼后邊會(huì)去執(zhí)行do_item_remove操作,每次remove操作都會(huì)判斷it->refcount
//如果引用次數(shù)減去1,則需要被刪除。這邊重建LRU鏈之后,it->refcount=2,所有old_it不會(huì)被刪除
do_item_update(old_it);
} else if (!old_it && (comm == NREAD_REPLACE
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
//replace/prepend/append 等操作,是需要item已經(jīng)存在的情況下操作做處理
//如果item不存在,則返回NOT_STORED
} else if (comm == NREAD_CAS) {
if(old_it == NULL) {
// LRU expired
stored = NOT_FOUND;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
STORAGE_delete(c->thread->storage, old_it);
item_replace(old_it, it, hv);
stored = STORED;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if(settings.verbose > 1) {
fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
(unsigned long long)ITEM_get_cas(old_it),
(unsigned long long)ITEM_get_cas(it));
}
stored = EXISTS;
}
} else {
int failed_alloc = 0;
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
if (ITEM_get_cas(it) != 0) {
if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
stored = EXISTS;
}
}
if (stored == NOT_STORED)
new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
if (new_it == NULL || _store_item_copy_data(comm, old_it, new_it, it) == -1) {
failed_alloc = 1;
stored = NOT_STORED;
if (new_it != NULL)
item_remove(new_it);
} else {
it = new_it;
}
}
}
if (stored == NOT_STORED && failed_alloc == 0) {
if (old_it != NULL) {
STORAGE_delete(c->thread->storage, old_it);
item_replace(old_it, it, hv);
} else {
do_item_link(it, hv);
}
c->cas = ITEM_get_cas(it);
stored = STORED;
}
//說明:
//這邊代碼注解中為何一次又一次提到it->refcount?
//1. 因?yàn)閕t->refcount代表的是引用次數(shù),防止不同線程刪除item
//2. do_item_remove操作前會(huì)去判斷it->refcount減一后,變成0,則會(huì)刪除這個(gè)ITEM
//
//在調(diào)用do_store_item方法之后,memcached會(huì)去調(diào)用do_item_remove(it)的操作。
//do_item_remove操作主要是將item生成后,結(jié)果SET/ADD等操作失敗的情況,會(huì)去將已經(jīng)分配好的item刪除
//如果SET和ADD操作成功,一般都會(huì)調(diào)用do_item_link這個(gè)方法會(huì)將item的refcount值加上1,變成2,當(dāng)
//再次調(diào)用do_item_remove(it);操作的時(shí)候,因?yàn)橐么螖?shù)大于0而不會(huì)被刪除
//這邊的代碼塊,真心很繞.....
}
if (old_it != NULL)
do_item_remove(old_it); /* release our reference */
if (new_it != NULL)
do_item_remove(new_it);
if (stored == STORED) {
c->cas = ITEM_get_cas(it);
}
return stored;
}
int do_item_link(item *it, const uint32_t hv) {
MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
STATS_LOCK();
stats_state.curr_bytes += ITEM_ntotal(it);
stats_state.curr_items += 1;
stats.total_items += 1;
STATS_UNLOCK();
/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
assoc_insert(it, hv);
item_link_q(it);
refcount_incr(it);
item_stats_sizes_add(it);
return 1;
}
查詢 get 操作
查詢操作主要看下process_get_command方法,該方法主要作用:
- 分解get命令,主要是在do/while循環(huán)當(dāng)中,就是tokenize_command方法。
- 通過key去HashTable上找到item的地址值,limited_get方法內(nèi)部去get值。
- 返回找到的item數(shù)據(jù)值,*(c->ilist + i) = it添加找到的item。
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas, bool should_touch) {
do {
while(key_token->length != 0) {
key = key_token->value;
nkey = key_token->length;
it = limited_get(key, nkey, c, exptime, should_touch);
if (it) {
if (return_cas || !settings.inline_ascii_response)
{ //省略相關(guān)代碼
} else {
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0 ||
add_iov(c, ITEM_key(it), it->nkey) != 0)
{
item_remove(it);
goto stop;
}
}
pthread_mutex_lock(&c->thread->stats.mutex);
if (should_touch) {
c->thread->stats.touch_cmds++;
c->thread->stats.slab_stats[ITEM_clsid(it)].touch_hits++;
} else {
c->thread->stats.lru_hits[it->slabs_clsid]++;
c->thread->stats.get_cmds++;
}
pthread_mutex_unlock(&c->thread->stats.mutex);
*(c->ilist + i) = it;
i++;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
if (should_touch) {
c->thread->stats.touch_cmds++;
c->thread->stats.touch_misses++;
} else {
c->thread->stats.get_misses++;
c->thread->stats.get_cmds++;
}
MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
pthread_mutex_unlock(&c->thread->stats.mutex);
}
key_token++;
}
} while(key_token->value != NULL);
stop:
c->icurr = c->ilist;
c->ileft = i;
if (return_cas || !settings.inline_ascii_response) {
c->suffixcurr = c->suffixlist;
c->suffixleft = si;
}
if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
// 省略相關(guān)代碼
} else {
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}
------------------華麗麗的分割線------------------------------
static inline item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch) {
item *it;
if (should_touch) {
it = item_touch(key, nkey, exptime, c);
} else {
it = item_get(key, nkey, c, DO_UPDATE);
}
if (it && it->refcount > IT_REFCOUNT_LIMIT) {
item_remove(it);
it = NULL;
}
return it;
}
item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
item_lock(hv);
it = do_item_get(key, nkey, hv, c, do_update);
item_unlock(hv);
return it;
}
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
item *it = assoc_find(key, nkey, hv);
// 省略其他相關(guān)代碼,關(guān)注assoc_find就可以了。
}
刪除 delete 操作
刪除操作主要看process_delete_command方法:
- 先查詢item是否存在
- 如果存在則刪除item,不存在,則返回NOT FOUND
static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
char *key;
size_t nkey;
item *it;
assert(c != NULL);
key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;
if(nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
it = item_get(key, nkey, c, DONT_UPDATE);
if (it) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_unlink(it);
item_remove(it); /* release our reference */
out_string(c, "DELETED");
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.delete_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
out_string(c, "NOT_FOUND");
}
}
item_unlink調(diào)用do_item_unlink方法,主要有兩個(gè)作用:
- 從HashTable上將Item的地址值刪除
- 從LRU的鏈表上,將Item的地址值刪除
- 通過assoc_delete從hash中刪除item
item_remove調(diào)用do_item_remove釋放item內(nèi)存
void item_unlink(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
do_item_unlink(item, hv);
item_unlock(hv);
}
void do_item_unlink(item *it, const uint32_t hv) {
MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
if ((it->it_flags & ITEM_LINKED) != 0) {
it->it_flags &= ~ITEM_LINKED;
STATS_LOCK();
stats_state.curr_bytes -= ITEM_ntotal(it);
stats_state.curr_items -= 1;
STATS_UNLOCK();
item_stats_sizes_remove(it);
assoc_delete(ITEM_key(it), it->nkey, hv);
item_unlink_q(it);
do_item_remove(it);
}
}
void item_remove(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
do_item_remove(item);
item_unlock(hv);
}
void do_item_remove(item *it) {
MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & ITEM_SLABBED) == 0);
assert(it->refcount > 0);
if (refcount_decr(it) == 0) {
item_free(it);
}
}