Redis源碼分析之請求解析過程

本篇來看一下Redis的請求處理過程。

監(jiān)聽過程
void initServer(void) {
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL)
        exit(1);
    if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    if (server.tls_port != 0 && listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
        exit(1);
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
        exit(1);
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic("Unrecoverable error creating server.ipfd file event.");
            }
    }
    for (j = 0; j < server.tlsfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, acceptTLSHandler,NULL) == AE_ERR)
            {
                serverPanic("Unrecoverable error creating server.tlsfd file event.");
            }
    }
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
}

監(jiān)聽端口后得到文件描述符,調(diào)用aeCreateFileEvent將文件描述符注冊到事件循環(huán)中,注冊監(jiān)聽可讀事件。

看一下listenToPort:

int listenToPort(int port, int *fds, int *count) {
    int j;
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            int unsupported = 0;
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            } else if (errno == EAFNOSUPPORT) {
                unsupported++;
            }
            if (*count == 1 || unsupported) {
                fds[*count] = anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                }
            }
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) {
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
        } else {
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
                if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
                    continue;
            return C_ERR;
        }
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return C_OK;
}

調(diào)用anetTcpServer或anetTcp6Server監(jiān)聽對應(yīng)的地址,并將文件描述符設(shè)置為非阻塞的。

接受連接

在監(jiān)聽的socket對應(yīng)的fd可讀時(有新的連接請求),在事件循環(huán)中,會調(diào)用我們注冊的處理函數(shù)acceptTcpHandler(不考慮TLS)。

我們來看一下acceptTcpHandler的過程:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

調(diào)用anetTcpAccept接受連接請求拿到客戶端連接對應(yīng)的文件描述符,connCreateAcceptedSocket創(chuàng)建對應(yīng)的connection結(jié)構(gòu)體,然后將其作為參數(shù)調(diào)用acceptCommonHandler。參數(shù)fd是產(chǎn)生就緒事件的文件描述符,對我們來說,就是監(jiān)聽tcp地址得到的fd。

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    char conninfo[100];
    if (connGetState(conn) != CONN_STATE_ACCEPTING) {
        connClose(conn);
        return;
    }
    if ((c = createClient(conn)) == NULL) {
        connClose(conn);
        return;
    }
    c->flags |= flags;
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        char conninfo[100];
        freeClient(connGetPrivateData(conn));
        return;
    }
}

調(diào)用createClient創(chuàng)建對應(yīng)的client結(jié)構(gòu)體,然后調(diào)用connAccept。

來看一下createClient的主要邏輯:

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
    selectDb(c,0);
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (conn) linkClient(c);
    initClientMultiState(c);
    return c;
}
  1. 設(shè)置連接為非阻塞讀寫。
  2. 開啟TCP_NODELAY。
  3. 調(diào)用connSetReadHandler,讀數(shù)據(jù)回調(diào)方法為readQueryFromClient,connSetReadHandler最終會調(diào)用connSocketSetReadHandler方法。

看一下connSocketSetReadHandler的主要邏輯:

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;
    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else if (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}
  1. 設(shè)置讀數(shù)據(jù)的回調(diào)方法。
  2. 調(diào)用aeCreateFileEvent將客戶端連接對應(yīng)的文件描述符注冊到事件循環(huán)中,注冊監(jiān)聽可讀事件。

connAccept最終會調(diào)用connSocketAccept,我們來看一下它的邏輯:

static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
    int ret = C_OK;
    if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
    conn->state = CONN_STATE_CONNECTED;
    connIncrRefs(conn);
    if (!callHandler(conn, accept_handler)) ret = C_ERR;
    connDecrRefs(conn);
    return ret;
}

參數(shù)accept_handler就是前面調(diào)用connAccept傳遞的clientAcceptHandler。

客戶端命令處理

接受連接后,我們已經(jīng)將客戶端連接對應(yīng)的文件描述符注冊到了事件循環(huán)中,注冊的事件處理方法是connSocketEventHandler。在客戶端連接對應(yīng)的文件描述符有就緒事件時,事件循環(huán)會調(diào)用connSocketEventHandler方法。

來看一下connSocketEventHandler:

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
    connection *conn = clientData;
    if (conn->state == CONN_STATE_CONNECTING &&
            (mask & AE_WRITABLE) && conn->conn_handler) {
        int conn_error = connGetSocketError(conn);
        if (conn_error) {
            conn->last_errno = conn_error;
            conn->state = CONN_STATE_ERROR;
        } else {
            conn->state = CONN_STATE_CONNECTED;
        }
        if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
        if (!callHandler(conn, conn->conn_handler)) return;
        conn->conn_handler = NULL;
    }
    int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
    int call_write = (mask & AE_WRITABLE) && conn->write_handler;
    int call_read = (mask & AE_READABLE) && conn->read_handler;
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}

根據(jù)事件類型調(diào)用具體的處理函數(shù),客戶端發(fā)來命令時,讀事件產(chǎn)生,調(diào)用conn->read_handler,前面我們將其設(shè)置為了readQueryFromClient,也就是讀事件會調(diào)用readQueryFromClient。通常在可讀可寫時,我們先處理讀事件,再處理寫事件,但是在conn->flags設(shè)置了CONN_FLAG_WRITE_BARRIER時,先處理寫事件,再處理讀事件。

看一下readQueryFromClient的主要邏輯:

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    if (postponeClientRead(c)) return;
    readlen = PROTO_IOBUF_LEN;
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
    }
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        return;
    }
     processInputBuffer(c);
}
  1. postponeClientRead主要是判斷是否開啟了ThreadIO,如果開啟了且readQueryFromClient不是由beforeSleep或者processEventsWhileBlocked調(diào)用的,則設(shè)置client的CLIENT_PENDING_READ標志位,并將client放到server.clients_pending_read鏈表中,在下一次主循環(huán)時,會調(diào)用beforeSleep,beforeSleep會調(diào)用handleClientsWithPendingReadsUsingThreads用多個線程并行的從socket讀取數(shù)據(jù)并解析命令。

    int postponeClientRead(client *c) {
        if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
        {
            c->flags |= CLIENT_PENDING_READ;
            listAddNodeHead(server.clients_pending_read,c);
            return 1;
        } else {
            return 0;
        }
    }
    

    ProcessingEventsWhileBlocked標志在進入processEventsWhileBlocked方法時設(shè)置,在走出方法時清空。processEventsWhileBlocked在執(zhí)行長耗時的操作時會被調(diào)用,如加載aof日志或者rdb日志時,每加載一定數(shù)量時,調(diào)用一次processEventsWhileBlocked方法,lua腳本超時也會調(diào)用processEventsWhileBlocked。lua腳本超時會調(diào)用processEventsWhileBlocked的原因是,本質(zhì)上Redis處理命令是單線程的,如果lua腳本一直占用CPU,會導致無法處理客戶端的其它命令,包括來自客戶端的SCRIPT KILL或者SHUTDOWN NOSAVE命令。

  2. 若是從beforeSleep第二次進入方法,或者未開啟ThreadIO,或者從processEventsWhileBlocked進入方法時,也就是postponeClientRead返回0時,繼續(xù)執(zhí)行下面的流程,否則直接返回。

  3. 以set a aaa請求為例,對應(yīng)的RESP(REdis Serialization Protocol)協(xié)議表示為*3\r\n$3\r\nset\r\n$1\r\na\r\n$3\r\naaa\r\n,對應(yīng)的INLINE請求表示為set a aaa\r\n。PROTO_REQ_MULTIBULK表明這是一個符合RESP協(xié)議的請求,PROTO_REQ_INLINE表示這是一個符合INLINE協(xié)議的請求。如果是一個RESP請求,且正在讀一個很大的參數(shù),則增大讀取的長度以便提前分配內(nèi)存。

  4. 讀取請求放到client的querybuf里。

  5. 調(diào)用processInputBuffer嘗試處理整個命令,processInputBuffer在命令遇到?jīng)]有完整讀取時,會直接返回,等待更多數(shù)據(jù)的到來。

接著看一下processInputBuffer的主要邏輯:

void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        if (c->flags & CLIENT_BLOCKED) break;
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
            if (server.gopher_enabled && !server.io_threads_do_reads &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }
        if (c->argc == 0) {
            resetClient(c);
        } else {
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}
  1. 首先判斷是否有客戶端執(zhí)行了CLIENT PAUSE且還沒有到達指定時間。如果有,暫停接收所有客戶端(除了來自slave的連接)的數(shù)據(jù)。
  2. 判斷客戶端是否被block了,一些命令如BLPOP在沒有數(shù)據(jù)時會阻塞當前客戶端知道超時或者有數(shù)據(jù)時。如果當前客戶端被阻塞了,暫停接收該客戶端的數(shù)據(jù)。
  3. CLIENT_PENDING_COMMAND會在開啟了ThreadIO的情況下出現(xiàn),如果該標志位被置位,說明該客戶端已經(jīng)有一個未處理的命令,不再繼續(xù)解析下一個命令。
  4. 如果當前調(diào)用是因為lua腳本超時導致processEventsWhileBlocked調(diào)用的,且請求是由master發(fā)來的(數(shù)據(jù)同步請求),則不處理,避免出現(xiàn)錯誤數(shù)據(jù)。
  5. 如果當前客戶端被設(shè)置了CLIENT_CLOSE_AFTER_REPLY或CLIENT_CLOSE_ASAP標志位,此時已不需要從客戶端讀取數(shù)據(jù),該連接會在稍后被斷開。
  6. 判斷請求的格式是RESP格式還是INLINE格式。
  7. 如果是INLINE請求,按照INLINE格式解析請求,如果無法讀取完整的命令則直接返回,等待完整的數(shù)據(jù)到來后再次解析。如果這個INLINE請求是一個Gopher請求,則提前處理它,Gopher請求是一類特殊的請求,以/開頭或者為空,如/foo\r\n代表獲取鍵為/foo的值,\r\n代表獲取鍵為/的值。如果不是Gopher請求則繼續(xù)執(zhí)行,執(zhí)行解析出的命令。
  8. 如果是RESP請求,則按照RESP協(xié)議格式解析,如果無法讀取完整的命令則直接返回,等待完整的數(shù)據(jù)到來后再次解析。
  9. 調(diào)用processCommandAndResetClient(c)執(zhí)行解析出來的命令。

下一篇文章我們分析解析出的命令的執(zhí)行過程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 分析從客戶端發(fā)送命令,到服務(wù)端執(zhí)行命令、返回執(zhí)行結(jié)果經(jīng)歷的整個過程。 建立連接 無論是redis-cli還是Jed...
    yingzong閱讀 3,727評論 3 4
  • 網(wǎng)上分析Redis源碼的文章挺多,如黃健宏的《Redis設(shè)計與實現(xiàn)》就很詳盡的分析了redis源碼,很贊。前不久看...
    達微閱讀 246評論 0 0
  • 單機Redis INCR命令完成執(zhí)行流程 客戶端發(fā)起一個TCP連接到Redis的6379端口,Redis中的AE事...
    一劍光寒十九洲閱讀 1,224評論 0 0
  • 今天我們來了解一下 Redis 命令執(zhí)行的過程。在之前的文章中《當 Redis 發(fā)生高延遲時,到底發(fā)生了什么》我們...
    程序員歷小冰閱讀 456評論 0 0
  • 網(wǎng)上分析Redis源碼的文章挺多,如黃健宏的《Redis設(shè)計與實現(xiàn)》就很詳盡的分析了redis源碼,很贊。前不久看...
    __七把刀__閱讀 4,205評論 0 11

友情鏈接更多精彩內(nèi)容