本篇來看一下Redis的請求處理過程。
監(jiān)聽過程
void initServer(void) {
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL)
exit(1);
if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
if (server.tls_port != 0 && listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
exit(1);
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
exit(1);
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
for (j = 0; j < server.tlsfd_count; j++) {
if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, acceptTLSHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.tlsfd file event.");
}
}
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
}
監(jiān)聽端口后得到文件描述符,調(diào)用aeCreateFileEvent將文件描述符注冊到事件循環(huán)中,注冊監(jiān)聽可讀事件。
看一下listenToPort:
int listenToPort(int port, int *fds, int *count) {
int j;
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
if (server.bindaddr[j] == NULL) {
int unsupported = 0;
fds[*count] = anetTcp6Server(server.neterr,port,NULL,server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
}
if (*count == 1 || unsupported) {
fds[*count] = anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
}
}
if (*count + unsupported == 2) break;
} else if (strchr(server.bindaddr[j],':')) {
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
} else {
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
}
if (fds[*count] == ANET_ERR) {
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
continue;
return C_ERR;
}
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return C_OK;
}
調(diào)用anetTcpServer或anetTcp6Server監(jiān)聽對應(yīng)的地址,并將文件描述符設(shè)置為非阻塞的。
接受連接
在監(jiān)聽的socket對應(yīng)的fd可讀時(有新的連接請求),在事件循環(huán)中,會調(diào)用我們注冊的處理函數(shù)acceptTcpHandler(不考慮TLS)。
我們來看一下acceptTcpHandler的過程:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
調(diào)用anetTcpAccept接受連接請求拿到客戶端連接對應(yīng)的文件描述符,connCreateAcceptedSocket創(chuàng)建對應(yīng)的connection結(jié)構(gòu)體,然后將其作為參數(shù)調(diào)用acceptCommonHandler。參數(shù)fd是產(chǎn)生就緒事件的文件描述符,對我們來說,就是監(jiān)聽tcp地址得到的fd。
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
char conninfo[100];
if (connGetState(conn) != CONN_STATE_ACCEPTING) {
connClose(conn);
return;
}
if ((c = createClient(conn)) == NULL) {
connClose(conn);
return;
}
c->flags |= flags;
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
freeClient(connGetPrivateData(conn));
return;
}
}
調(diào)用createClient創(chuàng)建對應(yīng)的client結(jié)構(gòu)體,然后調(diào)用connAccept。
來看一下createClient的主要邏輯:
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
selectDb(c,0);
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (conn) linkClient(c);
initClientMultiState(c);
return c;
}
- 設(shè)置連接為非阻塞讀寫。
- 開啟TCP_NODELAY。
- 調(diào)用connSetReadHandler,讀數(shù)據(jù)回調(diào)方法為readQueryFromClient,connSetReadHandler最終會調(diào)用connSocketSetReadHandler方法。
看一下connSocketSetReadHandler的主要邏輯:
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else if (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
- 設(shè)置讀數(shù)據(jù)的回調(diào)方法。
- 調(diào)用aeCreateFileEvent將客戶端連接對應(yīng)的文件描述符注冊到事件循環(huán)中,注冊監(jiān)聽可讀事件。
connAccept最終會調(diào)用connSocketAccept,我們來看一下它的邏輯:
static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
int ret = C_OK;
if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
conn->state = CONN_STATE_CONNECTED;
connIncrRefs(conn);
if (!callHandler(conn, accept_handler)) ret = C_ERR;
connDecrRefs(conn);
return ret;
}
參數(shù)accept_handler就是前面調(diào)用connAccept傳遞的clientAcceptHandler。
客戶端命令處理
接受連接后,我們已經(jīng)將客戶端連接對應(yīng)的文件描述符注冊到了事件循環(huán)中,注冊的事件處理方法是connSocketEventHandler。在客戶端連接對應(yīng)的文件描述符有就緒事件時,事件循環(huán)會調(diào)用connSocketEventHandler方法。
來看一下connSocketEventHandler:
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
connection *conn = clientData;
if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {
int conn_error = connGetSocketError(conn);
if (conn_error) {
conn->last_errno = conn_error;
conn->state = CONN_STATE_ERROR;
} else {
conn->state = CONN_STATE_CONNECTED;
}
if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
if (!callHandler(conn, conn->conn_handler)) return;
conn->conn_handler = NULL;
}
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}
根據(jù)事件類型調(diào)用具體的處理函數(shù),客戶端發(fā)來命令時,讀事件產(chǎn)生,調(diào)用conn->read_handler,前面我們將其設(shè)置為了readQueryFromClient,也就是讀事件會調(diào)用readQueryFromClient。通常在可讀可寫時,我們先處理讀事件,再處理寫事件,但是在conn->flags設(shè)置了CONN_FLAG_WRITE_BARRIER時,先處理寫事件,再處理讀事件。
看一下readQueryFromClient的主要邏輯:
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
if (postponeClientRead(c)) return;
readlen = PROTO_IOBUF_LEN;
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
freeClientAsync(c);
return;
}
} else if (nread == 0) {
freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
return;
}
processInputBuffer(c);
}
-
postponeClientRead主要是判斷是否開啟了ThreadIO,如果開啟了且readQueryFromClient不是由beforeSleep或者processEventsWhileBlocked調(diào)用的,則設(shè)置client的CLIENT_PENDING_READ標志位,并將client放到server.clients_pending_read鏈表中,在下一次主循環(huán)時,會調(diào)用beforeSleep,beforeSleep會調(diào)用handleClientsWithPendingReadsUsingThreads用多個線程并行的從socket讀取數(shù)據(jù)并解析命令。
int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } }ProcessingEventsWhileBlocked標志在進入processEventsWhileBlocked方法時設(shè)置,在走出方法時清空。processEventsWhileBlocked在執(zhí)行長耗時的操作時會被調(diào)用,如加載aof日志或者rdb日志時,每加載一定數(shù)量時,調(diào)用一次processEventsWhileBlocked方法,lua腳本超時也會調(diào)用processEventsWhileBlocked。lua腳本超時會調(diào)用processEventsWhileBlocked的原因是,本質(zhì)上Redis處理命令是單線程的,如果lua腳本一直占用CPU,會導致無法處理客戶端的其它命令,包括來自客戶端的SCRIPT KILL或者SHUTDOWN NOSAVE命令。
若是從beforeSleep第二次進入方法,或者未開啟ThreadIO,或者從processEventsWhileBlocked進入方法時,也就是postponeClientRead返回0時,繼續(xù)執(zhí)行下面的流程,否則直接返回。
以set a aaa請求為例,對應(yīng)的RESP(REdis Serialization Protocol)協(xié)議表示為*3\r\n$3\r\nset\r\n$1\r\na\r\n$3\r\naaa\r\n,對應(yīng)的INLINE請求表示為set a aaa\r\n。PROTO_REQ_MULTIBULK表明這是一個符合RESP協(xié)議的請求,PROTO_REQ_INLINE表示這是一個符合INLINE協(xié)議的請求。如果是一個RESP請求,且正在讀一個很大的參數(shù),則增大讀取的長度以便提前分配內(nèi)存。
讀取請求放到client的querybuf里。
調(diào)用processInputBuffer嘗試處理整個命令,processInputBuffer在命令遇到?jīng)]有完整讀取時,會直接返回,等待更多數(shù)據(jù)的到來。
接著看一下processInputBuffer的主要邏輯:
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (c->flags & CLIENT_PENDING_COMMAND) break;
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
- 首先判斷是否有客戶端執(zhí)行了CLIENT PAUSE且還沒有到達指定時間。如果有,暫停接收所有客戶端(除了來自slave的連接)的數(shù)據(jù)。
- 判斷客戶端是否被block了,一些命令如BLPOP在沒有數(shù)據(jù)時會阻塞當前客戶端知道超時或者有數(shù)據(jù)時。如果當前客戶端被阻塞了,暫停接收該客戶端的數(shù)據(jù)。
- CLIENT_PENDING_COMMAND會在開啟了ThreadIO的情況下出現(xiàn),如果該標志位被置位,說明該客戶端已經(jīng)有一個未處理的命令,不再繼續(xù)解析下一個命令。
- 如果當前調(diào)用是因為lua腳本超時導致processEventsWhileBlocked調(diào)用的,且請求是由master發(fā)來的(數(shù)據(jù)同步請求),則不處理,避免出現(xiàn)錯誤數(shù)據(jù)。
- 如果當前客戶端被設(shè)置了CLIENT_CLOSE_AFTER_REPLY或CLIENT_CLOSE_ASAP標志位,此時已不需要從客戶端讀取數(shù)據(jù),該連接會在稍后被斷開。
- 判斷請求的格式是RESP格式還是INLINE格式。
- 如果是INLINE請求,按照INLINE格式解析請求,如果無法讀取完整的命令則直接返回,等待完整的數(shù)據(jù)到來后再次解析。如果這個INLINE請求是一個Gopher請求,則提前處理它,Gopher請求是一類特殊的請求,以/開頭或者為空,如/foo\r\n代表獲取鍵為/foo的值,\r\n代表獲取鍵為/的值。如果不是Gopher請求則繼續(xù)執(zhí)行,執(zhí)行解析出的命令。
- 如果是RESP請求,則按照RESP協(xié)議格式解析,如果無法讀取完整的命令則直接返回,等待完整的數(shù)據(jù)到來后再次解析。
- 調(diào)用processCommandAndResetClient(c)執(zhí)行解析出來的命令。
下一篇文章我們分析解析出的命令的執(zhí)行過程。