Memcache-網(wǎng)絡(luò)線程模型-源碼分析

memcached-version-1.4.25

介紹

memcache 網(wǎng)絡(luò)模型是典型的單進(jìn)程多線程模型,采用libevent處理網(wǎng)絡(luò)請求,主進(jìn)程負(fù)責(zé)將新來的連接分配給work線程,work線程負(fù)責(zé)處理連接,有點(diǎn)類似與負(fù)載均衡,通過主進(jìn)程分發(fā)到對應(yīng)的工作線程.

                                   主進(jìn)程(master)
              
                 |                |                 |                  |
             phread1(work)    phread2(work)     phread3(work)      phread4(work) 

數(shù)據(jù)結(jié)構(gòu)

memcache 會(huì)給每個(gè)線程都會(huì)創(chuàng)建一個(gè) LIBEVENT_THREAD 線程結(jié)構(gòu)體

typedef struct {
    pthread_t thread_id;        /* 線程id */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* 注冊事件 */
    int notify_receive_fd;      /* 讀pipe管道文件描述符 */
    int notify_send_fd;         /* 寫pipe管道文件描述符 */
    struct thread_stats stats;  /* 線程統(tǒng)計(jì)信息結(jié)構(gòu)體,每個(gè)線程都會(huì)有自己的統(tǒng)計(jì)信息 */
    struct conn_queue *new_conn_queue; /* 當(dāng)前線程待處理的連接隊(duì)列,主線程負(fù)責(zé)將新的連接插入到該隊(duì)列 */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;

memcache 給每一個(gè)網(wǎng)絡(luò)連接都會(huì)創(chuàng)建一個(gè) conn 結(jié)構(gòu)體

typedef struct conn conn;
struct conn {
    int    sfd; // 當(dāng)前連接文件描述符
    sasl_conn_t *sasl_conn;
    bool authenticated;
    enum conn_states  state; // 當(dāng)前連接狀態(tài)
    enum bin_substates substate; // key、value 處理類型
    rel_time_t last_cmd_time; // 最后一次訪問時(shí)間
    struct event event; // 注冊連接監(jiān)聽事件
    short  ev_flags;    //監(jiān)聽事件類型
    short  which;   /** which events were just triggered */

    char   *rbuf;   /* 網(wǎng)絡(luò)連接讀取的數(shù)據(jù)存放緩沖區(qū)地址 */
    char   *rcurr;  /* 當(dāng)前讀取緩沖區(qū)的位置 */
    int    rsize;   /* 每次從網(wǎng)絡(luò)連接讀取多少數(shù)據(jù)到緩沖區(qū) */
    int    rbytes;  /* 緩沖區(qū)剩余待處理的字節(jié)數(shù) */
    
    /* 跟上面讀取一樣,只不過這個(gè)是往客戶端寫 */
    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    enum conn_states  write_and_go; /* 完成回寫客戶端之后賦值的狀態(tài),但是我看源碼里并沒有判斷此狀態(tài) */
    void   *write_and_free; /** free this memory after finishing writing */

    char   *ritem;  /* 指向緩沖區(qū)待處理的數(shù)據(jù)位置 */
    int    rlbytes; /* 每次程序處理需要讀取的字節(jié)數(shù), 先從緩沖區(qū)buf讀取,如果不夠或則為空,則從網(wǎng)絡(luò)連接里在讀取 */

    void   *item;     /* 指向內(nèi)存item指針  */

    //..........

    enum protocol protocol;   /* 協(xié)議包類型 (字符串、二進(jìn)制) */
    enum network_transport transport; /* 網(wǎng)絡(luò)連接類型 (TCP、UDP) */

   //..........

    bool   noreply;   /* 是否給客戶端答復(fù)狀態(tài) */
    /* current stats command */
    struct {
        char *buffer;
        size_t size;
        size_t offset;
    } stats;

    /* 如果是二進(jìn)制包, 則保存二進(jìn)制包頭 */
    protocol_binary_request_header binary_header;
    uint64_t cas; /* the cas to return */
    short cmd; /* 當(dāng)前的命令類型(get、set、add) */
    int opaque;
    int keylen; /* key長度 */
    conn   *next;     /* Used for generating a list of conn structures */
    LIBEVENT_THREAD *thread; /* 當(dāng)前連接屬于那個(gè)線程的,保存對應(yīng)線程的指針 */
};

新連接如何分配處理?

主進(jìn)程負(fù)責(zé)監(jiān)聽端口如果有新的連接過來會(huì)先進(jìn)行分配這個(gè)連接由那個(gè)work線程處理,確定一個(gè)work線程之后會(huì)把這個(gè)連接打包成一個(gè) CQ_ITEM 結(jié)構(gòu)體,然后丟給對應(yīng)的work線conn_queue隊(duì)列(上面線程結(jié)構(gòu)體有這個(gè)屬性),work線程從隊(duì)列取出該結(jié)構(gòu)體,獲取一些參數(shù)值,然后創(chuàng)建一個(gè) conn 結(jié)構(gòu)體,監(jiān)聽并開始處理.

CQ_ITEM 結(jié)構(gòu)體

typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;         /* 文件描述符 */
    enum conn_states  init_state;  /* 連接狀態(tài) */
    int               event_flags; /* 監(jiān)聽事件類型 EV_READ | EV_PERSIST */
    int               read_buffer_size; /* 每次緩沖區(qū)讀取size */
    enum network_transport     transport; /* TCP 或 UDP */
    CQ_ITEM          *next; /* 下一個(gè) cq_item */
}

conn_queue 隊(duì)列結(jié)構(gòu)體,用于指向分配給自己 CQ_ITEM

typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
};

memcache 網(wǎng)絡(luò)線程模型

memcache 網(wǎng)絡(luò)線程模型

memcache 線程模型初始化

網(wǎng)絡(luò)連接數(shù)初始化函數(shù) conn_init ,就是設(shè)定最大連接數(shù)

main_base = event_init(); //主進(jìn)程even事件初始化

static void conn_init(void) {
    /* We're unlikely to see an FD much higher than maxconns. */
    int next_fd = dup(1);
    int headroom = 10;      /* account for extra unexpected open FDs */
    struct rlimit rl;
    
    // 默認(rèn)最大連接數(shù)
    // settings.maxconns 啟動(dòng)memcache的時(shí)候指定
    max_fds = settings.maxconns + headroom + next_fd;
    
    // 先嘗試獲取系統(tǒng)進(jìn)程最大打開文件描述符數(shù)
    // 如果獲取成功則按進(jìn)程最大打開文件描述符
    // 設(shè)置最大連接數(shù)
    if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
        max_fds = rl.rlim_max;
    } else {
        fprintf(stderr, "Failed to query maximum file descriptor; "
                       "falling back to maxconns\n");
    }

   close(next_fd);
    
   // 根據(jù)最大文件描述符數(shù)量,創(chuàng)建conn結(jié)構(gòu)體指針數(shù)組
   if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) {
        fprintf(stderr, "Failed to allocate connection structures\n");
       /* This is unrecoverable so bail out early. */
        exit(1);
   }
}

初始化線程函數(shù) memcached_thread_init

// settings.num_threads 線程數(shù)
// main_base 主進(jìn)程事件
void memcached_thread_init(int nthreads, struct event_base *main_base) {
    int         i;
    int         power;
    
    // 初始化鎖
    for (i = 0; i < POWER_LARGEST; i++) {
        pthread_mutex_init(&lru_locks[i], NULL);
    }
    pthread_mutex_init(&worker_hang_lock, NULL);
    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);
    pthread_mutex_init(&cqi_freelist_lock, NULL);
    cqi_freelist = NULL;

    /* 根據(jù)線程數(shù),設(shè)定hash表段鎖的顆粒度 */
    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (nthreads < 5) {
        power = 12;
    } else {
        /* 8192 buckets, and central locks don't scale much past 5 threads */
        power = 13;
    }
    // 不能超過最大值 hashpower = 16
    if (power >= hashpower) {
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
        exit(1);
    }
    
    //hash表item鎖數(shù)量
    item_lock_count = hashsize(power); //#define hashsize(n) ((ub4)1<<(n))
    item_lock_hashpower = power;
    //申請item鎖
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror("Can't allocate item locks");
        exit(1);
    }
    //初始化item鎖
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }
    // 根據(jù)線程數(shù),創(chuàng)建線程結(jié)構(gòu)體
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }
    // 保存主進(jìn)程的事件及線程id
    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();
    
    // 根據(jù)線程數(shù)創(chuàng)建pipe管道,每個(gè)線程都監(jiān)聽自己管道的文件描述符
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }
        //讀寫 pipe fd
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];
        //設(shè)置線程監(jiān)聽事件及創(chuàng)建該線程連接隊(duì)列等.
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* 開始創(chuàng)建線程 */
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    /* 等待所有線程創(chuàng)建完畢之后,再返回 */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

設(shè)置線程監(jiān)聽事件及創(chuàng)建該線程連接隊(duì)列函數(shù) setup_thread

static void setup_thread(LIBEVENT_THREAD *me) {
    //初始化當(dāng)前線程的event事件
    me->base = event_init();
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    /* 設(shè)置一個(gè)pipe管道監(jiān)聽事件,這就是上面說的,當(dāng)有一個(gè)新的連接分配給
       當(dāng)前線程時(shí),就會(huì)通知該文件描述符 me->notify_receive_fd 調(diào)用回調(diào)
       函數(shù) thread_libevent_process 參數(shù)就是 me 當(dāng)前線程結(jié)構(gòu)體指針 */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }
    
    //創(chuàng)建一個(gè)連接隊(duì)列
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    //初始化連接隊(duì)列
    cq_init(me->new_conn_queue);

    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror("Failed to initialize mutex");
        exit(EXIT_FAILURE);
    }
    
    // 創(chuàng)建一塊 cache
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                    NULL, NULL);
    if (me->suffix_cache == NULL) {
        fprintf(stderr, "Failed to create suffix cache\n");
        exit(EXIT_FAILURE);
    }
}

創(chuàng)建線程函數(shù) create_worker

static void create_worker(void *(*func)(void *), void *arg) {
   pthread_attr_t  attr;
   int             ret;

   pthread_attr_init(&attr);
   // 創(chuàng)建線程,線程函數(shù)指針 func = worker_libevent 
   if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
       fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
   }
}

worke 線程執(zhí)行函數(shù)入口

static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; memcached_thread_init() will block until
     * all threads have finished initializing.
     */
    register_thread_initialized();
    
    // 實(shí)際上就是進(jìn)行 event_loop 開始監(jiān)聽每個(gè)線程事件
    event_base_loop(me->base, 0);
    
    return NULL;
}

現(xiàn)在工作線程初始化完畢,開始初始化主進(jìn)程(線程),主線程初始化就是正常socket模式監(jiān)聽端口,然后設(shè)置event監(jiān)聽事件

(1) sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)
(2) setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
(3) bind(sfd, next->ai_addr, next->ai_addrlen)
(4) listen(sfd, settings.backlog)

創(chuàng)建一個(gè)連接 conn 結(jié)構(gòu)體,因?yàn)楸緳C(jī)打開了一個(gè)端口產(chǎn)生了一個(gè)網(wǎng)絡(luò)文件描述符,所以給改文件描述符創(chuàng)建一個(gè) conn,并加入主線程 main_base 事件里面了,進(jìn)行監(jiān)聽,處理新的連接分配工作
conn_new (sfd, conn_listening, EV_READ | EV_PERSIST, 1,transport, main_base)

創(chuàng)建連接函數(shù) conn_new

// sfd 網(wǎng)絡(luò)文件描述符
// init_state 連接狀態(tài) (主線程創(chuàng)建的本機(jī)端口的默認(rèn)連接狀態(tài)都會(huì)是 conn_listening ) 就是代表只把新的連接分配到work線程,不作其它處理
// event_flags 事件監(jiān)聽類型
// read_buffer_size 讀到緩沖區(qū)size
// transport TCP 、 UDP
// base 主線程的 event 或者 work線程的 event

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;

    assert(sfd >= 0 && sfd < max_fds);
    c = conns[sfd];

    if (NULL == c) {
        // 創(chuàng)建一個(gè)conn結(jié)構(gòu)體,并指向
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            fprintf(stderr, "Failed to allocate connection object\n");
            return NULL;
        }
        MEMCACHED_CONN_CREATE(c);
        
        // 初始化每個(gè)字段
        c->rbuf = c->wbuf = 0;
        c->ilist = 0;
        c->suffixlist = 0;
        c->iov = 0;
        c->msglist = 0;
        c->hdrbuf = 0;

        c->rsize = read_buffer_size;
        c->wsize = DATA_BUFFER_SIZE;
        c->isize = ITEM_LIST_INITIAL;
        c->suffixsize = SUFFIX_LIST_INITIAL;
        c->iovsize = IOV_LIST_INITIAL;
        c->msgsize = MSG_LIST_INITIAL;
        c->hdrsize = 0;
        // 創(chuàng)建讀寫緩沖區(qū)
        c->rbuf = (char *)malloc((size_t)c->rsize);
        c->wbuf = (char *)malloc((size_t)c->wsize);
        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);

        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                c->msglist == 0 || c->suffixlist == 0) {
            conn_free(c);
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            fprintf(stderr, "Failed to allocate buffers for connection\n");
            return NULL;
        }
        
        // 統(tǒng)計(jì)增加
        STATS_LOCK();
        stats.conn_structs++;
        STATS_UNLOCK();
        
        // 保存當(dāng)前網(wǎng)絡(luò)文件描述符
        c->sfd = sfd;
        // 指針保存到conns
        conns[sfd] = c;
    }

    c->transport = transport;
    c->protocol = settings.binding_protocol;

    /* unix socket mode doesn't need this, so zeroed out.  but why
     * is this done for every command?  presumably for UDP
     * mode.  */
    if (!settings.socketpath) {
        c->request_addr_size = sizeof(c->request_addr);
    } else {
        c->request_addr_size = 0;
    }

    //.......................

    c->state = init_state;
    c->rlbytes = 0;
    c->cmd = -1;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;
    c->authenticated = false;

    c->write_and_go = init_state;
    c->write_and_free = 0;
    c->item = 0;

    c->noreply = false;
    
    // 設(shè)置監(jiān)聽事件,回調(diào)函數(shù) event_handler 
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    STATS_LOCK();
    stats.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    MEMCACHED_CONN_ALLOCATE(c->sfd);

    return c;
}

創(chuàng)建完主線程 conn 之后,主線程開始進(jìn)入事件監(jiān)聽環(huán)節(jié) event_base_look(main_base, 0)

說明

至此 (work線程 及 主線程) 全部初始化完畢,并設(shè)置完成一些自己的監(jiān)聽事件

(1) 網(wǎng)絡(luò)連接分配 - 回調(diào)函數(shù) -> thread_libevent_process
(2) 網(wǎng)絡(luò)連接處理 - 回調(diào)函數(shù) -> event_handler

目前工作線程只監(jiān)聽自己的管道文件描述符,當(dāng)管道文件描述符有活動(dòng)時(shí)執(zhí)行回調(diào) thread_libevent_process ,然后 conn_new() 創(chuàng)建一個(gè)網(wǎng)絡(luò)連接并加入到當(dāng)前工作線程的監(jiān)聽事件集合里面,并設(shè)置回調(diào)函數(shù)為 event_handler,這樣的話工作線程除了監(jiān)聽管道文件描述符,還會(huì)監(jiān)聽網(wǎng)絡(luò)連接文件描述符,哪個(gè)文件描述符有活動(dòng),就執(zhí)行那個(gè)文件描述符所綁定的回調(diào)函數(shù)。

1、event_handler 函數(shù)

void event_handler(const int fd, const short which, void *arg) {
    conn *c;

    c = (conn *)arg;
    assert(c != NULL);

    c->which = which;

    /* sanity */
    if (fd != c->sfd) {
        if (settings.verbose > 0)
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
        conn_close(c);
        return;
    }
    
    //調(diào)用狀態(tài)機(jī)函數(shù)
    drive_machine(c);

    /* wait for next event */
    return;
}

drive_machine 狀態(tài)機(jī)函數(shù),就是根據(jù)連接的狀態(tài)進(jìn)行對應(yīng)的處理

// 新連接創(chuàng)建默認(rèn)的狀態(tài)是 conn_listening 所以這里只看這個(gè)狀態(tài)的處理
static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int nreqs = settings.reqs_per_event;
    int res;
    const char *str;
#ifdef HAVE_ACCEPT4
    static int  use_accept4 = 1;
#else
    static int  use_accept4 = 0;
#endif

    assert(c != NULL);
    
    //循環(huán)處理conn連接
    while (!stop) {

        switch(c->state) {
        case conn_listening: //新連接分配狀態(tài)
            addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
            if (use_accept4) {
                sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
            } else {
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            }
#else
            //accept一個(gè)當(dāng)前連接文件描述符
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
            if (sfd == -1) {
                //.....
                perror(use_accept4 ? "accept4()" : "accept()");
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    /* these are transient, so don't log anything */
                    stop = true;
                } else if (errno == EMFILE) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Too many open connections\n");
                    //如果連接太多且文件描述符不夠用,會(huì)先暫時(shí)拒絕連接請求
                    //當(dāng)有可用文件描述符時(shí)會(huì)打開,下面會(huì)說明此函數(shù)內(nèi)部流程
                    accept_new_conns(false);
                    stop = true;
                } else {
                    perror("accept()");
                    stop = true;
                }
                break;
            }
            if (!use_accept4) {
                if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
                    perror("setting O_NONBLOCK");
                    close(sfd);
                    break;
                }
            }
            //如果當(dāng)前連接數(shù)大于啟動(dòng)時(shí)設(shè)定的最大連接數(shù)則報(bào)錯(cuò)
            if (settings.maxconns_fast &&
                stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                str = "ERROR Too many open connections\r\n";
                res = write(sfd, str, strlen(str));
                close(sfd);
                STATS_LOCK();
                stats.rejected_conns++;
                STATS_UNLOCK();
            } else {
                //調(diào)度連接,選擇那個(gè)work線程處理該連接
                //選擇完成之后,創(chuàng)建該連接的conn結(jié)構(gòu)體
                //默認(rèn)狀態(tài)為 conn_new_cmd 新命令然后繼續(xù)監(jiān)聽
                //再觸發(fā)事件之后就會(huì)根據(jù)狀態(tài)走下面的流程
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }
            stop = true;
            break;
        //這些狀態(tài)就是分配完連接之后,在處理該連接時(shí)的狀態(tài)流程。
        case conn_waiting:
            //......
        case conn_read:
            //......
        case conn_parse_cmd :
            //......
        case conn_new_cmd:
            //......
        case conn_nread:
            //......
        case conn_swallow:
            //......
        case conn_write:
            //......
        case conn_mwrite:
            //......
        case conn_closing:
            //......
        case conn_closed:
            //......
        case conn_max_state:
            //......
        }
    }
    return;
}

調(diào)度分配連接到 -> work線程函數(shù) dispatch_conn_new

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    
    //從 cq_item 空閑鏈表獲取一個(gè)
    CQ_ITEM *item = cqi_new();
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }
    
    //按順序獲取一個(gè)線程id
    int tid = (last_thread + 1) % settings.num_threads;
    
    //定位到該線程地址
    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;
    
    //將一些參數(shù)賦值給 cq_item 結(jié)構(gòu)體
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    
    //寫入當(dāng)前獲取到的線程cq_item隊(duì)列里面去
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    //寫入一個(gè)字符 'c' 到當(dāng)前獲取的線程 (thread->notify_send_fd) 管道文件描述符里
    //觸發(fā)該線程監(jiān)聽事件并回調(diào) thread_libevent_process 函數(shù),最終完成新連接分配工作
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

2、thread_libevent_process 函數(shù)

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg; //對應(yīng)的線程結(jié)構(gòu)體指針,分配給那個(gè)線程,就是那個(gè)線程的指針
    CQ_ITEM *item;
    char buf[1];
    
    //讀取一個(gè)字節(jié),因?yàn)橹骶€程有新連接分配給工作線程的時(shí)候,會(huì)往該工作線程的管道寫入一個(gè)字符 'c'
    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0]) {
    case 'c':
    item = cq_pop(me->new_conn_queue); // 從當(dāng)前連接隊(duì)列取出剛才主線程 dispatch_conn_new() 寫入的 cq_item

    if (NULL != item) {
        // 創(chuàng)建一個(gè)網(wǎng)絡(luò)連接,并往當(dāng)前線程 me->base 加入一個(gè)監(jiān)聽事件 回調(diào)函數(shù)就是 event_handler -> drive_machine 
        // 狀態(tài)為 item->init_state = conn_new_cmd
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can't listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can't listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            //將當(dāng)前線程指針賦給c->thread字段
            c->thread = me;
        }
        //釋放item, 就是重新加入到空閑cq_item鏈表里面
        cqi_free(item);
    }
        break;
    /* we were told to pause and report in */
    case 'p':
    register_thread_initialized();
        break;
    }
}

memcache 如果沒有可用的文件描述符該怎么辦?

因?yàn)橛锌赡懿l(fā)量過大一瞬間導(dǎo)致大量的連接,超過了系統(tǒng)設(shè)置的最大文件描述符數(shù)量,這個(gè)時(shí)候 memcache 實(shí)際上會(huì)拒絕連接的,這個(gè)拒絕連接是指拒絕TCP三次握手(減輕服務(wù)器負(fù)擔(dān)),然后內(nèi)部開啟定時(shí)器不斷的檢查是否有可用的文件描述,當(dāng)有可用的文件描述符時(shí),會(huì)在打開 socket

上面在 accept 獲取一個(gè)文件描述符如果返回 EMFILE 這個(gè)錯(cuò)誤代表文件描述符耗盡,然后執(zhí)行 accept_new_conns(false)

accept_new_conns 函數(shù)

void accept_new_conns(const bool do_accept) {
    pthread_mutex_lock(&conn_lock);
    do_accept_new_conns(do_accept);
    pthread_mutex_unlock(&conn_lock);
}

do_accept_new_conns 函數(shù)

void do_accept_new_conns(const bool do_accept) {
    conn *next;
    
    //static conn* listen_conn 本機(jī)綁定的端口也會(huì)使用conn結(jié)構(gòu)體
    for (next = listen_conn; next; next = next->next) {
        if (do_accept) {
            //do_accept = true 執(zhí)行這步
            //恢復(fù)當(dāng)前綁定端口的文件描述符的事件
            update_event(next, EV_READ | EV_PERSIST);
            //恢復(fù)當(dāng)前socket連接隊(duì)列最大限額 settings.backlog
            if (listen(next->sfd, settings.backlog) != 0) {
                perror("listen");
            }
        }
        else {
            //do_accept = false 執(zhí)行這步
            //先暫時(shí)關(guān)閉當(dāng)前綁定端口的文件描述符的事件
            update_event(next, 0);
            //把當(dāng)前socket連接隊(duì)列最大限額置0,就是代表不再進(jìn)行TCP三次握手.
            if (listen(next->sfd, 0) != 0) {
                perror("listen");
            }
        }
    }

    if (do_accept) {
        struct timeval maxconns_exited;
        uint64_t elapsed_us;
        gettimeofday(&maxconns_exited,NULL);
        //統(tǒng)計(jì)信息
        STATS_LOCK();
        elapsed_us =
            (maxconns_exited.tv_sec - stats.maxconns_entered.tv_sec) * 1000000
            + (maxconns_exited.tv_usec - stats.maxconns_entered.tv_usec);
        stats.time_in_listen_disabled_us += elapsed_us;
        stats.accepting_conns = true;
        STATS_UNLOCK();
    } else {
        //統(tǒng)計(jì)信息
        STATS_LOCK();
        stats.accepting_conns = false;
        gettimeofday(&stats.maxconns_entered,NULL);
        stats.listen_disabled_num++;
        STATS_UNLOCK();
        //allow_new_conns = false 代表當(dāng)前不能進(jìn)行新連接創(chuàng)建
        allow_new_conns = false;
        //執(zhí)行
        maxconns_handler(-42, 0, 0);
    }
}

maxconns_handler 函數(shù)

static void maxconns_handler(const int fd, const short which, void *arg) {
    struct timeval t = {.tv_sec = 0, .tv_usec = 10000};

    if (fd == -42 || allow_new_conns == false) {
        /* reschedule in 10ms if we need to keep polling */
        //這里可以看到設(shè)置了一個(gè)定時(shí)器,每 10ms 回調(diào)一次當(dāng)前函數(shù)
        //直到 allow_new_conns = true 為止,因?yàn)榈扔?true 就代表
        //已經(jīng)有空閑文件描述符了,可以重新建立連接。
        evtimer_set(&maxconnsevent, maxconns_handler, 0);
        event_base_set(main_base, &maxconnsevent);
        evtimer_add(&maxconnsevent, &t);
    } else {
        //刪除定時(shí)器
        evtimer_del(&maxconnsevent);
        //重新調(diào)用 accept_new_conns 這次參數(shù) do_accept = true
        accept_new_conns(true);
    }
}

當(dāng) memcache 處理完一個(gè)連接,關(guān)閉的時(shí)候會(huì) allow_new_conns = true

memcache 關(guān)閉連接函數(shù) conn_close

static void conn_close(conn *c) {
    assert(c != NULL);

    /* delete the event, the socket and the conn */
    event_del(&c->event);

    if (settings.verbose > 1)
        fprintf(stderr, "<%d connection closed.\n", c->sfd);
    
    //釋放當(dāng)前連接申請的一些資源數(shù)據(jù)
    conn_cleanup(c);

    MEMCACHED_CONN_RELEASE(c->sfd);
    conn_set_state(c, conn_closed);
    //關(guān)閉連接
    close(c->sfd);
    
    //添加一個(gè)conn_lock互斥鎖,防止跟上面accept_new_conns函數(shù)
    //同時(shí)操作 allow_new_conns 變量而沖突
    pthread_mutex_lock(&conn_lock);
    //這里每次關(guān)閉一個(gè)文件描述符之后都會(huì)allow_new_conns = true 
    allow_new_conns = true;
    pthread_mutex_unlock(&conn_lock);
        
    //為什么不能同時(shí)操作 allow_new_conns 變量?
    //因?yàn)樯厦婧瘮?shù)會(huì)把這個(gè)變量置為false,然后根據(jù)false添加定時(shí)器,讓服務(wù)器緩沖休息一下(等待更多的連接釋放)
    //但如果不加鎖,可能會(huì)導(dǎo)致上面函數(shù)把這個(gè)變量置為false的同時(shí)這里把這個(gè)變量置為true了
    //所以就導(dǎo)致上面函數(shù)判斷這個(gè)變量等于false失敗,所以也不會(huì)添加定時(shí)器了而且馬上恢復(fù)服務(wù)器
    //連接功能.
    
    STATS_LOCK();
    stats.curr_conns--;
    STATS_UNLOCK();

    return;
}

結(jié)束

以上介紹的大致就是 memcache 內(nèi)部如何實(shí)現(xiàn) <b>單進(jìn)程多線程</b> 處理網(wǎng)絡(luò)請求的具體實(shí)現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容