「Redis源碼解讀」—多機(jī)數(shù)據(jù)庫(二)哨兵

sentinel是redis的高可用性解決方案:由一個(gè)或多個(gè)sentinel實(shí)例組成sentinel系統(tǒng)監(jiān)視多個(gè)master以及master的slave,并在被監(jiān)視的master進(jìn)入下線狀態(tài)時(shí),自動(dòng)將下線master的某個(gè)slave升級(jí)為master,然后新的master代替下線的master處


知識(shí)點(diǎn)

  • 建立連接
  • 周期性的操作
  • 發(fā)送監(jiān)控命令
  • 判斷節(jié)點(diǎn)的主觀下線狀態(tài)
  • 判斷主節(jié)點(diǎn)的客觀下線狀態(tài)
  • 對(duì)主節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移
  • 選擇一個(gè)要晉升的從節(jié)點(diǎn)
  • 使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)
  • 從節(jié)點(diǎn)同步新的主節(jié)點(diǎn)
  • 更新主節(jié)點(diǎn)的狀態(tài)
  • 處理主從切換

建立連接

首先,執(zhí)行的第一個(gè)函數(shù)就是sentinelReconnectInstance()函數(shù),因?yàn)樵谳d入配置的時(shí)候,我們將創(chuàng)建的主節(jié)點(diǎn)實(shí)例加入到sentinel.masters字典的時(shí)候,該主節(jié)點(diǎn)的連接是關(guān)閉的,所以第一件事就是為主節(jié)點(diǎn)和哨兵節(jié)點(diǎn)建立網(wǎng)絡(luò)連接。

void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    // 如果ri實(shí)例沒有連接中斷,則直接返回
    if (ri->link->disconnected == 0) return;
    // ri實(shí)例地址非法
    if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
    instanceLink *link = ri->link;
    mstime_t now = mstime();

    // 如果還沒有最近一次重連的時(shí)間距離現(xiàn)在太短,小于1s,則直接返回
    if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
    // 設(shè)置最近重連的時(shí)間
    ri->link->last_reconn_time = now;

    /* Commands connection. */
    // cc:命令連接
    if (link->cc == NULL) {
        // 綁定ri實(shí)例的連接地址并建立連接
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        // 命令連接失敗,則事件通知,且斷開cc連接
        if (link->cc->err) {
            sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
                link->cc->errstr);
            instanceLinkCloseConnection(link,link->cc);
        // 命令連接成功
        } else {
            // 重置cc連接的屬性
            link->pending_commands = 0;
            link->cc_conn_time = mstime();
            link->cc->data = link;
            // 將服務(wù)器的事件循環(huán)關(guān)聯(lián)到cc連接的上下文中
            redisAeAttach(server.el,link->cc);
            // 設(shè)置確立連接的回調(diào)函數(shù)
            redisAsyncSetConnectCallback(link->cc,
                    sentinelLinkEstablishedCallback);
            // 設(shè)置斷開連接的回調(diào)處理
            redisAsyncSetDisconnectCallback(link->cc,
                    sentinelDisconnectCallback);
            // 發(fā)送AUTH 命令認(rèn)證
            sentinelSendAuthIfNeeded(ri,link->cc);
            // 發(fā)送連接名字
            sentinelSetClientName(ri,link->cc,"cmd");

            /* Send a PING ASAP when reconnecting. */
            // 立即向ri實(shí)例發(fā)送PING命令
            sentinelSendPing(ri);
        }
    }
    /* Pub / Sub */
    // pc:發(fā)布訂閱連接
    // 只對(duì)主節(jié)點(diǎn)和從節(jié)點(diǎn)如果沒有設(shè)置pc連接則建立一個(gè)
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
        // 綁定指定ri的連接地址并建立連接
        link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        // pc連接失敗,則事件通知,且斷開pc連接
        if (link->pc->err) {
            sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
                link->pc->errstr);
            instanceLinkCloseConnection(link,link->pc);
        // pc連接成功
        } else {
            int retval;

            link->pc_conn_time = mstime();
            link->pc->data = link;
            // 將服務(wù)器的事件循環(huán)關(guān)聯(lián)到pc連接的上下文中
            redisAeAttach(server.el,link->pc);
            // 設(shè)置確立連接的回調(diào)函數(shù)
            redisAsyncSetConnectCallback(link->pc,
                    sentinelLinkEstablishedCallback);
            // 設(shè)置斷開連接的回調(diào)處理
            redisAsyncSetDisconnectCallback(link->pc,
                    sentinelDisconnectCallback);
            //  發(fā)送AUTH 命令認(rèn)證
            sentinelSendAuthIfNeeded(ri,link->pc);
            // 發(fā)送連接名字
            sentinelSetClientName(ri,link->pc,"pubsub");
            // 發(fā)送訂閱 __sentinel__:hello 頻道的命令,設(shè)置回調(diào)函數(shù)處理回復(fù)
            // sentinelReceiveHelloMessages是處理Pub/Sub的頻道返回信息的回調(diào)函數(shù),可以發(fā)現(xiàn)訂閱同一master的Sentinel節(jié)點(diǎn)
            retval = redisAsyncCommand(link->pc,
                sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
                    SENTINEL_HELLO_CHANNEL);
            // 訂閱頻道出錯(cuò),關(guān)閉
            if (retval != C_OK) {
                // 關(guān)閉pc連接
                instanceLinkCloseConnection(link,link->pc);
                return;
            }
        }
    }
    // 如果已經(jīng)建立了新的連接,則清除斷開連接的狀態(tài)。表示已經(jīng)建立了連接
    if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
        link->disconnected = 0;
}

建立連接的函數(shù)redisAsyncConnectBind()是Redis的官方C語言客戶端hiredis的異步連接函數(shù),當(dāng)連接成功時(shí)需要調(diào)用redisAeAttach()函數(shù)來將服務(wù)器的事件循環(huán)(ae)與連接的上下文相關(guān)聯(lián)起來(因?yàn)閔iredis提供了多種適配器,包括事件ae,libev,libevent,libuv),在關(guān)聯(lián)的時(shí)候,會(huì)設(shè)置了網(wǎng)絡(luò)連接的可寫可讀事件的處理程序。接下來還會(huì)設(shè)置該連接的確立時(shí)和斷開時(shí)的回調(diào)函數(shù)redisAsyncSetConnectCallback()和redisAsyncSetDisconnectCallback(),為什么這么做,就是因?yàn)樵撨B接是異步的。

了解了以上這些,繼續(xù)分析節(jié)點(diǎn)實(shí)例和當(dāng)前哨兵的連接建立。從該函數(shù)中可以很明顯的看出來:

無論是主節(jié)點(diǎn)、從節(jié)點(diǎn)還是哨兵節(jié)點(diǎn),都會(huì)與當(dāng)前哨兵建立命令連接(Commands connection)。
只有主節(jié)點(diǎn)或從節(jié)點(diǎn)才會(huì)建立發(fā)布訂閱連接(Pub / Sub connection)。
當(dāng)建立了命令連接(cc)之后立即執(zhí)行了三個(gè)動(dòng)作:



當(dāng)建立了發(fā)布訂閱連接(pc)之后立即執(zhí)行的動(dòng)作:(前兩個(gè)動(dòng)作與命令連接相同,只列出不相同的第三個(gè))



如果成功建立連接,之后會(huì)清除連接斷開的標(biāo)志,以表示連接已建立。
如果不是第一次執(zhí)行,那么會(huì)判斷連接是否建立,如果斷開,則重新給建立,如果沒有斷開,那么什么都不會(huì)做直接返回。

發(fā)送監(jiān)控命令

執(zhí)行完建立網(wǎng)絡(luò)連接的函數(shù),接下來會(huì)執(zhí)行sentinelSendPeriodicCommands()函數(shù),該函數(shù)就是定期發(fā)送一些監(jiān)控命令到主節(jié)點(diǎn)或從節(jié)點(diǎn)或哨兵節(jié)點(diǎn),這些節(jié)點(diǎn)會(huì)將哨兵節(jié)點(diǎn)作為客戶端來處理,我們接下來仔細(xì)分析

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    // 如果ri實(shí)例連接處于關(guān)閉狀態(tài),直接返回
    if (ri->link->disconnected) return;

    // 對(duì)于不是發(fā)送關(guān)鍵命令的INFO,PING,PUBLISH,我們也有SENTINEL_MAX_PENDING_COMMANDS的限制。 我們不想使用大量的內(nèi)存,只是因?yàn)檫B接對(duì)象無法正常工作(請(qǐng)注意,無論如何,還有一個(gè)冗余的保護(hù)措施,即如果檢測(cè)到長(zhǎng)時(shí)間的超時(shí)條件,連接將被斷開連接并重新連接
    // 每個(gè)實(shí)例的已發(fā)送未回復(fù)的命令個(gè)數(shù)不能超過100個(gè),否則直接返回
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

    // 如果主節(jié)點(diǎn)處于O_DOWN狀態(tài)下,那么Sentinel默認(rèn)每秒發(fā)送INFO命令給它的從節(jié)點(diǎn),而不是通常的SENTINEL_INFO_PERIOD(10s)周期。在這種狀態(tài)下,我們想更密切的監(jiān)控從節(jié)點(diǎn),萬一他們被其他的Sentinel晉升為主節(jié)點(diǎn)
    // 如果從節(jié)點(diǎn)報(bào)告和主節(jié)點(diǎn)斷開連接,我們同樣也監(jiān)控INFO命令的輸出更加頻繁,以便我們能有一個(gè)更新鮮的斷開連接的時(shí)間

    // 如果ri是從節(jié)點(diǎn),且他的主節(jié)點(diǎn)處于故障狀態(tài)的狀態(tài)或者從節(jié)點(diǎn)和主節(jié)點(diǎn)斷開復(fù)制了
    if ((ri->flags & SRI_SLAVE) &&
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0)))
    {
        // 設(shè)置INFO命令的周期時(shí)間為1s
        info_period = 1000;
    } else {
        // 否則就是默認(rèn)的10s
        info_period = SENTINEL_INFO_PERIOD;
    }

    // 每次最后一次接收到的PONG比配置的 'down-after-milliseconds' 時(shí)間更長(zhǎng),但是如果 'down-after-milliseconds'大于1秒,則每秒鐘進(jìn)行一次ping

    // 獲取ri設(shè)置的主觀下線的時(shí)間
    ping_period = ri->down_after_period;
    // 如果大于1秒,則設(shè)置為1秒
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    // 如果實(shí)例不是Sentinel節(jié)點(diǎn)且Sentinel節(jié)點(diǎn)從該數(shù)據(jù)節(jié)點(diǎn)(主節(jié)點(diǎn)或從節(jié)點(diǎn))沒有收到過INFO回復(fù)或者收到INFO回復(fù)超時(shí)
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        // 發(fā)送INFO命令給主節(jié)點(diǎn)和從節(jié)點(diǎn)
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "INFO");
        // 已發(fā)送未回復(fù)的命令個(gè)數(shù)加1
        if (retval == C_OK) ri->link->pending_commands++;

    // 如果發(fā)送和回復(fù)PING命令超時(shí)
    } else if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        // 發(fā)送一個(gè)PING命令給ri實(shí)例,并且更新act_ping_time
        sentinelSendPing(ri);

    // 發(fā)送頻道的定時(shí)命令超時(shí)
    } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        // 發(fā)布hello信息給ri實(shí)例
        sentinelSendHello(ri);
    }
}

從這個(gè)函數(shù)我們可以了解到一下信息:

  • 一個(gè)連接對(duì)發(fā)送命令的個(gè)數(shù)有限制。因?yàn)檫B接是一個(gè)異步操作,發(fā)送了不一定會(huì)立即接收到,因此會(huì)為了節(jié)約內(nèi)存而有一個(gè)限制,已發(fā)送未回復(fù)的命令個(gè)數(shù)不能超過100個(gè),否則不做操作。
  • 當(dāng)該哨兵節(jié)點(diǎn)正在監(jiān)控從節(jié)點(diǎn)時(shí),但是從節(jié)點(diǎn)從屬的主節(jié)點(diǎn)發(fā)送了故障,那么會(huì)設(shè)置發(fā)送INFO命令的頻率為1s,否則就是默認(rèn)的10s發(fā)送一次INFO命令。
  • PING命令的頻率是1s發(fā)送一次。

接下來,就逐個(gè)分析所發(fā)送的監(jiān)控命令。

  • 第一個(gè)是INFO命令

哨兵節(jié)點(diǎn)只將INFO命令發(fā)送給主節(jié)點(diǎn)或從節(jié)點(diǎn)。并且設(shè)置sentinelInfoReplyCallback()函數(shù)來處理INFO命令的回復(fù)信息。
處理函數(shù)的代碼有300多行,這里就不列出來了,可以上github查看 sentinel.c源碼詳細(xì)注釋。
當(dāng)INFO名的回復(fù)正確時(shí),會(huì)調(diào)用sentinelRefreshInstanceInfo()函數(shù)來處理INFO命令的回復(fù)。處理INFO命令的回復(fù)有兩部分:
1.獲取該連接的節(jié)點(diǎn)實(shí)例最基本的信息,如:run_id,role,如果是發(fā)送給主節(jié)點(diǎn),會(huì)獲取到從節(jié)點(diǎn)信息;如果是發(fā)送給從節(jié)點(diǎn),會(huì)獲取到其主節(jié)點(diǎn)的信息。總之會(huì)獲取當(dāng)前整個(gè)集群網(wǎng)絡(luò)的所有活躍的節(jié)點(diǎn)信息,并將其保存到當(dāng)前哨兵的狀態(tài)中,而且會(huì)刷新配置文件。這就是為什么在配置文件中不需要配置從節(jié)點(diǎn)的信息,因?yàn)橥ㄟ^這一操作會(huì)自動(dòng)發(fā)現(xiàn)從節(jié)點(diǎn)。
2.處理角色變化的情況。當(dāng)接收到INFO命令的回復(fù),有可能發(fā)現(xiàn)當(dāng)前哨兵連接的節(jié)點(diǎn)的角色狀態(tài)發(fā)生變化,因此要處理這些情況。
連接的節(jié)點(diǎn)實(shí)例是主節(jié)點(diǎn),但是INFO命令顯示連接的是從節(jié)點(diǎn)。
什么也不做。
連接的節(jié)點(diǎn)實(shí)例是從節(jié)點(diǎn),但是INFO命令顯示連接的是主節(jié)點(diǎn)。
連接的從節(jié)點(diǎn)是被晉升的從節(jié)點(diǎn),且他的主節(jié)點(diǎn)處于等待該從節(jié)點(diǎn)晉升的狀態(tài),那么會(huì)更新一些屬性。
連接的從節(jié)點(diǎn)是被晉升的從節(jié)點(diǎn),但是主節(jié)點(diǎn)在發(fā)生故障轉(zhuǎn)移的超時(shí)時(shí)間限制內(nèi)又重新上線,因此要將該晉升的從節(jié)點(diǎn)重新降級(jí)為普通的從節(jié)點(diǎn),并從屬原來的主節(jié)點(diǎn),通過發(fā)送slaveof命令。
連接的節(jié)點(diǎn)實(shí)例是從節(jié)點(diǎn),INFO命令顯示連接的也是主節(jié)點(diǎn),但是發(fā)現(xiàn)該從節(jié)點(diǎn)從屬的主節(jié)點(diǎn)地址發(fā)生了變化。
發(fā)送slaveof命令使其從屬新的主節(jié)點(diǎn)。
連接的節(jié)點(diǎn)實(shí)例是從節(jié)點(diǎn),INFO命令顯示連接的也是主節(jié)點(diǎn),但是該從節(jié)點(diǎn)處于已經(jīng)接受slaveof命令(SRI_RECONF_SENT)或者正在根據(jù)slaveof命令指定的主節(jié)點(diǎn)執(zhí)行同步操作(SRI_RECONF_INPROG)的狀態(tài)。
將他們的狀態(tài)設(shè)置為下一步狀態(tài),表示當(dāng)前狀態(tài)的操作已經(jīng)完成。
當(dāng)這些處理只是當(dāng)收到INFO命令的回復(fù)時(shí)才會(huì)進(jìn)行處理。我們繼續(xù)分析下一個(gè)發(fā)送監(jiān)控的命令。

  • 第二個(gè)是PING命令
    這個(gè)發(fā)送的函數(shù)sentinelSendPing()函數(shù)和在第一次創(chuàng)建命令連接時(shí)執(zhí)行的函數(shù)操作一樣。
int sentinelSendPing(sentinelRedisInstance *ri) {
    // 異步發(fā)送一個(gè)PING命令給實(shí)例ri
    int retval = redisAsyncCommand(ri->link->cc,
        sentinelPingReplyCallback, ri, "PING");
    // 發(fā)送成功
    if (retval == C_OK) {
        // 已發(fā)送未回復(fù)的命令個(gè)數(shù)加1
        ri->link->pending_commands++;
        // 更新最近一次發(fā)送PING命令的時(shí)間
        ri->link->last_ping_time = mstime();
        // 更新最近一次發(fā)送PING命令,但沒有收到PONG命令的時(shí)間
        if (ri->link->act_ping_time == 0)
            ri->link->act_ping_time = ri->link->last_ping_time;
        return 1;
    } else {
        return 0;
    }
}

該函數(shù),發(fā)送給實(shí)例一個(gè)PING并且更新所有連接的狀態(tài)。設(shè)置sentinelPingReplyCallback()來處理PING命令的回復(fù)。
PING命令的回復(fù)有以下兩種:

  • 狀態(tài)回復(fù)或者錯(cuò)誤回復(fù)
    PONG、LOADING、MASTERDOWN這三個(gè)是可以接受的回復(fù),會(huì)更新最近的交互時(shí)間,用來判斷實(shí)例和哨兵之間的網(wǎng)絡(luò)可達(dá)。
    忙回復(fù)
  • BUSY這個(gè)可能會(huì)是因?yàn)閳?zhí)行腳本而表現(xiàn)為下線狀態(tài)。所以會(huì)發(fā)送一個(gè)SCRIPT KILL命令來終止腳本的執(zhí)行。
    無論如何,只要接受到回復(fù),都會(huì)更新最近一次收到PING命令回復(fù)的狀態(tài),表示連接可達(dá)。

第三個(gè)是PUBLISH命令
發(fā)送PUBLISH命令,可以叫發(fā)送hello信息。因?yàn)檫@個(gè)操作像是和訂閱該主節(jié)點(diǎn)的其他哨兵節(jié)點(diǎn)打招呼。

函數(shù)sentinelSendHello()用來發(fā)送hello信息,該函數(shù)主要做了兩步操作:

1.構(gòu)建hello信息的內(nèi)容。hello信息的格式如下:
sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch這些信息包含有:當(dāng)前哨兵的信息和主節(jié)點(diǎn)信息。
2.發(fā)送PUBLISH命令,將hello信息發(fā)布到創(chuàng)建連接時(shí)建立的頻道。
設(shè)置sentinelPublishReplyCallback()函數(shù)為處理PUBLISH命令的回復(fù)。該命令主要就是更新通過頻道進(jìn)行通信的時(shí)間,以便保持發(fā)布訂閱連接的可達(dá)。
通過發(fā)送PUBLISH命令給任意類型實(shí)例,最終都是將主節(jié)點(diǎn)信息和當(dāng)前哨兵信息廣播給所有的訂閱指定頻道的哨兵節(jié)點(diǎn),這樣就可以將監(jiān)控相同主節(jié)點(diǎn)的哨兵保存在哨兵實(shí)例的sentinels字典中。

發(fā)送完這些命令,就會(huì)獲取所有節(jié)點(diǎn)的新的狀態(tài)。因此,要根據(jù)這些狀態(tài)要判斷是否出現(xiàn)網(wǎng)絡(luò)故障。

判斷節(jié)點(diǎn)的主觀下線狀態(tài)

當(dāng)前哨兵節(jié)點(diǎn)發(fā)送完所有的監(jiān)控命令,有可能發(fā)送成功且順利收到回復(fù),也有可能發(fā)送和回復(fù)都沒有成功收到等等可能,因此要對(duì)當(dāng)前節(jié)點(diǎn)實(shí)例(所有類型都要進(jìn)行判斷)調(diào)用sentinelCheckSubjectivelyDown()函數(shù)進(jìn)行主觀下線判斷

void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;
    // 獲取ri實(shí)例回復(fù)命令已經(jīng)過去的時(shí)長(zhǎng)
    if (ri->link->act_ping_time)
        // 獲取最近一次發(fā)送PING命令過去了多少時(shí)間
        elapsed = mstime() - ri->link->act_ping_time;
    // 如果實(shí)例的連接已經(jīng)斷開
    else if (ri->link->disconnected)
        // 獲取最近一次回復(fù)PING命令過去了多少時(shí)間
        elapsed = mstime() - ri->link->last_avail_time;

    // 如果連接處于低活躍度,那么進(jìn)行重新連接
    // cc命令連接超過了1.5s,并且之前發(fā)送過PING命令但是連接活躍度很低
    if (ri->link->cc &&
        (mstime() - ri->link->cc_conn_time) >
        SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        ri->link->act_ping_time != 0 && /* Ther is a pending ping... */
        /* The pending ping is delayed, and we did not received
         * error replies as well. */
        (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
    {   // 斷開ri實(shí)例的cc命令連接
        instanceLinkCloseConnection(ri->link,ri->link->cc);
    }

    // 檢查pc發(fā)布訂閱的連接是否也處于低活躍狀態(tài)
    if (ri->link->pc &&
        (mstime() - ri->link->pc_conn_time) >
         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
    {   // 斷開ri實(shí)例的pc發(fā)布訂閱連接
        instanceLinkCloseConnection(ri->link,ri->link->pc);
    }
    // 更新主觀下線標(biāo)志,條件如下:
    /*
        1. 沒有回復(fù)命令
        2. Sentinel節(jié)點(diǎn)認(rèn)為ri是主節(jié)點(diǎn),但是它報(bào)告它是從節(jié)點(diǎn)
    */
    // ri實(shí)例回復(fù)命令已經(jīng)過去的時(shí)長(zhǎng)已經(jīng)超過主觀下線的時(shí)限,并且ri實(shí)例是主節(jié)點(diǎn),但是報(bào)告是從節(jié)點(diǎn)
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        /* Is subjectively down */
        // 設(shè)置主觀下線的標(biāo)識(shí)
        if ((ri->flags & SRI_S_DOWN) == 0) {
            // 發(fā)送"+sdown"的事件通知
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            // 設(shè)置實(shí)例被判斷主觀下線的時(shí)間
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        /* Is subjectively up */
        // 如果設(shè)置了主觀下線的標(biāo)識(shí),則取消標(biāo)識(shí)
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}

該函數(shù)主要做了兩件事:

  • 根據(jù)命令連接和發(fā)布訂閱連接的活躍度來判斷是否要執(zhí)行斷開對(duì)應(yīng)連接的操作。以便下次時(shí)鐘循環(huán)在重新連接,以保證可靠性。
  • 獲取回復(fù)PING命令過去的時(shí)間,然后進(jìn)行判斷是否已經(jīng)下線。如果滿足主觀下線的條件,那么會(huì)設(shè)置主觀下線的標(biāo)識(shí)。主觀下線條件有兩個(gè):
    1.回復(fù)PING命令超時(shí)
    2.哨兵節(jié)點(diǎn)發(fā)現(xiàn)他的角色發(fā)生變化。認(rèn)為它是主節(jié)點(diǎn)但是報(bào)告顯示它是從節(jié)點(diǎn)。

當(dāng)判斷完主觀下線厚,雖然對(duì)實(shí)例設(shè)置了主觀下線的標(biāo)識(shí),但是只有該實(shí)例是主節(jié)點(diǎn),才會(huì)執(zhí)行進(jìn)一步的判斷。否則對(duì)于其他類型節(jié)點(diǎn)來說,他們的周期性操作已經(jīng)執(zhí)行完成

判斷主節(jié)點(diǎn)的客觀下線狀態(tài)

客觀下線狀態(tài)的判斷只針對(duì)主節(jié)點(diǎn)而言。之前已經(jīng)判斷過主觀下線,因此只有被當(dāng)前哨兵節(jié)點(diǎn)判斷為主觀下線的主節(jié)點(diǎn)才會(huì)繼續(xù)執(zhí)行客觀下線的判斷。

void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;
    // 如果該master實(shí)例已經(jīng)被當(dāng)前Sentinel節(jié)點(diǎn)判斷為主觀下線
    if (master->flags & SRI_S_DOWN) {
        /* Is down for enough sentinels? */
        // 當(dāng)前Sentinel節(jié)點(diǎn)認(rèn)為下線投1票
        quorum = 1; /* the current sentinel. */
        /* Count all the other sentinels. */
        di = dictGetIterator(master->sentinels);
        // 遍歷監(jiān)控該master實(shí)例的所有的Sentinel節(jié)點(diǎn)
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            // 如果Sentinel也認(rèn)為master實(shí)例主觀下線,那么增加投票數(shù)
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        // 如果超過master設(shè)置的客觀下線票數(shù),則設(shè)置客觀下線標(biāo)識(shí)
        if (quorum >= master->quorum) odown = 1;
    }

    /* Set the flag accordingly to the outcome. */
    // 如果被判斷為客觀下線
    if (odown) {
        // master沒有客觀下線標(biāo)識(shí)則要設(shè)置
        if ((master->flags & SRI_O_DOWN) == 0) {
            // 發(fā)送"+odown"事件通知
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            // 設(shè)置master客觀下線標(biāo)識(shí)
            master->flags |= SRI_O_DOWN;
            // 設(shè)置master被判斷客觀下線的時(shí)間
            master->o_down_since_time = mstime();
        }
    // master實(shí)例沒有客觀下線
    } else {
        // 取消master客觀下線標(biāo)識(shí)
        if (master->flags & SRI_O_DOWN) {
            // 發(fā)送"-odown"事件通知
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

該函數(shù)做了兩個(gè)工作:

1.遍歷監(jiān)控該主節(jié)點(diǎn)的所有其他的哨兵節(jié)點(diǎn),如果這些哨兵節(jié)點(diǎn)也認(rèn)為當(dāng)前主節(jié)點(diǎn)下線(SRI_MASTER_DOWN),那么投票數(shù)加1,當(dāng)超過設(shè)置的投票數(shù),標(biāo)識(shí)客觀下線的標(biāo)志。
2.如果客觀下線的標(biāo)志(odown)為真,那么打開主節(jié)點(diǎn)的客觀下線的表示,否則取消主節(jié)點(diǎn)客觀下線的標(biāo)識(shí)。

  • 這種方法存在一個(gè)缺陷,那么就是客觀下線意味這有足夠多的Sentinel節(jié)點(diǎn)報(bào)告該主節(jié)點(diǎn)在一個(gè)時(shí)間范圍內(nèi)不可達(dá)。但是信息可能被延遲,不能保證N個(gè)實(shí)例在同一時(shí)間都同意該實(shí)例進(jìn)入下線狀態(tài)。

執(zhí)行完的客觀下線判斷,如果發(fā)現(xiàn)主節(jié)點(diǎn)打開了客觀下線的狀態(tài)標(biāo)識(shí),那么就進(jìn)一步進(jìn)行判斷,否則就執(zhí)行跳過判斷。執(zhí)行這進(jìn)一步判斷的函數(shù)是:sentinelStartFailoverIfNeeded()。該函數(shù)用來判斷能不能進(jìn)行故障轉(zhuǎn)移:

  • 主節(jié)點(diǎn)必須處于客觀下線狀態(tài)。如果沒有打開客觀下線的標(biāo)識(shí),就會(huì)直接返回0。
  • 沒有正在對(duì)主節(jié)點(diǎn)進(jìn)行故障轉(zhuǎn)移。
  • 一段時(shí)間內(nèi)沒有嘗試進(jìn)行故障轉(zhuǎn)移,防止頻繁執(zhí)行故障轉(zhuǎn)移。

如果以上條件都滿足,那么會(huì)調(diào)用sentinelStartFailover()函數(shù),將更新主節(jié)點(diǎn)的故障轉(zhuǎn)移狀態(tài),會(huì)執(zhí)行下面這句關(guān)鍵的代碼

master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;

并且返回1,執(zhí)行if條件中的代碼

sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);

由于指定了一個(gè)SENTINEL_ASK_FORCED標(biāo)識(shí),因此會(huì)強(qiáng)制發(fā)送一個(gè)SENTINEL is-master-down-by-addr命令來真正判斷是否主節(jié)點(diǎn)下線,不會(huì)被時(shí)間條件所拒絕執(zhí)行。

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(master->sentinels);
    // 遍歷監(jiān)控master的所有的Sentinel節(jié)點(diǎn)
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        // 當(dāng)前Sentinel實(shí)例最近一個(gè)回復(fù)SENTINEL IS-MASTER-DOWN-BY-ADDR命令所過去的時(shí)間
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        /* If the master state from other sentinel is too old, we clear it. */
        // 如果master狀態(tài)太舊沒有更新,則清除它保存的主節(jié)點(diǎn)狀態(tài)
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }
        // 滿足以下條件向其他Sentinel節(jié)點(diǎn)詢問主節(jié)點(diǎn)是否下線
        /*
            1. 當(dāng)前Sentinel節(jié)點(diǎn)認(rèn)為它已經(jīng)下線,并且處于故障轉(zhuǎn)移狀態(tài)
            2. 其他Sentinel與當(dāng)前Sentinel保持連接狀態(tài)
            3. 在SENTINEL_ASK_PERIOD毫秒內(nèi)沒有收到INFO回復(fù)
        */
        // 主節(jié)點(diǎn)沒有處于客觀下線狀態(tài),則跳過當(dāng)前Sentinel節(jié)點(diǎn)
        if ((master->flags & SRI_S_DOWN) == 0) continue;
        // 如果當(dāng)前Sentinel節(jié)點(diǎn)斷開連接,也跳過
        if (ri->link->disconnected) continue;
        // 最近回復(fù)SENTINEL IS-MASTER-DOWN-BY-ADDR命令在SENTINEL_ASK_PERIODms時(shí)間內(nèi)已經(jīng)回復(fù)過了,則跳過
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;

        /* Ask */
        // 發(fā)送SENTINEL IS-MASTER-DOWN-BY-ADDR命令
        ll2string(port,sizeof(port),master->addr->port);
        // 異步發(fā)送命令
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, ri,
                    "SENTINEL is-master-down-by-addr %s %s %llu %s",
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    // 如果主節(jié)點(diǎn)處于故障轉(zhuǎn)移的狀態(tài),那么發(fā)送該Sentinel的ID,讓收到命令的Sentinel節(jié)點(diǎn)選舉自己為領(lǐng)頭
                    // 否則發(fā)送"*"表示發(fā)送投票
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        // 已發(fā)送未回復(fù)的命令個(gè)數(shù)加1
        if (retval == C_OK) ri->link->pending_commands++;
    }
    dictReleaseIterator(di);
}

函數(shù)遍歷所有監(jiān)控該主節(jié)點(diǎn)的哨兵節(jié)點(diǎn),跳過三種不符合下線的條件的哨兵節(jié)點(diǎn),然后就發(fā)送SENTINEL is-master-down-by-addr命令,之前在if判斷時(shí),就設(shè)置了主節(jié)點(diǎn)的故障轉(zhuǎn)移狀態(tài)為SENTINEL_FAILOVER_STATE_WAIT_START,因此發(fā)送的SENTINEL命令中會(huì)加上自己的runid,用來請(qǐng)求所有收到命令的哨兵節(jié)點(diǎn)將自己選舉為執(zhí)行故障轉(zhuǎn)移的領(lǐng)頭。

由于發(fā)送的是異步命令,所以會(huì)設(shè)置回調(diào)函數(shù)sentinelReceiveIsMasterDownReply()來處理命令回復(fù)。

如果收到的回復(fù)的第一個(gè)整型值為1則打開該哨兵節(jié)點(diǎn)的主節(jié)點(diǎn)下線標(biāo)識(shí)(SRI_MASTER_DOWN)。這里就是前面說的那么缺陷,因?yàn)槭腔卣{(diào)函數(shù),該主節(jié)點(diǎn)下線標(biāo)識(shí)(SRI_MASTER_DOWN)不會(huì)立即打開,可能存在延遲。

至此,主節(jié)點(diǎn)的客觀下線判斷完畢,如果確認(rèn)了客觀下線,那么就會(huì)執(zhí)行故障轉(zhuǎn)移操作。

對(duì)主節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移

故障轉(zhuǎn)移操作的過程非常清晰,正如函數(shù)sentinelFailoverStateMachine()所寫的那樣

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    // ri實(shí)例必須是主節(jié)點(diǎn)
    serverAssert(ri->flags & SRI_MASTER);
    // 如果主節(jié)點(diǎn)不處于進(jìn)行故障轉(zhuǎn)移操作的狀態(tài),則直接返回
    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
    // 根據(jù)故障轉(zhuǎn)移的狀態(tài),執(zhí)行合適的操作
    switch(ri->failover_state) {
        // 故障轉(zhuǎn)移開始
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        // 選擇一個(gè)要晉升的從節(jié)點(diǎn)
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        // 發(fā)送slaveof no one命令,使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        // 等待被選擇的從節(jié)點(diǎn)晉升為主節(jié)點(diǎn),如果超時(shí)則重新選擇晉升的從節(jié)點(diǎn)
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        // 給所有的從節(jié)點(diǎn)發(fā)送slaveof命令,同步新的主節(jié)點(diǎn)
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

在之前判斷主節(jié)點(diǎn)客觀下線的時(shí)候,會(huì)將故障轉(zhuǎn)移的狀態(tài)打開,就是下面這樣:

master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;

所以,主節(jié)點(diǎn)如果沒有被判斷為主觀下線,就不會(huì)判斷為客觀下線,因此也就不會(huì)執(zhí)行故障轉(zhuǎn)移操作。

之前設(shè)置的這些狀態(tài)正好可以執(zhí)行故障轉(zhuǎn)移操作。這個(gè)過程分為五部:

  • 故障轉(zhuǎn)移開始。
    SENTINEL_FAILOVER_STATE_WAIT_START
  • 選擇一個(gè)要晉升的從節(jié)點(diǎn)。
    SENTINEL_FAILOVER_STATE_SELECT_SLAVE
  • 發(fā)送slaveof no one命令,使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)。
    SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
  • 等待被選擇的從節(jié)點(diǎn)晉升為主節(jié)點(diǎn),如果超時(shí)則重新選擇晉升的從節(jié)點(diǎn)。
    SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
  • 給所有的從節(jié)點(diǎn)發(fā)送slaveof命令,同步新的主節(jié)點(diǎn)。
    SENTINEL_FAILOVER_STATE_RECONF_SLAVES

這五部是連續(xù)的,成功執(zhí)行完一步操作,都會(huì)將狀態(tài)設(shè)置為下一步狀態(tài)。而且這五部是分開執(zhí)行的,意思是,每一次時(shí)間事件處理只處理一步,倘若已經(jīng)執(zhí)行了幾部故障轉(zhuǎn)移操作,但是在接下來的故障檢測(cè)時(shí),發(fā)現(xiàn)主節(jié)點(diǎn)是可達(dá)的,因此在之前的下線判斷中都會(huì)將下線標(biāo)識(shí)取消,會(huì)中斷執(zhí)行故障轉(zhuǎn)移。

故障轉(zhuǎn)移開始

sentinelFailoverWaitStart()函數(shù)會(huì)處理故障轉(zhuǎn)移的開始,主要是選舉出一個(gè)領(lǐng)頭哨兵節(jié)點(diǎn),用來領(lǐng)導(dǎo)故障轉(zhuǎn)移,并且更新故障轉(zhuǎn)移的狀態(tài)。
sentinel 自動(dòng)故障遷移使用Raft算法來選舉領(lǐng)頭(leader) Sentinel ,從而確保在一個(gè)給定的紀(jì)元(epoch)里,只有一個(gè)領(lǐng)頭產(chǎn)生。

這表示在同一個(gè)紀(jì)元中,不會(huì)有兩個(gè) Sentinel 同時(shí)被選中為領(lǐng)頭,并且各個(gè) Sentinel 在同一個(gè)紀(jì)元中只會(huì)對(duì)一個(gè)領(lǐng)頭進(jìn)行投票。
更高的配置紀(jì)元總是優(yōu)于較低的紀(jì)元,因此每個(gè) Sentinel 都會(huì)主動(dòng)使用更新的紀(jì)元來代替自己的配置。
簡(jiǎn)單來說,我們可以將 Sentinel 配置看作是一個(gè)帶有版本號(hào)的狀態(tài)。一個(gè)狀態(tài)會(huì)以最后寫入者勝出(last-write-wins)的方式(也即是,最新的配置總是勝出)傳播至所有其他 Sentinel

選擇一個(gè)要晉升的從節(jié)點(diǎn)

sentinelFailoverSelectSlave()函數(shù)用來選擇一個(gè)要晉升的從節(jié)點(diǎn)。該函數(shù)調(diào)用sentinelSelectSlave()函數(shù)來選則一個(gè)晉升的從節(jié)點(diǎn)。

sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
    // 從節(jié)點(diǎn)數(shù)組
    sentinelRedisInstance **instance =
        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
    sentinelRedisInstance *selected = NULL;
    int instances = 0;
    dictIterator *di;
    dictEntry *de;
    mstime_t max_master_down_time = 0;
    // master主節(jié)點(diǎn)處于主觀下線,計(jì)算出主節(jié)點(diǎn)被判斷為處于主觀下線的最大時(shí)長(zhǎng)
    if (master->flags & SRI_S_DOWN)
        max_master_down_time += mstime() - master->s_down_since_time;
    max_master_down_time += master->down_after_period * 10;

    di = dictGetIterator(master->slaves);
    // 迭代下線的主節(jié)點(diǎn)的所有從節(jié)點(diǎn)
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        mstime_t info_validity_time;
        // 跳過下線的從節(jié)點(diǎn)
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
        // 跳過已經(jīng)斷開主從連接的從節(jié)點(diǎn)
        if (slave->link->disconnected) continue;
        // 跳過回復(fù)PING命令過于久遠(yuǎn)的從節(jié)點(diǎn)
        if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
        // 跳過優(yōu)先級(jí)為0的從節(jié)點(diǎn)
        if (slave->slave_priority == 0) continue;

        // 如果主節(jié)點(diǎn)處于主觀下線狀態(tài),Sentinel每秒發(fā)送INFO命令給從節(jié)點(diǎn),否則以默認(rèn)的頻率發(fā)送。
        // 為了檢查命令是否合法,因此計(jì)算一個(gè)延遲值
        if (master->flags & SRI_S_DOWN)
            info_validity_time = SENTINEL_PING_PERIOD*5;
        else
            info_validity_time = SENTINEL_INFO_PERIOD*3;
        // 如果從節(jié)點(diǎn)接受到INFO命令的回復(fù)已經(jīng)過期,跳過該從節(jié)點(diǎn)
        if (mstime() - slave->info_refresh > info_validity_time) continue;
        // 跳過下線時(shí)間過長(zhǎng)的從節(jié)點(diǎn)
        if (slave->master_link_down_time > max_master_down_time) continue;
        // 否則將選中的節(jié)點(diǎn)保存到數(shù)組中
        instance[instances++] = slave;
    }
    dictReleaseIterator(di);
    // 如果有選中的從節(jié)點(diǎn)
    if (instances) {
        // 將數(shù)組中的從節(jié)點(diǎn)排序
        qsort(instance,instances,sizeof(sentinelRedisInstance*),
            compareSlavesForPromotion);
        // 將排序最低的從節(jié)點(diǎn)返回
        selected = instance[0];
    }
    zfree(instance);
    return selected;
}

總結(jié)一下就是這些條件:

  1. 不選有以下狀態(tài)的從節(jié)點(diǎn): S_DOWN, O_DOWN, DISCONNECTED.
  2. 最近一次回復(fù)PING命令超過5s的從節(jié)點(diǎn)
  3. 最近一次獲取INFO命令回復(fù)的時(shí)間不超過info_refresh的三倍時(shí)間長(zhǎng)度
  4. 主從節(jié)點(diǎn)之間斷開操作的時(shí)間不超過:從當(dāng)前的Sentinel節(jié)點(diǎn)來看,主節(jié)點(diǎn)處于下線狀態(tài),從節(jié)點(diǎn)和主節(jié)點(diǎn)斷開連接的時(shí)間不能超過down-after-period的10倍,這看起來非常魔幻(black magic),但是實(shí)際上,當(dāng)主節(jié)點(diǎn)不可達(dá)時(shí),主從連接會(huì)斷開,但是必然不超過一定時(shí)間。意思是,主從斷開,一定是主節(jié)點(diǎn)造成的,而不是從節(jié)點(diǎn)。無論如何,我們將根據(jù)復(fù)制偏移量選擇最佳的從節(jié)點(diǎn)。
  5. 從節(jié)點(diǎn)的優(yōu)先級(jí)不能為0,優(yōu)先級(jí)為0的從節(jié)點(diǎn)被拋棄。

如果以上條件都滿足,那么按照一下順序排序,compareSlavesForPromotion()函數(shù)指定排序方法:

  • 最低的優(yōu)先級(jí)的優(yōu)先。
  • 復(fù)制偏移量較大的優(yōu)先。
  • 運(yùn)行runid字典序小的優(yōu)先。
  • 如果runid相同,那么選擇執(zhí)行命令更多的從節(jié)點(diǎn)。

因此,當(dāng)選擇出一個(gè)適合晉升的從節(jié)點(diǎn)后,sentinelFailoverSelectSlave()會(huì)打開該從節(jié)點(diǎn)的SRI_PROMOTED晉升標(biāo)識(shí),并且保存起來,最后更新故障轉(zhuǎn)移到下一步狀態(tài)。

使從節(jié)點(diǎn)變?yōu)橹鞴?jié)點(diǎn)

函數(shù)sentinelFailoverSendSlaveOfNoOne()會(huì)調(diào)用sentinelSendSlaveOf()函數(shù)發(fā)送一個(gè)slaveof no one命令,使從晉升的節(jié)點(diǎn)和原來的主節(jié)點(diǎn)斷絕主從關(guān)系,成為新的主節(jié)點(diǎn)。

void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
    int retval;

    // 如果要晉升的從節(jié)點(diǎn)處于斷開連接的狀態(tài),那么不能發(fā)送命令。在當(dāng)前狀態(tài),在規(guī)定的故障轉(zhuǎn)移超時(shí)時(shí)間內(nèi)可以重試。
    if (ri->promoted_slave->link->disconnected) {
        // 如果超出 配置的故障轉(zhuǎn)移超時(shí)時(shí)間,那么中斷本次故障轉(zhuǎn)移后返回
        if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
            sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
            sentinelAbortFailover(ri);
        }
        return;
    }
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
    if (retval != C_OK) return;
    // 命令發(fā)送成功,發(fā)送事件通知
    sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
        ri->promoted_slave,"%@");
    // 設(shè)置故障轉(zhuǎn)移狀態(tài)為等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
    // 更新故障轉(zhuǎn)移操作狀態(tài)改變時(shí)間
    ri->failover_state_change_time = mstime();
}

發(fā)送成功后,會(huì)更新故障轉(zhuǎn)移狀態(tài)到下一步狀態(tài)。

等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)

調(diào)用sentinelFailoverWaitPromotion()來等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn),但是該函數(shù)只是處理故障轉(zhuǎn)移操作超時(shí)的情況。

void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
    // 所以,在這里只是處理故障轉(zhuǎn)移超時(shí)的情況
    if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
        // 如果超出配置的故障轉(zhuǎn)移超時(shí)時(shí)間,那么中斷本次故障轉(zhuǎn)移后返回
        sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
        sentinelAbortFailover(ri);
    }
}

這個(gè)函數(shù)并沒有更改故障轉(zhuǎn)移操作的狀態(tài),因?yàn)?,?dāng)從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)時(shí),故障轉(zhuǎn)移狀態(tài)的改變?cè)谔幚鞩NFO命令的回復(fù)時(shí)發(fā)生。
所以,當(dāng)下個(gè)時(shí)間事件發(fā)生(在故障轉(zhuǎn)移設(shè)置的超時(shí)時(shí)間內(nèi)),就會(huì)處理下一個(gè)故障轉(zhuǎn)移的狀態(tài)。如果等待從節(jié)點(diǎn)晉升為主節(jié)點(diǎn)超時(shí),那么會(huì)調(diào)用sentinelAbortFailover()函數(shù)中止當(dāng)前的故障轉(zhuǎn)移操作,清空所有故障轉(zhuǎn)移的狀態(tài),下個(gè)時(shí)間事件發(fā)生時(shí)重新執(zhí)行。

從節(jié)點(diǎn)同步新的主節(jié)點(diǎn)

調(diào)用sentinelFailoverReconfNextSlave()函數(shù),給所有沒有同步新主節(jié)點(diǎn)的從節(jié)點(diǎn)發(fā)送SLAVE OF <new master address>命令。

void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    int in_progress = 0;

    di = dictGetIterator(master->slaves);
    // 遍歷所有的從節(jié)點(diǎn)
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        // 計(jì)算處于已經(jīng)發(fā)送同步命令或者已經(jīng)正在同步的從節(jié)點(diǎn)
        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
            in_progress++;
    }
    dictReleaseIterator(di);

    di = dictGetIterator(master->slaves);
    // 如果已經(jīng)發(fā)送同步命令或者已經(jīng)正在同步的從節(jié)點(diǎn)個(gè)數(shù)小于設(shè)置的同步個(gè)數(shù)限制,那么遍歷所有的從節(jié)點(diǎn)
    while(in_progress < master->parallel_syncs &&
          (de = dictNext(di)) != NULL)
    {
        sentinelRedisInstance *slave = dictGetVal(de);
        int retval;

        // 跳過被晉升的從節(jié)點(diǎn)和已經(jīng)完成同步的從節(jié)點(diǎn)
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;

        // 如果從節(jié)點(diǎn)設(shè)置了發(fā)送slaveof命令,但是故障轉(zhuǎn)移更新到下一個(gè)狀態(tài)超時(shí)
        if ((slave->flags & SRI_RECONF_SENT) &&
            (mstime() - slave->slave_reconf_sent_time) >
            SENTINEL_SLAVE_RECONF_TIMEOUT)
        {
            sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
            // 清除已發(fā)送slaveof命令的標(biāo)識(shí)
            slave->flags &= ~SRI_RECONF_SENT;
            // 設(shè)置為完成同步的標(biāo)識(shí),隨后重新發(fā)送SLAVEOF命令,進(jìn)行同步
            slave->flags |= SRI_RECONF_DONE;
        }

        // 跳過已經(jīng)發(fā)送了命令或者已經(jīng)正在同步的從節(jié)點(diǎn)
        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
        // 跳過連接斷開的從節(jié)點(diǎn)
        if (slave->link->disconnected) continue;

        /* Send SLAVEOF <new master>. */
        // 發(fā)送 SLAVEOF <new master> 命令給從節(jié)點(diǎn),包括剛才超時(shí)的從節(jié)點(diǎn)
        retval = sentinelSendSlaveOf(slave,
                master->promoted_slave->addr->ip,
                master->promoted_slave->addr->port);
        // 如果發(fā)送成功
        if (retval == C_OK) {
            // 設(shè)置已經(jīng)發(fā)送了SLAVEOF命令標(biāo)識(shí)
            slave->flags |= SRI_RECONF_SENT;
            // 設(shè)置發(fā)送slaveof命令的時(shí)間
            slave->slave_reconf_sent_time = mstime();
            sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
            in_progress++;
        }
    }
    dictReleaseIterator(di);

    // 判斷故障轉(zhuǎn)移是否結(jié)束
    sentinelFailoverDetectEnd(master);
}

主要是給沒有被發(fā)送同步新主節(jié)點(diǎn)命令的從節(jié)點(diǎn)和雖然發(fā)送但是同步超時(shí)的從節(jié)點(diǎn)重新發(fā)送SLAVEOF <new master>命令。
函數(shù)最后調(diào)用了sentinelFailoverDetectEnd()函數(shù)來判斷故障轉(zhuǎn)移是否結(jié)束,但是結(jié)束的情況有兩種:
1.故障轉(zhuǎn)移超時(shí)被動(dòng)結(jié)束
2.從節(jié)點(diǎn)已經(jīng)完成同步新晉升的主節(jié)點(diǎn)結(jié)束

void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
    int not_reconfigured = 0, timeout = 0;
    dictIterator *di;
    dictEntry *de;
    // 自從上次更新故障轉(zhuǎn)移狀態(tài)的時(shí)間差
    mstime_t elapsed = mstime() - master->failover_state_change_time;

    /* We can't consider failover finished if the promoted slave is
     * not reachable. */
    // 如果被晉升的從節(jié)點(diǎn)不可達(dá),直接返回
    if (master->promoted_slave == NULL ||
        master->promoted_slave->flags & SRI_S_DOWN) return;

    /* The failover terminates once all the reachable slaves are properly
     * configured. */
    // 遍歷所有的從節(jié)點(diǎn),找出還沒有完成同步從節(jié)點(diǎn)
    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        // 如果是被晉升為主節(jié)點(diǎn)的從節(jié)點(diǎn)或者是完成同步的從節(jié)點(diǎn),則跳過
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
        // 如果從節(jié)點(diǎn)處于客觀下線,則跳過
        if (slave->flags & SRI_S_DOWN) continue;
        // 沒有完成同步的節(jié)點(diǎn)數(shù)加1
        not_reconfigured++;
    }
    dictReleaseIterator(di);

    // 強(qiáng)制結(jié)束故障轉(zhuǎn)移超時(shí)的節(jié)點(diǎn)
    if (elapsed > master->failover_timeout) {
        // 忽略未完成同步的從節(jié)點(diǎn)
        not_reconfigured = 0;
        // 設(shè)置超時(shí)標(biāo)識(shí)
        timeout = 1;
        sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
    }

    // 如果所有的從節(jié)點(diǎn)完成了同步,那么表示故障轉(zhuǎn)移結(jié)束
    if (not_reconfigured == 0) {
        sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
        // 監(jiān)控晉升的主節(jié)點(diǎn),更新配置
        master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
        // 更新故障轉(zhuǎn)移操作狀態(tài)改變時(shí)間
        master->failover_state_change_time = mstime();
    }

    // 如果是因?yàn)槌瑫r(shí)導(dǎo)致故障轉(zhuǎn)移結(jié)束
    if (timeout) {
        dictIterator *di;
        dictEntry *de;

        di = dictGetIterator(master->slaves);
        // 遍歷所有的從節(jié)點(diǎn)
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *slave = dictGetVal(de);
            int retval;
            // 跳過完成同步和發(fā)送同步slaveof命令的從節(jié)點(diǎn)
            if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
            // 跳過連接斷開的從節(jié)點(diǎn)
            if (slave->link->disconnected) continue;
            // 給沒有被發(fā)送同步命令的從節(jié)點(diǎn)發(fā)送同步新晉升主節(jié)點(diǎn)的slaveof IP port 命令
            retval = sentinelSendSlaveOf(slave,
                    master->promoted_slave->addr->ip,
                    master->promoted_slave->addr->port);
            // 如果發(fā)送成功,將這些從節(jié)點(diǎn)設(shè)置為已經(jīng)發(fā)送slaveof命令的標(biāo)識(shí)
            if (retval == C_OK) {
                sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
                slave->flags |= SRI_RECONF_SENT;
            }
        }
        dictReleaseIterator(di);
    }
}

該函數(shù)先會(huì)尋找沒有完成同步的從節(jié)點(diǎn),如果存在,則會(huì)強(qiáng)制將主節(jié)點(diǎn)的故障狀態(tài)更新,如下:

master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;

這么做是為了讓主進(jìn)程繼續(xù)向下執(zhí)行,不要總是在此等待故障狀態(tài)的變化。
雖然強(qiáng)制將主節(jié)點(diǎn)的故障狀態(tài)更新,但是還是要將沒有完成同步的從節(jié)點(diǎn)發(fā)送slaveof IP port讓他們重新同步。
執(zhí)行完這五步故障轉(zhuǎn)移操作后,回到sentinelHandleRedisInstance()函數(shù),該函數(shù)就剩最后一步操作了。

更新主節(jié)點(diǎn)的狀態(tài)

執(zhí)行該函數(shù),嘗試發(fā)送SENTINEL IS-MASTER-DOWN-BY-ADDR給所有的哨兵節(jié)點(diǎn)獲取回復(fù),更新一些主節(jié)點(diǎn)狀態(tài)。

sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);

這次則是有時(shí)間限制的發(fā)送SENTINEL命令,來更新其他哨兵節(jié)點(diǎn)對(duì)主節(jié)點(diǎn)是否下線的判斷。有可能發(fā)生主節(jié)點(diǎn)在故障轉(zhuǎn)移時(shí)重新上線的情況。
當(dāng)執(zhí)行完這一步,sentinelHandleRedisInstance()的所有操作全部剖析完成。
執(zhí)行完所有類型的節(jié)點(diǎn)的周期性任務(wù)之后,會(huì)接下來處理主從切換的情況。

處理主從切換

當(dāng)執(zhí)行完所有類型的節(jié)點(diǎn)的周期性任務(wù)之后,會(huì)執(zhí)行sentinelHandleDictOfRedisInstances()下面的代碼:

 while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
            // 遞歸對(duì)所有節(jié)點(diǎn)執(zhí)行周期性操作
            ......
            // 如果ri實(shí)例處于完成故障轉(zhuǎn)移操作的狀態(tài),所有從節(jié)點(diǎn)已經(jīng)完成對(duì)新主節(jié)點(diǎn)的同步
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                // 設(shè)置主從轉(zhuǎn)換的標(biāo)識(shí)
                switch_to_promoted = ri;
            }

    }
    // 如果主從節(jié)點(diǎn)發(fā)生了轉(zhuǎn)換
    if (switch_to_promoted)
        // 將原來的主節(jié)點(diǎn)從主節(jié)點(diǎn)表中刪除,并用晉升的主節(jié)點(diǎn)替代
        // 意味著已經(jīng)用新晉升的主節(jié)點(diǎn)代替舊的主節(jié)點(diǎn),包括所有從節(jié)點(diǎn)和舊的主節(jié)點(diǎn)從屬當(dāng)前新的主節(jié)點(diǎn)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);

還記得當(dāng)前強(qiáng)制將主節(jié)點(diǎn)的故障狀態(tài)更新的狀態(tài)嗎?對(duì),就是SENTINEL_FAILOVER_STATE_UPDATE_CONFIG狀態(tài)。這個(gè)狀態(tài)表示已經(jīng)完成在故障轉(zhuǎn)移狀態(tài)下,所有從節(jié)點(diǎn)對(duì)新主節(jié)點(diǎn)的同步操作。因此需要調(diào)用sentinelFailoverSwitchToPromotedSlave()函數(shù)特殊處理發(fā)送主從切換的情況。

該函數(shù)會(huì)發(fā)送事件通知然后調(diào)用sentinelResetMasterAndChangeAddress()來用新晉升的主節(jié)點(diǎn)代替舊的主節(jié)點(diǎn),包括所有從節(jié)點(diǎn)和舊的主節(jié)點(diǎn)從屬當(dāng)前新的主節(jié)點(diǎn)。

int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
    sentinelAddr *oldaddr, *newaddr;
    sentinelAddr **slaves = NULL;
    int numslaves = 0, j;
    dictIterator *di;
    dictEntry *de;

    // 創(chuàng)建ip:port地址字符串
    newaddr = createSentinelAddr(ip,port);
    if (newaddr == NULL) return C_ERR;

    // 創(chuàng)建一個(gè)從節(jié)點(diǎn)表,將重置后的主節(jié)點(diǎn)添加到該表中
    // 不包含有我們要轉(zhuǎn)換地址的那一個(gè)從節(jié)點(diǎn)
    di = dictGetIterator(master->slaves);
    // 遍歷所有的從節(jié)點(diǎn)
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        // 如果當(dāng)前從節(jié)點(diǎn)的地址和指定的地址相同,說明該從節(jié)點(diǎn)是要晉升為主節(jié)點(diǎn)的,因此跳過該從節(jié)點(diǎn)
        if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
        // 否則將該從節(jié)點(diǎn)加入到一個(gè)數(shù)組中
        slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
        slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
                                                 slave->addr->port);
    }
    dictReleaseIterator(di);

    // 如果指定的地址和主節(jié)點(diǎn)地址不相同,說明,該主節(jié)點(diǎn)是要被替換的,那么將該主節(jié)點(diǎn)地址加入到從節(jié)點(diǎn)數(shù)組中
    if (!sentinelAddrIsEqual(newaddr,master->addr)) {
        slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
        slaves[numslaves++] = createSentinelAddr(master->addr->ip,
                                                 master->addr->port);
    }

    // 重置主節(jié)點(diǎn),但不刪除所有監(jiān)控自己的Sentinel節(jié)點(diǎn)
    sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
    // 備份舊地址
    oldaddr = master->addr;
    // 設(shè)置新地址
    master->addr = newaddr;
    // 下線時(shí)間清零
    master->o_down_since_time = 0;
    master->s_down_since_time = 0;

    /* Add slaves back. */
    // 為新的主節(jié)點(diǎn)加入從節(jié)點(diǎn)
    for (j = 0; j < numslaves; j++) {
        sentinelRedisInstance *slave;
        // 遍歷所有的從節(jié)點(diǎn)表,創(chuàng)建從節(jié)點(diǎn)實(shí)例,并將該實(shí)例從屬到當(dāng)前新的主節(jié)點(diǎn)中
        slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
                    slaves[j]->port, master->quorum, master);
        // 釋放原有的表中的從節(jié)點(diǎn)
        releaseSentinelAddr(slaves[j]);
        // 事件通知
        if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
    }
    // 釋放從節(jié)點(diǎn)表
    zfree(slaves);

    // 將原主節(jié)點(diǎn)地址釋放
    releaseSentinelAddr(oldaddr);
    // 刷新配置文件
    sentinelFlushConfig();
    return C_OK;
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容