
介紹
memcache 網(wǎng)絡(luò)模型是典型的單進(jìn)程多線程模型,采用libevent處理網(wǎng)絡(luò)請求,主進(jìn)程負(fù)責(zé)將新來的連接分配給work線程,work線程負(fù)責(zé)處理連接,有點(diǎn)類似與負(fù)載均衡,通過主進(jìn)程分發(fā)到對應(yīng)的工作線程.
主進(jìn)程(master)
| | | |
phread1(work) phread2(work) phread3(work) phread4(work)
數(shù)據(jù)結(jié)構(gòu)
memcache 會(huì)給每個(gè)線程都會(huì)創(chuàng)建一個(gè) LIBEVENT_THREAD 線程結(jié)構(gòu)體
typedef struct {
pthread_t thread_id; /* 線程id */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* 注冊事件 */
int notify_receive_fd; /* 讀pipe管道文件描述符 */
int notify_send_fd; /* 寫pipe管道文件描述符 */
struct thread_stats stats; /* 線程統(tǒng)計(jì)信息結(jié)構(gòu)體,每個(gè)線程都會(huì)有自己的統(tǒng)計(jì)信息 */
struct conn_queue *new_conn_queue; /* 當(dāng)前線程待處理的連接隊(duì)列,主線程負(fù)責(zé)將新的連接插入到該隊(duì)列 */
cache_t *suffix_cache; /* suffix cache */
} LIBEVENT_THREAD;
memcache 給每一個(gè)網(wǎng)絡(luò)連接都會(huì)創(chuàng)建一個(gè) conn 結(jié)構(gòu)體
typedef struct conn conn;
struct conn {
int sfd; // 當(dāng)前連接文件描述符
sasl_conn_t *sasl_conn;
bool authenticated;
enum conn_states state; // 當(dāng)前連接狀態(tài)
enum bin_substates substate; // key、value 處理類型
rel_time_t last_cmd_time; // 最后一次訪問時(shí)間
struct event event; // 注冊連接監(jiān)聽事件
short ev_flags; //監(jiān)聽事件類型
short which; /** which events were just triggered */
char *rbuf; /* 網(wǎng)絡(luò)連接讀取的數(shù)據(jù)存放緩沖區(qū)地址 */
char *rcurr; /* 當(dāng)前讀取緩沖區(qū)的位置 */
int rsize; /* 每次從網(wǎng)絡(luò)連接讀取多少數(shù)據(jù)到緩沖區(qū) */
int rbytes; /* 緩沖區(qū)剩余待處理的字節(jié)數(shù) */
/* 跟上面讀取一樣,只不過這個(gè)是往客戶端寫 */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
enum conn_states write_and_go; /* 完成回寫客戶端之后賦值的狀態(tài),但是我看源碼里并沒有判斷此狀態(tài) */
void *write_and_free; /** free this memory after finishing writing */
char *ritem; /* 指向緩沖區(qū)待處理的數(shù)據(jù)位置 */
int rlbytes; /* 每次程序處理需要讀取的字節(jié)數(shù), 先從緩沖區(qū)buf讀取,如果不夠或則為空,則從網(wǎng)絡(luò)連接里在讀取 */
void *item; /* 指向內(nèi)存item指針 */
//..........
enum protocol protocol; /* 協(xié)議包類型 (字符串、二進(jìn)制) */
enum network_transport transport; /* 網(wǎng)絡(luò)連接類型 (TCP、UDP) */
//..........
bool noreply; /* 是否給客戶端答復(fù)狀態(tài) */
/* current stats command */
struct {
char *buffer;
size_t size;
size_t offset;
} stats;
/* 如果是二進(jìn)制包, 則保存二進(jìn)制包頭 */
protocol_binary_request_header binary_header;
uint64_t cas; /* the cas to return */
short cmd; /* 當(dāng)前的命令類型(get、set、add) */
int opaque;
int keylen; /* key長度 */
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* 當(dāng)前連接屬于那個(gè)線程的,保存對應(yīng)線程的指針 */
};
新連接如何分配處理?
主進(jìn)程負(fù)責(zé)監(jiān)聽端口如果有新的連接過來會(huì)先進(jìn)行分配這個(gè)連接由那個(gè)work線程處理,確定一個(gè)work線程之后會(huì)把這個(gè)連接打包成一個(gè) CQ_ITEM 結(jié)構(gòu)體,然后丟給對應(yīng)的work線conn_queue隊(duì)列(上面線程結(jié)構(gòu)體有這個(gè)屬性),work線程從隊(duì)列取出該結(jié)構(gòu)體,獲取一些參數(shù)值,然后創(chuàng)建一個(gè) conn 結(jié)構(gòu)體,監(jiān)聽并開始處理.
CQ_ITEM 結(jié)構(gòu)體
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd; /* 文件描述符 */
enum conn_states init_state; /* 連接狀態(tài) */
int event_flags; /* 監(jiān)聽事件類型 EV_READ | EV_PERSIST */
int read_buffer_size; /* 每次緩沖區(qū)讀取size */
enum network_transport transport; /* TCP 或 UDP */
CQ_ITEM *next; /* 下一個(gè) cq_item */
}
conn_queue 隊(duì)列結(jié)構(gòu)體,用于指向分配給自己 CQ_ITEM
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
};
memcache 網(wǎng)絡(luò)線程模型

memcache 線程模型初始化
網(wǎng)絡(luò)連接數(shù)初始化函數(shù) conn_init ,就是設(shè)定最大連接數(shù)
main_base = event_init(); //主進(jìn)程even事件初始化
static void conn_init(void) {
/* We're unlikely to see an FD much higher than maxconns. */
int next_fd = dup(1);
int headroom = 10; /* account for extra unexpected open FDs */
struct rlimit rl;
// 默認(rèn)最大連接數(shù)
// settings.maxconns 啟動(dòng)memcache的時(shí)候指定
max_fds = settings.maxconns + headroom + next_fd;
// 先嘗試獲取系統(tǒng)進(jìn)程最大打開文件描述符數(shù)
// 如果獲取成功則按進(jìn)程最大打開文件描述符
// 設(shè)置最大連接數(shù)
if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
max_fds = rl.rlim_max;
} else {
fprintf(stderr, "Failed to query maximum file descriptor; "
"falling back to maxconns\n");
}
close(next_fd);
// 根據(jù)最大文件描述符數(shù)量,創(chuàng)建conn結(jié)構(gòu)體指針數(shù)組
if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) {
fprintf(stderr, "Failed to allocate connection structures\n");
/* This is unrecoverable so bail out early. */
exit(1);
}
}
初始化線程函數(shù) memcached_thread_init
// settings.num_threads 線程數(shù)
// main_base 主進(jìn)程事件
void memcached_thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;
// 初始化鎖
for (i = 0; i < POWER_LARGEST; i++) {
pthread_mutex_init(&lru_locks[i], NULL);
}
pthread_mutex_init(&worker_hang_lock, NULL);
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
/* 根據(jù)線程數(shù),設(shè)定hash表段鎖的顆粒度 */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
/* 8192 buckets, and central locks don't scale much past 5 threads */
power = 13;
}
// 不能超過最大值 hashpower = 16
if (power >= hashpower) {
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
exit(1);
}
//hash表item鎖數(shù)量
item_lock_count = hashsize(power); //#define hashsize(n) ((ub4)1<<(n))
item_lock_hashpower = power;
//申請item鎖
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}
//初始化item鎖
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
// 根據(jù)線程數(shù),創(chuàng)建線程結(jié)構(gòu)體
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can't allocate thread descriptors");
exit(1);
}
// 保存主進(jìn)程的事件及線程id
dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
// 根據(jù)線程數(shù)創(chuàng)建pipe管道,每個(gè)線程都監(jiān)聽自己管道的文件描述符
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
//讀寫 pipe fd
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
//設(shè)置線程監(jiān)聽事件及創(chuàng)建該線程連接隊(duì)列等.
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}
/* 開始創(chuàng)建線程 */
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
/* 等待所有線程創(chuàng)建完畢之后,再返回 */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
設(shè)置線程監(jiān)聽事件及創(chuàng)建該線程連接隊(duì)列函數(shù) setup_thread
static void setup_thread(LIBEVENT_THREAD *me) {
//初始化當(dāng)前線程的event事件
me->base = event_init();
if (! me->base) {
fprintf(stderr, "Can't allocate event base\n");
exit(1);
}
/* 設(shè)置一個(gè)pipe管道監(jiān)聽事件,這就是上面說的,當(dāng)有一個(gè)新的連接分配給
當(dāng)前線程時(shí),就會(huì)通知該文件描述符 me->notify_receive_fd 調(diào)用回調(diào)
函數(shù) thread_libevent_process 參數(shù)就是 me 當(dāng)前線程結(jié)構(gòu)體指針 */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
//創(chuàng)建一個(gè)連接隊(duì)列
me->new_conn_queue = malloc(sizeof(struct conn_queue));
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
//初始化連接隊(duì)列
cq_init(me->new_conn_queue);
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
perror("Failed to initialize mutex");
exit(EXIT_FAILURE);
}
// 創(chuàng)建一塊 cache
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
NULL, NULL);
if (me->suffix_cache == NULL) {
fprintf(stderr, "Failed to create suffix cache\n");
exit(EXIT_FAILURE);
}
}
創(chuàng)建線程函數(shù) create_worker
static void create_worker(void *(*func)(void *), void *arg) {
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
// 創(chuàng)建線程,線程函數(shù)指針 func = worker_libevent
if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
worke 線程執(zhí)行函數(shù)入口
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
/* Any per-thread setup can happen here; memcached_thread_init() will block until
* all threads have finished initializing.
*/
register_thread_initialized();
// 實(shí)際上就是進(jìn)行 event_loop 開始監(jiān)聽每個(gè)線程事件
event_base_loop(me->base, 0);
return NULL;
}
現(xiàn)在工作線程初始化完畢,開始初始化主進(jìn)程(線程),主線程初始化就是正常socket模式監(jiān)聽端口,然后設(shè)置event監(jiān)聽事件
(1) sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)
(2) setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
(3) bind(sfd, next->ai_addr, next->ai_addrlen)
(4) listen(sfd, settings.backlog)
創(chuàng)建一個(gè)連接 conn 結(jié)構(gòu)體,因?yàn)楸緳C(jī)打開了一個(gè)端口產(chǎn)生了一個(gè)網(wǎng)絡(luò)文件描述符,所以給改文件描述符創(chuàng)建一個(gè) conn,并加入主線程 main_base 事件里面了,進(jìn)行監(jiān)聽,處理新的連接分配工作
conn_new (sfd, conn_listening, EV_READ | EV_PERSIST, 1,transport, main_base)
創(chuàng)建連接函數(shù) conn_new
// sfd 網(wǎng)絡(luò)文件描述符
// init_state 連接狀態(tài) (主線程創(chuàng)建的本機(jī)端口的默認(rèn)連接狀態(tài)都會(huì)是 conn_listening ) 就是代表只把新的連接分配到work線程,不作其它處理
// event_flags 事件監(jiān)聽類型
// read_buffer_size 讀到緩沖區(qū)size
// transport TCP 、 UDP
// base 主線程的 event 或者 work線程的 event
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;
assert(sfd >= 0 && sfd < max_fds);
c = conns[sfd];
if (NULL == c) {
// 創(chuàng)建一個(gè)conn結(jié)構(gòu)體,并指向
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
fprintf(stderr, "Failed to allocate connection object\n");
return NULL;
}
MEMCACHED_CONN_CREATE(c);
// 初始化每個(gè)字段
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->suffixlist = 0;
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
c->suffixsize = SUFFIX_LIST_INITIAL;
c->iovsize = IOV_LIST_INITIAL;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
// 創(chuàng)建讀寫緩沖區(qū)
c->rbuf = (char *)malloc((size_t)c->rsize);
c->wbuf = (char *)malloc((size_t)c->wsize);
c->ilist = (item **)malloc(sizeof(item *) * c->isize);
c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
c->msglist == 0 || c->suffixlist == 0) {
conn_free(c);
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
fprintf(stderr, "Failed to allocate buffers for connection\n");
return NULL;
}
// 統(tǒng)計(jì)增加
STATS_LOCK();
stats.conn_structs++;
STATS_UNLOCK();
// 保存當(dāng)前網(wǎng)絡(luò)文件描述符
c->sfd = sfd;
// 指針保存到conns
conns[sfd] = c;
}
c->transport = transport;
c->protocol = settings.binding_protocol;
/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
if (!settings.socketpath) {
c->request_addr_size = sizeof(c->request_addr);
} else {
c->request_addr_size = 0;
}
//.......................
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
c->ritem = 0;
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
c->ileft = 0;
c->suffixleft = 0;
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;
c->authenticated = false;
c->write_and_go = init_state;
c->write_and_free = 0;
c->item = 0;
c->noreply = false;
// 設(shè)置監(jiān)聽事件,回調(diào)函數(shù) event_handler
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
STATS_LOCK();
stats.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd);
return c;
}
創(chuàng)建完主線程 conn 之后,主線程開始進(jìn)入事件監(jiān)聽環(huán)節(jié) event_base_look(main_base, 0)
說明
至此 (work線程 及 主線程) 全部初始化完畢,并設(shè)置完成一些自己的監(jiān)聽事件
(1) 網(wǎng)絡(luò)連接分配 - 回調(diào)函數(shù) -> thread_libevent_process
(2) 網(wǎng)絡(luò)連接處理 - 回調(diào)函數(shù) -> event_handler
目前工作線程只監(jiān)聽自己的管道文件描述符,當(dāng)管道文件描述符有活動(dòng)時(shí)執(zhí)行回調(diào) thread_libevent_process ,然后 conn_new() 創(chuàng)建一個(gè)網(wǎng)絡(luò)連接并加入到當(dāng)前工作線程的監(jiān)聽事件集合里面,并設(shè)置回調(diào)函數(shù)為 event_handler,這樣的話工作線程除了監(jiān)聽管道文件描述符,還會(huì)監(jiān)聽網(wǎng)絡(luò)連接文件描述符,哪個(gè)文件描述符有活動(dòng),就執(zhí)行那個(gè)文件描述符所綁定的回調(diào)函數(shù)。
1、event_handler 函數(shù)
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;
assert(c != NULL);
c->which = which;
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//調(diào)用狀態(tài)機(jī)函數(shù)
drive_machine(c);
/* wait for next event */
return;
}
drive_machine 狀態(tài)機(jī)函數(shù),就是根據(jù)連接的狀態(tài)進(jìn)行對應(yīng)的處理
// 新連接創(chuàng)建默認(rèn)的狀態(tài)是 conn_listening 所以這里只看這個(gè)狀態(tài)的處理
static void drive_machine(conn *c) {
bool stop = false;
int sfd;
socklen_t addrlen;
struct sockaddr_storage addr;
int nreqs = settings.reqs_per_event;
int res;
const char *str;
#ifdef HAVE_ACCEPT4
static int use_accept4 = 1;
#else
static int use_accept4 = 0;
#endif
assert(c != NULL);
//循環(huán)處理conn連接
while (!stop) {
switch(c->state) {
case conn_listening: //新連接分配狀態(tài)
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
//accept一個(gè)當(dāng)前連接文件描述符
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
if (sfd == -1) {
//.....
perror(use_accept4 ? "accept4()" : "accept()");
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* these are transient, so don't log anything */
stop = true;
} else if (errno == EMFILE) {
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
//如果連接太多且文件描述符不夠用,會(huì)先暫時(shí)拒絕連接請求
//當(dāng)有可用文件描述符時(shí)會(huì)打開,下面會(huì)說明此函數(shù)內(nèi)部流程
accept_new_conns(false);
stop = true;
} else {
perror("accept()");
stop = true;
}
break;
}
if (!use_accept4) {
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
break;
}
}
//如果當(dāng)前連接數(shù)大于啟動(dòng)時(shí)設(shè)定的最大連接數(shù)則報(bào)錯(cuò)
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
} else {
//調(diào)度連接,選擇那個(gè)work線程處理該連接
//選擇完成之后,創(chuàng)建該連接的conn結(jié)構(gòu)體
//默認(rèn)狀態(tài)為 conn_new_cmd 新命令然后繼續(xù)監(jiān)聽
//再觸發(fā)事件之后就會(huì)根據(jù)狀態(tài)走下面的流程
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
//這些狀態(tài)就是分配完連接之后,在處理該連接時(shí)的狀態(tài)流程。
case conn_waiting:
//......
case conn_read:
//......
case conn_parse_cmd :
//......
case conn_new_cmd:
//......
case conn_nread:
//......
case conn_swallow:
//......
case conn_write:
//......
case conn_mwrite:
//......
case conn_closing:
//......
case conn_closed:
//......
case conn_max_state:
//......
}
}
return;
}
調(diào)度分配連接到 -> work線程函數(shù) dispatch_conn_new
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
//從 cq_item 空閑鏈表獲取一個(gè)
CQ_ITEM *item = cqi_new();
char buf[1];
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
//按順序獲取一個(gè)線程id
int tid = (last_thread + 1) % settings.num_threads;
//定位到該線程地址
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
//將一些參數(shù)賦值給 cq_item 結(jié)構(gòu)體
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//寫入當(dāng)前獲取到的線程cq_item隊(duì)列里面去
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
//寫入一個(gè)字符 'c' 到當(dāng)前獲取的線程 (thread->notify_send_fd) 管道文件描述符里
//觸發(fā)該線程監(jiān)聽事件并回調(diào) thread_libevent_process 函數(shù),最終完成新連接分配工作
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
2、thread_libevent_process 函數(shù)
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg; //對應(yīng)的線程結(jié)構(gòu)體指針,分配給那個(gè)線程,就是那個(gè)線程的指針
CQ_ITEM *item;
char buf[1];
//讀取一個(gè)字節(jié),因?yàn)橹骶€程有新連接分配給工作線程的時(shí)候,會(huì)往該工作線程的管道寫入一個(gè)字符 'c'
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue); // 從當(dāng)前連接隊(duì)列取出剛才主線程 dispatch_conn_new() 寫入的 cq_item
if (NULL != item) {
// 創(chuàng)建一個(gè)網(wǎng)絡(luò)連接,并往當(dāng)前線程 me->base 加入一個(gè)監(jiān)聽事件 回調(diào)函數(shù)就是 event_handler -> drive_machine
// 狀態(tài)為 item->init_state = conn_new_cmd
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
//將當(dāng)前線程指針賦給c->thread字段
c->thread = me;
}
//釋放item, 就是重新加入到空閑cq_item鏈表里面
cqi_free(item);
}
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
}
}
memcache 如果沒有可用的文件描述符該怎么辦?
因?yàn)橛锌赡懿l(fā)量過大一瞬間導(dǎo)致大量的連接,超過了系統(tǒng)設(shè)置的最大文件描述符數(shù)量,這個(gè)時(shí)候 memcache 實(shí)際上會(huì)拒絕連接的,這個(gè)拒絕連接是指拒絕TCP三次握手(減輕服務(wù)器負(fù)擔(dān)),然后內(nèi)部開啟定時(shí)器不斷的檢查是否有可用的文件描述,當(dāng)有可用的文件描述符時(shí),會(huì)在打開 socket
上面在 accept 獲取一個(gè)文件描述符如果返回 EMFILE 這個(gè)錯(cuò)誤代表文件描述符耗盡,然后執(zhí)行 accept_new_conns(false)
accept_new_conns 函數(shù)
void accept_new_conns(const bool do_accept) {
pthread_mutex_lock(&conn_lock);
do_accept_new_conns(do_accept);
pthread_mutex_unlock(&conn_lock);
}
do_accept_new_conns 函數(shù)
void do_accept_new_conns(const bool do_accept) {
conn *next;
//static conn* listen_conn 本機(jī)綁定的端口也會(huì)使用conn結(jié)構(gòu)體
for (next = listen_conn; next; next = next->next) {
if (do_accept) {
//do_accept = true 執(zhí)行這步
//恢復(fù)當(dāng)前綁定端口的文件描述符的事件
update_event(next, EV_READ | EV_PERSIST);
//恢復(fù)當(dāng)前socket連接隊(duì)列最大限額 settings.backlog
if (listen(next->sfd, settings.backlog) != 0) {
perror("listen");
}
}
else {
//do_accept = false 執(zhí)行這步
//先暫時(shí)關(guān)閉當(dāng)前綁定端口的文件描述符的事件
update_event(next, 0);
//把當(dāng)前socket連接隊(duì)列最大限額置0,就是代表不再進(jìn)行TCP三次握手.
if (listen(next->sfd, 0) != 0) {
perror("listen");
}
}
}
if (do_accept) {
struct timeval maxconns_exited;
uint64_t elapsed_us;
gettimeofday(&maxconns_exited,NULL);
//統(tǒng)計(jì)信息
STATS_LOCK();
elapsed_us =
(maxconns_exited.tv_sec - stats.maxconns_entered.tv_sec) * 1000000
+ (maxconns_exited.tv_usec - stats.maxconns_entered.tv_usec);
stats.time_in_listen_disabled_us += elapsed_us;
stats.accepting_conns = true;
STATS_UNLOCK();
} else {
//統(tǒng)計(jì)信息
STATS_LOCK();
stats.accepting_conns = false;
gettimeofday(&stats.maxconns_entered,NULL);
stats.listen_disabled_num++;
STATS_UNLOCK();
//allow_new_conns = false 代表當(dāng)前不能進(jìn)行新連接創(chuàng)建
allow_new_conns = false;
//執(zhí)行
maxconns_handler(-42, 0, 0);
}
}
maxconns_handler 函數(shù)
static void maxconns_handler(const int fd, const short which, void *arg) {
struct timeval t = {.tv_sec = 0, .tv_usec = 10000};
if (fd == -42 || allow_new_conns == false) {
/* reschedule in 10ms if we need to keep polling */
//這里可以看到設(shè)置了一個(gè)定時(shí)器,每 10ms 回調(diào)一次當(dāng)前函數(shù)
//直到 allow_new_conns = true 為止,因?yàn)榈扔?true 就代表
//已經(jīng)有空閑文件描述符了,可以重新建立連接。
evtimer_set(&maxconnsevent, maxconns_handler, 0);
event_base_set(main_base, &maxconnsevent);
evtimer_add(&maxconnsevent, &t);
} else {
//刪除定時(shí)器
evtimer_del(&maxconnsevent);
//重新調(diào)用 accept_new_conns 這次參數(shù) do_accept = true
accept_new_conns(true);
}
}
當(dāng) memcache 處理完一個(gè)連接,關(guān)閉的時(shí)候會(huì) allow_new_conns = true
memcache 關(guān)閉連接函數(shù) conn_close
static void conn_close(conn *c) {
assert(c != NULL);
/* delete the event, the socket and the conn */
event_del(&c->event);
if (settings.verbose > 1)
fprintf(stderr, "<%d connection closed.\n", c->sfd);
//釋放當(dāng)前連接申請的一些資源數(shù)據(jù)
conn_cleanup(c);
MEMCACHED_CONN_RELEASE(c->sfd);
conn_set_state(c, conn_closed);
//關(guān)閉連接
close(c->sfd);
//添加一個(gè)conn_lock互斥鎖,防止跟上面accept_new_conns函數(shù)
//同時(shí)操作 allow_new_conns 變量而沖突
pthread_mutex_lock(&conn_lock);
//這里每次關(guān)閉一個(gè)文件描述符之后都會(huì)allow_new_conns = true
allow_new_conns = true;
pthread_mutex_unlock(&conn_lock);
//為什么不能同時(shí)操作 allow_new_conns 變量?
//因?yàn)樯厦婧瘮?shù)會(huì)把這個(gè)變量置為false,然后根據(jù)false添加定時(shí)器,讓服務(wù)器緩沖休息一下(等待更多的連接釋放)
//但如果不加鎖,可能會(huì)導(dǎo)致上面函數(shù)把這個(gè)變量置為false的同時(shí)這里把這個(gè)變量置為true了
//所以就導(dǎo)致上面函數(shù)判斷這個(gè)變量等于false失敗,所以也不會(huì)添加定時(shí)器了而且馬上恢復(fù)服務(wù)器
//連接功能.
STATS_LOCK();
stats.curr_conns--;
STATS_UNLOCK();
return;
}
結(jié)束
以上介紹的大致就是 memcache 內(nèi)部如何實(shí)現(xiàn) <b>單進(jìn)程多線程</b> 處理網(wǎng)絡(luò)請求的具體實(shí)現(xiàn)。