「Redis源碼解讀」—多機數(shù)據(jù)庫(一)主從

  • 主動數(shù)據(jù)庫配置用來降低單個redis的壓力(主要是master)。通常的方案是master用做數(shù)據(jù)寫入,slave用做數(shù)據(jù)讀取。



    主從復(fù)制,服務(wù)器雙方數(shù)據(jù)庫將保存相同的數(shù)據(jù),這種現(xiàn)象稱為“數(shù)據(jù)庫狀態(tài)一致”

127.0.0.1:6380> slaveof 127.0.0.1 6379

舊版復(fù)制功能的實現(xiàn)(2.8以前的版本)

復(fù)制功能都分為兩個基本步驟:同步命令傳播
1.同步:將從服務(wù)器的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器當(dāng)前所處的數(shù)據(jù)庫狀態(tài)。
2.命令傳播:主服務(wù)器的數(shù)據(jù)庫狀態(tài)被修改,導(dǎo)致主從服務(wù)器的數(shù)據(jù)庫狀態(tài)不一致,讓主從服務(wù)器數(shù)據(jù)庫重新回到一致狀態(tài)。

同步

當(dāng)客戶端向從服務(wù)器發(fā)送slaveof命令,要求從服務(wù)器復(fù)制主服務(wù)器時,從服務(wù)器首先需要執(zhí)行同步操作,也就是將從服務(wù)器的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器當(dāng)前所處的數(shù)據(jù)庫狀態(tài)。而從服務(wù)器對主服務(wù)器的同步操作需要通過向主服務(wù)器發(fā)送SYNC命令來完成。

  • 從服務(wù)器發(fā)送SYNC命令的執(zhí)行步驟:
    1.從服務(wù)器向主服務(wù)器發(fā)送SYNC命令。
    2.收到SYNC命令的主服務(wù)器執(zhí)行BGSAVE命令,在后臺生成一個RDB文件,并使用一個緩沖區(qū)記錄從現(xiàn)在開始執(zhí)行的所有寫命令。
    3.當(dāng)主服務(wù)器的BGSAVE命令執(zhí)行完畢時,主服務(wù)器會將BGSAVE命令生成的RDB文件發(fā)送給從服務(wù)器,從服務(wù)器接收接收并載入這個RBD文件,將自己的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器執(zhí)行BGSAVE命令時的數(shù)據(jù)庫狀態(tài)。
    4.主服務(wù)器將記錄在緩沖區(qū)里面的所有寫命令發(fā)送給從服務(wù)器,從服務(wù)器執(zhí)行這些寫命令,將自己的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器當(dāng)前所處的狀態(tài)。


命令傳播

  • 在執(zhí)行完同步操作以后,如果客戶端又再次向主服務(wù)器發(fā)送寫命令,如果此時該命令沒有傳播到從服務(wù)器,那么主從服務(wù)器的數(shù)據(jù)庫狀態(tài)必然會不一樣,因此,在執(zhí)行完同步操作以后,還必須得執(zhí)行命令傳播,用來傳播主服務(wù)器接收到的新的命令請求。
  • 為了讓主從服務(wù)器再次回到一致狀態(tài),主服務(wù)器需要對從服務(wù)器執(zhí)行命令傳播操作:主服務(wù)器會將自己執(zhí)行的那條寫命令,發(fā)送給從服務(wù)器,當(dāng)從服務(wù)器執(zhí)行了相同的寫命令之后,主從服務(wù)器將再次回到一致狀態(tài)。

舊版復(fù)制存在的缺陷

從服務(wù)器對主服務(wù)器的復(fù)制分為以下兩種:
1.初次復(fù)制:從服務(wù)器沒有復(fù)制任何主服務(wù)器,或者從服務(wù)器當(dāng)前要復(fù)制的主服務(wù)器和上一次復(fù)制的主服務(wù)器不同。
2.斷線后重復(fù)制:處理命令傳播階段的主從服務(wù)器因為網(wǎng)絡(luò)原因而中斷了復(fù)制,但從服務(wù)器通過自動重連接重新連接上主服務(wù)器,并繼續(xù)復(fù)制主服務(wù)器。


  • 當(dāng)主從服務(wù)器斷開以后,從服務(wù)器通過自動重連連上主服務(wù)器,然后從服務(wù)器向主服務(wù)器發(fā)送SYNC命令,進行同步操作,但是主服務(wù)器此時會將數(shù)據(jù)庫狀態(tài)寫入到RDB文件中,如上述紅色方框(重復(fù)復(fù)制了許多鍵值對),這部分就是舊版復(fù)制存在的缺陷。

舊版復(fù)制問題的解決方案

為了解決舊版復(fù)制功能在處理斷線重復(fù)復(fù)制情況時的低效問題,redis從2.8以后,使用PSYNC命令代替SYNC命令來執(zhí)行復(fù)制時的同步操作。

  • psync命令具有完整重同步和部分重同步兩種模式。
    1.完整重同步:用以解決初次復(fù)制的問題。執(zhí)行操作與sync一模一樣。
    2.部分重同步:用于處理斷線后重復(fù)制情況:當(dāng)從服務(wù)器在斷線后重新連上主服務(wù)器時,如果條件允許,主服務(wù)器可以將主從服務(wù)器連接斷開期間執(zhí)行的寫命令發(fā)送給從服務(wù)器,從服務(wù)器只要接收并執(zhí)行這些寫命令,就可以將數(shù)據(jù)更新至主服務(wù)器當(dāng)前所處的狀態(tài)。

  • PSYNC命令的部分重同步解決了舊版復(fù)制功能在處理斷線后重復(fù)復(fù)制時出現(xiàn)的低效情況。



部分重同步的實現(xiàn)

  • 要實現(xiàn)部分重同步,必須解決以下三個問題:
    1.當(dāng)前主從服務(wù)器各復(fù)制了多少數(shù)據(jù)??
    2.如果主從服務(wù)器斷線以后,主服務(wù)器新接收到的命令請求,該如何處理?
    3.如果在一個集群系統(tǒng)中,如何找到上一次復(fù)制的那個主服務(wù)器呢?

部分重同步功能由以下三個部分構(gòu)成:
  a.主服務(wù)器的復(fù)制偏移量和從服務(wù)器的復(fù)制偏移量
  b.主服務(wù)器的復(fù)制積壓緩沖區(qū)
  c.服務(wù)器的運行ID

t/*
 * 客戶端結(jié)構(gòu)
 *
 * 為每個連接到服務(wù)器的客戶端保存維持一個該結(jié)構(gòu)的映射,
 * 從而實現(xiàn)多路復(fù)用。
 */
typedef struct redisClient {

    // socket 文件描述符
    int fd;

    // 指向當(dāng)前目標(biāo)數(shù)據(jù)庫的指針
    redisDb *db;

    // 當(dāng)前目標(biāo)數(shù)據(jù)庫的號碼
    int dictid;

    // 查詢緩存
    sds querybuf;
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size */

    // 參數(shù)的個數(shù)
    int argc;

    // 字符串表示的命令,以及命令的參數(shù)
    robj **argv;

    // 命令,以及上個命令
    struct redisCommand *cmd, *lastcmd;

    // 回復(fù)類型
    int reqtype;
    int multibulklen;       /* number of multi bulk arguments left to read */
    long bulklen;           /* length of bulk argument in multi bulk request */

    // 保存回復(fù)的鏈表
    list *reply;
    // 鏈表中保存的所有回復(fù)的總字節(jié)大小
    unsigned long reply_bytes; /* Tot bytes of objects in reply list */

    // 統(tǒng)計數(shù)據(jù)
    int sentlen;
    time_t ctime;           /* Client creation time */
    time_t lastinteraction; /* time of the last interaction, used for timeout */
    time_t obuf_soft_limit_reached_time;
    int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */

    // 復(fù)制功能相關(guān)
    int slaveseldb;         /* slave selected db, if this client is a slave */
    int authenticated;      /* when requirepass is non-NULL */
    // 客戶端當(dāng)前的同步狀態(tài)
    int replstate;          /* replication state if this is a slave */
    // 同步數(shù)據(jù)庫的文件描述符
    int repldbfd;           /* replication DB file descriptor */
    // 同步數(shù)據(jù)庫文件的偏移量
    long repldboff;         /* replication DB file offset */
    // 同步數(shù)據(jù)庫文件的大小
    off_t repldbsize;       /* replication DB file size */
    int slave_listening_port; /* As configured with: SLAVECONF listening-port */

    // 事務(wù)實現(xiàn)
    multiState mstate;      /* MULTI/EXEC state */

    // 阻塞狀態(tài)
    blockingState bpop;   /* blocking state */
    list *io_keys;          /* Keys this client is waiting to be loaded from the
                             * swap file in order to continue. */

    // 被監(jiān)視的 KEY
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */

    // 訂閱與發(fā)布
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */

    /* Response buffer */
    // 回復(fù)緩存的當(dāng)前緩存
    int bufpos;
    // 回復(fù)緩存,可以保存多個回復(fù)
    char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;

復(fù)制偏移量

執(zhí)行復(fù)制的雙方---主從服務(wù)器都會維護一個復(fù)制偏移量。
主服務(wù)器每次向從服務(wù)器傳播N個字節(jié)的數(shù)據(jù)時,就將自己的復(fù)制偏移量的值加上N。
從服務(wù)器每次接收到主服務(wù)器傳播來的N個字節(jié)的數(shù)據(jù)時,就將自己的復(fù)制偏移量加上N。
通過對比主從服務(wù)器的復(fù)制偏移量,程序很容易知道主從服務(wù)器是否處于一致狀態(tài)。

  • 主從狀態(tài)一致: 


  • 主從狀態(tài)不一致:



    假如從服務(wù)器A在斷線后就立即重新連接主服務(wù)器,并且成功,那么接下來,從服務(wù)器將向主服務(wù)器發(fā)送PSYNC命令,報告從服務(wù)器A當(dāng)前的復(fù)制偏移量為10086,那么這時,主服務(wù)器應(yīng)該對從服務(wù)器執(zhí)行完全重同步還是部分重同步?如果執(zhí)行部分重同步的話,主服務(wù)器又如何補償從服務(wù)器A在斷線期間丟失的那部分?jǐn)?shù)據(jù)呢?

復(fù)制積壓區(qū)

復(fù)制積壓區(qū)是由主服務(wù)器維護的一個固定長度的隊列,默認(rèn)大小為1M。
當(dāng)主服務(wù)器進行命令傳播時,它不僅將寫命令發(fā)送給所有從服務(wù)器,還會將寫命令入列到復(fù)制積壓區(qū)緩沖區(qū)里面。
因此,主服務(wù)器的復(fù)制積壓區(qū)里面會保存著一部分最近傳播的寫命令,并且復(fù)制積壓緩沖區(qū)會為隊列中的每個字節(jié)記錄相應(yīng)的復(fù)制偏移量。


當(dāng)從服務(wù)器重新連上主服務(wù)器時,從服務(wù)器會通過PSYNC命令將自己的復(fù)制偏移量offset發(fā)送給主服務(wù)器,主服務(wù)器會根據(jù)這個復(fù)制偏移量來決定對主服務(wù)器進行何種復(fù)制操作:
1.如果offset偏移量之后的數(shù)據(jù),仍然存在于復(fù)制積壓區(qū)里面,那么主服務(wù)器將對從服務(wù)器執(zhí)行部分重同步操作。
2.如果offset偏移量之后的數(shù)據(jù),不在復(fù)制積壓區(qū)里面,那么主服務(wù)器將會對從服務(wù)器進行完全重同步操作。

  • 服務(wù)器允許ID
    每個Redis服務(wù)器,不論是主服務(wù)器還是從服務(wù)器都會有自己的運行ID。這個ID在服務(wù)器啟動時自動生成,由40個隨機十六進制字符組成。
    當(dāng)從服務(wù)器對主服務(wù)器進行初次復(fù)制時,主服務(wù)器會將自己的運行ID傳送給從服務(wù)器,而從服務(wù)器會將這個運行ID保存起來。
    當(dāng)從服務(wù)器斷線并重連上一個主服務(wù)器時,從服務(wù)器將向當(dāng)前連接的主服務(wù)器發(fā)送自己的之前保存的運行ID:
    1.如果ID一致,說明短線后重連的就是之前連接的服務(wù)器;
    2.如果ID不一致,說明短信后重連的不是之前鏈接的服務(wù)器,那么主服務(wù)器將對從服務(wù)器進行完整重同步操作。

源碼解析

/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 4 /* Connected to master */
初始化時設(shè)置
server.replstate = REDIS_REPL_CONNECT
即slave需要連接master
slave周期性調(diào)用replicationCron,查看slave狀態(tài):

void replicationCron(void) {
    /*判斷是否IO超時*/
    if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
        replicationAbortSyncTransfer(); //終止連接,并設(shè)置server.replstate = REDIS_REPL_CONNECT;
    }

    /* Timed out master when we are an already connected slave? */
    if (server.masterhost && server.replstate == REDIS_REPL_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
        freeClient(server.master);
    }

    /* Check if we should connect to a MASTER */
    if (server.replstate == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER...");
        if (connectWithMaster() == REDIS_OK) { //連接master
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    
    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            redisClient *slave = ln->value;

            /* Don't ping slaves that are in the middle of a bulk transfer
             * with the master for first synchronization. */
            if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
            if (slave->replstate == REDIS_REPL_ONLINE) {
                /* If the slave is online send a normal ping */
                addReplySds(slave,sdsnew("PING\r\n"));
            } else {
                /* Otherwise we are in the pre-synchronization stage.
                 * Just a newline will do the work of refreshing the
                 * connection last interaction time, and at the same time
                 * we'll be sure that being a single char there are no
                 * short-write problems. */
                if (write(slave->fd, "\n", 1) == -1) {
                    /* Don't worry, it's just a ping. */
                }
            }
        }
    }
}

當(dāng)server.replstate == REDIS_REPL_CONNECT時,slave連接master,連接成功后,slave執(zhí)行syncWithMaster函數(shù),syncWithMaster將向master發(fā)送SYNC命令

int connectWithMaster(void) {
    int fd;

    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }

    server.repl_transfer_s = fd;
    server.replstate = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}

master端:
master對于slave的連接和client的連接統(tǒng)一處理,在接收到slave發(fā)出的SYNC命令后,執(zhí)行syncCommand,syncCommand 將查看當(dāng)前狀態(tài),如果正在做快照,則等待,否則啟動后臺進程做快照。

void syncCommand(redisClient *c) {
    /* ignore SYNC if aleady slave or in monitor mode */
    if (c->flags & REDIS_SLAVE) return;

    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok... */
    if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
        addReplyError(c,"Can't SYNC while not connected with my master");
        return;
    }

    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (listLength(c->reply) != 0) {
        addReplyError(c,"SYNC is invalid with pending input");
        return;
    }

    redisLog(REDIS_NOTICE,"Slave ask for synchronization");
    /* Here we need to check if there is a background saving operation
     * in progress, or if it is required to start one */
    if (server.bgsavechildpid != -1) {
       .....
    } else {
        /* Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
            addReplyError(c,"Unable to perform background save");
            return;
        }
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    }
    c->repldbfd = -1;
    c->flags |= REDIS_SLAVE;
    c->slaveseldb = 0;
    listAddNodeTail(server.slaves,c);
    return;
}

在完成快照后,執(zhí)行updateSlavesWaitingBgsave函數(shù),updateSlavesWaitingBgsave將查看當(dāng)前master的各個slave的狀態(tài),如果發(fā)現(xiàn)有在等待bgsave完成的,則注冊事件sendBulkToSlave,sendBulkToSlave將快照文件發(fā)送給slave

void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;

        if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
            startbgsave = 1;
            slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
            struct redis_stat buf;

            if (bgsaveerr != REDIS_OK) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                continue;
            }
            if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
                redis_fstat(slave->repldbfd,&buf) == -1) {
                freeClient(slave);
                redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                continue;
            }
            slave->repldboff = 0;
            slave->repldbsize = buf.st_size;
            slave->replstate = REDIS_REPL_SEND_BULK;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                freeClient(slave);
                continue;
            }
        }
    }
    if (startbgsave) {
        if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
            listIter li;

            listRewind(server.slaves,&li);
            redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
            while((ln = listNext(&li))) {
                redisClient *slave = ln->value;

                if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                    freeClient(slave);
            }
        }
    }
}

在slave完成第一次的同步后,后續(xù)如果master接收到改變db狀態(tài)的命令,則調(diào)用replicationFeedSlaves將相應(yīng)變更發(fā)送slave

/* Call() is the core of Redis execution of a command */
void call(redisClient *c) {
    long long dirty, start = ustime(), duration;

    dirty = server.dirty;
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);

    if (server.appendonly && dirty > 0)
        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}

總結(jié):

  1. redis主從復(fù)制,并沒有增加太多額外代碼,但是功能強大,支持多個slave,并且支持slave作為master。
  2. redis雖然宣稱主從復(fù)制無阻塞,但是,由于redis使用單線程服務(wù),而和slave的交互由處理線程統(tǒng)一處理,因此,對性能有影響。在slave第一次和master做同步時,如果master快照文件較大,則快照文件的傳輸將耗費較長時間,文件傳輸過程中master無法提供服務(wù)。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容