今天我們來了解一下 Redis 命令執(zhí)行的過程。在之前的文章中《當(dāng) Redis 發(fā)生高延遲時(shí),到底發(fā)生了什么》我們曾簡單的描述了一條命令的執(zhí)行過程,本篇文章展示深入說明一下,加深讀者對 Redis 的了解。
如下圖所示,一條命令執(zhí)行完成并且返回?cái)?shù)據(jù)一共涉及三部分,第一步是建立連接階段,響應(yīng)了socket的建立,并且創(chuàng)建了client對象;第二步是處理階段,從socket讀取數(shù)據(jù)到輸入緩沖區(qū),然后解析并獲得命令,執(zhí)行命令并將返回值存儲到輸出緩沖區(qū)中;第三步是數(shù)據(jù)返回階段,將返回值從輸出緩沖區(qū)寫到socket中,返回給客戶端,最后關(guān)閉client。

這三個階段之間是通過事件機(jī)制串聯(lián)了,在 Redis 啟動階段首先要注冊socket連接建立事件處理器:
- 當(dāng)客戶端發(fā)來建立socket的連接的請求時(shí),對應(yīng)的處理器方法會被執(zhí)行,建立連接階段的相關(guān)處理就會進(jìn)行,然后注冊socket讀取事件處理器
- 當(dāng)客戶端發(fā)來命令時(shí),讀取事件處理器方法會被執(zhí)行,對應(yīng)處理階段的相關(guān)邏輯都會被執(zhí)行,然后注冊socket寫事件處理器
- 當(dāng)寫事件處理器被執(zhí)行時(shí),就是將返回值寫回到socket中。

接下來,我們分別來看一下各個步驟的具體原理和代碼實(shí)現(xiàn)。
啟動時(shí)監(jiān)聽socket
Redis 服務(wù)器啟動時(shí),會調(diào)用 initServer 方法,首先會建立 Redis 自己的事件機(jī)制 eventLoop,然后在其上注冊周期時(shí)間事件處理器,最后在所監(jiān)聽的 socket 上
創(chuàng)建文件事件處理器,監(jiān)聽 socket 建立連接的事件,其處理函數(shù)為 acceptTcpHandler。
void initServer(void) { // server.c
....
/**
* 創(chuàng)建eventLoop
*/
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/**
* 注冊周期時(shí)間事件,處理后臺操作,比如說客戶端操作、過期鍵等
*/
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/**
* 為所有監(jiān)聽的socket創(chuàng)建文件事件,監(jiān)聽可讀事件;事件處理函數(shù)為acceptTcpHandler
*
*/
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
....
}
在《Redis 事件機(jī)制詳解》一文中,我們曾詳細(xì)介紹過 Redis 的事件機(jī)制,可以說,Redis 命令執(zhí)行過程中都是由事件機(jī)制協(xié)調(diào)管理的,也就是 initServer 方法中生成的 aeEventLoop。當(dāng)socket發(fā)生對應(yīng)的事件時(shí),aeEventLoop 對調(diào)用已經(jīng)注冊的對應(yīng)的事件處理器。

建立連接和Client
當(dāng)客戶端向 Redis 建立 socket時(shí),aeEventLoop 會調(diào)用 acceptTcpHandler 處理函數(shù),服務(wù)器會為每個鏈接創(chuàng)建一個 Client 對象,并創(chuàng)建相應(yīng)文件事件來監(jiān)聽socket的可讀事件,并指定事件處理函數(shù)。
acceptTcpHandler 函數(shù)會首先調(diào)用 anetTcpAccept方法,它底層會調(diào)用 socket 的 accept 方法,也就是接受客戶端來的建立連接請求,然后調(diào)用 acceptCommonHandler方法,繼續(xù)后續(xù)的邏輯處理。
// 當(dāng)客戶端建立鏈接時(shí)進(jìn)行的eventloop處理函數(shù) networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
....
// 層層調(diào)用,最后在anet.c 中 anetGenericAccept 方法中調(diào)用 socket 的 accept 方法
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
/**
* 進(jìn)行socket 建立連接后的處理
*/
acceptCommonHandler(cfd,0,cip);
}
acceptCommonHandler 則首先調(diào)用 createClient 創(chuàng)建 client,接著判斷當(dāng)前 client 的數(shù)量是否超出了配置的 maxclients,如果超過,則給客戶端發(fā)送錯誤信息,并且釋放 client。
static void acceptCommonHandler(int fd, int flags, char *ip) { //networking.c
client *c;
// 創(chuàng)建redisClient
c = createClient(fd)
// 當(dāng) maxClient 屬性被設(shè)置,并且client數(shù)量已經(jīng)超出時(shí),給client發(fā)送error,然后釋放連接
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
.... // 處理為設(shè)置密碼時(shí)默認(rèn)保護(hù)狀態(tài)的客戶端連接
// 統(tǒng)計(jì)連接數(shù)
server.stat_numconnections++;
c->flags |= flags;
}
createClient 方法用于創(chuàng)建 client,它代表著連接到 Redis 客戶端,每個客戶端都有各自的輸入緩沖區(qū)和輸出緩沖區(qū),輸入緩沖區(qū)存儲客戶端通過 socket 發(fā)送過來的數(shù)據(jù),輸出緩沖區(qū)則存儲著 Redis 對客戶端的響應(yīng)數(shù)據(jù)。client一共有三種類型,不同類型的對應(yīng)緩沖區(qū)的大小都不同。
- 普通客戶端是除了復(fù)制和訂閱的客戶端之外的所有連接
- 從客戶端用于主從復(fù)制,主節(jié)點(diǎn)會為每個從節(jié)點(diǎn)單獨(dú)建立一條連接用于命令復(fù)制
- 訂閱客戶端用于發(fā)布訂閱功能

createClient 方法除了創(chuàng)建 client 結(jié)構(gòu)體并設(shè)置其屬性值外,還會對 socket進(jìn)行配置并注冊讀事件處理器
設(shè)置 socket 為 非阻塞 socket、設(shè)置 NO_DELAY 和 SO_KEEPALIVE標(biāo)志位來關(guān)閉 Nagle 算法并且啟動 socket 存活檢查機(jī)制。
設(shè)置讀事件處理器,當(dāng)客戶端通過 socket 發(fā)送來數(shù)據(jù)后,Redis 會調(diào)用 readQueryFromClient 方法。
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
// fd 為 -1,表示其他特殊情況創(chuàng)建的client,redis在進(jìn)行比如lua腳本執(zhí)行之類的情況下也會創(chuàng)建client
if (fd != -1) {
// 配置socket為非阻塞、NO_DELAY不開啟Nagle算法和SO_KEEPALIVE
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
/**
* 向 eventLoop 中注冊了 readQueryFromClient。
* readQueryFromClient 的作用就是從client中讀取客戶端的查詢緩沖區(qū)內(nèi)容。
* 綁定讀事件到事件 loop (開始接收命令請求)
*/
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// 默認(rèn)選擇數(shù)據(jù)庫
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
c->fd = fd;
.... // 設(shè)置client的屬性
return c;
}
client 的屬性中有很多屬性,比如后邊會看到的輸入緩沖區(qū) querybuf 和輸出緩沖區(qū) buf,這里因?yàn)榇a過長做了省略,感興趣的同學(xué)可以自行閱讀源碼。
讀取socket數(shù)據(jù)到輸入緩沖區(qū)
readQueryFromClient 方法會調(diào)用 read 方法從 socket 中讀取數(shù)據(jù)到輸入緩沖區(qū)中,然后判斷其大小是否大于系統(tǒng)設(shè)置的 client_max_querybuf_len,如果大于,則向 Redis返回錯誤信息,并關(guān)閉 client。
將數(shù)據(jù)讀取到輸入緩沖區(qū)后,readQueryFromClient 方法會根據(jù) client 的類型來做不同的處理,如果是普通類型,則直接調(diào)用 processInputBuffer 來處理;如果是主從客戶端,還需要將命令同步到自己的從服務(wù)器中。也就是說,Redis實(shí)例將主實(shí)例傳來的命令執(zhí)行后,繼續(xù)將命令同步給自己的從實(shí)例。

// 處理從client中讀取客戶端的輸入緩沖區(qū)內(nèi)容。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
....
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 從 fd 對應(yīng)的socket中讀取到 client 中的 querybuf 輸入緩沖區(qū)
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
.... // 出錯釋放 client
} else if (nread == 0) {
// 客戶端主動關(guān)閉 connection
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/*
* 當(dāng)這個client代表主從的master節(jié)點(diǎn)時(shí),將query buffer和 pending_querybuf結(jié)合
* 用于主從復(fù)制中的命令傳播????
*/
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// 增加已經(jīng)讀取的字節(jié)數(shù)
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
// 如果大于系統(tǒng)配置的最大客戶端緩存區(qū)大小,也就是配置文件中的client-query-buffer-limit
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
// 返回錯誤信息,并且關(guān)閉client
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
if (!(c->flags & CLIENT_MASTER)) {
// processInputBuffer 處理輸入緩沖區(qū)
processInputBuffer(c);
} else {
// 如果client是master的連接
size_t prev_offset = c->reploff;
processInputBuffer(c);
// 判斷是否同步偏移量發(fā)生變化,則通知到后續(xù)的slave
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
解析獲取命令
processInputBuffer 主要是將輸入緩沖區(qū)中的數(shù)據(jù)解析成對應(yīng)的命令,根據(jù)命令類型是 PROTO_REQ_MULTIBULK 還是 PROTO_REQ_INLINE,來分別調(diào)用 processInlineBuffer 和 processMultibulkBuffer 方法來解析命令。
然后調(diào)用 processCommand 方法來執(zhí)行命令。執(zhí)行成功后,如果是主從客戶端,還需要更新同步偏移量 reploff 屬性,然后重置 client,讓client可以接收一條命令。
void processInputBuffer(client *c) { // networking.c
server.current_client = c;
/* 當(dāng)緩沖區(qū)中還有數(shù)據(jù)時(shí)就一直處理 */
while(sdslen(c->querybuf)) {
.... // 處理 client 的各種狀態(tài)
/* 判斷命令請求類型 telnet發(fā)送的命令和redis-cli發(fā)送的命令請求格式不同 */
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
/**
* 從緩沖區(qū)解析命令
*/
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* 參數(shù)個數(shù)為0時(shí)重置client,可以接受下一個命令 */
if (c->argc == 0) {
resetClient(c);
} else {
// 執(zhí)行命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
// 如果是master的client發(fā)來的命令,則 更新 reploff
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
// 如果不是阻塞狀態(tài),則重置client,可以接受下一個命令
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
}
}
server.current_client = NULL;
}
解析命令暫時(shí)不看,就是將 redis 命令文本信息,記錄到client的argv/argc屬性中
執(zhí)行命令
processCommand 方法會處理很多邏輯,不過大致可以分為三個部分:首先是調(diào)用 lookupCommand 方法獲得對應(yīng)的 redisCommand;接著是檢測當(dāng)前 Redis 是否可以執(zhí)行該命令;最后是調(diào)用 call 方法真正執(zhí)行命令。
processCommand會做如下邏輯處理:
- 1 如果命令名稱為 quit,則直接返回,并且設(shè)置客戶端標(biāo)志位。
- 2 根據(jù) argv[0] 查找對應(yīng)的 redisCommand,所有的命令都存儲在命令字典 redisCommandTable 中,根據(jù)命令名稱可以獲取對應(yīng)的命令。
- 3 進(jìn)行用戶權(quán)限校驗(yàn)。
- 4 如果是集群模式,處理集群重定向。當(dāng)命令發(fā)送者是 master 或者 命令沒有任何 key 的參數(shù)時(shí)可以不重定向。
- 5 預(yù)防 maxmemory 情況,先嘗試回收一下,如果不行,則返回異常。
- 6 當(dāng)此服務(wù)器是 master 時(shí):aof 持久化失敗時(shí),或上一次 bgsave 執(zhí)行錯誤,且配置 bgsave 參數(shù)和 stop_writes_on_bgsave_err;禁止執(zhí)行寫命令。
- 7 當(dāng)此服務(wù)器時(shí)master時(shí):如果配置了 repl_min_slaves_to_write,當(dāng)slave數(shù)目小于時(shí),禁止執(zhí)行寫命令。
- 8 當(dāng)時(shí)只讀slave時(shí),除了 master 的不接受其他寫命令。
- 9 當(dāng)客戶端正在訂閱頻道時(shí),只會執(zhí)行部分命令。
- 10 服務(wù)器為slave,但是沒有連接 master 時(shí),只會執(zhí)行帶有 CMD_STALE 標(biāo)志的命令,如 info 等
- 11 正在加載數(shù)據(jù)庫時(shí),只會執(zhí)行帶有 CMD_LOADING 標(biāo)志的命令,其余都會被拒絕。
- 12 當(dāng)服務(wù)器因?yàn)閳?zhí)行l(wèi)ua腳本阻塞時(shí),只會執(zhí)行部分命令,其余都會拒絕
- 13 如果是事務(wù)命令,則開啟事務(wù),命令進(jìn)入等待隊(duì)列;否則直接執(zhí)行命令。
int processCommand(client *c) {
// 1 處理 quit 命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/**
* 根據(jù) argv[0] 查找對應(yīng)的 command
* 2 命令字典查找指定命令;所有的命令都存儲在命令字典中 struct redisCommand redisCommandTable[]={}
*/
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
// 處理未知命令
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// 處理參數(shù)錯誤
}
// 3 檢查用戶驗(yàn)證
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return C_OK;
}
/**
* 4 如果是集群模式,處理集群重定向。當(dāng)命令發(fā)送者是master或者 命令沒有任何key的參數(shù)時(shí)可以不重定向
*/
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
// 查詢可以執(zhí)行的node信息
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
// 5 處理maxmemory請求,先嘗試回收一下,如果不行,則返回異常
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
....
}
/**
* 6 當(dāng)此服務(wù)器是master時(shí):aof持久化失敗時(shí),或上一次bgsave執(zhí)行錯誤,
* 且配置bgsave參數(shù)和stop_writes_on_bgsave_err;禁止執(zhí)行寫命令
*/
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand)) { .... }
/**
* 7 當(dāng)此服務(wù)器時(shí)master時(shí):如果配置了repl_min_slaves_to_write,
* 當(dāng)slave數(shù)目小于時(shí),禁止執(zhí)行寫命令
*/
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write) { .... }
/**
* 8 當(dāng)時(shí)只讀slave時(shí),除了master的不接受其他寫命令
*/
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE) { .... }
/**
* 9 當(dāng)客戶端正在訂閱頻道時(shí),只會執(zhí)行以下命令
*/
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) { .... }
/**
* 10 服務(wù)器為slave,但沒有正確連接master時(shí),只會執(zhí)行帶有CMD_STALE標(biāo)志的命令,如info等
*/
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE)) {...}
/**
* 11 正在加載數(shù)據(jù)庫時(shí),只會執(zhí)行帶有CMD_LOADING標(biāo)志的命令,其余都會被拒絕
*/
if (server.loading && !(c->cmd->flags & CMD_LOADING)) { .... }
/**
* 12 當(dāng)服務(wù)器因?yàn)閳?zhí)行l(wèi)ua腳本阻塞時(shí),只會執(zhí)行以下幾個命令,其余都會拒絕
*/
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) {....}
/**
* 13 開始執(zhí)行命令
*/
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
/**
* 開啟了事務(wù),命令只會入隊(duì)列
*/
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
/**
* 直接執(zhí)行命令
*/
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
.... // 所有的 redis 命令都有
}
call 方法是 Redis 中執(zhí)行命令的通用方法,它會處理通用的執(zhí)行命令的前置和后續(xù)操作。

- 如果有監(jiān)視器 monitor,則需要將命令發(fā)送給監(jiān)視器。
- 調(diào)用 redisCommand 的proc 方法,執(zhí)行對應(yīng)具體的命令邏輯。
- 如果開啟了 CMD_CALL_SLOWLOG,則需要記錄慢查詢?nèi)罩?/li>
- 如果開啟了 CMD_CALL_STATS,則需要記錄一些統(tǒng)計(jì)信息
- 如果開啟了 CMD_CALL_PROPAGATE,則當(dāng) dirty大于0時(shí),需要調(diào)用 propagate 方法來進(jìn)行命令傳播。

命令傳播就是將命令寫入 repl-backlog-buffer 緩沖中,并發(fā)送給各個從服務(wù)器中。
// 執(zhí)行client中持有的 redisCommand 命令
void call(client *c, int flags) {
/**
* dirty記錄數(shù)據(jù)庫修改次數(shù);start記錄命令開始執(zhí)行時(shí)間us;duration記錄命令執(zhí)行花費(fèi)時(shí)間
*/
long long dirty, start, duration;
int client_old_flags = c->flags;
/**
* 有監(jiān)視器的話,需要將不是從AOF獲取的命令會發(fā)送給監(jiān)視器。當(dāng)然,這里會消耗時(shí)間
*/
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
....
/* Call the command. */
dirty = server.dirty;
start = ustime();
// 處理命令,調(diào)用命令處理函數(shù)
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
.... // Lua 腳本的一些特殊處理
/**
* CMD_CALL_SLOWLOG 表示要記錄慢查詢?nèi)罩? */
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
/**
* CMD_CALL_STATS 表示要統(tǒng)計(jì)
*/
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}
/**
* CMD_CALL_PROPAGATE表示要進(jìn)行廣播命令
*/
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/**
* dirty大于0時(shí),需要廣播命令給slave和aof
*/
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
....
/**
* 廣播命令,寫如aof,發(fā)送命令到slave
* 也就是傳說中的傳播命令
*/
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
....
}
由于文章篇幅問題,本篇文章就先講到這里,后半部分在接下來的文章中進(jìn)行講解,歡迎大家繼續(xù)關(guān)注。
