本章重點(diǎn)走讀 redis 網(wǎng)絡(luò) I/O 的多線程部分源碼。
哈希表 + 內(nèi)存數(shù)據(jù)庫 + 非阻塞系統(tǒng)調(diào)用 + 多路復(fù)用 I/O 事件驅(qū)動,使得 redis 單線程處理主邏輯足夠高效。當(dāng)并發(fā)上來后,數(shù)據(jù)的邏輯處理肯定要占用大量時間,那樣,客戶端與服務(wù)端通信處理就會變得遲鈍。所以在合適的時候(根據(jù)任務(wù)量自適應(yīng))采用多線程處理,充分地利用多核優(yōu)勢,分擔(dān)主線程壓力,使得客戶端和服務(wù)端通信更加敏捷。
redis 6.0 新增多線程處理網(wǎng)絡(luò) I/O,默認(rèn)是關(guān)閉的,需要修改配置開啟。對于這個新特性,redis 作者建議:如果項目確實遇到性能問題,再開啟多線程處理網(wǎng)絡(luò)讀寫事件。否則開啟沒什么意義,還會浪費(fèi) CPU 資源。線程數(shù)量不要超過 cpu 核心數(shù)量 - 1,預(yù)留一個核心。
?? 文章來源:wenfh2020.com
1. 配置
多線程這兩個設(shè)置項,默認(rèn)是關(guān)閉的。
# redis.conf
# 配置多線程處理線程個數(shù),數(shù)量最好少于 cpu 核心,默認(rèn) 4。
# io-threads 4
#
# 多線程是否處理讀事件,默認(rèn)關(guān)閉。
# io-threads-do-reads no
redis 作者建議:
- 配置線程數(shù)量,最好少于 cpu 核心。起碼預(yù)留一個空閑核心處理系統(tǒng)其它業(yè)務(wù),線程數(shù)量超過 cpu 核心對 redis 性能有一定影響,因為 redis 主線程處理主邏輯,如果被系統(tǒng)頻繁切換,效率會降低。
- 提供了多線程處理網(wǎng)絡(luò)讀事件開關(guān)。多線程處理網(wǎng)絡(luò)讀事件,對 redis 性能影響不大。redis 作為緩存,查詢操作的頻率比較大,系統(tǒng)的網(wǎng)絡(luò)瓶頸一般在查詢返回數(shù)據(jù),根據(jù)系統(tǒng)實際應(yīng)用場景進(jìn)行配置吧。
2. 主線程工作流程

- 主線程通過事件驅(qū)動從內(nèi)核獲取就緒事件,記錄下需要延時操作的客戶端連接。
- 多線程并行處理延時讀事件。
- 多線程處理延時寫事件。
- 重新執(zhí)行第一步,循環(huán)執(zhí)行。
- 加載循環(huán)事件管理。
int main(int argc, char **argv) {
...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
- 事件循環(huán)管理。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 向內(nèi)核獲取就緒的可讀可寫事件事件進(jìn)行處理,處理時鐘事件。
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
- 獲取就緒事件處理和處理時鐘事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
// 從內(nèi)核中取出就緒的可讀可寫事件。
numevents = aeApiPoll(eventLoop, tvp);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
// 處理讀寫事件。
}
...
// 處理時鐘事件。
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
...
}
- 讀寫邏輯處理。
void beforeSleep(struct aeEventLoop *eventLoop) {
...
// write
handleClientsWithPendingWritesUsingThreads();
...
}
void afterSleep(struct aeEventLoop *eventLoop) {
...
// read
handleClientsWithPendingReadsUsingThreads();
}
3. 多線程協(xié)作

3.1. 特點(diǎn)
主線程實現(xiàn)主邏輯,子線程輔助實現(xiàn)任務(wù)。
- redis 主線程實現(xiàn)主邏輯。
- 主線程與子線程共同處理延時客戶端網(wǎng)絡(luò)讀寫事件。
- 主線程根據(jù)寫事件用戶量大小,開啟/關(guān)閉多線程模式。
- 雖然多線程是并行處理邏輯,但是 redis 整體工作流程是串行的。
- 當(dāng)主線程處理延時讀寫事件時,把一次大任務(wù)進(jìn)行取模切割成小任務(wù),平均分配給(主+子)線程處理。這樣每個客戶端連接被獨(dú)立的一個線程處理,不會出現(xiàn)多個線程同時處理一個客戶端連接邏輯。
- 主線程限制多線程子線程同一個時間段只能并行處理一種類型操作:讀/寫。
- 主線程先等待子線程處理完任務(wù)了,再進(jìn)行下一步,處理分配給自己的等待事件。
- 主線程在等待子線程處理任務(wù)過程中,它不是通過
sleep掛起線程讓出使用權(quán),而是通過for循環(huán)進(jìn)行忙等,不斷檢測所有子線程處理的任務(wù)是否已經(jīng)完成,如果完成再進(jìn)行下一步,處理自己的任務(wù)。相當(dāng)于主線程在等待過程中,并沒有做其它任務(wù),只是讓幫手去干活,幫手都把活干完了,它再干自己的,然后做一些善后工作。主線程在這里的角色有點(diǎn)像代理商或者包工頭。 - 子線程在完成分配的任務(wù)后,也會通過
for循環(huán)忙等,檢測主線程的工作調(diào)度,如果任務(wù)很少了,等待主線程通過鎖,把自己掛起。
3.2. 忙等
多線程模式,存在忙等現(xiàn)象,這個處理有點(diǎn)超出了常規(guī)思維。
3.2.1. 源碼實現(xiàn)
- 主線程分配完任務(wù)后,等待所有子線程完成任務(wù)后,再進(jìn)行下一步操作。
// write
int handleClientsWithPendingWritesUsingThreads(void) {
...
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
}
// read
int handleClientsWithPendingReadsUsingThreads(void) {
...
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
...
}
- 子線程完成任務(wù)后,保持繁忙狀態(tài),等待主線程上鎖掛起自己。
void *IOThreadMain(void *myid) {
...
while(1) {
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
...
}
}
3.2.2. 優(yōu)缺點(diǎn)
-
優(yōu)點(diǎn):
- 實現(xiàn)簡單,主線程可以通過鎖開啟/暫停多線程工作模式,不需要復(fù)雜的通信。
- redis 讀寫事件處理基本都是內(nèi)存級別操作,而且非阻塞,多線程處理任務(wù)非???。
- 反應(yīng)快,有任務(wù)能實時處理。
- 宏觀上看,主線程是串行處理邏輯,邏輯清晰:讀寫邏輯順序處理。主線程把一次大任務(wù)進(jìn)行取模切割成小任務(wù),分配給子線程處理。主線程等子線程完成所有任務(wù)后,再完成自己的任務(wù),再進(jìn)行下一步。
- 因為多線程處理的是客戶端鏈接的延時讀寫邏輯,redis 服務(wù)應(yīng)用場景作為緩存,接入對象一般是服務(wù)端級別,而不是面向普通用戶的客戶端,所以鏈接不會太多。而等待的讀寫鏈接通過取模分散到不同的線程去處理,那每個線程處理的鏈接就會相對較少。每個線程處理任務(wù)也很快。
-
缺點(diǎn):
忙等最大的問題是以浪費(fèi)一定 cpu 性能為代價,如果 redis 鏈接并發(fā)量不是很高,redis 作者不建議開啟多線程模式,所以主邏輯會根據(jù)寫事件鏈接數(shù)量大小來開啟/暫停多線程工作模式。
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
// 如果單線程模式就直接返回。
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
3.3. 源碼分析
3.3.1. 概述
-
網(wǎng)絡(luò)讀寫核心接口:
接口 描述 readQueryFromClient 服務(wù)讀客戶端數(shù)據(jù)。 writeToClient 服務(wù)向客戶端寫數(shù)據(jù)。 -
多線程工作模式核心接口(
networking.c),其它延時處理邏輯也有一部分源碼。接口 描述 IOThreadMain 子線程處理邏輯。 initThreadedIO 主線程創(chuàng)建掛起子線程。 startThreadedIO 主線程開啟多線程工作模式。 stopThreadedIO 主線程暫停多線程工作模式。 stopThreadedIOIfNeeded 主線程根據(jù)寫并發(fā)量是否關(guān)閉多線程工作模式。 handleClientsWithPendingWritesUsingThreads 主線程多線程處理延時寫事件。 handleClientsWithPendingReadsUsingThreads 主線程多線程處理延時讀事件。 -
其它延時處理邏輯,看看下面這些變量和宏在代碼中的邏輯,這里不會詳細(xì)展開。
變量/宏 描述 server.clients_pending_read 延時處理讀事件的客戶端連接鏈表。 server.clients_pending_write 延時處理寫事件的客戶端連接鏈表。 CLIENT_PENDING_READ 延時處理讀事件標(biāo)識。 CLIENT_PENDING_WRITE 延時處理寫事件標(biāo)識。 CLIENT_PENDING_COMMAND 延時處理命令邏輯標(biāo)識。
3.3.2. 源碼
-
變量/宏
io_threads_mutex互斥變量數(shù)組,為了方便主線程喚醒/掛起控制子線程。
io_threads_pending原子變量,方便主線程統(tǒng)計子線程是否已經(jīng)處理完所有任務(wù)。
// 最大線程個數(shù)。
#define IO_THREADS_MAX_NUM 128
// 線程讀操作。
#define IO_THREADS_OP_READ 0
// 線程寫操作。
#define IO_THREADS_OP_WRITE 1
// 線程數(shù)組。
pthread_t io_threads[IO_THREADS_MAX_NUM];
// 互斥變量數(shù)組,提供主線程上鎖和解鎖子線程工作。
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
// 原子變量數(shù)組,分別存儲每個線程要處理的延時處理鏈接數(shù)量。主線程用來統(tǒng)計線程是否處理完等待事件,從而進(jìn)行下一步操作。
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
// 是否啟動了多線程處理模式。
int io_threads_active;
// 線程操作類型。多線程每次只能處理一種類型的操作:讀/寫。
int io_threads_op;
// 子線程列表,子線程個數(shù)為 IO_THREADS_MAX_NUM - 1,因為主線程也會處理延時任務(wù)。
list *io_threads_list[IO_THREADS_MAX_NUM];
- 主線程創(chuàng)建子線程
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
if (server.io_threads_num == 1) return;
// 檢查配置的線程數(shù)量是否超出限制。
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
// 創(chuàng)建 server.io_threads_num - 1 個子線程。
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
// 0 號線程不創(chuàng)建,0 號就是主線程,主線程也會處理任務(wù)邏輯。
if (i == 0) continue;
// 創(chuàng)建子線程,主線程先對子線程上鎖,掛起子線程,不讓子線程進(jìn)入工作模式。
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
- 開啟多線程模式
void startThreadedIO(void) {
serverAssert(io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
// 子線程因為上鎖等待主線程解鎖,當(dāng)主線程解鎖子線程,子線程重新進(jìn)入工作狀態(tài)。
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
}
- 子線程邏輯處理
void *IOThreadMain(void *myid) {
// 每個線程在創(chuàng)建的時候會產(chǎn)生一個業(yè)務(wù) id。
long id = (unsigned long)myid;
while(1) {
// 替代 sleep,用忙等,這樣能實時處理業(yè)務(wù)。但是也付出了耗費(fèi) cpu 的代價。
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 留機(jī)會給主線程上鎖,掛起當(dāng)前子線程。
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
// 根據(jù)操作類型,處理對應(yīng)的讀/寫邏輯。
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
}
}
- 是否需要停止多線程模式
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
// 如果單線程模式就直接返回。
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
- 暫停多線程處理模式
void stopThreadedIO(void) {
// 在停止線程前,仍然有等待處理的延時讀數(shù)據(jù)處理,需要先處理再停止線程。
handleClientsWithPendingReadsUsingThreads();
serverAssert(io_threads_active == 1);
// 主給子線程上鎖,掛起子線程。
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
- 處理延時的讀事件
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
// 將等待處理的鏈接,通過取模放進(jìn)不同的隊列中去。
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 分別統(tǒng)計每個隊列要處理鏈接的個數(shù)。
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主線程處理第一個隊列。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 讀客戶端發(fā)送的數(shù)據(jù)到緩存。
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 主線程處理完任務(wù)后,忙等其它線程,全部線程處理完任務(wù)后,再處理命令實現(xiàn)邏輯。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
/* 主線程處理命令邏輯,因為鏈接都標(biāo)識了等待狀態(tài),讀完數(shù)據(jù)后命令對應(yīng)的業(yè)務(wù)邏輯還沒有被處理。
* 這里去掉等待標(biāo)識,處理命令業(yè)務(wù)邏輯。*/
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
// 讀取數(shù)據(jù),解析協(xié)議取出命令參數(shù),執(zhí)行命令,填充回復(fù)緩沖區(qū)。
processCommandAndResetClient(c);
}
// 繼續(xù)解析協(xié)議,取出命令參數(shù),執(zhí)行命令,填充回復(fù)緩沖區(qū)。
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
return processed;
}
- 處理延時的寫事件
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;
// 如果延時寫事件對應(yīng)的 client 鏈接很少,關(guān)閉多線程模式,用主線程處理異步邏輯。
if (stopThreadedIOIfNeeded()) {
// 處理延時寫事件。
return handleClientsWithPendingWrites();
}
if (!io_threads_active) startThreadedIO();
// 將等待處理的鏈接,通過取模放進(jìn)不同的隊列中去,去掉延遲寫標(biāo)識。
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 線程處理寫事件。
io_threads_op = IO_THREADS_OP_WRITE;
// 分別統(tǒng)計每個隊列要處理鏈接的個數(shù)。
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
// 主線程處理第一個隊列。
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 寫數(shù)據(jù),發(fā)送給回復(fù)給客戶端。
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 主線程處理完任務(wù)后,忙等其它線程,全部線程處理完任務(wù)后,再處理命令實現(xiàn)邏輯。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 如果緩存中還有沒有發(fā)送完的數(shù)據(jù),繼續(xù)發(fā)送或者下次繼續(xù)發(fā),否則從事件驅(qū)動刪除 fd 注冊的可寫事件。
if (clientHasPendingReplies(c)
&& connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) {
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
4. 數(shù)據(jù)結(jié)構(gòu)
redisServer 和 client 分別 redis 是服務(wù)端和客戶端的數(shù)據(jù)結(jié)構(gòu),理解結(jié)構(gòu)的成員作用是走讀源碼邏輯的關(guān)鍵。有興趣的朋友下個斷點(diǎn)跑下邏輯,細(xì)節(jié)就不詳細(xì)展開了。
- 客戶端結(jié)構(gòu)
// server.h
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
...
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
...
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
...
}
- 服務(wù)端結(jié)構(gòu)
struct redisServer {
...
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
...
}
5. 測試
8 核心,16G 內(nèi)存, mac book 本地測試。
redis 服務(wù)默認(rèn)開 4 線程,壓測工具開 2 線程。有剩余核心處理機(jī)器的其它業(yè)務(wù),這樣不影響 redis 工作。
Linux 系統(tǒng),如果安裝不了 redis 最新版本,請升級系統(tǒng)
gcc版本。
- 配置,多線程模式測試,開啟讀寫兩個選項;單線程模式測試則會關(guān)閉。
# redis.conf
io-threads 4
io-threads-do-reads yes
- 壓測命令,會針對客戶端鏈接數(shù)/測試包體大小進(jìn)行測試。
命令邏輯已整理成腳本,放到 github,順手錄制了測試視頻:壓力測試 redis 多線程處理網(wǎng)絡(luò) I/O。
# 壓測工具會模擬多個終端,防止超出限制,被停止。
ulimit -n 16384
# 可以設(shè)置對應(yīng)的鏈接數(shù)/包體大小進(jìn)行測試。
./redis-benchmark -c xxxx -r 1000000 -n 100000 -t set,get -q --threads 2 -d yyyy
- 壓測結(jié)果
在 mac book 上測試,從測試結(jié)果看,多線程沒有單線程好??吹骄W(wǎng)上很多同學(xué)用壓測工具測試,性能有很大的提升,有時間用其它機(jī)器跑下??赡苁菣C(jī)器配置不一樣,但是至少一點(diǎn),這個多線程功能目前還有很大的優(yōu)化空間,所以新特性,還需要放到真實環(huán)境中測試過,才能投產(chǎn)。

6. 總結(jié)
- 多線程模式使得網(wǎng)絡(luò)讀寫快速處理。
- 多線程模式會浪費(fèi)一定 cpu,并發(fā)量不高不建議開啟多線程模式。
- 主線程實現(xiàn)主邏輯,子線程輔助完成任務(wù)。
- redis 即便開啟多線程模式處理網(wǎng)絡(luò)讀寫事件,宏觀邏輯還是串行的。
- 實踐是檢驗真理的試金石,壓測過程中,單線程比多線程優(yōu)秀,沒有體現(xiàn)出多線程應(yīng)有的性能提升,其它尚待驗證。