Redis Cluster Gossip 協(xié)議
今天來講一下 Reids Cluster 的 Gossip 協(xié)議和集群操作,文章的思維導(dǎo)圖如下所示。
集群模式和 Gossip 簡介
對于數(shù)據(jù)存儲領(lǐng)域,當數(shù)據(jù)量或者請求流量大到一定程度后,就必然會引入分布式。比如 Redis,雖然其單機性能十分優(yōu)秀,但是因為下列原因時,也不得不引入集群。
單機無法保證高可用,需要引入多實例來提供高可用性
單機能夠提供高達 8W 左右的QPS,再高的QPS則需要引入多實例
單機能夠支持的數(shù)據(jù)量有限,處理更多的數(shù)據(jù)需要引入多實例;
單機所處理的網(wǎng)絡(luò)流量已經(jīng)超過服務(wù)器的網(wǎng)卡的上限值,需要引入多實例來分流。
有集群,集群往往需要維護一定的元數(shù)據(jù),比如實例的ip地址,緩存分片的 slots 信息等,所以需要一套分布式機制來維護元數(shù)據(jù)的一致性。這類機制一般有兩個模式:分散式和集中式
分散式機制將元數(shù)據(jù)存儲在部分或者所有節(jié)點上,不同節(jié)點之間進行不斷的通信來維護元數(shù)據(jù)的變更和一致性。Redis Cluster,Consul 等都是該模式。
而集中式是將集群元數(shù)據(jù)集中存儲在外部節(jié)點或者中間件上,比如 zookeeper。舊版本的 kafka 和 storm 等都是使用該模式。
兩種模式各有優(yōu)劣,具體如下表所示:
模式優(yōu)點缺點集中式數(shù)據(jù)更新及時,時效好,元數(shù)據(jù)的更新和讀取,時效性非常好,一旦元數(shù)據(jù)出現(xiàn)了變更,立即就更新到集中式的外部節(jié)點中,其他節(jié)點讀取的時候立即就可以感知到;較大數(shù)據(jù)更新壓力,更新壓力全部集中在外部節(jié)點,作為單點影響整個系統(tǒng)分散式數(shù)據(jù)更新壓力分散,元數(shù)據(jù)的更新比較分散,不是集中某一個節(jié)點,更新請求比較分散,而且有不同節(jié)點處理,有一定的延時,降低了并發(fā)壓力數(shù)據(jù)更新延遲,可能導(dǎo)致集群的感知有一定的滯后
分散式的元數(shù)據(jù)模式有多種可選的算法進行元數(shù)據(jù)的同步,比如說 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都需要全部節(jié)點或者大多數(shù)節(jié)點(超過一半)正常運行,整個集群才能穩(wěn)定運行,而 Gossip 則不需要半數(shù)以上的節(jié)點運行。
Gossip 協(xié)議,顧名思義,就像流言蜚語一樣,利用一種隨機、帶有傳染性的方式,將信息傳播到整個網(wǎng)絡(luò)中,并在一定時間內(nèi),使得系統(tǒng)內(nèi)的所有節(jié)點數(shù)據(jù)一致。對你來說,掌握這個協(xié)議不僅能很好地理解這種最常用的,實現(xiàn)最終一致性的算法,也能在后續(xù)工作中得心應(yīng)手地實現(xiàn)數(shù)據(jù)的最終一致性。
Gossip 協(xié)議又稱 epidemic 協(xié)議(epidemic protocol),是基于流行病傳播方式的節(jié)點或者進程之間信息交換的協(xié)議,在P2P網(wǎng)絡(luò)和分布式系統(tǒng)中應(yīng)用廣泛,它的方法論也特別簡單:
在一個處于有界網(wǎng)絡(luò)的集群里,如果每個節(jié)點都隨機與其他節(jié)點交換特定信息,經(jīng)過足夠長的時間后,集群各個節(jié)點對該份信息的認知終將收斂到一致。
這里的“特定信息”一般就是指集群狀態(tài)、各節(jié)點的狀態(tài)以及其他元數(shù)據(jù)等。Gossip協(xié)議是完全符合 BASE 原則,可以用在任何要求最終一致性的領(lǐng)域,比如分布式存儲和注冊中心。另外,它可以很方便地實現(xiàn)彈性集群,允許節(jié)點隨時上下線,提供快捷的失敗檢測和動態(tài)負載均衡等。
此外,Gossip 協(xié)議的最大的好處是,即使集群節(jié)點的數(shù)量增加,每個節(jié)點的負載也不會增加很多,幾乎是恒定的。這就允許 Redis Cluster 或者 Consul 集群管理的節(jié)點規(guī)模能橫向擴展到數(shù)千個。
Redis Cluster 的 Gossip 通信機制
Redis Cluster 是在 3.0 版本引入集群功能。為了讓讓集群中的每個實例都知道其他所有實例的狀態(tài)信息,Redis 集群規(guī)定各個實例之間按照 Gossip 協(xié)議來通信傳遞信息。
上圖展示了主從架構(gòu)的 Redis Cluster 示意圖,其中實線表示節(jié)點間的主從復(fù)制關(guān)系,而虛線表示各個節(jié)點之間的 Gossip 通信。
Redis Cluster 中的每個節(jié)點都維護一份自己視角下的當前整個集群的狀態(tài),主要包括:
當前集群狀態(tài)
集群中各節(jié)點所負責(zé)的 slots信息,及其migrate狀態(tài)
集群中各節(jié)點的master-slave狀態(tài)
集群中各節(jié)點的存活狀態(tài)及懷疑Fail狀態(tài)
也就是說上面的信息,就是集群中Node相互八卦傳播流言蜚語的內(nèi)容主題,而且比較全面,既有自己的更有別人的,這么一來大家都相互傳,最終信息就全面而且一致了。
Redis Cluster 的節(jié)點之間會相互發(fā)送多種消息,較為重要的如下所示:
MEET:通過「cluster meet ip port」命令,已有集群的節(jié)點會向新的節(jié)點發(fā)送邀請,加入現(xiàn)有集群,然后新節(jié)點就會開始與其他節(jié)點進行通信;
PING:節(jié)點按照配置的時間間隔向集群中其他節(jié)點發(fā)送 ping 消息,消息中帶有自己的狀態(tài),還有自己維護的集群元數(shù)據(jù),和部分其他節(jié)點的元數(shù)據(jù);
PONG: 節(jié)點用于回應(yīng) PING 和 MEET 的消息,結(jié)構(gòu)和 PING 消息類似,也包含自己的狀態(tài)和其他信息,也可以用于信息廣播和更新;
FAIL: 節(jié)點 PING 不通某節(jié)點后,會向集群所有節(jié)點廣播該節(jié)點掛掉的消息。其他節(jié)點收到消息后標記已下線。
Redis 的源碼中 cluster.h 文件定義了全部的消息類型,代碼為 redis 4.0版本。
// 注意,PING 、 PONG 和 MEET 實際上是同一種消息。
// PONG 是對 PING 的回復(fù),它的實際格式也為 PING 消息,
// 而 MEET 則是一種特殊的 PING 消息,用于強制消息的接收者將消息的發(fā)送者添加到集群中(如果節(jié)點尚未在節(jié)點列表中的話)
#define CLUSTERMSG_TYPE_PING 0? ? ? ? ? /* Ping 消息 */
#define CLUSTERMSG_TYPE_PONG 1? ? ? ? ? /* Pong 用于回復(fù)Ping */
#define CLUSTERMSG_TYPE_MEET 2? ? ? ? ? /* Meet 請求將某個節(jié)點添加到集群中 */
#define CLUSTERMSG_TYPE_FAIL 3? ? ? ? ? /* Fail 將某個節(jié)點標記為 FAIL */
#define CLUSTERMSG_TYPE_PUBLISH 4? ? ? /* 通過發(fā)布與訂閱功能廣播消息 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* 請求進行故障轉(zhuǎn)移操作,要求消息的接收者通過投票來支持消息的發(fā)送者 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6? ? /* 消息的接收者同意向消息的發(fā)送者投票 */
#define CLUSTERMSG_TYPE_UPDATE 7? ? ? ? /* slots 已經(jīng)發(fā)生變化,消息發(fā)送者要求消息接收者進行相應(yīng)的更新 */
#define CLUSTERMSG_TYPE_MFSTART 8? ? ? /* 為了進行手動故障轉(zhuǎn)移,暫停各個客戶端 */
#define CLUSTERMSG_TYPE_COUNT 9? ? ? ? /* 消息總數(shù) */
復(fù)制代碼
通過上述這些消息,集群中的每一個實例都能獲得其它所有實例的狀態(tài)信息。這樣一來,即使有新節(jié)點加入、節(jié)點故障、Slot 變更等事件發(fā)生,實例間也可以通過 PING、PONG 消息的傳遞,完成集群狀態(tài)在每個實例上的同步。下面,我們依次來看看幾種常見的場景。
定時 PING/PONG 消息
Redis Cluster 中的節(jié)點都會定時地向其他節(jié)點發(fā)送 PING 消息,來交換各個節(jié)點狀態(tài)信息,檢查各個節(jié)點狀態(tài),包括在線狀態(tài)、疑似下線狀態(tài) PFAIL 和已下線狀態(tài) FAIL。
Redis 集群的定時 PING/PONG 的工作原理可以概括成兩點:
一是,每個實例之間會按照一定的頻率,從集群中隨機挑選一些實例,把 PING 消息發(fā)送給挑選出來的實例,用來檢測這些實例是否在線,并交換彼此的狀態(tài)信息。PING 消息中封裝了發(fā)送消息的實例自身的狀態(tài)信息、部分其它實例的狀態(tài)信息,以及 Slot 映射表。
二是,一個實例在接收到 PING 消息后,會給發(fā)送 PING 消息的實例,發(fā)送一個 PONG 消息。PONG 消息包含的內(nèi)容和 PING 消息一樣。
下圖顯示了兩個實例間進行 PING、PONG 消息傳遞的情況,其中實例一為發(fā)送節(jié)點,實例二是接收節(jié)點
新節(jié)點上線
Redis Cluster 加入新節(jié)點時,客戶端需要執(zhí)行 CLUSTER MEET 命令,如下圖所示。
節(jié)點一在執(zhí)行 CLUSTER MEET 命令時會首先為新節(jié)點創(chuàng)建一個 clusterNode 數(shù)據(jù),并將其添加到自己維護的 clusterState 的 nodes 字典中。有關(guān) clusterState 和 clusterNode 關(guān)系,我們在最后一節(jié)會有詳盡的示意圖和源碼來講解。
然后節(jié)點一會根據(jù)據(jù) CLUSTER MEET 命令中的 IP 地址和端口號,向新節(jié)點發(fā)送一條 MEET 消息。新節(jié)點接收到節(jié)點一發(fā)送的MEET消息后,新節(jié)點也會為節(jié)點一創(chuàng)建一個 clusterNode 結(jié)構(gòu),并將該結(jié)構(gòu)添加到自己維護的 clusterState 的 nodes 字典中。
接著,新節(jié)點向節(jié)點一返回一條PONG消息。節(jié)點一接收到節(jié)點B返回的PONG消息后,得知新節(jié)點已經(jīng)成功的接收了自己發(fā)送的MEET消息。
最后,節(jié)點一還會向新節(jié)點發(fā)送一條 PING 消息。新節(jié)點接收到該條 PING 消息后,可以知道節(jié)點A已經(jīng)成功的接收到了自己返回的P ONG消息,從而完成了新節(jié)點接入的握手操作。
MEET 操作成功之后,節(jié)點一會通過稍早時講的定時 PING 機制將新節(jié)點的信息發(fā)送給集群中的其他節(jié)點,讓其他節(jié)點也與新節(jié)點進行握手,最終,經(jīng)過一段時間后,新節(jié)點會被集群中的所有節(jié)點認識。
節(jié)點疑似下線和真正下線
Redis Cluster 中的節(jié)點會定期檢查已經(jīng)發(fā)送 PING 消息的接收方節(jié)點是否在規(guī)定時間 ( cluster-node-timeout ) 內(nèi)返回了 PONG 消息,如果沒有則會將其標記為疑似下線狀態(tài),也就是 PFAIL 狀態(tài),如下圖所示。
然后,節(jié)點一會通過 PING 消息,將節(jié)點二處于疑似下線狀態(tài)的信息傳遞給其他節(jié)點,例如節(jié)點三。節(jié)點三接收到節(jié)點一的 PING 消息得知節(jié)點二進入 PFAIL 狀態(tài)后,會在自己維護的 clusterState 的 nodes 字典中找到節(jié)點二所對應(yīng)的 clusterNode 結(jié)構(gòu),并將主節(jié)點一的下線報告添加到 clusterNode 結(jié)構(gòu)的 fail_reports 鏈表中。
隨著時間的推移,如果節(jié)點十 (舉個例子) 也因為 PONG 超時而認為節(jié)點二疑似下線了,并且發(fā)現(xiàn)自己維護的節(jié)點二的 clusterNode 的 fail_reports 中有半數(shù)以上的主節(jié)點數(shù)量的未過時的將節(jié)點二標記為 PFAIL 狀態(tài)報告日志,那么節(jié)點十將會把節(jié)點二將被標記為已下線 FAIL 狀態(tài),并且節(jié)點十會立刻向集群其他節(jié)點廣播主節(jié)點二已經(jīng)下線的 FAIL 消息,所有收到 FAIL 消息的節(jié)點都會立即將節(jié)點二狀態(tài)標記為已下線。如下圖所示。
需要注意的是,報告疑似下線記錄是由時效性的,如果超過 cluster-node-timeout *2 的時間,這個報告就會被忽略掉,讓節(jié)點二又恢復(fù)成正常狀態(tài)。
Redis Cluster 通信源碼實現(xiàn)
綜上,我們了解了 Redis Cluster 在定時 PING/PONG、新節(jié)點上線、節(jié)點疑似下線和真正下線等環(huán)節(jié)的原理和操作流程,下面我們來真正看一下 Redis 在這些環(huán)節(jié)的源碼實現(xiàn)和具體操作。
涉及的數(shù)據(jù)結(jié)構(gòu)體
首先,我們先來講解一下其中涉及的數(shù)據(jù)結(jié)構(gòu),也就是上文提到的 ClusterNode 等結(jié)構(gòu)。
每個節(jié)點都會維護一個 clusterState 結(jié)構(gòu),表示當前集群的整體狀態(tài),它的定義如下所示。
typedef struct clusterState {
? clusterNode *myself;? /* 當前節(jié)點的clusterNode信息 */
? ....
? dict *nodes;? ? ? ? ? /* name到clusterNode的字典 */
? ....
? clusterNode *slots[CLUSTER_SLOTS]; /* slot 和節(jié)點的對應(yīng)關(guān)系*/
? ....
} clusterState;
它有三個比較關(guān)鍵的字段,具體示意圖如下所示:
myself 字段,是一個 clusterNode 結(jié)構(gòu),用來記錄自己的狀態(tài);
nodes 字典,記錄一個 name 到 clusterNode 結(jié)構(gòu)的映射,以此來記錄其他節(jié)點的狀態(tài);
slot 數(shù)組,記錄slot 對應(yīng)的節(jié)點 clusterNode結(jié)構(gòu)。
clusterNode 結(jié)構(gòu)保存了一個節(jié)點的當前狀態(tài),比如節(jié)點的創(chuàng)建時間、節(jié)點的名字、節(jié)點 當前的配置紀元、節(jié)點的IP地址和端口號等等。除此之外,clusterNode結(jié)構(gòu)的 link 屬性是一個clusterLink結(jié)構(gòu),該結(jié)構(gòu)保存了連接節(jié)點所需的有關(guān)信息**,比如**套接字描述符,輸入緩沖區(qū)和輸出緩沖區(qū)。clusterNode 還有一個 fail_report 的列表,用來記錄疑似下線報告。具體定義如下所示。
typedef struct clusterNode {
? ? mstime_t ctime; /* 創(chuàng)建節(jié)點的時間 */
? ? char name[CLUSTER_NAMELEN]; /* 節(jié)點的名字 */
? ? int flags;? ? ? /* 節(jié)點標識,標記節(jié)點角色或者狀態(tài),比如主節(jié)點從節(jié)點或者在線和下線 */
? ? uint64_t configEpoch; /* 當前節(jié)點已知的集群統(tǒng)一epoch */
? ? unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
? ? int numslots;? /* Number of slots handled by this node */
? ? int numslaves;? /* Number of slave nodes, if this is a master */
? ? struct clusterNode **slaves; /* pointers to slave nodes */
? ? struct clusterNode *slaveof; /* pointer to the master node. Note that it
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? may be NULL even if the node is a slave
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if we don't have the master node in our
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? tables. */
? ? mstime_t ping_sent;? ? ? /* 當前節(jié)點最后一次向該節(jié)點發(fā)送 PING 消息的時間 */
? ? mstime_t pong_received;? /* 當前節(jié)點最后一次收到該節(jié)點 PONG 消息的時間 */
? ? mstime_t fail_time;? ? ? /* FAIL 標志位被設(shè)置的時間 */
? ? mstime_t voted_time;? ? /* Last time we voted for a slave of this master */
? ? mstime_t repl_offset_time;? /* Unix time we received offset for this node */
? ? mstime_t orphaned_time;? ? /* Starting time of orphaned master condition */
? ? long long repl_offset;? ? ? /* 當前節(jié)點的repl便宜 */
? ? char ip[NET_IP_STR_LEN];? /* 節(jié)點的IP 地址 */
? ? int port;? ? ? ? ? ? ? ? ? /* 端口 */
? ? int cport;? ? ? ? ? ? ? ? ? /* 通信端口,一般是端口+1000 */
? ? clusterLink *link;? ? ? ? ? /* 和該節(jié)點的 tcp 連接 */
? ? list *fail_reports;? ? ? ? /* 下線記錄列表 */
} clusterNode;
復(fù)制代碼
clusterNodeFailReport 是記錄節(jié)點下線報告的結(jié)構(gòu)體, node 是報告節(jié)點的信息,而 time 則代表著報告時間。
typedef struct clusterNodeFailReport {
? ? struct clusterNode *node;? /* 報告當前節(jié)點已經(jīng)下線的節(jié)點 */
? ? mstime_t time;? ? ? ? ? ? /* 報告時間 */
} clusterNodeFailReport;
消息結(jié)構(gòu)體
了解了 Reids 節(jié)點維護的數(shù)據(jù)結(jié)構(gòu)體后,我們再來看節(jié)點進行通信的消息結(jié)構(gòu)體。 通信消息最外側(cè)的結(jié)構(gòu)體為 clusterMsg,它包括了很多消息記錄信息,包括 RCmb 標志位,消息總長度,消息協(xié)議版本,消息類型;它還包括了發(fā)送該消息節(jié)點的記錄信息,比如節(jié)點名稱,節(jié)點負責(zé)的slot信息,節(jié)點ip和端口等;最后它包含了一個 clusterMsgData 來攜帶具體類型的消息。
typedef struct {
? ? char sig[4];? ? ? ? /* 標志位,"RCmb" (Redis Cluster message bus). */
? ? uint32_t totlen;? ? /* 消息總長度 */
? ? uint16_t ver;? ? ? /* 消息協(xié)議版本 */
? ? uint16_t port;? ? ? /* 端口 */
? ? uint16_t type;? ? ? /* 消息類型 */
? ? uint16_t count;? ? /*? */
? ? uint64_t currentEpoch;? /* 表示本節(jié)點當前記錄的整個集群的統(tǒng)一的epoch,用來決策選舉投票等,與configEpoch不同的是:configEpoch表示的是master節(jié)點的唯一標志,currentEpoch是集群的唯一標志。 */
? ? uint64_t configEpoch;? /* 每個master節(jié)點都有一個唯一的configEpoch做標志,如果和其他master節(jié)點沖突,會強制自增使本節(jié)點在集群中唯一 */
? ? uint64_t offset;? ? /* 主從復(fù)制偏移相關(guān)信息,主節(jié)點和從節(jié)點含義不同 */
? ? char sender[CLUSTER_NAMELEN]; /* 發(fā)送節(jié)點的名稱 */
? ? unsigned char myslots[CLUSTER_SLOTS/8]; /* 本節(jié)點負責(zé)的slots信息,16384/8個char數(shù)組,一共為16384bit */
? ? char slaveof[CLUSTER_NAMELEN]; /* master信息,假如本節(jié)點是slave節(jié)點的話,協(xié)議帶有master信息 */
? ? char myip[NET_IP_STR_LEN];? ? /* IP */
? ? char notused1[34];? /* 保留字段 */
? ? uint16_t cport;? ? ? /* 集群的通信端口 */
? ? uint16_t flags;? ? ? /* 本節(jié)點當前的狀態(tài),比如 CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET */
? ? unsigned char state; /* Cluster state from the POV of the sender */
? ? unsigned char mflags[3]; /* 本條消息的類型,目前只有兩類:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */
? ? union clusterMsgData data;
} clusterMsg;
clusterMsgData 是一個 union 結(jié)構(gòu)體,它可以為 PING,MEET,PONG 或者 FAIL 等消息體。其中當消息為 PING、MEET 和 PONG 類型時,ping 字段是被賦值的,而是 FAIL 類型時,fail 字段是被賦值的。
// 注意這是 union 關(guān)鍵字
union clusterMsgData {
? ? /* PING, MEET 或者 PONG 消息時,ping 字段被賦值 */
? ? struct {
? ? ? ? /* Array of N clusterMsgDataGossip structures */
? ? ? ? clusterMsgDataGossip gossip[1];
? ? } ping;
? ? /*? FAIL 消息時,fail 被賦值 */
? ? struct {
? ? ? ? clusterMsgDataFail about;
? ? } fail;
? ? // .... 省略 publish 和 update 消息的字段
};
clusterMsgDataGossip 是 PING、PONG 和 MEET 消息的結(jié)構(gòu)體,它會包括發(fā)送消息節(jié)點維護的其他節(jié)點信息,也就是上文中 clusterState 中 nodes 字段包含的信息,具體代碼如下所示,你也會發(fā)現(xiàn)二者的字段是類似的。
typedef struct {
/* 節(jié)點的名字,默認是隨機的,MEET消息發(fā)送并得到回復(fù)后,集群會為該節(jié)點設(shè)置正式的名稱*/
? ? char nodename[CLUSTER_NAMELEN];
? ? uint32_t ping_sent; /* 發(fā)送節(jié)點最后一次給接收節(jié)點發(fā)送 PING 消息的時間戳,收到對應(yīng) PONG 回復(fù)后會被賦值為0 */
? ? uint32_t pong_received; /* 發(fā)送節(jié)點最后一次收到接收節(jié)點發(fā)送 PONG 消息的時間戳 */
? ? char ip[NET_IP_STR_LEN];? /* IP address last time it was seen */
? ? uint16_t port;? ? ? /* IP*/? ? ?
? ? uint16_t cport;? ? ? /* 端口*/?
? ? uint16_t flags;? ? ? /* 標識*/
? ? uint32_t notused1;? /* 對齊字符*/
} clusterMsgDataGossip;
typedef struct {
? ? char nodename[CLUSTER_NAMELEN]; /* 下線節(jié)點的名字 */
} clusterMsgDataFail;
看完了節(jié)點維護的數(shù)據(jù)結(jié)構(gòu)體和發(fā)送的消息結(jié)構(gòu)體后,我們就來看看 Redis 的具體行為源碼了。
隨機周期性發(fā)送PING消息
Redis 的 clusterCron 函數(shù)會被定時調(diào)用,每被執(zhí)行10次,就會準備向隨機的一個節(jié)點發(fā)送 PING 消息。
它會先隨機的選出 5 個節(jié)點,然后從中選擇最久沒有與之通信的節(jié)點,調(diào)用 clusterSendPing 函數(shù)發(fā)送類型為 CLUSTERMSG_TYPE_PING 的消息
// cluster.c 文件
// clusterCron() 每執(zhí)行 10 次(至少間隔一秒鐘),就向一個隨機節(jié)點發(fā)送 gossip 信息
if (!(iteration % 10)) {
? ? int j;
? ? /* 隨機 5 個節(jié)點,選出其中一個 */
? ? for (j = 0; j < 5; j++) {
? ? ? ? de = dictGetRandomKey(server.cluster->nodes);
? ? ? ? clusterNode *this = dictGetVal(de);
? ? ? ? /* 不要 PING 連接斷開的節(jié)點,也不要 PING 最近已經(jīng) PING 過的節(jié)點 */
? ? ? ? if (this->link == NULL || this->ping_sent != 0) continue;
? ? ? ? if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
? ? ? ? ? ? continue;
? ? ? ? /* 對比 pong_received 字段,選出更長時間未收到其 PONG 消息的節(jié)點(表示好久沒有接受到該節(jié)點的PONG消息了) */
? ? ? ? if (min_pong_node == NULL || min_pong > this->pong_received) {
? ? ? ? ? ? min_pong_node = this;
? ? ? ? ? ? min_pong = this->pong_received;
? ? ? ? }
? ? }
? ? /* 向最久沒有收到 PONG 回復(fù)的節(jié)點發(fā)送 PING 命令 */
? ? if (min_pong_node) {
? ? ? ? serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
? ? ? ? clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
? ? }
}
clusterSendPing 函數(shù)的具體行為我們后續(xù)再了解,因為該函數(shù)在其他環(huán)節(jié)也會經(jīng)常用到
節(jié)點加入集群
當節(jié)點執(zhí)行 CLUSTER MEET 命令后,會在自身給新節(jié)點維護一個 clusterNode 結(jié)構(gòu)體,該結(jié)構(gòu)體的 link 也就是TCP連接字段是 null,表示是新節(jié)點尚未建立連接。
clusterCron 函數(shù)中也會處理這些未建立連接的新節(jié)點,調(diào)用 createClusterLink 創(chuàng)立連接,然后調(diào)用 clusterSendPing 函數(shù)來發(fā)送 MEET 消息
/* cluster.c clusterCron 函數(shù)部分,為未創(chuàng)建連接的節(jié)點創(chuàng)建連接 */
if (node->link == NULL) {
? ? int fd;
? ? mstime_t old_ping_sent;
? ? clusterLink *link;
? ? /* 和該節(jié)點建立連接 */
? ? fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
? ? ? ? node->cport, NET_FIRST_BIND_ADDR);
? ? /* .... fd 為-1時的異常處理 */
? ? /* 建立 link */
? ? link = createClusterLink(node);
? ? link->fd = fd;
? ? node->link = link;
? ? aeCreateFileEvent(server.el,link->fd,AE_READABLE,
? ? ? ? ? ? clusterReadHandler,link);
? ? /* 向新連接的節(jié)點發(fā)送 PING 命令,防止節(jié)點被識進入下線 */
? ? /* 如果節(jié)點被標記為 MEET ,那么發(fā)送 MEET 命令,否則發(fā)送 PING 命令 */
? ? old_ping_sent = node->ping_sent;
? ? clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
? ? ? ? ? ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
? ? /* .... */
? ? /* 如果當前節(jié)點(發(fā)送者)沒能收到 MEET 信息的回復(fù),那么它將不再向目標節(jié)點發(fā)送命令。*/
? ? /* 如果接收到回復(fù)的話,那么節(jié)點將不再處于 HANDSHAKE 狀態(tài),并繼續(xù)向目標節(jié)點發(fā)送普通 PING 命令*/
? ? node->flags &= ~CLUSTER_NODE_MEET;
}
防止節(jié)點假超時及狀態(tài)過期
防止節(jié)點假超時和標記疑似下線標記也是在 clusterCron 函數(shù)中,具體如下所示。它會檢查當前所有的 nodes 節(jié)點列表,如果發(fā)現(xiàn)某個節(jié)點與自己的最后一個 PONG 通信時間超過了預(yù)定的閾值的一半時,為了防止節(jié)點是假超時,會主動釋放掉與之的 link 連接,然后會主動向它發(fā)送一個 PING 消息。
/* cluster.c clusterCron 函數(shù)部分,遍歷節(jié)點來檢查 fail 的節(jié)點*/
while((de = dictNext(di)) != NULL) {
? ? clusterNode *node = dictGetVal(de);
? ? now = mstime(); /* Use an updated time at every iteration. */
? ? mstime_t delay;
? ? /* 如果等到 PONG 到達的時間超過了 node timeout 一半的連接 */
? ? /* 因為盡管節(jié)點依然正常,但連接可能已經(jīng)出問題了 */
? ? if (node->link && /* is connected */
? ? ? ? now - node->link->ctime >
? ? ? ? server.cluster_node_timeout && /* 還未重連 */
? ? ? ? node->ping_sent && /* 已經(jīng)發(fā)過ping消息 */
? ? ? ? node->pong_received < node->ping_sent && /* 還在等待pong消息 */
? ? ? ? /* 等待pong消息超過了 timeout/2 */
? ? ? ? now - node->ping_sent > server.cluster_node_timeout/2)
? ? {
? ? ? ? /* 釋放連接,下次 clusterCron() 會自動重連 */
? ? ? ? freeClusterLink(node->link);
? ? }
? ? /* 如果目前沒有在 PING 節(jié)點*/
? ? /* 并且已經(jīng)有 node timeout 一半的時間沒有從節(jié)點那里收到 PONG 回復(fù) */
? ? /* 那么向節(jié)點發(fā)送一個 PING ,確保節(jié)點的信息不會太舊,有可能一直沒有隨機中 */
? ? if (node->link &&
? ? ? ? node->ping_sent == 0 &&
? ? ? ? (now - node->pong_received) > server.cluster_node_timeout/2)
? ? {
? ? ? ? clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
? ? ? ? continue;
? ? }
? ? /* .... 處理failover和標記遺失下線 */
}
復(fù)制代碼
處理failover和標記疑似下線
如果防止節(jié)點假超時處理后,節(jié)點依舊未收到目標節(jié)點的 PONG 消息,并且時間已經(jīng)超過了 cluster_node_timeout,那么就將該節(jié)點標記為疑似下線狀態(tài)。
/* 如果這是一個主節(jié)點,并且有一個從服務(wù)器請求進行手動故障轉(zhuǎn)移,那么向從服務(wù)器發(fā)送 PING*/
if (server.cluster->mf_end &&
? ? nodeIsMaster(myself) &&
? ? server.cluster->mf_slave == node &&
? ? node->link)
{
? ? clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
? ? continue;
}
/* 后續(xù)代碼只在節(jié)點發(fā)送了 PING 命令的情況下執(zhí)行*/
if (node->ping_sent == 0) continue;
/* 計算等待 PONG 回復(fù)的時長 */
delay = now - node->ping_sent;
/* 等待 PONG 回復(fù)的時長超過了限制值,將目標節(jié)點標記為 PFAIL (疑似下線)*/
if (delay > server.cluster_node_timeout) {
? ? /* 超時了,標記為疑似下線 */
? ? if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
? ? ? ? redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
? ? ? ? ? ? node->name);
? ? ? ? // 打開疑似下線標記
? ? ? ? node->flags |= REDIS_NODE_PFAIL;
? ? ? ? update_state = 1;
? ? }
}
實際發(fā)送Gossip消息
以下是前方多次調(diào)用過的clusterSendPing()方法的源碼,代碼中有詳細的注釋,大家可以自行閱讀。主要的操作就是將節(jié)點自身維護的 clusterState 轉(zhuǎn)換為對應(yīng)的消息結(jié)構(gòu)體,。
/* 向指定節(jié)點發(fā)送一條 MEET 、 PING 或者 PONG 消息 */
void clusterSendPing(clusterLink *link, int type) {
? ? unsigned char *buf;
? ? clusterMsg *hdr;
? ? int gossipcount = 0; /* Number of gossip sections added so far. */
? ? int wanted; /* Number of gossip sections we want to append if possible. */
? ? int totlen; /* Total packet length. */
? ? // freshnodes 是用于發(fā)送 gossip 信息的計數(shù)器
? ? // 每次發(fā)送一條信息時,程序?qū)?freshnodes 的值減一
? ? // 當 freshnodes 的數(shù)值小于等于 0 時,程序停止發(fā)送 gossip 信息
? ? // freshnodes 的數(shù)量是節(jié)點目前的 nodes 表中的節(jié)點數(shù)量減去 2
? ? // 這里的 2 指兩個節(jié)點,一個是 myself 節(jié)點(也即是發(fā)送信息的這個節(jié)點)
? ? // 另一個是接受 gossip 信息的節(jié)點
? ? int freshnodes = dictSize(server.cluster->nodes)-2;
? ? /* 計算要攜帶多少節(jié)點的信息,最少3個,最多 1/10 集群總節(jié)點數(shù)量*/
? ? wanted = floor(dictSize(server.cluster->nodes)/10);
? ? if (wanted < 3) wanted = 3;
? ? if (wanted > freshnodes) wanted = freshnodes;
? ? /* .... 省略 totlen 的計算等*/
? ? /* 如果發(fā)送的信息是 PING ,那么更新最后一次發(fā)送 PING 命令的時間戳 */
? ? if (link->node && type == CLUSTERMSG_TYPE_PING)
? ? ? ? link->node->ping_sent = mstime();
? ? /* 將當前節(jié)點的信息(比如名字、地址、端口號、負責(zé)處理的槽)記錄到消息里面 */
? ? clusterBuildMessageHdr(hdr,type);
? ? /* Populate the gossip fields */
? ? int maxiterations = wanted*3;
? ? /* 每個節(jié)點有 freshnodes 次發(fā)送 gossip 信息的機會
? ? ? 每次向目標節(jié)點發(fā)送 2 個被選中節(jié)點的 gossip 信息(gossipcount 計數(shù)) */
? ? while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
? ? ? ? /* 從 nodes 字典中隨機選出一個節(jié)點(被選中節(jié)點) */
? ? ? ? dictEntry *de = dictGetRandomKey(server.cluster->nodes);
? ? ? ? clusterNode *this = dictGetVal(de);
? ? ? ? /* 以下節(jié)點不能作為被選中節(jié)點:
? ? ? ? * Myself:節(jié)點本身。
? ? ? ? * PFAIL狀態(tài)的節(jié)點
? ? ? ? * 處于 HANDSHAKE 狀態(tài)的節(jié)點。
? ? ? ? * 帶有 NOADDR 標識的節(jié)點
? ? ? ? * 因為不處理任何 Slot 而被斷開連接的節(jié)點
? ? ? ? */
? ? ? ? if (this == myself) continue;
? ? ? ? if (this->flags & CLUSTER_NODE_PFAIL) continue;
? ? ? ? if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
? ? ? ? ? ? (this->link == NULL && this->numslots == 0))
? ? ? ? {
? ? ? ? ? ? freshnodes--; /* Tecnically not correct, but saves CPU. */
? ? ? ? ? ? continue;
? ? ? ? }
? ? ? ? // 檢查被選中節(jié)點是否已經(jīng)在 hdr->data.ping.gossip 數(shù)組里面
? ? ? ? // 如果是的話說明這個節(jié)點之前已經(jīng)被選中了
? ? ? ? // 不要再選中它(否則就會出現(xiàn)重復(fù))
? ? ? ? if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;
? ? ? ? /* 這個被選中節(jié)點有效,計數(shù)器減一 */
? ? ? ? clusterSetGossipEntry(hdr,gossipcount,this);
? ? ? ? freshnodes--;
? ? ? ? gossipcount++;
? ? }
? ? /* .... 如果有 PFAIL 節(jié)點,最后添加 */
? ? /* 計算信息長度 */
? ? totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
? ? totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
? ? /* 將被選中節(jié)點的數(shù)量(gossip 信息中包含了多少個節(jié)點的信息)記錄在 count 屬性里面*/
? ? hdr->count = htons(gossipcount);
? ? /* 將信息的長度記錄到信息里面 */
? ? hdr->totlen = htonl(totlen);
? ? /* 發(fā)送網(wǎng)絡(luò)請求 */
? ? clusterSendMessage(link,buf,totlen);
? ? zfree(buf);
}
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
? ? clusterMsgDataGossip *gossip;
? ? /* 指向 gossip 信息結(jié)構(gòu) */
? ? gossip = &(hdr->data.ping.gossip[i]);
? ? /* 將被選中節(jié)點的名字記錄到 gossip 信息 */?
? ? memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
? ? /* 將被選中節(jié)點的 PING 命令發(fā)送時間戳記錄到 gossip 信息 */
? ? gossip->ping_sent = htonl(n->ping_sent/1000);
? ? /* 將被選中節(jié)點的 PONG 命令回復(fù)的時間戳記錄到 gossip 信息 */
? ? gossip->pong_received = htonl(n->pong_received/1000);
? ? /* 將被選中節(jié)點的 IP 記錄到 gossip 信息 */
? ? memcpy(gossip->ip,n->ip,sizeof(n->ip));
? ? /* 將被選中節(jié)點的端口號記錄到 gossip 信息 */
? ? gossip->port = htons(n->port);
? ? gossip->cport = htons(n->cport);
? ? /* 將被選中節(jié)點的標識值記錄到 gossip 信息 */
? ? gossip->flags = htons(n->flags);
? ? gossip->notused1 = 0;
}
下面是 clusterBuildMessageHdr 函數(shù),它主要負責(zé)填充消息結(jié)構(gòu)體中的基礎(chǔ)信息和當前節(jié)點的狀態(tài)信息。
/* 構(gòu)建消息的 header */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
? ? int totlen = 0;
? ? uint64_t offset;
? ? clusterNode *master;
? ? /* 如果當前節(jié)點是salve,則master為其主節(jié)點,如果當前節(jié)點是master節(jié)點,則master就是當前節(jié)點 */
? ? master = (nodeIsSlave(myself) && myself->slaveof) ?
? ? ? ? ? ? ? myself->slaveof : myself;
? ? memset(hdr,0,sizeof(*hdr));
? ? /* 初始化協(xié)議版本、標識、及類型, */
? ? hdr->ver = htons(CLUSTER_PROTO_VER);
? ? hdr->sig[0] = 'R';
? ? hdr->sig[1] = 'C';
? ? hdr->sig[2] = 'm';
? ? hdr->sig[3] = 'b';
? ? hdr->type = htons(type);
? ? /* 消息頭設(shè)置當前節(jié)點id */
? ? memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
? ? /* 消息頭設(shè)置當前節(jié)點ip */
? ? memset(hdr->myip,0,NET_IP_STR_LEN);
? ? if (server.cluster_announce_ip) {
? ? ? ? strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
? ? ? ? hdr->myip[NET_IP_STR_LEN-1] = '\0';
? ? }
? ? /* 基礎(chǔ)端口及集群內(nèi)節(jié)點通信端口 */
? ? int announced_port = server.cluster_announce_port ?
? ? ? ? ? ? ? ? ? ? ? ? server.cluster_announce_port : server.port;
? ? int announced_cport = server.cluster_announce_bus_port ?
? ? ? ? ? ? ? ? ? ? ? ? ? server.cluster_announce_bus_port :
? ? ? ? ? ? ? ? ? ? ? ? ? (server.port + CLUSTER_PORT_INCR);
? ? /* 設(shè)置當前節(jié)點的槽信息 */
? ? memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
? ? memset(hdr->slaveof,0,CLUSTER_NAMELEN);
? ? if (myself->slaveof != NULL)
? ? ? ? memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
? ? hdr->port = htons(announced_port);
? ? hdr->cport = htons(announced_cport);
? ? hdr->flags = htons(myself->flags);
? ? hdr->state = server.cluster->state;
? ? /* 設(shè)置 currentEpoch and configEpochs. */
? ? hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
? ? hdr->configEpoch = htonu64(master->configEpoch);
? ? /* 設(shè)置復(fù)制偏移量 */
? ? if (nodeIsSlave(myself))
? ? ? ? offset = replicationGetSlaveOffset();
? ? else
? ? ? ? offset = server.master_repl_offset;
? ? hdr->offset = htonu64(offset);
? ? /* Set the message flags. */
? ? if (nodeIsMaster(myself) && server.cluster->mf_end)
? ? ? ? hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
? ? /* 計算并設(shè)置消息的總長度 */
? ? if (type == CLUSTERMSG_TYPE_FAIL) {
? ? ? ? totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
? ? ? ? totlen += sizeof(clusterMsgDataFail);
? ? } else if (type == CLUSTERMSG_TYPE_UPDATE) {
? ? ? ? totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
? ? ? ? totlen += sizeof(clusterMsgDataUpdate);
? ? }
? ? hdr->totlen = htonl(totlen);
}
后記
本來只想寫一下 Redis Cluster 的 Gossip 協(xié)議,沒想到文章越寫,內(nèi)容越多,最后源碼分析也是有點虎頭蛇尾,大家就湊合看一下,也希望大家繼續(xù)關(guān)注我后續(xù)的問題。
作者:用戶836357178299
鏈接:https://juejin.cn/post/6902004920543952909