承接上一章 《[redis 源碼走讀] sentinel 哨兵 - 原理》,本章通過 strace 命令從底層抓取 sentinel 工作日志,熟悉節(jié)點通信流程,閱讀相關(guān)源碼。
1. 工作流程
1.1. 命令
- 下面兩個命令都可以啟動 sentinel 進(jìn)程。
redis-sentinel /path/to/your/sentinel.conf
redis-server /path/to/your/sentinel.conf --sentinel
- 通信命令。
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
...
};
1.2. 節(jié)點關(guān)系
| node | port |
|---|---|
| master | 6379 |
| slave | 6378 |
| sentinel A | 26379 |
| sentinel B | 26377 |
| sentinel C | 26378 |
1.3. 連接關(guān)系
節(jié)點之間通過 TCP 建立聯(lián)系,下圖展示了 sentinel A 節(jié)點與其它節(jié)點的關(guān)系。
箭頭代表節(jié)點 connect 的方向,箭頭上面的數(shù)字是 fd,可以根據(jù) strace 日志,對號入座。fd 從小到大,展示了創(chuàng)建鏈接的時序。
1.3.1. 配置
- 鏈接 master。
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2
-
保存已建立鏈接的節(jié)點信息。
當(dāng) sentinel 啟動后,它與集群中其它節(jié)點建立了聯(lián)系,它會將這些節(jié)點信息保存在配置文件里。
# sentinel.conf
# slave 信息。
sentinel known-replica mymaster 127.0.0.1 6378
# sentinel B 信息。
sentinel known-sentinel mymaster 127.0.0.1 26377 de0ffb0d63f77605db3fccb959f67b65b8fdb529
# sentinel C 信息。
sentinel known-sentinel mymaster 127.0.0.1 26378 989f0e00789a0b41cff738704ce8b04bad306714
1.3.2. 工作日志
16259:X 17 Sep 2020 14:17:51.097 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
16259:X 17 Sep 2020 14:17:51.097 # Redis version=5.9.104, bits=64, commit=00000000, modified=0, pid=16259, just started
16259:X 17 Sep 2020 14:17:51.098 # Configuration loaded
16259:X 17 Sep 2020 14:17:51.104 * Running mode=sentinel, port=26379.
16259:X 17 Sep 2020 14:17:51.106 # Sentinel ID is 0400c9170654ecbaeaf98fedb1630486e5f8f5b6
16259:X 17 Sep 2020 14:17:51.107 # +monitor master mymaster 127.0.0.1 6379 quorum 2
16259:X 17 Sep 2020 14:17:51.113 * +slave slave 127.0.0.1:6378 127.0.0.1 6378 @ mymaster 127.0.0.1 6379
16259:X 17 Sep 2020 14:17:52.168 * +sentinel sentinel de0ffb0d63f77605db3fccb959f67b65b8fdb529 127.0.0.1 26377 @ mymaster 127.0.0.1 6379
16259:X 17 Sep 2020 14:17:52.370 * +sentinel sentinel 989f0e00789a0b41cff738704ce8b04bad306714 127.0.0.1 26378 @ mymaster 127.0.0.1 6379
1.4. 通信流程
通過 strace 命令查看 socket 的發(fā)送和接收數(shù)據(jù)日志內(nèi)容,我們基本可以掌握 sentinel/master/slave 這三個角色是怎么聯(lián)系起來的。
- sentinel 通過配置文件 master 的鏈接信息,鏈接 master,發(fā)送 PING。
- sentinel 向 master 發(fā)送
INFO命令,獲取 master 上的 slave 名單。 - sentinel 向 master/slave 訂閱了
__sentinel__:hello頻道,當(dāng)其它節(jié)點定時向 master/slave 發(fā)布消息時,訂閱者也能被通知,所以當(dāng)前 sentinel 也能收到其它 sentinel 的信息,并進(jìn)行鏈接。
這樣 sentinel 只需要配置 master 的信息,通過 INFO 命令和訂閱頻道 __sentinel__:hello 就能將集群中所有角色的節(jié)點緊密聯(lián)系在一起。
| 命令 | 描述 |
|---|---|
| PING | 三個角色之間通過發(fā)送 PING 作為心跳,確認(rèn)對方是否在線。 |
| INFO | sentinel 向 master/slave 發(fā)送該命令,獲取 slave 節(jié)點的詳細(xì)信息。 |
| PUBLISH / SUBSCRIBE | sentinel 向 master / slave 訂閱(SUBSCRIBE) __sentinel__:hello 了頻道,但是會向三個角色都發(fā)布(PUBLISH)消息,推送相關(guān)信息給其它 sentinel 節(jié)點,從而與其它節(jié)點建立聯(lián)系。 |
1.5. 具體日志流程
根據(jù) strace 日志參考上述對應(yīng)連接關(guān)系圖。
從日志中看,有幾個命令是一起
sendto發(fā)送出去的,因為 sentinel 通過hiredis作為連接的 client,綁定了 redis 的多路復(fù)用異步通信。通過接口寫入的命令是異步操作,會先寫入發(fā)送緩沖區(qū),當(dāng)觸發(fā)寫事件,才會將發(fā)送緩沖區(qū)數(shù)據(jù)發(fā)送出去。所以你看到很多命令不是一條一條發(fā)出去的,同理,recvfrom收到的回復(fù)包,hiredis 觸發(fā)讀事件后,才去讀數(shù)據(jù),所以很多時候接收的命令也是幾條一起讀出來。這是 pipline 批量處理,詳細(xì)原理請參考 《[hiredis 源碼走讀] 異步回調(diào)機制剖析》。
# strace -s 512 -o /tmp/sentinel.log ./redis-sentinel sentinel.conf
# sentinel A 啟動綁定 26379 端口。
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 7
setsockopt(7, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(7, {sa_family=AF_INET, sin_port=htons(26379), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(7, 511) = 0
# sentinel A 與 master 通信。兩條鏈接,一條是命令鏈接,一條是發(fā)布訂閱鏈接。
connect(8, {sa_family=AF_INET, sin_port=htons(6379), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(8, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nINFO\r\n", 85, 0, NULL, 0) = 85
connect(9, {sa_family=AF_INET, sin_port=htons(6379), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(9, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
# sentinl A 從 master 獲取 slave 節(jié)點信息。
recvfrom(8, "+OK\r\n+PONG\r\n$3757\r\n# Server\r\nredis_version:5.9.104\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:995c39fc3a59d30e\r\nredis_mode:standalone\r\nos:Linux 3.10.0-693.2.2.el7.x86_64 x86_64\r\narch_bits:64\r\nmultiplexing_api:epoll\r\natomicvar_api:atomic-builtin\r\ngcc_version:8.3.1\r\nprocess_id:7676\r\nrun_id:93843ea6e3ddb2a0c0bc0688a62470b578ef9489\r\ntcp_port:6379\r\nuptime_in_seconds:8271174\r\nuptime_in_days:95\r\nhz:1000\r\nconfigured_hz:1000\r\nlru_clock:6487951\r\nexecutable:/home/other/redis-test/maser/./redis-server\r"..., 16384, 0, NULL, NULL) = 3778
recvfrom(9, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53
# sentinel A 與 slave 通信。兩條鏈接,一條是命令鏈接,一條是發(fā)布訂閱鏈接。
connect(10, {sa_family=AF_INET, sin_port=htons(6378), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(10, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n*1\r\n$4\r\nINFO\r\n", 85, 0, NULL, 0) = 85
connect(11, {sa_family=AF_INET, sin_port=htons(6378), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(11, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
recvfrom(10, "+OK\r\n+PONG\r\n$3828\r\n# Server\r\nredis_version:5.9.104\r\nredis_git_sha1:00000000\r\nredis_git_dirty:0\r\nredis_build_id:995c39fc3a59d30e\r\nredis_mode:standalone\r\nos:Linux 3.10.0-693.2.2.el7.x86_64 x86_64\r\narch_bits:64\r\nmultiplexing_api:epoll\r\natomicvar_api:atomic-builtin\r\ngcc_version:8.3.1\r\nprocess_id:15605\r\nrun_id:c945db01b8ff34ffaa529dcfb8f24c7f3a600573\r\ntcp_port:6378\r\nuptime_in_seconds:408\r\nuptime_in_days:0\r\nhz:1000\r\nconfigured_hz:1000\r\nlru_clock:6487951\r\nexecutable:/home/other/redis-test/slave/./redis-server\r\ncon"..., 16384, 0, NULL, NULL) = 3849
recvfrom(11, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53
# sentinel A 與 sentinel B 通信。sentinel A 從 master 獲得 sentinel B 發(fā)布的鏈接信息。
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
connect(12, {sa_family=AF_INET, sin_port=htons(26377), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(12, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n", 71, 0, NULL, 0) = 71
recvfrom(8, "+PONG\r\n", 16384, 0, NULL, NULL) = 7
recvfrom(10, "+PONG\r\n", 16384, 0, NULL, NULL) = 7
recvfrom(12, "+OK\r\n+PONG\r\n", 16384, 0, NULL, NULL) = 12
# sentinel A 與 sentinel C 通信。sentinel A 從 slave 獲得 sentinel C 的鏈接信息。
recvfrom(11, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 266
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
connect(13, {sa_family=AF_INET, sin_port=htons(26378), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
sendto(13, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-0400c917-cmd\r\n*1\r\n$4\r\nPING\r\n", 71, 0, NULL, 0) = 71
recvfrom(13, "+OK\r\n+PONG\r\n", 16384, 0, NULL, NULL) = 12
# sentinel A 向 master / slave 發(fā)布自己的鏈接信息和對應(yīng)的 master 信息。
sendto(8, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
sendto(10, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
recvfrom(8, ":3\r\n", 16384, 0, NULL, NULL) = 4
recvfrom(10, ":3\r\n", 16384, 0, NULL, NULL) = 4
# sentinel C 鏈接 sentinel A。
accept(7, {sa_family=AF_INET, sin_port=htons(62448), sin_addr=inet_addr("127.0.0.1")}, [16]) = 14
read(14, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-989f0e00-cmd\r\n*1\r\n$4\r\nPING\r\n", 16384) = 71
write(14, "+OK\r\n+PONG\r\n", 12) = 12
# sentinle B 鏈接 sentinel A。
accept(7, {sa_family=AF_INET, sin_port=htons(62450), sin_addr=inet_addr("127.0.0.1")}, [16]) = 15
read(15, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$21\r\nsentinel-de0ffb0d-cmd\r\n*1\r\n$4\r\nPING\r\n", 16384) = 71
2. 源碼理解
通過上述分析,我們基本了解了節(jié)點之間的通信流程時序,下面來分析一下源碼。
2.1. 結(jié)構(gòu)
sentinel 進(jìn)程對 sentinel / master / slave 三個角色用數(shù)據(jù)結(jié)構(gòu) sentinelRedisInstance 進(jìn)行管理。
// 角色數(shù)據(jù)結(jié)構(gòu)。
typedef struct sentinelRedisInstance {
int flags; /* See SRI_... defines */
char *name; /* Master name from the point of view of this sentinel. */
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
uint64_t config_epoch; /* Configuration epoch. */
sentinelAddr *addr; /* Master host. */
instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
...
/* Master specific. */
dict *sentinels; /* Other sentinels monitoring the same master. */
dict *slaves; /* Slaves for this master instance. */
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
...
} sentinelRedisInstance;
// sentinel 數(shù)據(jù)結(jié)構(gòu)。
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
dict *masters; /* Dictionary of master sentinelRedisInstances. */
...
} sentinel;
2.2. 初始化
sentinel 進(jìn)程啟動,加載配置,創(chuàng)建對應(yīng)節(jié)點的管理實例 sentinelRedisInstance。
sentinel 運行過程中,會把新發(fā)現(xiàn)的 sentinel / master / slave 節(jié)點信息保存 sentinel.conf 文件里。
# 創(chuàng)建 sentinel 管理實例。
createSentinelRedisInstance(char* name, int flags, char* hostname, int port, int quorum, sentinelRedisInstance* master) (/Users/wenfh2020/src/redis/src/sentinel.c:1192)
sentinelHandleConfiguration(char** argv, int argc) (/Users/wenfh2020/src/redis/src/sentinel.c:1636)
loadServerConfigFromString(char* config) (/Users/wenfh2020/src/redis/src/config.c:504)
# 加載配置。
loadServerConfig(char* filename, char* options) (/Users/wenfh2020/src/redis/src/config.c:566)
main(int argc, char** argv) (/Users/wenfh2020/src/redis/src/server.c:5101)
// 加載處理配置信息。
char *sentinelHandleConfiguration(char **argv, int argc) {
...
if (!strcasecmp(argv[0],"monitor") && argc == 5) {
// 加載 master 信息。
/* monitor <name> <host> <port> <quorum> */
int quorum = atoi(argv[4]);
if (quorum <= 0) return "Quorum must be 1 or greater.";
// 創(chuàng)建 master 的監(jiān)控實例。
if (createSentinelRedisInstance(
argv[1], SRI_MASTER, argv[2], atoi(argv[3]), quorum, NULL) == NULL) {
switch(errno) {
case EBUSY: return "Duplicated master name.";
case ENOENT: return "Can't resolve master instance hostname.";
case EINVAL: return "Invalid port number";
}
}
} else if ((!strcasecmp(argv[0],"known-slave") ||
!strcasecmp(argv[0],"known-replica")) && argc == 4) {
// 加載 slave 信息。
sentinelRedisInstance *slave;
/* known-replica <name> <ip> <port> */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL) {
return "Wrong hostname or port for replica.";
}
} else if (!strcasecmp(argv[0],"known-sentinel") && (argc == 4 || argc == 5)) {
// 加載其它 sentinel 節(jié)點信息。
sentinelRedisInstance *si;
if (argc == 5) { /* Ignore the old form without runid. */
/* known-sentinel <name> <ip> <port> [runid] */
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL) {
return "Wrong hostname or port for sentinel.";
}
si->runid = sdsnew(argv[4]);
sentinelTryConnectionSharing(si);
}
}
...
}
// 創(chuàng)建角色實例對象。角色間關(guān)系,通過哈希表進(jìn)行管理。
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
char slavename[NET_PEER_ID_LEN], *sdsname;
serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
serverAssert((flags & SRI_MASTER) || master != NULL);
// 域名解析
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;
/* 一般以 master 為核心管理。只有 master 才配置名稱。slave 通過 ip:port 組合成名稱進(jìn)行管理。*/
if (flags & SRI_SLAVE) {
anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}
// 創(chuàng)建不同角色的哈希表。
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;
sdsname = sdsnew(name);
// 去重。
if (dictFind(table,sdsname)) {
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}
// 創(chuàng)建 sentinelRedisInstance 實例對象。
ri = zmalloc(sizeof(*ri));
ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
...
ri->sentinels = dictCreate(&instancesDictType,NULL);
ri->quorum = quorum;
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
ri->master = master;
ri->slaves = dictCreate(&instancesDictType,NULL);
...
// 將新實例關(guān)聯(lián)到對應(yīng)的哈希表進(jìn)行管理。
dictAdd(table, ri->name, ri);
return ri;
}
2.3. 鏈接
定時器定期對其它節(jié)點進(jìn)行監(jiān)控管理。sentinel 利用 hiredis 作為 redis client,鏈接其它節(jié)點進(jìn)行相互通信。
2.3.1. 數(shù)據(jù)結(jié)構(gòu)
// 鏈接結(jié)構(gòu),兩條 hiredis 封裝的鏈接,一條用來發(fā)布/訂閱。一條用來處理命令。
typedef struct instanceLink {
int refcount; /* Number of sentinelRedisInstance owners. */
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
int pending_commands; /* Number of commands sent waiting for a reply. */
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
...
} instanceLink;
| params | desc |
|---|---|
| disconnected | tcp 鏈接狀態(tài)。 |
| pending_commands | 等待回復(fù)命令個數(shù),因為異步通信,命令并非實時回復(fù),通過統(tǒng)計等待命令回復(fù)個數(shù),實現(xiàn)一些策略。 |
| cc | 發(fā)送的 hiredis 鏈接。 |
| pc | 對 master/slave 發(fā)布訂閱的 hiredis 鏈接。 |
2.3.2. 定時管理節(jié)點
// 定時器。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
if (server.sentinel_mode) sentinelTimer();
...
}
void sentinelTimer(void) {
...
// 管理節(jié)點。
sentinelHandleDictOfRedisInstances(sentinel.masters);
...
}
void sentinelHandleDictOfRedisInstances(dict *instances) {
...
/* There are a number of things we need to perform against every master. */
/* 遍歷 master 哈希表下的拓?fù)鋽?shù)據(jù)結(jié)構(gòu),管理對應(yīng)節(jié)點。*/
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 節(jié)點管理。
sentinelHandleRedisInstance(ri);
...
}
...
}
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
...
/* 異步鏈接,鏈接節(jié)點 ri。*/
sentinelReconnectInstance(ri);
/* 監(jiān)控節(jié)點 ri,定時向其它節(jié)點發(fā)送信息。
* 定期給所有類型節(jié)點 ri 發(fā)送命令 PING/PUBLISH,給 master/slave ri 發(fā)送 INFO。*/
sentinelSendPeriodicCommands(ri);
...
}
2.3.3. 異步鏈接
sentinel 異步重連其它節(jié)點。
- 每秒檢查一次命令鏈接和發(fā)布訂閱鏈接是否正常。
- 鏈接斷開需要重新鏈接。
- 命令鏈接重連成功,發(fā)送 PING 命令。
- 發(fā)布訂閱重連成功,SUBSCRIBE 訂閱 hello 頻道。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();
/* 每秒檢查一次。*/
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) {
return;
}
ri->link->last_reconn_time = now;
/* 鏈接命令通道。*/
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
...
/* 鏈接成功后,發(fā)送 PING 命令。*/
sentinelSendPing(ri);
...
}
/* 鏈接發(fā)布訂閱通道。*/
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
/* 創(chuàng)建異步非阻塞鏈接。*/
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
...
/* 鏈接成功后訂閱 hello 頻道。*/
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
...
}
...
}
2.3.4. 定時發(fā)送消息
sentinel 定期發(fā)送命令:PING / INFO / PUBLISH。每種命令發(fā)送的時間間隔不一樣;不同場景下,同一個命令發(fā)送時間間隔可能會改變。
命令發(fā)送對象:
| 命令 | 發(fā)送節(jié)點類型 |
|---|---|
| PING | master / slave / sentinel |
| PUBLISH | master / slave / sentinel |
| INFO | master / slave |
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
if (ri->link->disconnected) return;
/* 因為是異步通信,如果發(fā)出去的命令還沒有收到回復(fù),當(dāng)?shù)竭_(dá)一定的量,暫停發(fā)送定時命令。*/
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* 如果當(dāng)前節(jié)點是 slave,它對應(yīng)的 master 已經(jīng)客觀下線,并且進(jìn)入了故障轉(zhuǎn)移狀態(tài)。
* 那么提高發(fā)命令(INFO)頻率,因為故障轉(zhuǎn)移過程中,sentinel 需要通過 "info" 命令
* 獲得節(jié)點的信息來完成故障轉(zhuǎn)移環(huán)節(jié),例如:slave 的 role 角色信息,
* 還有當(dāng) slave 是否已經(jīng)成功連接新的 master("master_link_status"),等等。*/
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0))) {
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
/* 監(jiān)控 master,掉線時長可以通過 'down-after-milliseconds' 配置。
* 但 PING 命令發(fā)送間隔不能長于 master 掉線時間,否則不能?;?。*/
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) {
ping_period = SENTINEL_PING_PERIOD;
}
/* 給 master / slave 發(fā)送 INFO。*/
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) {
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}
/* 發(fā)送 PING。*/
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}
/* 發(fā)布 sentinel 鏈接信息到 hello 頻道。 */
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}
2.3.5. INFO 回復(fù)
sentinel 通過 master / slave 的 INFO 回復(fù),主要下面幾件事:
故障轉(zhuǎn)移下一節(jié)詳細(xì)介紹。
- 發(fā)現(xiàn)節(jié)點信息變更,同步新的節(jié)點屬性信息。
- 如果在 master 的回復(fù)文本中發(fā)現(xiàn)新的 slave,進(jìn)行鏈接建立聯(lián)系。
- 節(jié)點角色改變,進(jìn)行故障轉(zhuǎn)移或其它相關(guān)的邏輯。
- master
# Server
run_id:93843ea6e3ddb2a0c0bc0688a62470b578ef9489
...
# Replication
role:master
slave0:ip=127.0.0.1,port=6378,state=online,offset=1554692663,lag=1
...
- slave
# Server
run_id:c945db01b8ff34ffaa529dcfb8f24c7f3a600573
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
slave_priority:100
slave_repl_offset:1563634631
- 根據(jù) INFO 回復(fù)信息,更新當(dāng)前監(jiān)控節(jié)點屬性信息。
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
...
sentinelRefreshInstanceInfo(ri,r->str);
}
// 分析 INFO 回復(fù)的文本信息。
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sds *lines;
int numlines, j;
int role = 0;
/* cache full INFO output for instance */
sdsfree(ri->info);
ri->info = sdsnew(info);
ri->master_link_down_time = 0;
/* info 命令回復(fù)內(nèi)容是多行文本,分析每行文本內(nèi)容。*/
lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
for (j = 0; j < numlines; j++) {
sentinelRedisInstance *slave;
sds l = lines[j];
...
/* old versions: slave0:<ip>,<port>,<state>
* new versions: slave0:ip=127.0.0.1,port=9999,... */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 && !memcmp(l,"slave",5) && isdigit(l[5])) {
char *ip, *port, *end;
...
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
/* 如果從 master 回復(fù)的 INFO 信息中發(fā)現(xiàn)新的 slave 就添加監(jiān)控實例。 */
if ((slave = createSentinelRedisInstance(
NULL,SRI_SLAVE,ip, atoi(port), ri->quorum, ri)) != NULL) {
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}
/* role:<role> */
if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
/* 更新 slave 對應(yīng)的屬性信息。 */
if (role == SRI_SLAVE) {
/* master_host:<host> */
...
/* master_port:<port> */
...
/* master_link_status:<status> */
...
/* slave_priority:<priority> */
...
/* slave_repl_offset:<offset> */
...
}
}
/* 如果 sentinel 正處在異常狀態(tài),不參與故障轉(zhuǎn)移。 */
if (sentinel.tilt) return;
/* 故障轉(zhuǎn)移 */
...
}
2.3.6. 發(fā)布訂閱 hello 頻道
- sentinel 發(fā)布的文本內(nèi)容。
<ip>,<port>,<runid>,<current_epoch>,<master_name>,<master_ip>,<master_port>,<master_config_epoch>
- sentinel 向
__sentinel__:hello頻道發(fā)布訂閱的strace日志。
# sentinel A 向 master 訂閱 hello 頻道。
sendto(9, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
recvfrom(9, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53
# sentinel A 向 slave 訂閱 hello 頻道。
sendto(11, "*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$24\r\nsentinel-0400c917-pubsub\r\n*2\r\n$9\r\nSUBSCRIBE\r\n$18\r\n__sentinel__:hello\r\n", 104, 0, NULL, 0) = 104
recvfrom(11, "+OK\r\n*3\r\n$9\r\nsubscribe\r\n$18\r\n__sentinel__:hello\r\n:1\r\n", 16384, 0, NULL, NULL) = 53
# sentinel A 從 master / slave 收到 sentinel C 發(fā)布的信息。
recvfrom(11, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 266
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26378,989f0e00789a0b41cff738704ce8b04bad306714,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
# sentinel A 從 master / slave 收到 sentinel B 發(fā)布的信息。
recvfrom(9, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 133
recvfrom(11, "*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n*3\r\n$7\r\nmessage\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26377,de0ffb0d63f77605db3fccb959f67b65b8fdb529,0,mymaster,127.0.0.1,6379,0\r\n", 16384, 0, NULL, NULL) = 266
# sentinel A 向 master / slave 發(fā)布自己的鏈接信息和對應(yīng)的 master 信息。
# __sentinel__:hello
# <ip>,<port>,<runid>,<current_epoch>,<master_name>,<master_ip>,<master_port>,<master_config_epoch>
sendto(8, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
sendto(10, "*3\r\n$7\r\nPUBLISH\r\n$18\r\n__sentinel__:hello\r\n$84\r\n127.0.0.1,26379,0400c9170654ecbaeaf98fedb1630486e5f8f5b6,0,mymaster,127.0.0.1,6379,0\r\n", 133, 0, NULL, 0) = 133
recvfrom(8, ":3\r\n", 16384, 0, NULL, NULL) = 4
recvfrom(10, ":3\r\n", 16384, 0, NULL, NULL) = 4
- 發(fā)布。
int sentinelSendHello(sentinelRedisInstance *ri) {
...
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
...
/* Format and send the Hello message. */
snprintf(payload, sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, sentinel.myid,
(unsigned long long)sentinel.current_epoch,
/* --- */
master->name, master_addr->ip, master_addr->port,
(unsigned long long)master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri, "PUBLISH"),
SENTINEL_HELLO_CHANNEL, payload);
...
}
- 訂閱。
sentinel 向 master / slave 訂閱 hello 頻道,通過異步函數(shù)sentinelReceiveHelloMessages接收其它 sentinel 發(fā)布的信息。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
...
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
...
// 接收廣播消息。
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
...
}
...
}
- 接收文本回復(fù) sentinelReceiveHelloMessages。
/* to discover other sentinels attached at the same master. */
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
...
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */
...
if (numtokens == 8) {
...
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0], port, token[2]);
...
if (!si) {
...
/* Add the new sentinel. */
si = createSentinelRedisInstance(
token[2], SRI_SENTINEL, token[0], port, master->quorum, master);
...
}
...
/* 如果 master 鏈接信息改變,那么修改 master 的屬性信息,以及重置 master 對應(yīng)的 slave 信息。 */
if (si && master->config_epoch < master_config_epoch) {
master->config_epoch = master_config_epoch;
if (master_port != master->addr->port || strcmp(master->addr->ip, token[5])) {
...
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
...
}
}
...
}
...
}
3. 參考
- Redis Sentinel Documentation
- 《redis 設(shè)計與實現(xiàn)》
如果文章不錯,給個點贊唄 ~ 謝謝。??