Redis 源碼簡(jiǎn)潔剖析 16 - 客戶端

整體概述

Redis 一個(gè)服務(wù)器可以和多個(gè)客戶端建立網(wǎng)絡(luò)連接,每個(gè)客戶端都可以向服務(wù)器發(fā)送命令請(qǐng)求,服務(wù)器接收客戶端的命令,處理后將結(jié)果返回給客戶端。

Redis 的文件事件處理器使用 I/O 多路復(fù)用,Redis 使用單線程單進(jìn)程處理命令請(qǐng)求,與多個(gè)客戶端進(jìn)行網(wǎng)絡(luò)通信。

image

每個(gè)連接了 Redis 服務(wù)器的客戶端,服務(wù)器都建立了一個(gè) redisClient 結(jié)構(gòu)的客戶端狀態(tài),保存了客戶端當(dāng)前的狀態(tài)信息,以及執(zhí)行相關(guān)功能時(shí)用到的數(shù)據(jù)結(jié)構(gòu)。

Redis 服務(wù)器狀態(tài)結(jié)構(gòu)的 clients 屬性是一個(gè)鏈表,保存了所有與服務(wù)器連接的客戶端狀態(tài)。

struct redisServer {
    ……
    // 保存了所有客戶端狀態(tài)的鏈表
    list *clients;
    ……
};
image

客戶端屬性

先貼一下 client 完整的數(shù)據(jù)結(jié)構(gòu):

typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    connection *conn;
    int resp;               /* RESP protocol version. Can be 2 or 3. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    sds pending_querybuf;   /* If this client is flagged as master, this buffer
                               represents the yet not applied portion of the
                               replication stream that we are receiving from
                               the master. */
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    int original_argc;      /* Num of arguments of original command if arguments were rewritten. */
    robj **original_argv;   /* Arguments of original command if arguments were rewritten. */
    size_t argv_len_sum;    /* Sum of lengths of objects in argv list. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    user *user;             /* User associated with this connection. If the
                               user is set to NULL the connection can do
                               anything (admin). */
    int reqtype;            /* Request protocol type: PROTO_REQ_* */
    int multibulklen;       /* Number of multi bulk arguments left to read. */
    long bulklen;           /* Length of bulk argument in multi bulk request. */
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    size_t sentlen;         /* Amount of bytes already sent in the current
                               buffer or object being sent. */
    time_t ctime;           /* Client creation time. */
    long duration;          /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
    time_t lastinteraction; /* Time of the last interaction, used for timeout */
    time_t obuf_soft_limit_reached_time;
    uint64_t flags;         /* Client flags: CLIENT_* macros. */
    int authenticated;      /* Needed when the default user requires auth. */
    int replstate;          /* Replication state if this is a slave. */
    int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
    int repldbfd;           /* Replication DB file descriptor. */
    off_t repldboff;        /* Replication DB file offset. */
    off_t repldbsize;       /* Replication DB file size. */
    sds replpreamble;       /* Replication DB preamble. */
    long long read_reploff; /* Read replication offset if this is a master. */
    long long reploff;      /* Applied replication offset if this is a master. */
    long long repl_ack_off; /* Replication ack offset, if this is a slave. */
    long long repl_ack_time;/* Replication ack time, if this is a slave. */
    long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica  */
    long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                       copying this slave output buffer
                                       should use. */
    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
    int slave_listening_port; /* As configured with: REPLCONF listening-port */
    char *slave_addr;       /* Optionally given by REPLCONF ip-address */
    int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
    multiState mstate;      /* MULTI/EXEC state */
    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
    blockingState bpop;     /* blocking state */
    long long woff;         /* Last write global replication offset. */
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    sds peerid;             /* Cached peer ID. */
    sds sockname;           /* Cached connection target address. */
    listNode *client_list_node; /* list node in client list */
    listNode *paused_list_node; /* list node within the pause list */
    RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
                                               * when the authenticated user
                                               * changes. */
    void *auth_callback_privdata; /* Private data that is passed when the auth
                                   * changed callback is executed. Opaque for
                                   * Redis Core. */
    void *auth_module;      /* The module that owns the callback, which is used
                             * to disconnect the client if the module is
                             * unloaded for cleanup. Opaque for Redis Core.*/

    /* If this client is in tracking mode and this field is non zero,
     * invalidation messages for keys fetched by this client will be send to
     * the specified client ID. */
    uint64_t client_tracking_redirection;
    rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
                                      subscribed to in BCAST mode, in the
                                      context of client side caching. */
    /* In clientsCronTrackClientsMemUsage() we track the memory usage of
     * each client and add it to the sum of all the clients of a given type,
     * however we need to remember what was the old contribution of each
     * client, and in which categoty the client was, in order to remove it
     * before adding it the new value. */
    uint64_t client_cron_last_memory_usage;
    int      client_cron_last_memory_type;
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

套接字描述符

typedef struct client {
    ……
    // 記錄客戶端正在使用的套接字描述符
    int fd;
    ……
}
image

標(biāo)志

客戶端的標(biāo)志屬性 flags 記錄了客戶端的角色(role),以及客戶端目前所處的狀態(tài):

typedef struct redisClient {
    // ...
    int flags;
    // ...
} redisClient;

具體值可參考:《Redis 設(shè)計(jì)與實(shí)現(xiàn)-客戶端屬性》,flag 例子:

# 客戶端是一個(gè)主服務(wù)器
REDIS_MASTER

# 客戶端正在被列表命令阻塞
REDIS_BLOCKED

# 客戶端正在執(zhí)行事務(wù),但事務(wù)的安全性已被破壞
REDIS_MULTI | REDIS_DIRTY_CAS

# 客戶端是一個(gè)從服務(wù)器,并且版本低于 Redis 2.8
REDIS_SLAVE | REDIS_PRE_PSYNC

# 這是專門用于執(zhí)行 Lua 腳本包含的 Redis 命令的偽客戶端
# 它強(qiáng)制服務(wù)器將當(dāng)前執(zhí)行的命令寫入 AOF 文件,并復(fù)制給從服務(wù)器
REDIS_LUA_CLIENT | REDIS_FORCE_AOF | REDIS_FORCE_REPL

輸入緩沖區(qū)

客戶端狀態(tài)的輸入緩沖區(qū)用于保存客戶端發(fā)送的命令請(qǐng)求

typedef struct redisClient {
    // ...
    sds querybuf;
    // ...
} redisClient;

如果客戶端向服務(wù)器發(fā)送了以下命令請(qǐng)求:

SET key value

客戶端狀態(tài)的 querybuf 屬性將是一個(gè)包含以下內(nèi)容的 SDS 值:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

展示了這個(gè) SDS 值以及 querybuf 屬性的樣子:

image

命名及命令參數(shù)

在服務(wù)器將客戶端發(fā)送的命令請(qǐng)求保存到客戶端狀態(tài)的 querybuf 后,服務(wù)器會(huì)分析該命令,將得到的命令參數(shù)、命令參數(shù)的個(gè)數(shù)分別保存到客戶端狀態(tài)的 argv 屬性和 argc 屬性中:

typedef struct redisClient {
    // ...
    robj **argv;
    int argc;
    // ...
} redisClient;
image

命令的實(shí)現(xiàn)函數(shù)

當(dāng)服務(wù)器從協(xié)議內(nèi)容中分析并得出 argv 屬性和 argc 屬性的值之后, 服務(wù)器將根據(jù)項(xiàng) argv[0] 的值,在命令表中查找命令所對(duì)應(yīng)的命令實(shí)現(xiàn)函數(shù)。

當(dāng)程序在命令表中成功找到 argv[0] 所對(duì)應(yīng)的 redisCommand 結(jié)構(gòu)時(shí), 它會(huì)將客戶端狀態(tài)的 cmd 指針指向這個(gè)結(jié)構(gòu):

typedef struct redisClient {
    // ...
    struct redisCommand *cmd;
    // ...
} redisClient;
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;   /* Flags as string representation, one char per flag. */
    uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls, rejected_calls, failed_calls;
    int id;     /* Command ID. This is a progressive ID starting from 0 that
                   is assigned at runtime, and is used in order to check
                   ACLs. A connection is able to execute a given command if
                   the user associated to the connection has this command
                   bit set in the bitmap of allowed commands. */
};

每個(gè)命令所對(duì)應(yīng)的處理函數(shù)在是下面的 table:

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},

    {"getex",getexCommand,-2,
     "write fast @string",
     0,NULL,1,1,1,0,0,0},

     ……
}

輸出緩沖區(qū)

保存執(zhí)行命令所得的命令回復(fù)。

image

客戶端的固定大小緩沖區(qū)bufbufpos 兩個(gè)屬性組成:

typedef struct redisClient {
    // ...
    char buf[REDIS_REPLY_CHUNK_BYTES];
    // 記錄了 buf 數(shù)組目前已使用的字節(jié)數(shù)量
    int bufpos;
    // ...
} redisClient;

可變大小緩沖區(qū)reply 鏈表和一個(gè)或多個(gè)字符串對(duì)象組成:

typedef struct redisClient {
    // ...

    list *reply;
    // ...
} redisClient;

通過(guò)使用鏈表來(lái)連接多個(gè)字符串對(duì)象, 服務(wù)器可以為客戶端保存一個(gè)非常長(zhǎng)的命令回復(fù), 而不必受到固定大小緩沖區(qū) 16 KB 大小的限制。展示了一個(gè)包含三個(gè)字符串對(duì)象的 reply 鏈表。

image

客戶端的創(chuàng)建與關(guān)閉

image

創(chuàng)建普通客戶端

使用 connect 函數(shù)連接到服務(wù)器,服務(wù)器調(diào)用連接事件處理器,為客戶端創(chuàng)建對(duì)應(yīng)的客戶端狀態(tài),并將其添加到服務(wù)器狀態(tài)結(jié)構(gòu) clients 鏈表的末尾。

image

關(guān)閉普通客戶端

關(guān)閉普通客戶端的原因:

  • 客戶端進(jìn)程退出或被殺死,客戶端與服務(wù)端的網(wǎng)絡(luò)連接被關(guān)閉
  • 客戶端向服務(wù)端發(fā)送了不符合協(xié)議格式的命令請(qǐng)求
  • 客戶端成為了 CLIENT KILL 命令的目標(biāo)
  • 客戶端的空轉(zhuǎn)時(shí)間超過(guò) timeout 配置選項(xiàng)的值
  • 客戶端發(fā)送的命令請(qǐng)求大小,超過(guò)了深入緩沖區(qū)的限制大?。J(rèn)為 1GB)
  • 服務(wù)端返回給客戶端的數(shù)據(jù)超過(guò)了輸出緩沖區(qū)的限制大小

參考鏈接

Redis 源碼簡(jiǎn)潔剖析系列

最簡(jiǎn)潔的 Redis 源碼剖析系列文章

Java 編程思想-最全思維導(dǎo)圖-GitHub 下載鏈接,需要的小伙伴可以自取~

原創(chuàng)不易,希望大家轉(zhuǎn)載時(shí)請(qǐng)先聯(lián)系我,并標(biāo)注原文鏈接。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容