sentinel是redis的高可用性解決方案:由一個(gè)或多個(gè)sentinel實(shí)例組成sentinel系統(tǒng)監(jiān)視多個(gè)master以及master的slave,并在被監(jiān)視的master進(jìn)入下線狀態(tài)時(shí),自動(dòng)將下線master的某個(gè)slave升級(jí)為master,然后新的master代替下線的master處

知識(shí)點(diǎn)
- 建立連接
- 周期性的操作
- 發(fā)送監(jiān)控命令
- 判斷節(jié)點(diǎn)的主觀下線狀態(tài)
- 判斷主節(jié)點(diǎn)的客觀下線狀態(tài)
- 對(duì)主節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移
- 選擇一個(gè)要晉升的從節(jié)點(diǎn)
- 使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)
- 從節(jié)點(diǎn)同步新的主節(jié)點(diǎn)
- 更新主節(jié)點(diǎn)的狀態(tài)
- 處理主從切換
建立連接
首先,執(zhí)行的第一個(gè)函數(shù)就是sentinelReconnectInstance()函數(shù),因?yàn)樵谳d入配置的時(shí)候,我們將創(chuàng)建的主節(jié)點(diǎn)實(shí)例加入到sentinel.masters字典的時(shí)候,該主節(jié)點(diǎn)的連接是關(guān)閉的,所以第一件事就是為主節(jié)點(diǎn)和哨兵節(jié)點(diǎn)建立網(wǎng)絡(luò)連接。
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
// 如果ri實(shí)例沒有連接中斷,則直接返回
if (ri->link->disconnected == 0) return;
// ri實(shí)例地址非法
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();
// 如果還沒有最近一次重連的時(shí)間距離現(xiàn)在太短,小于1s,則直接返回
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
// 設(shè)置最近重連的時(shí)間
ri->link->last_reconn_time = now;
/* Commands connection. */
// cc:命令連接
if (link->cc == NULL) {
// 綁定ri實(shí)例的連接地址并建立連接
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
// 命令連接失敗,則事件通知,且斷開cc連接
if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
// 命令連接成功
} else {
// 重置cc連接的屬性
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
// 將服務(wù)器的事件循環(huán)關(guān)聯(lián)到cc連接的上下文中
redisAeAttach(server.el,link->cc);
// 設(shè)置確立連接的回調(diào)函數(shù)
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
// 設(shè)置斷開連接的回調(diào)處理
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
// 發(fā)送AUTH 命令認(rèn)證
sentinelSendAuthIfNeeded(ri,link->cc);
// 發(fā)送連接名字
sentinelSetClientName(ri,link->cc,"cmd");
/* Send a PING ASAP when reconnecting. */
// 立即向ri實(shí)例發(fā)送PING命令
sentinelSendPing(ri);
}
}
/* Pub / Sub */
// pc:發(fā)布訂閱連接
// 只對(duì)主節(jié)點(diǎn)和從節(jié)點(diǎn)如果沒有設(shè)置pc連接則建立一個(gè)
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
// 綁定指定ri的連接地址并建立連接
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
// pc連接失敗,則事件通知,且斷開pc連接
if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
// pc連接成功
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
// 將服務(wù)器的事件循環(huán)關(guān)聯(lián)到pc連接的上下文中
redisAeAttach(server.el,link->pc);
// 設(shè)置確立連接的回調(diào)函數(shù)
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
// 設(shè)置斷開連接的回調(diào)處理
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
// 發(fā)送AUTH 命令認(rèn)證
sentinelSendAuthIfNeeded(ri,link->pc);
// 發(fā)送連接名字
sentinelSetClientName(ri,link->pc,"pubsub");
// 發(fā)送訂閱 __sentinel__:hello 頻道的命令,設(shè)置回調(diào)函數(shù)處理回復(fù)
// sentinelReceiveHelloMessages是處理Pub/Sub的頻道返回信息的回調(diào)函數(shù),可以發(fā)現(xiàn)訂閱同一master的Sentinel節(jié)點(diǎn)
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
// 訂閱頻道出錯(cuò),關(guān)閉
if (retval != C_OK) {
// 關(guān)閉pc連接
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
// 如果已經(jīng)建立了新的連接,則清除斷開連接的狀態(tài)。表示已經(jīng)建立了連接
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
建立連接的函數(shù)redisAsyncConnectBind()是Redis的官方C語言客戶端hiredis的異步連接函數(shù),當(dāng)連接成功時(shí)需要調(diào)用redisAeAttach()函數(shù)來將服務(wù)器的事件循環(huán)(ae)與連接的上下文相關(guān)聯(lián)起來(因?yàn)閔iredis提供了多種適配器,包括事件ae,libev,libevent,libuv),在關(guān)聯(lián)的時(shí)候,會(huì)設(shè)置了網(wǎng)絡(luò)連接的可寫可讀事件的處理程序。接下來還會(huì)設(shè)置該連接的確立時(shí)和斷開時(shí)的回調(diào)函數(shù)redisAsyncSetConnectCallback()和redisAsyncSetDisconnectCallback(),為什么這么做,就是因?yàn)樵撨B接是異步的。
了解了以上這些,繼續(xù)分析節(jié)點(diǎn)實(shí)例和當(dāng)前哨兵的連接建立。從該函數(shù)中可以很明顯的看出來:
無論是主節(jié)點(diǎn)、從節(jié)點(diǎn)還是哨兵節(jié)點(diǎn),都會(huì)與當(dāng)前哨兵建立命令連接(Commands connection)。
只有主節(jié)點(diǎn)或從節(jié)點(diǎn)才會(huì)建立發(fā)布訂閱連接(Pub / Sub connection)。
當(dāng)建立了命令連接(cc)之后立即執(zhí)行了三個(gè)動(dòng)作:

當(dāng)建立了發(fā)布訂閱連接(pc)之后立即執(zhí)行的動(dòng)作:(前兩個(gè)動(dòng)作與命令連接相同,只列出不相同的第三個(gè))

如果成功建立連接,之后會(huì)清除連接斷開的標(biāo)志,以表示連接已建立。
如果不是第一次執(zhí)行,那么會(huì)判斷連接是否建立,如果斷開,則重新給建立,如果沒有斷開,那么什么都不會(huì)做直接返回。
發(fā)送監(jiān)控命令
執(zhí)行完建立網(wǎng)絡(luò)連接的函數(shù),接下來會(huì)執(zhí)行sentinelSendPeriodicCommands()函數(shù),該函數(shù)就是定期發(fā)送一些監(jiān)控命令到主節(jié)點(diǎn)或從節(jié)點(diǎn)或哨兵節(jié)點(diǎn),這些節(jié)點(diǎn)會(huì)將哨兵節(jié)點(diǎn)作為客戶端來處理,我們接下來仔細(xì)分析
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
// 如果ri實(shí)例連接處于關(guān)閉狀態(tài),直接返回
if (ri->link->disconnected) return;
// 對(duì)于不是發(fā)送關(guān)鍵命令的INFO,PING,PUBLISH,我們也有SENTINEL_MAX_PENDING_COMMANDS的限制。 我們不想使用大量的內(nèi)存,只是因?yàn)檫B接對(duì)象無法正常工作(請(qǐng)注意,無論如何,還有一個(gè)冗余的保護(hù)措施,即如果檢測(cè)到長(zhǎng)時(shí)間的超時(shí)條件,連接將被斷開連接并重新連接
// 每個(gè)實(shí)例的已發(fā)送未回復(fù)的命令個(gè)數(shù)不能超過100個(gè),否則直接返回
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
// 如果主節(jié)點(diǎn)處于O_DOWN狀態(tài)下,那么Sentinel默認(rèn)每秒發(fā)送INFO命令給它的從節(jié)點(diǎn),而不是通常的SENTINEL_INFO_PERIOD(10s)周期。在這種狀態(tài)下,我們想更密切的監(jiān)控從節(jié)點(diǎn),萬一他們被其他的Sentinel晉升為主節(jié)點(diǎn)
// 如果從節(jié)點(diǎn)報(bào)告和主節(jié)點(diǎn)斷開連接,我們同樣也監(jiān)控INFO命令的輸出更加頻繁,以便我們能有一個(gè)更新鮮的斷開連接的時(shí)間
// 如果ri是從節(jié)點(diǎn),且他的主節(jié)點(diǎn)處于故障狀態(tài)的狀態(tài)或者從節(jié)點(diǎn)和主節(jié)點(diǎn)斷開復(fù)制了
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
// 設(shè)置INFO命令的周期時(shí)間為1s
info_period = 1000;
} else {
// 否則就是默認(rèn)的10s
info_period = SENTINEL_INFO_PERIOD;
}
// 每次最后一次接收到的PONG比配置的 'down-after-milliseconds' 時(shí)間更長(zhǎng),但是如果 'down-after-milliseconds'大于1秒,則每秒鐘進(jìn)行一次ping
// 獲取ri設(shè)置的主觀下線的時(shí)間
ping_period = ri->down_after_period;
// 如果大于1秒,則設(shè)置為1秒
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
// 如果實(shí)例不是Sentinel節(jié)點(diǎn)且Sentinel節(jié)點(diǎn)從該數(shù)據(jù)節(jié)點(diǎn)(主節(jié)點(diǎn)或從節(jié)點(diǎn))沒有收到過INFO回復(fù)或者收到INFO回復(fù)超時(shí)
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
// 發(fā)送INFO命令給主節(jié)點(diǎn)和從節(jié)點(diǎn)
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "INFO");
// 已發(fā)送未回復(fù)的命令個(gè)數(shù)加1
if (retval == C_OK) ri->link->pending_commands++;
// 如果發(fā)送和回復(fù)PING命令超時(shí)
} else if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
// 發(fā)送一個(gè)PING命令給ri實(shí)例,并且更新act_ping_time
sentinelSendPing(ri);
// 發(fā)送頻道的定時(shí)命令超時(shí)
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
// 發(fā)布hello信息給ri實(shí)例
sentinelSendHello(ri);
}
}
從這個(gè)函數(shù)我們可以了解到一下信息:
- 一個(gè)連接對(duì)發(fā)送命令的個(gè)數(shù)有限制。因?yàn)檫B接是一個(gè)異步操作,發(fā)送了不一定會(huì)立即接收到,因此會(huì)為了節(jié)約內(nèi)存而有一個(gè)限制,已發(fā)送未回復(fù)的命令個(gè)數(shù)不能超過100個(gè),否則不做操作。
- 當(dāng)該哨兵節(jié)點(diǎn)正在監(jiān)控從節(jié)點(diǎn)時(shí),但是從節(jié)點(diǎn)從屬的主節(jié)點(diǎn)發(fā)送了故障,那么會(huì)設(shè)置發(fā)送INFO命令的頻率為1s,否則就是默認(rèn)的10s發(fā)送一次INFO命令。
- PING命令的頻率是1s發(fā)送一次。
接下來,就逐個(gè)分析所發(fā)送的監(jiān)控命令。
- 第一個(gè)是INFO命令
哨兵節(jié)點(diǎn)只將INFO命令發(fā)送給主節(jié)點(diǎn)或從節(jié)點(diǎn)。并且設(shè)置sentinelInfoReplyCallback()函數(shù)來處理INFO命令的回復(fù)信息。
處理函數(shù)的代碼有300多行,這里就不列出來了,可以上github查看 sentinel.c源碼詳細(xì)注釋。
當(dāng)INFO名的回復(fù)正確時(shí),會(huì)調(diào)用sentinelRefreshInstanceInfo()函數(shù)來處理INFO命令的回復(fù)。處理INFO命令的回復(fù)有兩部分:
1.獲取該連接的節(jié)點(diǎn)實(shí)例最基本的信息,如:run_id,role,如果是發(fā)送給主節(jié)點(diǎn),會(huì)獲取到從節(jié)點(diǎn)信息;如果是發(fā)送給從節(jié)點(diǎn),會(huì)獲取到其主節(jié)點(diǎn)的信息。總之會(huì)獲取當(dāng)前整個(gè)集群網(wǎng)絡(luò)的所有活躍的節(jié)點(diǎn)信息,并將其保存到當(dāng)前哨兵的狀態(tài)中,而且會(huì)刷新配置文件。這就是為什么在配置文件中不需要配置從節(jié)點(diǎn)的信息,因?yàn)橥ㄟ^這一操作會(huì)自動(dòng)發(fā)現(xiàn)從節(jié)點(diǎn)。
2.處理角色變化的情況。當(dāng)接收到INFO命令的回復(fù),有可能發(fā)現(xiàn)當(dāng)前哨兵連接的節(jié)點(diǎn)的角色狀態(tài)發(fā)生變化,因此要處理這些情況。
連接的節(jié)點(diǎn)實(shí)例是主節(jié)點(diǎn),但是INFO命令顯示連接的是從節(jié)點(diǎn)。
什么也不做。
連接的節(jié)點(diǎn)實(shí)例是從節(jié)點(diǎn),但是INFO命令顯示連接的是主節(jié)點(diǎn)。
連接的從節(jié)點(diǎn)是被晉升的從節(jié)點(diǎn),且他的主節(jié)點(diǎn)處于等待該從節(jié)點(diǎn)晉升的狀態(tài),那么會(huì)更新一些屬性。
連接的從節(jié)點(diǎn)是被晉升的從節(jié)點(diǎn),但是主節(jié)點(diǎn)在發(fā)生故障轉(zhuǎn)移的超時(shí)時(shí)間限制內(nèi)又重新上線,因此要將該晉升的從節(jié)點(diǎn)重新降級(jí)為普通的從節(jié)點(diǎn),并從屬原來的主節(jié)點(diǎn),通過發(fā)送slaveof命令。
連接的節(jié)點(diǎn)實(shí)例是從節(jié)點(diǎn),INFO命令顯示連接的也是主節(jié)點(diǎn),但是發(fā)現(xiàn)該從節(jié)點(diǎn)從屬的主節(jié)點(diǎn)地址發(fā)生了變化。
發(fā)送slaveof命令使其從屬新的主節(jié)點(diǎn)。
連接的節(jié)點(diǎn)實(shí)例是從節(jié)點(diǎn),INFO命令顯示連接的也是主節(jié)點(diǎn),但是該從節(jié)點(diǎn)處于已經(jīng)接受slaveof命令(SRI_RECONF_SENT)或者正在根據(jù)slaveof命令指定的主節(jié)點(diǎn)執(zhí)行同步操作(SRI_RECONF_INPROG)的狀態(tài)。
將他們的狀態(tài)設(shè)置為下一步狀態(tài),表示當(dāng)前狀態(tài)的操作已經(jīng)完成。
當(dāng)這些處理只是當(dāng)收到INFO命令的回復(fù)時(shí)才會(huì)進(jìn)行處理。我們繼續(xù)分析下一個(gè)發(fā)送監(jiān)控的命令。
- 第二個(gè)是PING命令
這個(gè)發(fā)送的函數(shù)sentinelSendPing()函數(shù)和在第一次創(chuàng)建命令連接時(shí)執(zhí)行的函數(shù)操作一樣。
int sentinelSendPing(sentinelRedisInstance *ri) {
// 異步發(fā)送一個(gè)PING命令給實(shí)例ri
int retval = redisAsyncCommand(ri->link->cc,
sentinelPingReplyCallback, ri, "PING");
// 發(fā)送成功
if (retval == C_OK) {
// 已發(fā)送未回復(fù)的命令個(gè)數(shù)加1
ri->link->pending_commands++;
// 更新最近一次發(fā)送PING命令的時(shí)間
ri->link->last_ping_time = mstime();
// 更新最近一次發(fā)送PING命令,但沒有收到PONG命令的時(shí)間
if (ri->link->act_ping_time == 0)
ri->link->act_ping_time = ri->link->last_ping_time;
return 1;
} else {
return 0;
}
}
該函數(shù),發(fā)送給實(shí)例一個(gè)PING并且更新所有連接的狀態(tài)。設(shè)置sentinelPingReplyCallback()來處理PING命令的回復(fù)。
PING命令的回復(fù)有以下兩種:
- 狀態(tài)回復(fù)或者錯(cuò)誤回復(fù)
PONG、LOADING、MASTERDOWN這三個(gè)是可以接受的回復(fù),會(huì)更新最近的交互時(shí)間,用來判斷實(shí)例和哨兵之間的網(wǎng)絡(luò)可達(dá)。
忙回復(fù) - BUSY這個(gè)可能會(huì)是因?yàn)閳?zhí)行腳本而表現(xiàn)為下線狀態(tài)。所以會(huì)發(fā)送一個(gè)SCRIPT KILL命令來終止腳本的執(zhí)行。
無論如何,只要接受到回復(fù),都會(huì)更新最近一次收到PING命令回復(fù)的狀態(tài),表示連接可達(dá)。
第三個(gè)是PUBLISH命令
發(fā)送PUBLISH命令,可以叫發(fā)送hello信息。因?yàn)檫@個(gè)操作像是和訂閱該主節(jié)點(diǎn)的其他哨兵節(jié)點(diǎn)打招呼。
函數(shù)sentinelSendHello()用來發(fā)送hello信息,該函數(shù)主要做了兩步操作:
1.構(gòu)建hello信息的內(nèi)容。hello信息的格式如下:
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch這些信息包含有:當(dāng)前哨兵的信息和主節(jié)點(diǎn)信息。
2.發(fā)送PUBLISH命令,將hello信息發(fā)布到創(chuàng)建連接時(shí)建立的頻道。
設(shè)置sentinelPublishReplyCallback()函數(shù)為處理PUBLISH命令的回復(fù)。該命令主要就是更新通過頻道進(jìn)行通信的時(shí)間,以便保持發(fā)布訂閱連接的可達(dá)。
通過發(fā)送PUBLISH命令給任意類型實(shí)例,最終都是將主節(jié)點(diǎn)信息和當(dāng)前哨兵信息廣播給所有的訂閱指定頻道的哨兵節(jié)點(diǎn),這樣就可以將監(jiān)控相同主節(jié)點(diǎn)的哨兵保存在哨兵實(shí)例的sentinels字典中。
發(fā)送完這些命令,就會(huì)獲取所有節(jié)點(diǎn)的新的狀態(tài)。因此,要根據(jù)這些狀態(tài)要判斷是否出現(xiàn)網(wǎng)絡(luò)故障。
判斷節(jié)點(diǎn)的主觀下線狀態(tài)
當(dāng)前哨兵節(jié)點(diǎn)發(fā)送完所有的監(jiān)控命令,有可能發(fā)送成功且順利收到回復(fù),也有可能發(fā)送和回復(fù)都沒有成功收到等等可能,因此要對(duì)當(dāng)前節(jié)點(diǎn)實(shí)例(所有類型都要進(jìn)行判斷)調(diào)用sentinelCheckSubjectivelyDown()函數(shù)進(jìn)行主觀下線判斷
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
// 獲取ri實(shí)例回復(fù)命令已經(jīng)過去的時(shí)長(zhǎng)
if (ri->link->act_ping_time)
// 獲取最近一次發(fā)送PING命令過去了多少時(shí)間
elapsed = mstime() - ri->link->act_ping_time;
// 如果實(shí)例的連接已經(jīng)斷開
else if (ri->link->disconnected)
// 獲取最近一次回復(fù)PING命令過去了多少時(shí)間
elapsed = mstime() - ri->link->last_avail_time;
// 如果連接處于低活躍度,那么進(jìn)行重新連接
// cc命令連接超過了1.5s,并且之前發(fā)送過PING命令但是連接活躍度很低
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 && /* Ther is a pending ping... */
/* The pending ping is delayed, and we did not received
* error replies as well. */
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{ // 斷開ri實(shí)例的cc命令連接
instanceLinkCloseConnection(ri->link,ri->link->cc);
}
// 檢查pc發(fā)布訂閱的連接是否也處于低活躍狀態(tài)
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{ // 斷開ri實(shí)例的pc發(fā)布訂閱連接
instanceLinkCloseConnection(ri->link,ri->link->pc);
}
// 更新主觀下線標(biāo)志,條件如下:
/*
1. 沒有回復(fù)命令
2. Sentinel節(jié)點(diǎn)認(rèn)為ri是主節(jié)點(diǎn),但是它報(bào)告它是從節(jié)點(diǎn)
*/
// ri實(shí)例回復(fù)命令已經(jīng)過去的時(shí)長(zhǎng)已經(jīng)超過主觀下線的時(shí)限,并且ri實(shí)例是主節(jié)點(diǎn),但是報(bào)告是從節(jié)點(diǎn)
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
// 設(shè)置主觀下線的標(biāo)識(shí)
if ((ri->flags & SRI_S_DOWN) == 0) {
// 發(fā)送"+sdown"的事件通知
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
// 設(shè)置實(shí)例被判斷主觀下線的時(shí)間
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
// 如果設(shè)置了主觀下線的標(biāo)識(shí),則取消標(biāo)識(shí)
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
該函數(shù)主要做了兩件事:
- 根據(jù)命令連接和發(fā)布訂閱連接的活躍度來判斷是否要執(zhí)行斷開對(duì)應(yīng)連接的操作。以便下次時(shí)鐘循環(huán)在重新連接,以保證可靠性。
- 獲取回復(fù)PING命令過去的時(shí)間,然后進(jìn)行判斷是否已經(jīng)下線。如果滿足主觀下線的條件,那么會(huì)設(shè)置主觀下線的標(biāo)識(shí)。主觀下線條件有兩個(gè):
1.回復(fù)PING命令超時(shí)
2.哨兵節(jié)點(diǎn)發(fā)現(xiàn)他的角色發(fā)生變化。認(rèn)為它是主節(jié)點(diǎn)但是報(bào)告顯示它是從節(jié)點(diǎn)。
當(dāng)判斷完主觀下線厚,雖然對(duì)實(shí)例設(shè)置了主觀下線的標(biāo)識(shí),但是只有該實(shí)例是主節(jié)點(diǎn),才會(huì)執(zhí)行進(jìn)一步的判斷。否則對(duì)于其他類型節(jié)點(diǎn)來說,他們的周期性操作已經(jīng)執(zhí)行完成
判斷主節(jié)點(diǎn)的客觀下線狀態(tài)
客觀下線狀態(tài)的判斷只針對(duì)主節(jié)點(diǎn)而言。之前已經(jīng)判斷過主觀下線,因此只有被當(dāng)前哨兵節(jié)點(diǎn)判斷為主觀下線的主節(jié)點(diǎn)才會(huì)繼續(xù)執(zhí)行客觀下線的判斷。
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
// 如果該master實(shí)例已經(jīng)被當(dāng)前Sentinel節(jié)點(diǎn)判斷為主觀下線
if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
// 當(dāng)前Sentinel節(jié)點(diǎn)認(rèn)為下線投1票
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
// 遍歷監(jiān)控該master實(shí)例的所有的Sentinel節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 如果Sentinel也認(rèn)為master實(shí)例主觀下線,那么增加投票數(shù)
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
// 如果超過master設(shè)置的客觀下線票數(shù),則設(shè)置客觀下線標(biāo)識(shí)
if (quorum >= master->quorum) odown = 1;
}
/* Set the flag accordingly to the outcome. */
// 如果被判斷為客觀下線
if (odown) {
// master沒有客觀下線標(biāo)識(shí)則要設(shè)置
if ((master->flags & SRI_O_DOWN) == 0) {
// 發(fā)送"+odown"事件通知
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
// 設(shè)置master客觀下線標(biāo)識(shí)
master->flags |= SRI_O_DOWN;
// 設(shè)置master被判斷客觀下線的時(shí)間
master->o_down_since_time = mstime();
}
// master實(shí)例沒有客觀下線
} else {
// 取消master客觀下線標(biāo)識(shí)
if (master->flags & SRI_O_DOWN) {
// 發(fā)送"-odown"事件通知
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
該函數(shù)做了兩個(gè)工作:
1.遍歷監(jiān)控該主節(jié)點(diǎn)的所有其他的哨兵節(jié)點(diǎn),如果這些哨兵節(jié)點(diǎn)也認(rèn)為當(dāng)前主節(jié)點(diǎn)下線(SRI_MASTER_DOWN),那么投票數(shù)加1,當(dāng)超過設(shè)置的投票數(shù),標(biāo)識(shí)客觀下線的標(biāo)志。
2.如果客觀下線的標(biāo)志(odown)為真,那么打開主節(jié)點(diǎn)的客觀下線的表示,否則取消主節(jié)點(diǎn)客觀下線的標(biāo)識(shí)。
- 這種方法存在一個(gè)缺陷,那么就是客觀下線意味這有足夠多的Sentinel節(jié)點(diǎn)報(bào)告該主節(jié)點(diǎn)在一個(gè)時(shí)間范圍內(nèi)不可達(dá)。但是信息可能被延遲,不能保證N個(gè)實(shí)例在同一時(shí)間都同意該實(shí)例進(jìn)入下線狀態(tài)。
執(zhí)行完的客觀下線判斷,如果發(fā)現(xiàn)主節(jié)點(diǎn)打開了客觀下線的狀態(tài)標(biāo)識(shí),那么就進(jìn)一步進(jìn)行判斷,否則就執(zhí)行跳過判斷。執(zhí)行這進(jìn)一步判斷的函數(shù)是:sentinelStartFailoverIfNeeded()。該函數(shù)用來判斷能不能進(jìn)行故障轉(zhuǎn)移:
- 主節(jié)點(diǎn)必須處于客觀下線狀態(tài)。如果沒有打開客觀下線的標(biāo)識(shí),就會(huì)直接返回0。
- 沒有正在對(duì)主節(jié)點(diǎn)進(jìn)行故障轉(zhuǎn)移。
- 一段時(shí)間內(nèi)沒有嘗試進(jìn)行故障轉(zhuǎn)移,防止頻繁執(zhí)行故障轉(zhuǎn)移。
如果以上條件都滿足,那么會(huì)調(diào)用sentinelStartFailover()函數(shù),將更新主節(jié)點(diǎn)的故障轉(zhuǎn)移狀態(tài),會(huì)執(zhí)行下面這句關(guān)鍵的代碼
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
并且返回1,執(zhí)行if條件中的代碼
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
由于指定了一個(gè)SENTINEL_ASK_FORCED標(biāo)識(shí),因此會(huì)強(qiáng)制發(fā)送一個(gè)SENTINEL is-master-down-by-addr命令來真正判斷是否主節(jié)點(diǎn)下線,不會(huì)被時(shí)間條件所拒絕執(zhí)行。
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels);
// 遍歷監(jiān)控master的所有的Sentinel節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 當(dāng)前Sentinel實(shí)例最近一個(gè)回復(fù)SENTINEL IS-MASTER-DOWN-BY-ADDR命令所過去的時(shí)間
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;
/* If the master state from other sentinel is too old, we clear it. */
// 如果master狀態(tài)太舊沒有更新,則清除它保存的主節(jié)點(diǎn)狀態(tài)
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
// 滿足以下條件向其他Sentinel節(jié)點(diǎn)詢問主節(jié)點(diǎn)是否下線
/*
1. 當(dāng)前Sentinel節(jié)點(diǎn)認(rèn)為它已經(jīng)下線,并且處于故障轉(zhuǎn)移狀態(tài)
2. 其他Sentinel與當(dāng)前Sentinel保持連接狀態(tài)
3. 在SENTINEL_ASK_PERIOD毫秒內(nèi)沒有收到INFO回復(fù)
*/
// 主節(jié)點(diǎn)沒有處于客觀下線狀態(tài),則跳過當(dāng)前Sentinel節(jié)點(diǎn)
if ((master->flags & SRI_S_DOWN) == 0) continue;
// 如果當(dāng)前Sentinel節(jié)點(diǎn)斷開連接,也跳過
if (ri->link->disconnected) continue;
// 最近回復(fù)SENTINEL IS-MASTER-DOWN-BY-ADDR命令在SENTINEL_ASK_PERIODms時(shí)間內(nèi)已經(jīng)回復(fù)過了,則跳過
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
/* Ask */
// 發(fā)送SENTINEL IS-MASTER-DOWN-BY-ADDR命令
ll2string(port,sizeof(port),master->addr->port);
// 異步發(fā)送命令
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"SENTINEL is-master-down-by-addr %s %s %llu %s",
master->addr->ip, port,
sentinel.current_epoch,
// 如果主節(jié)點(diǎn)處于故障轉(zhuǎn)移的狀態(tài),那么發(fā)送該Sentinel的ID,讓收到命令的Sentinel節(jié)點(diǎn)選舉自己為領(lǐng)頭
// 否則發(fā)送"*"表示發(fā)送投票
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
// 已發(fā)送未回復(fù)的命令個(gè)數(shù)加1
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
函數(shù)遍歷所有監(jiān)控該主節(jié)點(diǎn)的哨兵節(jié)點(diǎn),跳過三種不符合下線的條件的哨兵節(jié)點(diǎn),然后就發(fā)送SENTINEL is-master-down-by-addr命令,之前在if判斷時(shí),就設(shè)置了主節(jié)點(diǎn)的故障轉(zhuǎn)移狀態(tài)為SENTINEL_FAILOVER_STATE_WAIT_START,因此發(fā)送的SENTINEL命令中會(huì)加上自己的runid,用來請(qǐng)求所有收到命令的哨兵節(jié)點(diǎn)將自己選舉為執(zhí)行故障轉(zhuǎn)移的領(lǐng)頭。
由于發(fā)送的是異步命令,所以會(huì)設(shè)置回調(diào)函數(shù)sentinelReceiveIsMasterDownReply()來處理命令回復(fù)。
如果收到的回復(fù)的第一個(gè)整型值為1則打開該哨兵節(jié)點(diǎn)的主節(jié)點(diǎn)下線標(biāo)識(shí)(SRI_MASTER_DOWN)。這里就是前面說的那么缺陷,因?yàn)槭腔卣{(diào)函數(shù),該主節(jié)點(diǎn)下線標(biāo)識(shí)(SRI_MASTER_DOWN)不會(huì)立即打開,可能存在延遲。
至此,主節(jié)點(diǎn)的客觀下線判斷完畢,如果確認(rèn)了客觀下線,那么就會(huì)執(zhí)行故障轉(zhuǎn)移操作。
對(duì)主節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移
故障轉(zhuǎn)移操作的過程非常清晰,正如函數(shù)sentinelFailoverStateMachine()所寫的那樣
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
// ri實(shí)例必須是主節(jié)點(diǎn)
serverAssert(ri->flags & SRI_MASTER);
// 如果主節(jié)點(diǎn)不處于進(jìn)行故障轉(zhuǎn)移操作的狀態(tài),則直接返回
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
// 根據(jù)故障轉(zhuǎn)移的狀態(tài),執(zhí)行合適的操作
switch(ri->failover_state) {
// 故障轉(zhuǎn)移開始
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
// 選擇一個(gè)要晉升的從節(jié)點(diǎn)
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
// 發(fā)送slaveof no one命令,使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
// 等待被選擇的從節(jié)點(diǎn)晉升為主節(jié)點(diǎn),如果超時(shí)則重新選擇晉升的從節(jié)點(diǎn)
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
// 給所有的從節(jié)點(diǎn)發(fā)送slaveof命令,同步新的主節(jié)點(diǎn)
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
在之前判斷主節(jié)點(diǎn)客觀下線的時(shí)候,會(huì)將故障轉(zhuǎn)移的狀態(tài)打開,就是下面這樣:
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
所以,主節(jié)點(diǎn)如果沒有被判斷為主觀下線,就不會(huì)判斷為客觀下線,因此也就不會(huì)執(zhí)行故障轉(zhuǎn)移操作。
之前設(shè)置的這些狀態(tài)正好可以執(zhí)行故障轉(zhuǎn)移操作。這個(gè)過程分為五部:
- 故障轉(zhuǎn)移開始。
SENTINEL_FAILOVER_STATE_WAIT_START - 選擇一個(gè)要晉升的從節(jié)點(diǎn)。
SENTINEL_FAILOVER_STATE_SELECT_SLAVE - 發(fā)送slaveof no one命令,使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)。
SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE - 等待被選擇的從節(jié)點(diǎn)晉升為主節(jié)點(diǎn),如果超時(shí)則重新選擇晉升的從節(jié)點(diǎn)。
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION - 給所有的從節(jié)點(diǎn)發(fā)送slaveof命令,同步新的主節(jié)點(diǎn)。
SENTINEL_FAILOVER_STATE_RECONF_SLAVES
這五部是連續(xù)的,成功執(zhí)行完一步操作,都會(huì)將狀態(tài)設(shè)置為下一步狀態(tài)。而且這五部是分開執(zhí)行的,意思是,每一次時(shí)間事件處理只處理一步,倘若已經(jīng)執(zhí)行了幾部故障轉(zhuǎn)移操作,但是在接下來的故障檢測(cè)時(shí),發(fā)現(xiàn)主節(jié)點(diǎn)是可達(dá)的,因此在之前的下線判斷中都會(huì)將下線標(biāo)識(shí)取消,會(huì)中斷執(zhí)行故障轉(zhuǎn)移。
故障轉(zhuǎn)移開始
sentinelFailoverWaitStart()函數(shù)會(huì)處理故障轉(zhuǎn)移的開始,主要是選舉出一個(gè)領(lǐng)頭哨兵節(jié)點(diǎn),用來領(lǐng)導(dǎo)故障轉(zhuǎn)移,并且更新故障轉(zhuǎn)移的狀態(tài)。
sentinel 自動(dòng)故障遷移使用Raft算法來選舉領(lǐng)頭(leader) Sentinel ,從而確保在一個(gè)給定的紀(jì)元(epoch)里,只有一個(gè)領(lǐng)頭產(chǎn)生。
這表示在同一個(gè)紀(jì)元中,不會(huì)有兩個(gè) Sentinel 同時(shí)被選中為領(lǐng)頭,并且各個(gè) Sentinel 在同一個(gè)紀(jì)元中只會(huì)對(duì)一個(gè)領(lǐng)頭進(jìn)行投票。
更高的配置紀(jì)元總是優(yōu)于較低的紀(jì)元,因此每個(gè) Sentinel 都會(huì)主動(dòng)使用更新的紀(jì)元來代替自己的配置。
簡(jiǎn)單來說,我們可以將 Sentinel 配置看作是一個(gè)帶有版本號(hào)的狀態(tài)。一個(gè)狀態(tài)會(huì)以最后寫入者勝出(last-write-wins)的方式(也即是,最新的配置總是勝出)傳播至所有其他 Sentinel
選擇一個(gè)要晉升的從節(jié)點(diǎn)
sentinelFailoverSelectSlave()函數(shù)用來選擇一個(gè)要晉升的從節(jié)點(diǎn)。該函數(shù)調(diào)用sentinelSelectSlave()函數(shù)來選則一個(gè)晉升的從節(jié)點(diǎn)。
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
// 從節(jié)點(diǎn)數(shù)組
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
// master主節(jié)點(diǎn)處于主觀下線,計(jì)算出主節(jié)點(diǎn)被判斷為處于主觀下線的最大時(shí)長(zhǎng)
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;
di = dictGetIterator(master->slaves);
// 迭代下線的主節(jié)點(diǎn)的所有從節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
// 跳過下線的從節(jié)點(diǎn)
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
// 跳過已經(jīng)斷開主從連接的從節(jié)點(diǎn)
if (slave->link->disconnected) continue;
// 跳過回復(fù)PING命令過于久遠(yuǎn)的從節(jié)點(diǎn)
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
// 跳過優(yōu)先級(jí)為0的從節(jié)點(diǎn)
if (slave->slave_priority == 0) continue;
// 如果主節(jié)點(diǎn)處于主觀下線狀態(tài),Sentinel每秒發(fā)送INFO命令給從節(jié)點(diǎn),否則以默認(rèn)的頻率發(fā)送。
// 為了檢查命令是否合法,因此計(jì)算一個(gè)延遲值
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
// 如果從節(jié)點(diǎn)接受到INFO命令的回復(fù)已經(jīng)過期,跳過該從節(jié)點(diǎn)
if (mstime() - slave->info_refresh > info_validity_time) continue;
// 跳過下線時(shí)間過長(zhǎng)的從節(jié)點(diǎn)
if (slave->master_link_down_time > max_master_down_time) continue;
// 否則將選中的節(jié)點(diǎn)保存到數(shù)組中
instance[instances++] = slave;
}
dictReleaseIterator(di);
// 如果有選中的從節(jié)點(diǎn)
if (instances) {
// 將數(shù)組中的從節(jié)點(diǎn)排序
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
// 將排序最低的從節(jié)點(diǎn)返回
selected = instance[0];
}
zfree(instance);
return selected;
}
總結(jié)一下就是這些條件:
- 不選有以下狀態(tài)的從節(jié)點(diǎn): S_DOWN, O_DOWN, DISCONNECTED.
- 最近一次回復(fù)PING命令超過5s的從節(jié)點(diǎn)
- 最近一次獲取INFO命令回復(fù)的時(shí)間不超過
info_refresh的三倍時(shí)間長(zhǎng)度 - 主從節(jié)點(diǎn)之間斷開操作的時(shí)間不超過:從當(dāng)前的Sentinel節(jié)點(diǎn)來看,主節(jié)點(diǎn)處于下線狀態(tài),從節(jié)點(diǎn)和主節(jié)點(diǎn)斷開連接的時(shí)間不能超過down-after-period的10倍,這看起來非常魔幻(black magic),但是實(shí)際上,當(dāng)主節(jié)點(diǎn)不可達(dá)時(shí),主從連接會(huì)斷開,但是必然不超過一定時(shí)間。意思是,主從斷開,一定是主節(jié)點(diǎn)造成的,而不是從節(jié)點(diǎn)。無論如何,我們將根據(jù)復(fù)制偏移量選擇最佳的從節(jié)點(diǎn)。
- 從節(jié)點(diǎn)的優(yōu)先級(jí)不能為0,優(yōu)先級(jí)為0的從節(jié)點(diǎn)被拋棄。
如果以上條件都滿足,那么按照一下順序排序,compareSlavesForPromotion()函數(shù)指定排序方法:
- 最低的優(yōu)先級(jí)的優(yōu)先。
- 復(fù)制偏移量較大的優(yōu)先。
- 運(yùn)行runid字典序小的優(yōu)先。
- 如果runid相同,那么選擇執(zhí)行命令更多的從節(jié)點(diǎn)。
因此,當(dāng)選擇出一個(gè)適合晉升的從節(jié)點(diǎn)后,sentinelFailoverSelectSlave()會(huì)打開該從節(jié)點(diǎn)的SRI_PROMOTED晉升標(biāo)識(shí),并且保存起來,最后更新故障轉(zhuǎn)移到下一步狀態(tài)。
使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)
函數(shù)sentinelFailoverSendSlaveOfNoOne()會(huì)調(diào)用sentinelSendSlaveOf()函數(shù)發(fā)送一個(gè)slaveof no one命令,使從晉升的節(jié)點(diǎn)和原來的主節(jié)點(diǎn)斷絕主從關(guān)系,成為新的主節(jié)點(diǎn)。
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
// 如果要晉升的從節(jié)點(diǎn)處于斷開連接的狀態(tài),那么不能發(fā)送命令。在當(dāng)前狀態(tài),在規(guī)定的故障轉(zhuǎn)移超時(shí)時(shí)間內(nèi)可以重試。
if (ri->promoted_slave->link->disconnected) {
// 如果超出 配置的故障轉(zhuǎn)移超時(shí)時(shí)間,那么中斷本次故障轉(zhuǎn)移后返回
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
// 命令發(fā)送成功,發(fā)送事件通知
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
// 設(shè)置故障轉(zhuǎn)移狀態(tài)為等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
// 更新故障轉(zhuǎn)移操作狀態(tài)改變時(shí)間
ri->failover_state_change_time = mstime();
}
發(fā)送成功后,會(huì)更新故障轉(zhuǎn)移狀態(tài)到下一步狀態(tài)。
等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)
調(diào)用sentinelFailoverWaitPromotion()來等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn),但是該函數(shù)只是處理故障轉(zhuǎn)移操作超時(shí)的情況。
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
// 所以,在這里只是處理故障轉(zhuǎn)移超時(shí)的情況
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
// 如果超出配置的故障轉(zhuǎn)移超時(shí)時(shí)間,那么中斷本次故障轉(zhuǎn)移后返回
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
這個(gè)函數(shù)并沒有更改故障轉(zhuǎn)移操作的狀態(tài),因?yàn)?,?dāng)從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)時(shí),故障轉(zhuǎn)移狀態(tài)的改變?cè)谔幚鞩NFO命令的回復(fù)時(shí)發(fā)生。
所以,當(dāng)下個(gè)時(shí)間事件發(fā)生(在故障轉(zhuǎn)移設(shè)置的超時(shí)時(shí)間內(nèi)),就會(huì)處理下一個(gè)故障轉(zhuǎn)移的狀態(tài)。如果等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)超時(shí),那么會(huì)調(diào)用sentinelAbortFailover()函數(shù)中止當(dāng)前的故障轉(zhuǎn)移操作,清空所有故障轉(zhuǎn)移的狀態(tài),下個(gè)時(shí)間事件發(fā)生時(shí)重新執(zhí)行。
從節(jié)點(diǎn)同步新的主節(jié)點(diǎn)
調(diào)用sentinelFailoverReconfNextSlave()函數(shù),給所有沒有同步新主節(jié)點(diǎn)的從節(jié)點(diǎn)發(fā)送SLAVE OF <new master address>命令。
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
di = dictGetIterator(master->slaves);
// 遍歷所有的從節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
// 計(jì)算處于已經(jīng)發(fā)送同步命令或者已經(jīng)正在同步的從節(jié)點(diǎn)
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);
di = dictGetIterator(master->slaves);
// 如果已經(jīng)發(fā)送同步命令或者已經(jīng)正在同步的從節(jié)點(diǎn)個(gè)數(shù)小于設(shè)置的同步個(gè)數(shù)限制,那么遍歷所有的從節(jié)點(diǎn)
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
// 跳過被晉升的從節(jié)點(diǎn)和已經(jīng)完成同步的從節(jié)點(diǎn)
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
// 如果從節(jié)點(diǎn)設(shè)置了發(fā)送slaveof命令,但是故障轉(zhuǎn)移更新到下一個(gè)狀態(tài)超時(shí)
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
// 清除已發(fā)送slaveof命令的標(biāo)識(shí)
slave->flags &= ~SRI_RECONF_SENT;
// 設(shè)置為完成同步的標(biāo)識(shí),隨后重新發(fā)送SLAVEOF命令,進(jìn)行同步
slave->flags |= SRI_RECONF_DONE;
}
// 跳過已經(jīng)發(fā)送了命令或者已經(jīng)正在同步的從節(jié)點(diǎn)
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
// 跳過連接斷開的從節(jié)點(diǎn)
if (slave->link->disconnected) continue;
/* Send SLAVEOF <new master>. */
// 發(fā)送 SLAVEOF <new master> 命令給從節(jié)點(diǎn),包括剛才超時(shí)的從節(jié)點(diǎn)
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
// 如果發(fā)送成功
if (retval == C_OK) {
// 設(shè)置已經(jīng)發(fā)送了SLAVEOF命令標(biāo)識(shí)
slave->flags |= SRI_RECONF_SENT;
// 設(shè)置發(fā)送slaveof命令的時(shí)間
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);
// 判斷故障轉(zhuǎn)移是否結(jié)束
sentinelFailoverDetectEnd(master);
}
主要是給沒有被發(fā)送同步新主節(jié)點(diǎn)命令的從節(jié)點(diǎn)和雖然發(fā)送但是同步超時(shí)的從節(jié)點(diǎn)重新發(fā)送SLAVEOF <new master>命令。
函數(shù)最后調(diào)用了sentinelFailoverDetectEnd()函數(shù)來判斷故障轉(zhuǎn)移是否結(jié)束,但是結(jié)束的情況有兩種:
1.故障轉(zhuǎn)移超時(shí)被動(dòng)結(jié)束
2.從節(jié)點(diǎn)已經(jīng)完成同步新晉升的主節(jié)點(diǎn)結(jié)束
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
// 自從上次更新故障轉(zhuǎn)移狀態(tài)的時(shí)間差
mstime_t elapsed = mstime() - master->failover_state_change_time;
/* We can't consider failover finished if the promoted slave is
* not reachable. */
// 如果被晉升的從節(jié)點(diǎn)不可達(dá),直接返回
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN) return;
/* The failover terminates once all the reachable slaves are properly
* configured. */
// 遍歷所有的從節(jié)點(diǎn),找出還沒有完成同步從節(jié)點(diǎn)
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
// 如果是被晉升為主節(jié)點(diǎn)的從節(jié)點(diǎn)或者是完成同步的從節(jié)點(diǎn),則跳過
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
// 如果從節(jié)點(diǎn)處于客觀下線,則跳過
if (slave->flags & SRI_S_DOWN) continue;
// 沒有完成同步的節(jié)點(diǎn)數(shù)加1
not_reconfigured++;
}
dictReleaseIterator(di);
// 強(qiáng)制結(jié)束故障轉(zhuǎn)移超時(shí)的節(jié)點(diǎn)
if (elapsed > master->failover_timeout) {
// 忽略未完成同步的從節(jié)點(diǎn)
not_reconfigured = 0;
// 設(shè)置超時(shí)標(biāo)識(shí)
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}
// 如果所有的從節(jié)點(diǎn)完成了同步,那么表示故障轉(zhuǎn)移結(jié)束
if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
// 監(jiān)控晉升的主節(jié)點(diǎn),更新配置
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
// 更新故障轉(zhuǎn)移操作狀態(tài)改變時(shí)間
master->failover_state_change_time = mstime();
}
// 如果是因?yàn)槌瑫r(shí)導(dǎo)致故障轉(zhuǎn)移結(jié)束
if (timeout) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->slaves);
// 遍歷所有的從節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
// 跳過完成同步和發(fā)送同步slaveof命令的從節(jié)點(diǎn)
if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
// 跳過連接斷開的從節(jié)點(diǎn)
if (slave->link->disconnected) continue;
// 給沒有被發(fā)送同步命令的從節(jié)點(diǎn)發(fā)送同步新晉升主節(jié)點(diǎn)的slaveof IP port 命令
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
// 如果發(fā)送成功,將這些從節(jié)點(diǎn)設(shè)置為已經(jīng)發(fā)送slaveof命令的標(biāo)識(shí)
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}
該函數(shù)先會(huì)尋找沒有完成同步的從節(jié)點(diǎn),如果存在,則會(huì)強(qiáng)制將主節(jié)點(diǎn)的故障狀態(tài)更新,如下:
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
這么做是為了讓主進(jìn)程繼續(xù)向下執(zhí)行,不要總是在此等待故障狀態(tài)的變化。
雖然強(qiáng)制將主節(jié)點(diǎn)的故障狀態(tài)更新,但是還是要將沒有完成同步的從節(jié)點(diǎn)發(fā)送slaveof IP port讓他們重新同步。
執(zhí)行完這五步故障轉(zhuǎn)移操作后,回到sentinelHandleRedisInstance()函數(shù),該函數(shù)就剩最后一步操作了。
更新主節(jié)點(diǎn)的狀態(tài)
執(zhí)行該函數(shù),嘗試發(fā)送SENTINEL IS-MASTER-DOWN-BY-ADDR給所有的哨兵節(jié)點(diǎn)獲取回復(fù),更新一些主節(jié)點(diǎn)狀態(tài)。
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
這次則是有時(shí)間限制的發(fā)送SENTINEL命令,來更新其他哨兵節(jié)點(diǎn)對(duì)主節(jié)點(diǎn)是否下線的判斷。有可能發(fā)生主節(jié)點(diǎn)在故障轉(zhuǎn)移時(shí)重新上線的情況。
當(dāng)執(zhí)行完這一步,sentinelHandleRedisInstance()的所有操作全部剖析完成。
執(zhí)行完所有類型的節(jié)點(diǎn)的周期性任務(wù)之后,會(huì)接下來處理主從切換的情況。
處理主從切換
當(dāng)執(zhí)行完所有類型的節(jié)點(diǎn)的周期性任務(wù)之后,會(huì)執(zhí)行sentinelHandleDictOfRedisInstances()下面的代碼:
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 遞歸對(duì)所有節(jié)點(diǎn)執(zhí)行周期性操作
......
// 如果ri實(shí)例處于完成故障轉(zhuǎn)移操作的狀態(tài),所有從節(jié)點(diǎn)已經(jīng)完成對(duì)新主節(jié)點(diǎn)的同步
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
// 設(shè)置主從轉(zhuǎn)換的標(biāo)識(shí)
switch_to_promoted = ri;
}
}
// 如果主從節(jié)點(diǎn)發(fā)生了轉(zhuǎn)換
if (switch_to_promoted)
// 將原來的主節(jié)點(diǎn)從主節(jié)點(diǎn)表中刪除,并用晉升的主節(jié)點(diǎn)替代
// 意味著已經(jīng)用新晉升的主節(jié)點(diǎn)代替舊的主節(jié)點(diǎn),包括所有從節(jié)點(diǎn)和舊的主節(jié)點(diǎn)從屬當(dāng)前新的主節(jié)點(diǎn)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
還記得當(dāng)前強(qiáng)制將主節(jié)點(diǎn)的故障狀態(tài)更新的狀態(tài)嗎?對(duì),就是SENTINEL_FAILOVER_STATE_UPDATE_CONFIG狀態(tài)。這個(gè)狀態(tài)表示已經(jīng)完成在故障轉(zhuǎn)移狀態(tài)下,所有從節(jié)點(diǎn)對(duì)新主節(jié)點(diǎn)的同步操作。因此需要調(diào)用sentinelFailoverSwitchToPromotedSlave()函數(shù)特殊處理發(fā)送主從切換的情況。
該函數(shù)會(huì)發(fā)送事件通知然后調(diào)用sentinelResetMasterAndChangeAddress()來用新晉升的主節(jié)點(diǎn)代替舊的主節(jié)點(diǎn),包括所有從節(jié)點(diǎn)和舊的主節(jié)點(diǎn)從屬當(dāng)前新的主節(jié)點(diǎn)。
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
sentinelAddr *oldaddr, *newaddr;
sentinelAddr **slaves = NULL;
int numslaves = 0, j;
dictIterator *di;
dictEntry *de;
// 創(chuàng)建ip:port地址字符串
newaddr = createSentinelAddr(ip,port);
if (newaddr == NULL) return C_ERR;
// 創(chuàng)建一個(gè)從節(jié)點(diǎn)表,將重置后的主節(jié)點(diǎn)添加到該表中
// 不包含有我們要轉(zhuǎn)換地址的那一個(gè)從節(jié)點(diǎn)
di = dictGetIterator(master->slaves);
// 遍歷所有的從節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
// 如果當(dāng)前從節(jié)點(diǎn)的地址和指定的地址相同,說明該從節(jié)點(diǎn)是要晉升為主節(jié)點(diǎn)的,因此跳過該從節(jié)點(diǎn)
if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
// 否則將該從節(jié)點(diǎn)加入到一個(gè)數(shù)組中
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
slave->addr->port);
}
dictReleaseIterator(di);
// 如果指定的地址和主節(jié)點(diǎn)地址不相同,說明,該主節(jié)點(diǎn)是要被替換的,那么將該主節(jié)點(diǎn)地址加入到從節(jié)點(diǎn)數(shù)組中
if (!sentinelAddrIsEqual(newaddr,master->addr)) {
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(master->addr->ip,
master->addr->port);
}
// 重置主節(jié)點(diǎn),但不刪除所有監(jiān)控自己的Sentinel節(jié)點(diǎn)
sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
// 備份舊地址
oldaddr = master->addr;
// 設(shè)置新地址
master->addr = newaddr;
// 下線時(shí)間清零
master->o_down_since_time = 0;
master->s_down_since_time = 0;
/* Add slaves back. */
// 為新的主節(jié)點(diǎn)加入從節(jié)點(diǎn)
for (j = 0; j < numslaves; j++) {
sentinelRedisInstance *slave;
// 遍歷所有的從節(jié)點(diǎn)表,創(chuàng)建從節(jié)點(diǎn)實(shí)例,并將該實(shí)例從屬到當(dāng)前新的主節(jié)點(diǎn)中
slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
slaves[j]->port, master->quorum, master);
// 釋放原有的表中的從節(jié)點(diǎn)
releaseSentinelAddr(slaves[j]);
// 事件通知
if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
}
// 釋放從節(jié)點(diǎn)表
zfree(slaves);
// 將原主節(jié)點(diǎn)地址釋放
releaseSentinelAddr(oldaddr);
// 刷新配置文件
sentinelFlushConfig();
return C_OK;
}