-
主動數(shù)據(jù)庫配置用來降低單個redis的壓力(主要是master)。通常的方案是master用做數(shù)據(jù)寫入,slave用做數(shù)據(jù)讀取。
主從復(fù)制,服務(wù)器雙方數(shù)據(jù)庫將保存相同的數(shù)據(jù),這種現(xiàn)象稱為“數(shù)據(jù)庫狀態(tài)一致”
127.0.0.1:6380> slaveof 127.0.0.1 6379
舊版復(fù)制功能的實現(xiàn)(2.8以前的版本)
復(fù)制功能都分為兩個基本步驟:同步和命令傳播
1.同步:將從服務(wù)器的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器當(dāng)前所處的數(shù)據(jù)庫狀態(tài)。
2.命令傳播:主服務(wù)器的數(shù)據(jù)庫狀態(tài)被修改,導(dǎo)致主從服務(wù)器的數(shù)據(jù)庫狀態(tài)不一致,讓主從服務(wù)器數(shù)據(jù)庫重新回到一致狀態(tài)。
同步
當(dāng)客戶端向從服務(wù)器發(fā)送slaveof命令,要求從服務(wù)器復(fù)制主服務(wù)器時,從服務(wù)器首先需要執(zhí)行同步操作,也就是將從服務(wù)器的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器當(dāng)前所處的數(shù)據(jù)庫狀態(tài)。而從服務(wù)器對主服務(wù)器的同步操作需要通過向主服務(wù)器發(fā)送SYNC命令來完成。
-
從服務(wù)器發(fā)送SYNC命令的執(zhí)行步驟:
1.從服務(wù)器向主服務(wù)器發(fā)送SYNC命令。
2.收到SYNC命令的主服務(wù)器執(zhí)行BGSAVE命令,在后臺生成一個RDB文件,并使用一個緩沖區(qū)記錄從現(xiàn)在開始執(zhí)行的所有寫命令。
3.當(dāng)主服務(wù)器的BGSAVE命令執(zhí)行完畢時,主服務(wù)器會將BGSAVE命令生成的RDB文件發(fā)送給從服務(wù)器,從服務(wù)器接收接收并載入這個RBD文件,將自己的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器執(zhí)行BGSAVE命令時的數(shù)據(jù)庫狀態(tài)。
4.主服務(wù)器將記錄在緩沖區(qū)里面的所有寫命令發(fā)送給從服務(wù)器,從服務(wù)器執(zhí)行這些寫命令,將自己的數(shù)據(jù)庫狀態(tài)更新至主服務(wù)器當(dāng)前所處的狀態(tài)。
命令傳播
- 在執(zhí)行完同步操作以后,如果客戶端又再次向主服務(wù)器發(fā)送寫命令,如果此時該命令沒有傳播到從服務(wù)器,那么主從服務(wù)器的數(shù)據(jù)庫狀態(tài)必然會不一樣,因此,在執(zhí)行完同步操作以后,還必須得執(zhí)行命令傳播,用來傳播主服務(wù)器接收到的新的命令請求。
- 為了讓主從服務(wù)器再次回到一致狀態(tài),主服務(wù)器需要對從服務(wù)器執(zhí)行命令傳播操作:主服務(wù)器會將自己執(zhí)行的那條寫命令,發(fā)送給從服務(wù)器,當(dāng)從服務(wù)器執(zhí)行了相同的寫命令之后,主從服務(wù)器將再次回到一致狀態(tài)。
舊版復(fù)制存在的缺陷
從服務(wù)器對主服務(wù)器的復(fù)制分為以下兩種:
1.初次復(fù)制:從服務(wù)器沒有復(fù)制任何主服務(wù)器,或者從服務(wù)器當(dāng)前要復(fù)制的主服務(wù)器和上一次復(fù)制的主服務(wù)器不同。
2.斷線后重復(fù)制:處理命令傳播階段的主從服務(wù)器因為網(wǎng)絡(luò)原因而中斷了復(fù)制,但從服務(wù)器通過自動重連接重新連接上主服務(wù)器,并繼續(xù)復(fù)制主服務(wù)器。

- 當(dāng)主從服務(wù)器斷開以后,從服務(wù)器通過自動重連連上主服務(wù)器,然后從服務(wù)器向主服務(wù)器發(fā)送SYNC命令,進行同步操作,但是主服務(wù)器此時會將數(shù)據(jù)庫狀態(tài)寫入到RDB文件中,如上述紅色方框(重復(fù)復(fù)制了許多鍵值對),這部分就是舊版復(fù)制存在的缺陷。
舊版復(fù)制問題的解決方案
為了解決舊版復(fù)制功能在處理斷線重復(fù)復(fù)制情況時的低效問題,redis從2.8以后,使用PSYNC命令代替SYNC命令來執(zhí)行復(fù)制時的同步操作。
psync命令具有完整重同步和部分重同步兩種模式。
1.完整重同步:用以解決初次復(fù)制的問題。執(zhí)行操作與sync一模一樣。
2.部分重同步:用于處理斷線后重復(fù)制情況:當(dāng)從服務(wù)器在斷線后重新連上主服務(wù)器時,如果條件允許,主服務(wù)器可以將主從服務(wù)器連接斷開期間執(zhí)行的寫命令發(fā)送給從服務(wù)器,從服務(wù)器只要接收并執(zhí)行這些寫命令,就可以將數(shù)據(jù)更新至主服務(wù)器當(dāng)前所處的狀態(tài)。-
PSYNC命令的部分重同步解決了舊版復(fù)制功能在處理斷線后重復(fù)復(fù)制時出現(xiàn)的低效情況。
部分重同步的實現(xiàn)
- 要實現(xiàn)部分重同步,必須解決以下三個問題:
1.當(dāng)前主從服務(wù)器各復(fù)制了多少數(shù)據(jù)??
2.如果主從服務(wù)器斷線以后,主服務(wù)器新接收到的命令請求,該如何處理?
3.如果在一個集群系統(tǒng)中,如何找到上一次復(fù)制的那個主服務(wù)器呢?
部分重同步功能由以下三個部分構(gòu)成:
a.主服務(wù)器的復(fù)制偏移量和從服務(wù)器的復(fù)制偏移量
b.主服務(wù)器的復(fù)制積壓緩沖區(qū)
c.服務(wù)器的運行ID
t/*
* 客戶端結(jié)構(gòu)
*
* 為每個連接到服務(wù)器的客戶端保存維持一個該結(jié)構(gòu)的映射,
* 從而實現(xiàn)多路復(fù)用。
*/
typedef struct redisClient {
// socket 文件描述符
int fd;
// 指向當(dāng)前目標(biāo)數(shù)據(jù)庫的指針
redisDb *db;
// 當(dāng)前目標(biāo)數(shù)據(jù)庫的號碼
int dictid;
// 查詢緩存
sds querybuf;
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
// 參數(shù)的個數(shù)
int argc;
// 字符串表示的命令,以及命令的參數(shù)
robj **argv;
// 命令,以及上個命令
struct redisCommand *cmd, *lastcmd;
// 回復(fù)類型
int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */
// 保存回復(fù)的鏈表
list *reply;
// 鏈表中保存的所有回復(fù)的總字節(jié)大小
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
// 統(tǒng)計數(shù)據(jù)
int sentlen;
time_t ctime; /* Client creation time */
time_t lastinteraction; /* time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
// 復(fù)制功能相關(guān)
int slaveseldb; /* slave selected db, if this client is a slave */
int authenticated; /* when requirepass is non-NULL */
// 客戶端當(dāng)前的同步狀態(tài)
int replstate; /* replication state if this is a slave */
// 同步數(shù)據(jù)庫的文件描述符
int repldbfd; /* replication DB file descriptor */
// 同步數(shù)據(jù)庫文件的偏移量
long repldboff; /* replication DB file offset */
// 同步數(shù)據(jù)庫文件的大小
off_t repldbsize; /* replication DB file size */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
// 事務(wù)實現(xiàn)
multiState mstate; /* MULTI/EXEC state */
// 阻塞狀態(tài)
blockingState bpop; /* blocking state */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
// 被監(jiān)視的 KEY
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
// 訂閱與發(fā)布
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
/* Response buffer */
// 回復(fù)緩存的當(dāng)前緩存
int bufpos;
// 回復(fù)緩存,可以保存多個回復(fù)
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
復(fù)制偏移量
執(zhí)行復(fù)制的雙方---主從服務(wù)器都會維護一個復(fù)制偏移量。
主服務(wù)器每次向從服務(wù)器傳播N個字節(jié)的數(shù)據(jù)時,就將自己的復(fù)制偏移量的值加上N。
從服務(wù)器每次接收到主服務(wù)器傳播來的N個字節(jié)的數(shù)據(jù)時,就將自己的復(fù)制偏移量加上N。
通過對比主從服務(wù)器的復(fù)制偏移量,程序很容易知道主從服務(wù)器是否處于一致狀態(tài)。
-
主從狀態(tài)一致:
-
主從狀態(tài)不一致:
假如從服務(wù)器A在斷線后就立即重新連接主服務(wù)器,并且成功,那么接下來,從服務(wù)器將向主服務(wù)器發(fā)送PSYNC命令,報告從服務(wù)器A當(dāng)前的復(fù)制偏移量為10086,那么這時,主服務(wù)器應(yīng)該對從服務(wù)器執(zhí)行完全重同步還是部分重同步?如果執(zhí)行部分重同步的話,主服務(wù)器又如何補償從服務(wù)器A在斷線期間丟失的那部分?jǐn)?shù)據(jù)呢?
復(fù)制積壓區(qū)
復(fù)制積壓區(qū)是由主服務(wù)器維護的一個固定長度的隊列,默認(rèn)大小為1M。
當(dāng)主服務(wù)器進行命令傳播時,它不僅將寫命令發(fā)送給所有從服務(wù)器,還會將寫命令入列到復(fù)制積壓區(qū)緩沖區(qū)里面。
因此,主服務(wù)器的復(fù)制積壓區(qū)里面會保存著一部分最近傳播的寫命令,并且復(fù)制積壓緩沖區(qū)會為隊列中的每個字節(jié)記錄相應(yīng)的復(fù)制偏移量。

當(dāng)從服務(wù)器重新連上主服務(wù)器時,從服務(wù)器會通過PSYNC命令將自己的復(fù)制偏移量offset發(fā)送給主服務(wù)器,主服務(wù)器會根據(jù)這個復(fù)制偏移量來決定對主服務(wù)器進行何種復(fù)制操作:
1.如果offset偏移量之后的數(shù)據(jù),仍然存在于復(fù)制積壓區(qū)里面,那么主服務(wù)器將對從服務(wù)器執(zhí)行部分重同步操作。
2.如果offset偏移量之后的數(shù)據(jù),不在復(fù)制積壓區(qū)里面,那么主服務(wù)器將會對從服務(wù)器進行完全重同步操作。
- 服務(wù)器允許ID
每個Redis服務(wù)器,不論是主服務(wù)器還是從服務(wù)器都會有自己的運行ID。這個ID在服務(wù)器啟動時自動生成,由40個隨機十六進制字符組成。
當(dāng)從服務(wù)器對主服務(wù)器進行初次復(fù)制時,主服務(wù)器會將自己的運行ID傳送給從服務(wù)器,而從服務(wù)器會將這個運行ID保存起來。
當(dāng)從服務(wù)器斷線并重連上一個主服務(wù)器時,從服務(wù)器將向當(dāng)前連接的主服務(wù)器發(fā)送自己的之前保存的運行ID:
1.如果ID一致,說明短線后重連的就是之前連接的服務(wù)器;
2.如果ID不一致,說明短信后重連的不是之前鏈接的服務(wù)器,那么主服務(wù)器將對從服務(wù)器進行完整重同步操作。
源碼解析
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 4 /* Connected to master */
初始化時設(shè)置
server.replstate = REDIS_REPL_CONNECT
即slave需要連接master
slave周期性調(diào)用replicationCron,查看slave狀態(tài):
void replicationCron(void) {
/*判斷是否IO超時*/
if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
replicationAbortSyncTransfer(); //終止連接,并設(shè)置server.replstate = REDIS_REPL_CONNECT;
}
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.replstate == REDIS_REPL_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
freeClient(server.master);
}
/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (connectWithMaster() == REDIS_OK) { //連接master
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
/* Don't ping slaves that are in the middle of a bulk transfer
* with the master for first synchronization. */
if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
if (slave->replstate == REDIS_REPL_ONLINE) {
/* If the slave is online send a normal ping */
addReplySds(slave,sdsnew("PING\r\n"));
} else {
/* Otherwise we are in the pre-synchronization stage.
* Just a newline will do the work of refreshing the
* connection last interaction time, and at the same time
* we'll be sure that being a single char there are no
* short-write problems. */
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry, it's just a ping. */
}
}
}
}
}
當(dāng)server.replstate == REDIS_REPL_CONNECT時,slave連接master,連接成功后,slave執(zhí)行syncWithMaster函數(shù),syncWithMaster將向master發(fā)送SYNC命令
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
server.repl_transfer_s = fd;
server.replstate = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
master端:
master對于slave的連接和client的連接統(tǒng)一處理,在接收到slave發(fā)出的SYNC命令后,執(zhí)行syncCommand,syncCommand 將查看當(dāng)前狀態(tài),如果正在做快照,則等待,否則啟動后臺進程做快照。
void syncCommand(redisClient *c) {
/* ignore SYNC if aleady slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (listLength(c->reply) != 0) {
addReplyError(c,"SYNC is invalid with pending input");
return;
}
redisLog(REDIS_NOTICE,"Slave ask for synchronization");
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.bgsavechildpid != -1) {
.....
} else {
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplyError(c,"Unable to perform background save");
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
}
c->repldbfd = -1;
c->flags |= REDIS_SLAVE;
c->slaveseldb = 0;
listAddNodeTail(server.slaves,c);
return;
}
在完成快照后,執(zhí)行updateSlavesWaitingBgsave函數(shù),updateSlavesWaitingBgsave將查看當(dāng)前master的各個slave的狀態(tài),如果發(fā)現(xiàn)有在等待bgsave完成的,則注冊事件sendBulkToSlave,sendBulkToSlave將快照文件發(fā)送給slave
void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
startbgsave = 1;
slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
struct redis_stat buf;
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
if (startbgsave) {
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
listIter li;
listRewind(server.slaves,&li);
redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
freeClient(slave);
}
}
}
}
在slave完成第一次的同步后,后續(xù)如果master接收到改變db狀態(tài)的命令,則調(diào)用replicationFeedSlaves將相應(yīng)變更發(fā)送slave
/* Call() is the core of Redis execution of a command */
void call(redisClient *c) {
long long dirty, start = ustime(), duration;
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
duration = ustime()-start;
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
if (server.appendonly && dirty > 0)
feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
server.stat_numcommands++;
}
總結(jié):
- redis主從復(fù)制,并沒有增加太多額外代碼,但是功能強大,支持多個slave,并且支持slave作為master。
- redis雖然宣稱主從復(fù)制無阻塞,但是,由于redis使用單線程服務(wù),而和slave的交互由處理線程統(tǒng)一處理,因此,對性能有影響。在slave第一次和master做同步時,如果master快照文件較大,則快照文件的傳輸將耗費較長時間,文件傳輸過程中master無法提供服務(wù)。





