[redis 源碼走讀] sentinel 哨兵 - 節(jié)點發(fā)現(xiàn)流程

承接上一章 《[redis 源碼走讀] sentinel 哨兵 - 原理》,本章通過 strace 命令從底層抓取 sentinel 工作日志,熟悉節(jié)點通信流程,閱讀相關(guān)源碼。

?? 文章來源 《[redis 源碼走讀] sentinel 哨兵 - 節(jié)點發(fā)現(xiàn)流程》


1. 工作流程

1.1. 命令

  • 下面兩個命令都可以啟動 sentinel 進(jìn)程。
redis-sentinel /path/to/your/sentinel.conf
redis-server /path/to/your/sentinel.conf --sentinel
  • 通信命令。
struct redisCommand sentinelcmds[] = {
    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
    ...
};

1.2. 節(jié)點關(guān)系

node port
master 6379
slave 6378
sentinel A 26379
sentinel B 26377
sentinel C 26378
在角色關(guān)系

1.3. 連接關(guān)系

節(jié)點之間通過 TCP 建立聯(lián)系,下圖展示了 sentinel A 節(jié)點與其它節(jié)點的關(guān)系。

箭頭代表節(jié)點 connect 的方向,箭頭上面的數(shù)字是 fd,可以根據(jù) strace 日志,對號入座。fd 從小到大,展示了創(chuàng)建鏈接的時序。


1.3.1. 配置

  • 鏈接 master。
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2
  • 保存已建立鏈接的節(jié)點信息。

    當(dāng) sentinel 啟動后,它與集群中其它節(jié)點建立了聯(lián)系,它會將這些節(jié)點信息保存在配置文件里。

# sentinel.conf

# slave 信息。
sentinel known-replica mymaster 127.0.0.1 6378
# sentinel B 信息。
sentinel known-sentinel mymaster 127.0.0.1 26377 de0ffb0d63f77605db3fccb959f67b65b8fdb529
# sentinel C 信息。
sentinel known-sentinel mymaster 127.0.0.1 26378 989f0e00789a0b41cff738704ce8b04bad306714

1.3.2. 工作日志

16259:X 17 Sep 2020 14:17:51.097 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
16259:X 17 Sep 2020 14:17:51.097 # Redis version=5.9.104, bits=64, commit=00000000, modified=0, pid=16259, just started
16259:X 17 Sep 2020 14:17:51.098 # Configuration loaded
16259:X 17 Sep 2020 14:17:51.104 * Running mode=sentinel, port=26379.
16259:X 17 Sep 2020 14:17:51.106 # Sentinel ID is 0400c9170654ecbaeaf98fedb1630486e5f8f5b6
16259:X 17 Sep 2020 14:17:51.107 # +monitor master mymaster 127.0.0.1 6379 quorum 2
16259:X 17 Sep 2020 14:17:51.113 * +slave slave 127.0.0.1:6378 127.0.0.1 6378 @ mymaster 127.0.0.1 6379
16259:X 17 Sep 2020 14:17:52.168 * +sentinel sentinel de0ffb0d63f77605db3fccb959f67b65b8fdb529 127.0.0.1 26377 @ mymaster 127.0.0.1 6379
16259:X 17 Sep 2020 14:17:52.370 * +sentinel sentinel 989f0e00789a0b41cff738704ce8b04bad306714 127.0.0.1 26378 @ mymaster 127.0.0.1 6379
抓包工作流程

1.4. 通信流程

通過 strace 命令查看 socket 的發(fā)送和接收數(shù)據(jù)日志內(nèi)容,我們基本可以掌握 sentinel/master/slave 這三個角色是怎么聯(lián)系起來的。

  1. sentinel 通過配置文件 master 的鏈接信息,鏈接 master,發(fā)送 PING。
  2. sentinel 向 master 發(fā)送 INFO 命令,獲取 master 上的 slave 名單。
  3. sentinel 向 master/slave 訂閱了 __sentinel__:hello 頻道,當(dāng)其它節(jié)點定時向 master/slave 發(fā)布消息時,訂閱者也能被通知,所以當(dāng)前 sentinel 也能收到其它 sentinel 的信息,并進(jìn)行鏈接。

這樣 sentinel 只需要配置 master 的信息,通過 INFO 命令和訂閱頻道 __sentinel__:hello 就能將集群中所有角色的節(jié)點緊密聯(lián)系在一起。


命令 描述
PING 三個角色之間通過發(fā)送 PING 作為心跳,確認(rèn)對方是否在線。
INFO sentinel 向 master/slave 發(fā)送該命令,獲取 slave 節(jié)點的詳細(xì)信息。
PUBLISH / SUBSCRIBE sentinel 向 master / slave 訂閱(SUBSCRIBE) __sentinel__:hello 了頻道,但是會向三個角色都發(fā)布(PUBLISH)消息,推送相關(guān)信息給其它 sentinel 節(jié)點,從而與其它節(jié)點建立聯(lián)系。

1.5. 具體日志流程

根據(jù) strace 日志參考上述對應(yīng)連接關(guān)系圖。

從日志中看,有幾個命令是一起 sendto 發(fā)送出去的,因為 sentinel 通過 hiredis 作為連接的 client,綁定了 redis 的多路復(fù)用異步通信。通過接口寫入的命令是異步操作,會先寫入發(fā)送緩沖區(qū),當(dāng)觸發(fā)寫事件,才會將發(fā)送緩沖區(qū)數(shù)據(jù)發(fā)送出去。所以你看到很多命令不是一條一條發(fā)出去的,同理,recvfrom 收到的回復(fù)包,hiredis 觸發(fā)讀事件后,才去讀數(shù)據(jù),所以很多時候接收的命令也是幾條一起讀出來。

這是 pipline 批量處理,詳細(xì)原理請參考 《[hiredis 源碼走讀] 異步回調(diào)機制剖析》。

# strace -s 512 -o /tmp/sentinel.log ./redis-sentinel sentinel.conf

# sentinel A 啟動綁定 26379 端口。
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 7
setsockopt(7, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(7, {sa_family=AF_INET, sin_port=htons(26379), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(7, 511)                          = 0

# sentinel A 與 master 通信。兩條鏈接,一條是命令鏈接,一條是發(fā)布訂閱鏈接。
connect(8, {sa_family=AF_INET, sin_port=htons(6379), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(8, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nINFO\r\n", 85, 0, NULL, 0) = 85
connect(9, {sa_family=AF_INET, sin_port=htons(6379), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(9, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
# sentinl A 從 master 獲取 slave 節(jié)點信息。
recvfrom(8, "+OK\r\n+PONG\r\n$3757\r\n# Server\r\nredis_version:5.9.104\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:995c39fc3a59d30e\r\nredis_mode:standalone\r\nos:Linux 3.10.0-693.2.2.el7.x86_64 x86_64\r\narch_bits:64\r\nmultiplexing_api:epoll\r\natomicvar_api:atomic-builtin\r\ngcc_version:8.3.1\r\nprocess_id:7676\r\nrun_id:93843ea6e3ddb2a0c0bc0688a62470b578ef9489\r\ntcp_port:6379\r\nuptime_in_seconds:8271174\r\nuptime_in_days:95\r\nhz:1000\r\nconfigured_hz:1000\r\nlru_clock:6487951\r\nexecutable:/home/other/redis-test/maser/./redis-server\r"..., 16384, 0, NULL, NULL) = 3778
recvfrom(9, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53

# sentinel A 與 slave 通信。兩條鏈接,一條是命令鏈接,一條是發(fā)布訂閱鏈接。
connect(10, {sa_family=AF_INET, sin_port=htons(6378), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(10, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nINFO\r\n", 85, 0, NULL, 0) = 85
connect(11, {sa_family=AF_INET, sin_port=htons(6378), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(11, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
recvfrom(10, "+OK\r\n+PONG\r\n$3828\r\n# Server\r\nredis_version:5.9.104\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:995c39fc3a59d30e\r\nredis_mode:standalone\r\nos:Linux 3.10.0-693.2.2.el7.x86_64 x86_64\r\narch_bits:64\r\nmultiplexing_api:epoll\r\natomicvar_api:atomic-builtin\r\ngcc_version:8.3.1\r\nprocess_id:15605\r\nrun_id:c945db01b8ff34ffaa529dcfb8f24c7f3a600573\r\ntcp_port:6378\r\nuptime_in_seconds:408\r\nuptime_in_days:0\r\nhz:1000\r\nconfigured_hz:1000\r\nlru_clock:6487951\r\nexecutable:/home/other/redis-test/slave/./redis-server\r\ncon"..., 16384, 0, NULL, NULL) = 3849
recvfrom(11, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53

# sentinel A 與 sentinel B 通信。sentinel A 從 master 獲得 sentinel B 發(fā)布的鏈接信息。
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
connect(12, {sa_family=AF_INET, sin_port=htons(26377), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(12, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n", 71, 0, NULL, 0) = 71
recvfrom(8, "+PONG\r\n", 16384, 0, NULL, NULL) = 7
recvfrom(10, "+PONG\r\n", 16384, 0, NULL, NULL) = 7
recvfrom(12, "+OK\r\n+PONG\r\n", 16384, 0, NULL, NULL) = 12

# sentinel A 與 sentinel C 通信。sentinel A 從 slave 獲得 sentinel C 的鏈接信息。
recvfrom(11, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 266
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
connect(13, {sa_family=AF_INET, sin_port=htons(26378), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(13, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n", 71, 0, NULL, 0) = 71
recvfrom(13, "+OK\r\n+PONG\r\n", 16384, 0, NULL, NULL) = 12

# sentinel A 向 master / slave 發(fā)布自己的鏈接信息和對應(yīng)的 master 信息。
sendto(8, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
sendto(10, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
recvfrom(8, ":3\r\n", 16384, 0, NULL, NULL) = 4
recvfrom(10, ":3\r\n", 16384, 0, NULL, NULL) = 4

# sentinel C 鏈接 sentinel A。
accept(7, {sa_family=AF_INET, sin_port=htons(62448), sin_addr=inet_addr("127.0.0.1")}, [16]) = 14
read(14, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-989f0e00-cmd\r\n*1\r\n$4\r\nPING\r\n", 16384) = 71
write(14, "+OK\r\n+PONG\r\n", 12)       = 12

# sentinle B 鏈接 sentinel A。
accept(7, {sa_family=AF_INET, sin_port=htons(62450), sin_addr=inet_addr("127.0.0.1")}, [16]) = 15
read(15, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-de0ffb0d-cmd\r\n*1\r\n$4\r\nPING\r\n", 16384) = 71

2. 源碼理解

通過上述分析,我們基本了解了節(jié)點之間的通信流程時序,下面來分析一下源碼。


2.1. 結(jié)構(gòu)

sentinel 進(jìn)程對 sentinel / master / slave 三個角色用數(shù)據(jù)結(jié)構(gòu) sentinelRedisInstance 進(jìn)行管理。

sentinelRedisInstance 節(jié)點保存關(guān)系

// 角色數(shù)據(jù)結(jié)構(gòu)。
typedef struct sentinelRedisInstance {
    int flags;      /* See SRI_... defines */
    char *name;     /* Master name from the point of view of this sentinel. */
    char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
    uint64_t config_epoch;  /* Configuration epoch. */
    sentinelAddr *addr; /* Master host. */
    instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
    ...
    /* Master specific. */
    dict *sentinels;    /* Other sentinels monitoring the same master. */
    dict *slaves;       /* Slaves for this master instance. */
    unsigned int quorum;/* Number of sentinels that need to agree on failure. */
    ...
} sentinelRedisInstance;

// sentinel 數(shù)據(jù)結(jié)構(gòu)。
struct sentinelState {
    char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
    uint64_t current_epoch;         /* Current epoch. */
    dict *masters;      /* Dictionary of master sentinelRedisInstances. */
    ...
} sentinel;

2.2. 初始化

sentinel 進(jìn)程啟動,加載配置,創(chuàng)建對應(yīng)節(jié)點的管理實例 sentinelRedisInstance。

sentinel 運行過程中,會把新發(fā)現(xiàn)的 sentinel / master / slave 節(jié)點信息保存 sentinel.conf 文件里。

# 創(chuàng)建 sentinel 管理實例。
createSentinelRedisInstance(char* name, int flags, char* hostname, int port, int quorum, sentinelRedisInstance* master) (/Users/wenfh2020/src/redis/src/sentinel.c:1192)
sentinelHandleConfiguration(char** argv, int argc) (/Users/wenfh2020/src/redis/src/sentinel.c:1636)
loadServerConfigFromString(char* config) (/Users/wenfh2020/src/redis/src/config.c:504)
# 加載配置。
loadServerConfig(char* filename, char* options) (/Users/wenfh2020/src/redis/src/config.c:566)
main(int argc, char** argv) (/Users/wenfh2020/src/redis/src/server.c:5101)
// 加載處理配置信息。
char *sentinelHandleConfiguration(char **argv, int argc) {
   ...
   if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        // 加載 master 信息。
        /* monitor <name> <host> <port> <quorum> */
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        // 創(chuàng)建 master 的監(jiān)控實例。
        if (createSentinelRedisInstance(
           argv[1], SRI_MASTER, argv[2], atoi(argv[3]), quorum, NULL) == NULL) {
            switch(errno) {
            case EBUSY: return "Duplicated master name.";
            case ENOENT: return "Can't resolve master instance hostname.";
            case EINVAL: return "Invalid port number";
            }
        }
    } else if ((!strcasecmp(argv[0],"known-slave") ||
                !strcasecmp(argv[0],"known-replica")) && argc == 4) {
        // 加載 slave 信息。
        sentinelRedisInstance *slave;

        /* known-replica <name> <ip> <port> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
                    atoi(argv[3]), ri->quorum, ri)) == NULL) {
            return "Wrong hostname or port for replica.";
        }
    } else if (!strcasecmp(argv[0],"known-sentinel") && (argc == 4 || argc == 5)) {
        // 加載其它 sentinel 節(jié)點信息。
        sentinelRedisInstance *si;
        if (argc == 5) { /* Ignore the old form without runid. */
            /* known-sentinel <name> <ip> <port> [runid] */
            ri = sentinelGetMasterByName(argv[1]);
            if (!ri) return "No such master with specified name.";
            if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
                        atoi(argv[3]), ri->quorum, ri)) == NULL) {
                return "Wrong hostname or port for sentinel.";
            }
            si->runid = sdsnew(argv[4]);
            sentinelTryConnectionSharing(si);
        }
    }
   ...
}

// 創(chuàng)建角色實例對象。角色間關(guān)系,通過哈希表進(jìn)行管理。
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
    sentinelRedisInstance *ri;
    sentinelAddr *addr;
    dict *table = NULL;
    char slavename[NET_PEER_ID_LEN], *sdsname;

    serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
    serverAssert((flags & SRI_MASTER) || master != NULL);

    // 域名解析
    addr = createSentinelAddr(hostname,port);
    if (addr == NULL) return NULL;

    /* 一般以 master 為核心管理。只有 master 才配置名稱。slave 通過 ip:port 組合成名稱進(jìn)行管理。*/
    if (flags & SRI_SLAVE) {
        anetFormatAddr(slavename, sizeof(slavename), hostname, port);
        name = slavename;
    }

    // 創(chuàng)建不同角色的哈希表。
    if (flags & SRI_MASTER) table = sentinel.masters;
    else if (flags & SRI_SLAVE) table = master->slaves;
    else if (flags & SRI_SENTINEL) table = master->sentinels;
    sdsname = sdsnew(name);
    // 去重。
    if (dictFind(table,sdsname)) {
        releaseSentinelAddr(addr);
        sdsfree(sdsname);
        errno = EBUSY;
        return NULL;
    }

    // 創(chuàng)建 sentinelRedisInstance 實例對象。
    ri = zmalloc(sizeof(*ri));
    ri->flags = flags;
    ri->name = sdsname;
    ri->runid = NULL;
    ri->config_epoch = 0;
    ri->addr = addr;
    ...
    ri->sentinels = dictCreate(&instancesDictType,NULL);
    ri->quorum = quorum;
    ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
    ri->master = master;
    ri->slaves = dictCreate(&instancesDictType,NULL);
    ...
    // 將新實例關(guān)聯(lián)到對應(yīng)的哈希表進(jìn)行管理。
    dictAdd(table, ri->name, ri);
    return ri;
}

2.3. 鏈接

定時器定期對其它節(jié)點進(jìn)行監(jiān)控管理。sentinel 利用 hiredis 作為 redis client,鏈接其它節(jié)點進(jìn)行相互通信。


2.3.1. 數(shù)據(jù)結(jié)構(gòu)

// 鏈接結(jié)構(gòu),兩條 hiredis 封裝的鏈接,一條用來發(fā)布/訂閱。一條用來處理命令。
typedef struct instanceLink {
    int refcount;          /* Number of sentinelRedisInstance owners. */
    int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
    int pending_commands;  /* Number of commands sent waiting for a reply. */
    redisAsyncContext *cc; /* Hiredis context for commands. */
    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
    ...
} instanceLink;
params desc
disconnected tcp 鏈接狀態(tài)。
pending_commands 等待回復(fù)命令個數(shù),因為異步通信,命令并非實時回復(fù),通過統(tǒng)計等待命令回復(fù)個數(shù),實現(xiàn)一些策略。
cc 發(fā)送的 hiredis 鏈接。
pc 對 master/slave 發(fā)布訂閱的 hiredis 鏈接。

2.3.2. 定時管理節(jié)點

// 定時器。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    if (server.sentinel_mode) sentinelTimer();
    ...
}

void sentinelTimer(void) {
    ...
    // 管理節(jié)點。
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    ...
}

void sentinelHandleDictOfRedisInstances(dict *instances) {
    ...
    /* There are a number of things we need to perform against every master. */
    /* 遍歷 master 哈希表下的拓?fù)鋽?shù)據(jù)結(jié)構(gòu),管理對應(yīng)節(jié)點。*/
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        // 節(jié)點管理。
        sentinelHandleRedisInstance(ri);
        ...
    }
    ...
}

void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    ...
    /* 異步鏈接,鏈接節(jié)點 ri。*/
    sentinelReconnectInstance(ri);
    /* 監(jiān)控節(jié)點 ri,定時向其它節(jié)點發(fā)送信息。
     * 定期給所有類型節(jié)點 ri 發(fā)送命令 PING/PUBLISH,給 master/slave ri 發(fā)送 INFO。*/
    sentinelSendPeriodicCommands(ri);
    ...
}

2.3.3. 異步鏈接

sentinel 異步重連其它節(jié)點。

  1. 每秒檢查一次命令鏈接和發(fā)布訂閱鏈接是否正常。
  2. 鏈接斷開需要重新鏈接。
  3. 命令鏈接重連成功,發(fā)送 PING 命令。
  4. 發(fā)布訂閱重連成功,SUBSCRIBE 訂閱 hello 頻道。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    if (ri->link->disconnected == 0) return;
    if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
    instanceLink *link = ri->link;
    mstime_t now = mstime();

    /* 每秒檢查一次。*/
    if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) {
        return;
    }
    ri->link->last_reconn_time = now;

    /* 鏈接命令通道。*/
    if (link->cc == NULL) {
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        ...
        /* 鏈接成功后,發(fā)送 PING 命令。*/
        sentinelSendPing(ri);
        ...
    }

    /* 鏈接發(fā)布訂閱通道。*/
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
        /* 創(chuàng)建異步非阻塞鏈接。*/
        link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        ...
        /* 鏈接成功后訂閱 hello 頻道。*/
        retval = redisAsyncCommand(link->pc,
            sentinelReceiveHelloMessages, ri, "%s %s",
            sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
            SENTINEL_HELLO_CHANNEL);
        ...
    }
    ...
}

2.3.4. 定時發(fā)送消息

sentinel 定期發(fā)送命令:PING / INFO / PUBLISH。每種命令發(fā)送的時間間隔不一樣;不同場景下,同一個命令發(fā)送時間間隔可能會改變。


命令發(fā)送對象

命令 發(fā)送節(jié)點類型
PING master / slave / sentinel
PUBLISH master / slave / sentinel
INFO master / slave
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    if (ri->link->disconnected) return;

    /* 因為是異步通信,如果發(fā)出去的命令還沒有收到回復(fù),當(dāng)?shù)竭_(dá)一定的量,暫停發(fā)送定時命令。*/
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

    /* 如果當(dāng)前節(jié)點是 slave,它對應(yīng)的 master 已經(jīng)客觀下線,并且進(jìn)入了故障轉(zhuǎn)移狀態(tài)。
     * 那么提高發(fā)命令(INFO)頻率,因為故障轉(zhuǎn)移過程中,sentinel 需要通過 "info" 命令
     * 獲得節(jié)點的信息來完成故障轉(zhuǎn)移環(huán)節(jié),例如:slave 的 role 角色信息,
     * 還有當(dāng) slave 是否已經(jīng)成功連接新的 master("master_link_status"),等等。*/
    if ((ri->flags & SRI_SLAVE) &&
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0))) {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

    /* 監(jiān)控 master,掉線時長可以通過 'down-after-milliseconds' 配置。
     * 但 PING 命令發(fā)送間隔不能長于 master 掉線時間,否則不能?;?。*/
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) {
        ping_period = SENTINEL_PING_PERIOD;
    }

    /* 給 master / slave 發(fā)送 INFO。*/
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) {
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO"));
        if (retval == C_OK) ri->link->pending_commands++;
    }

    /* 發(fā)送 PING。*/
    if ((now - ri->link->last_pong_time) > ping_period &&
        (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri);
    }

    /* 發(fā)布 sentinel 鏈接信息到 hello 頻道。 */
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        sentinelSendHello(ri);
    }
}

2.3.5. INFO 回復(fù)

sentinel 通過 master / slave 的 INFO 回復(fù),主要下面幾件事:

故障轉(zhuǎn)移下一節(jié)詳細(xì)介紹。

  1. 發(fā)現(xiàn)節(jié)點信息變更,同步新的節(jié)點屬性信息。
  2. 如果在 master 的回復(fù)文本中發(fā)現(xiàn)新的 slave,進(jìn)行鏈接建立聯(lián)系。
  3. 節(jié)點角色改變,進(jìn)行故障轉(zhuǎn)移或其它相關(guān)的邏輯。
  • master
# Server
run_id:93843ea6e3ddb2a0c0bc0688a62470b578ef9489
...

# Replication
role:master
slave0:ip=127.0.0.1,port=6378,state=online,offset=1554692663,lag=1
...
  • slave
# Server
run_id:c945db01b8ff34ffaa529dcfb8f24c7f3a600573

# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
slave_priority:100
slave_repl_offset:1563634631
  • 根據(jù) INFO 回復(fù)信息,更新當(dāng)前監(jiān)控節(jié)點屬性信息。
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    ...
    sentinelRefreshInstanceInfo(ri,r->str);
}

// 分析 INFO 回復(fù)的文本信息。
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    sds *lines;
    int numlines, j;
    int role = 0;

    /* cache full INFO output for instance */
    sdsfree(ri->info);
    ri->info = sdsnew(info);

    ri->master_link_down_time = 0;

    /* info 命令回復(fù)內(nèi)容是多行文本,分析每行文本內(nèi)容。*/
    lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
    for (j = 0; j < numlines; j++) {
        sentinelRedisInstance *slave;
        sds l = lines[j];
        ...
        /* old versions: slave0:<ip>,<port>,<state>
         * new versions: slave0:ip=127.0.0.1,port=9999,... */
        if ((ri->flags & SRI_MASTER) &&
            sdslen(l) >= 7 && !memcmp(l,"slave",5) && isdigit(l[5])) {
            char *ip, *port, *end;
            ...
            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                /* 如果從 master 回復(fù)的 INFO 信息中發(fā)現(xiàn)新的 slave 就添加監(jiān)控實例。 */
                if ((slave = createSentinelRedisInstance(
                    NULL,SRI_SLAVE,ip, atoi(port), ri->quorum, ri)) != NULL) {
                    sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
                    sentinelFlushConfig();
                }
            }
        }

        /* role:<role> */
        if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
        else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;

        /* 更新 slave 對應(yīng)的屬性信息。 */
        if (role == SRI_SLAVE) {
            /* master_host:<host> */
            ...
            /* master_port:<port> */
            ...
            /* master_link_status:<status> */
            ...
            /* slave_priority:<priority> */
            ...
            /* slave_repl_offset:<offset> */
            ...
        }
    }

    /* 如果 sentinel 正處在異常狀態(tài),不參與故障轉(zhuǎn)移。 */
    if (sentinel.tilt) return;

    /* 故障轉(zhuǎn)移 */
    ...
}

2.3.6. 發(fā)布訂閱 hello 頻道

抓包工作流程
  • sentinel 發(fā)布的文本內(nèi)容。
<ip>,<port>,<runid>,<current_epoch>,<master_name>,<master_ip>,<master_port>,<master_config_epoch>
  • sentinel 向 __sentinel__:hello 頻道發(fā)布訂閱的 strace 日志。
# sentinel A 向 master 訂閱 hello 頻道。
sendto(9, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
recvfrom(9, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53

# sentinel A 向 slave 訂閱 hello 頻道。
sendto(11, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
recvfrom(11, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53

# sentinel A 從 master / slave 收到 sentinel C 發(fā)布的信息。
recvfrom(11, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 266
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133

# sentinel A 從 master / slave 收到 sentinel B 發(fā)布的信息。
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
recvfrom(11, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 266

# sentinel A 向 master / slave 發(fā)布自己的鏈接信息和對應(yīng)的 master 信息。
# __sentinel__:hello
# <ip>,<port>,<runid>,<current_epoch>,<master_name>,<master_ip>,<master_port>,<master_config_epoch>
sendto(8, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
sendto(10, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
recvfrom(8, ":3\r\n", 16384, 0, NULL, NULL) = 4
recvfrom(10, ":3\r\n", 16384, 0, NULL, NULL) = 4
  • 發(fā)布。
int sentinelSendHello(sentinelRedisInstance *ri) {
    ...
    sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
    sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
    ...
    /* Format and send the Hello message. */
    snprintf(payload, sizeof(payload),
             "%s,%d,%s,%llu," /* Info about this sentinel. */
             "%s,%s,%d,%llu", /* Info about current master. */
             announce_ip, announce_port, sentinel.myid,
             (unsigned long long)sentinel.current_epoch,
             /* --- */
             master->name, master_addr->ip, master_addr->port,
             (unsigned long long)master->config_epoch);
    retval = redisAsyncCommand(ri->link->cc,
                               sentinelPublishReplyCallback, ri, "%s %s %s",
                               sentinelInstanceMapCommand(ri, "PUBLISH"),
                               SENTINEL_HELLO_CHANNEL, payload);
    ...
}
  • 訂閱。
    sentinel 向 master / slave 訂閱 hello 頻道,通過異步函數(shù) sentinelReceiveHelloMessages 接收其它 sentinel 發(fā)布的信息。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    ...
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
        ...
        // 接收廣播消息。
        retval = redisAsyncCommand(link->pc,
                sentinelReceiveHelloMessages, ri, "%s %s",
                sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
                SENTINEL_HELLO_CHANNEL);
        ...
    }
    ...
}
  • 接收文本回復(fù) sentinelReceiveHelloMessages。
/* to discover other sentinels attached at the same master. */
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
    ...
    sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}

void sentinelProcessHelloMessage(char *hello, int hello_len) {
    /* Format is composed of 8 tokens:
     * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
     * 5=master_ip,6=master_port,7=master_config_epoch. */
    ...

    if (numtokens == 8) {
        ...
        si = getSentinelRedisInstanceByAddrAndRunID(
            master->sentinels, token[0], port, token[2]);
        ...
        if (!si) {
            ...
            /* Add the new sentinel. */
            si = createSentinelRedisInstance(
                token[2], SRI_SENTINEL, token[0], port, master->quorum, master);
            ...
        }
        ...
        /* 如果 master 鏈接信息改變,那么修改 master 的屬性信息,以及重置 master 對應(yīng)的 slave 信息。 */
        if (si && master->config_epoch < master_config_epoch) {
            master->config_epoch = master_config_epoch;
            if (master_port != master->addr->port || strcmp(master->addr->ip, token[5])) {
                ...
                sentinelResetMasterAndChangeAddress(master, token[5], master_port);
                ...
            }
        }
        ...
    }
    ...
}

3. 參考


如果文章不錯,給個點贊唄 ~ 謝謝。??

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容