在 2020 年 5 月推出的 Redis 6.0 版本中,Redis 在執(zhí)行模型中還進一步使用了多線程來處理 IO 任務。之前在:http://www.itdecent.cn/p/0323fc06a36f 簡單討論過Redis執(zhí)行命令的過程大致分為:讀取命令、解析命令、執(zhí)行命令、返回結果四個階段。而多線程處理 IO 任務的目的,就是為了充分利用當前服務器的多核特性,使用多核運行多線程,讓多線程幫助加速命令讀取、命令解析以及數(shù)據(jù)寫回的速度,提升 Redis 整體性能。
源碼地址:https://github.com/redis/redis/tree/6.0/src
1. 基本步驟
1.1. 輸入、輸出緩沖區(qū)
為了避免客戶端和服務器端的請求發(fā)送和處理速度不匹配,服務器端給每個連接的客戶端都設置了一個輸入緩沖區(qū)和輸出緩沖區(qū),我們稱之為客戶端輸入緩沖區(qū)和輸出緩沖區(qū)。
輸入緩沖區(qū)會先把客戶端發(fā)送過來的命令暫存起來,Redis 主線程再從輸入緩沖區(qū)中讀取命令,進行處理。當 Redis 主線程處理完數(shù)據(jù)后,會把結果寫入到輸出緩沖區(qū),再通過輸出緩沖區(qū)返回給客戶端,如下圖所示:

1.2. 多線程處理網(wǎng)絡請求
引入多I/O線程優(yōu)化之后,為了避免多線程訪問共享資源造成的線程安全問題,執(zhí)行命令階段仍然是在主線程中執(zhí)行的,而I/O線程只是在讀取客戶端請求、解析命令、將命令執(zhí)行結果返回給客戶端的時候起作用。而且,在同一時刻,讀寫客戶端命令操作和執(zhí)行命令操作只有一種在運行。下面用一張表格來簡單描述下這個過程:
| 主線程 | I/O線程 | |
|---|---|---|
| T1 | 接收客戶端連接,建立連接socket | |
| T2 | 把連接socket分配給I/O線程 | |
| T3 | 等待I/O線程讀取、解析命令 | |
| T4 | 讀取命令 | |
| T5 | 解析命令 | |
| T6 | 執(zhí)行命令 | |
| T7 | 將結果寫到輸出緩沖區(qū) | |
| T8 | 等待I/O線程寫回客戶端 | |
| T9 | 將緩沖區(qū)數(shù)據(jù)寫回客戶端 | |
| T10 | I/O線程寫回客戶端完成,等待后續(xù)請求 |
2. 源碼解析
2.1. 數(shù)據(jù)結構
在Redis中,全局變量都會保存在redisServer結構體類型的變量server中(在server.h文件中),我們的RDB、AOF、主從等配置都是在這個server變量中保存的,而多I/O線程中有一個全局變量io_threads_active,來表示Redis是否開啟了多I/O線程:
- server.io_threads_active = 0,未啟動多I/O線程。
- server.io_threads_active = 1,啟動多I/O線程。
server中還有一個變量io_threads_num保存I/O線程的數(shù)量。
server中有兩個 List 類型的成員變量:clients_pending_write 和 clients_pending_read,它們分別記錄了待寫回數(shù)據(jù)的客戶端和待讀取數(shù)據(jù)的客戶端,如下所示:
struct redisServer {
...
list *clients_pending_write; //待寫回數(shù)據(jù)的客戶端
list *clients_pending_read; //待讀取數(shù)據(jù)的客戶端
...
}
在networking.c文件中,定義了四個數(shù)組,用來保存多I/O線程的相關數(shù)據(jù):
- io_threads_list 數(shù)組:每個元素是一個List類型的列表,列表保存了每個線程待處理的客戶端,比如io_threads_list[0]保存了0號線程要處理的客戶端列表;
- io_threads_pending 數(shù)組:保存等待每個 IO 線程處理的客戶端個數(shù);
- io_threads_mutex 數(shù)組:保存線程互斥鎖,可以對標Java的ReentrantLock,到后面會介紹這個數(shù)組的作用;
- io_threads 數(shù)組:保存每個 IO 線程。
pthread_t io_threads[IO_THREADS_MAX_NUM]; //記錄線程描述符的數(shù)組
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; //記錄線程互斥鎖的數(shù)組
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; //記錄線程待處理的客戶端個數(shù)
list *io_threads_list[IO_THREADS_MAX_NUM]; //記錄線程對應處理的客戶端
networking.c還定義了一個int型的變量io_threads_op,表示I/O線程當前要執(zhí)行的操作是讀操作還是寫操作,這個變量也表明所有的I/O線程要么都在讀,要么都在寫,不可能一部分I/O線程在讀,一部分I/O線程在寫,該變量有兩個值:
- IO_THREADS_OP_WRITE:這表明該 IO 線程要做的是寫操作。
- IO_THREADS_OP_READ:這表明該 IO 線程要做的是讀操作。
每一次客戶端連接請求進來的時候,redis都會為這個客戶端創(chuàng)建一個client變量,而client有一個屬性flags,flags標記了當前客戶端的狀態(tài),這次僅分析和多I/O線程有關的三個狀態(tài):
- CLIENT_PENDING_READ:有命令等待被讀取
- CLIENT_PENDING_WRITE:等待被寫回
- CLIENT_PENDING_COMMAND:命令已經(jīng)被解析,等待被執(zhí)行
2.2. 初始化
redis啟動的main函數(shù)(在server.c文件中)會執(zhí)行server的初始化過程,server 在初始化過程的最后,調(diào)用 InitSeverLast 函數(shù),而 InitServerLast 函數(shù)再進一步調(diào)用 initThreadedIO 函數(shù)(在networking.c文件中)來完成多I/O線程的初始化操作。具體如下所示:
void InitServerLast() {
bioInit();
initThreadedIO(); //調(diào)用initThreadedIO函數(shù)初始化IO線程
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
下面來看下initThreadedIO函數(shù)的主要執(zhí)行流程,他主要分為3步:
- 設置激活I/O線程的標志為未啟動,即server.io_threads_active = 0。
- 用兩個if來判斷是否推出方法:設置的線程數(shù)是否為1;設置的線程數(shù)是否超過最大閾值。
- 初始化I/O線程,即上面提到的networking.c文件中定義的有關I.O線程的數(shù)據(jù)結構。
具體如下:
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
//1. 設置激活I/O線程的標志為未啟動
server.io_threads_active = 0; /* We start with threads not active. */
//2. 設置的線程數(shù)是否為1
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
//2. 設置的線程數(shù)是否超過最大閾值
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
//3.初始化I/O線程
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
在初始化I/O線程的時候,會調(diào)用pthread_create函數(shù)來創(chuàng)建I/O線程,pthread_create可以對標Java的 new Thread(runnable),然后IOThreadMain函數(shù)就是I/O線程執(zhí)行的主函數(shù),I/O線程獲取客戶端列表,然后根據(jù)當前I/O線程的操作類型,來執(zhí)行讀取命令或?qū)懟乜蛻舳瞬僮?。IOThreadMain還有一些線程同步的操作等到后面再討論。
void *IOThreadMain(void *myid) {
......
while(1) {
......
//獲取IO線程要處理的客戶端列表
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
//如果線程操作是寫操作,則將數(shù)據(jù)寫回客戶端
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
//如果線程操作是讀操作,則從客戶端讀取數(shù)據(jù)
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
2.3. 讀取命令
在http://www.itdecent.cn/p/0323fc06a36f分析命令執(zhí)行過程中有提到,redis是在readQueryFromClient函數(shù)(networking.c文件)中執(zhí)行命令讀取操作的,而在readQueryFromClient函數(shù)一開始,會判斷是否開啟了I/O線程,
void readQueryFromClient(connection *conn) {
......
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
}
而在postponeClientRead函數(shù)中,有五個判斷條件,分別是:
- I/O線程被激活,即server.io_threads_active = 1,在2.2分析初始化的時候,io_threads_active 是被設置為0的,而具體什么時候會把他置為1,會在2.5分析線程間同步的時候再討論。
- I/O線程可以用于讀取命令,這個變量值是在 Redis 配置文件 redis.conf 中,通過配置項 io-threads-do-reads 設置的,默認值為 no,如果想用多 IO 線程處理客戶端讀操作,就需要把 io-threads-do-reads 配置項設為 yes。
- 客戶端沒有被暫停。
- processEventsWhileBlokced 函數(shù)沒有在執(zhí)行。
- 客戶端現(xiàn)有標識不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ,前面兩個常量是關于主從的,這里先不討論,CLIENT_PENDING_READ前面有提到過。
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!clientsArePaused() &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
如果五個條件都滿足的話,就把客戶端的狀態(tài)設置為CLIENT_PENDING_READ,并把當前客戶端添加到server.clients_pending_read列表中。
也就是說當Redis接收到客戶端請求時,如果I/O線程被激活,并不會直接讀取命令,而是把client記為CLIENT_PENDING_READ,并把client添加到“等待被讀取客戶端”clients_pending_read列表中就直接返回了。這個操作是在主線程中進行的。
分析Redis命令處理過程中有提到,Redis會在一個循環(huán)中接收客戶端請求,在阻塞等待客戶端請求到來之前,會調(diào)用beforesleep函數(shù),進而調(diào)用handleClientsWithPendingReadsUsingThreads函數(shù)來處理等待被讀取的客戶端,該函數(shù)主要邏輯分為四步:
- 判定 IO 線程是否激活,以及用戶是否設置了 Redis 可以用 IO 線程處理待讀客戶端,如果不滿足條件直接返回,這兩個判斷和postponeClientRead函數(shù)中的第1、2個條件相同。
- 取出等待被讀取的客戶端,以輪詢的方式分配給各個I/O線程。
- 主線程把自己該讀取的客戶端中的命令,先讀取、解析完。這里要說明的一點就是主線程就是0號線程也會參與I/O操作,并且讀取的是io_threads_list[0]中的元素。
- 當I/O線程讀取、解析完成之后,執(zhí)行server.clients_pending_read列表中所有的客戶端命令。
int handleClientsWithPendingReadsUsingThreads(void) {
//1. 判定 IO 線程是否激活,以及用戶是否設置了 Redis 可以用 IO 線程處理待讀客戶端
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
......
//2. 取出等待被讀取的客戶端,以輪詢的方式分配給各個I/O線程。
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
......
//3.主線程把自己該讀取的客戶端中的命令,先讀取、解析完
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
//等待I/O線程解析完所有的命令
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
//4.執(zhí)行所有的命令
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
......
//執(zhí)行所有命令
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
......
}
......
return processed;
}
這里再重新說一下readQueryFromClient函數(shù),剛開始該函數(shù)只是把客戶端添加到全局列表中,并標記client.flags 為 CLIENT_PENDING_READ,就直接返回了。這個時候再進入這個函數(shù),就會進入后面的讀取邏輯,進而調(diào)用processInputBuffer解析命令。這一部分邏輯可以參考http://www.itdecent.cn/p/0323fc06a36f。
而processInputBuffer函數(shù)的最后,會判斷當前客戶端client.flags是否為CLIENT_PENDING_READ,如果是的話,就把該客戶端標識設置為CLIENT_PENDING_COMMAND,即解析完客戶端的命令,就直接返回了,并沒有執(zhí)行客戶端的命令。
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
2.4. 寫回客戶端
在2.3中分析的handleClientsWithPendingReadsUsingThreads函數(shù)中,當所有客戶端的命令都解析完成以后,循環(huán)遍歷所有的客戶端,調(diào)用processPendingCommandsAndResetClient函數(shù)執(zhí)行客戶端命令,而processPendingCommandsAndResetClient會進而調(diào)用processCommand(在server.c文件中)函數(shù)來處理客戶端命令,處理完客戶端命令后,調(diào)用addReply函數(shù)(在networking.c文件中)寫回客戶端。
而addReply函數(shù)一開始,會調(diào)用 prepareClientToWrite 函數(shù),來判斷是否使用I/O線程來寫回客戶端:
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
...
}
prepareClientToWrite函數(shù)前面一些主從的判斷這里先忽略,在最后會調(diào)用 clientHasPendingReplies 函數(shù),判斷當前客戶端是否還有留存在輸出緩沖區(qū)中的數(shù)據(jù)等待寫回。在前面2.3最后解析完客戶端命令之后,就會把client.flags設置為CLIENT_PENDING_COMMAND,所以第二個條件也會滿足
如果沒有的話,那么,prepareClientToWrite 就會調(diào)用 clientInstallWriteHandler 函數(shù),再進一步判斷能否推遲該客戶端寫操作。
int prepareClientToWrite(client *c) {
......
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
}
clientInstallWriteHandler函數(shù)中,忽略掉主從相關判斷,就判斷c->flags 是否為 CLIENT_PENDING_WRITE,而這個時候client.flags為CLIENT_PENDING_COMMAND,滿足條件。把c->flags設置為CLIENT_PENDING_WRITE,并把當前client添加到全局變量server.clients_pending_write中,就返回了。
void clientInstallWriteHandler(client *c) {
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
prepareClientToWrite函數(shù)返回之后,addReply函數(shù)會進一步把寫回給客戶端的數(shù)據(jù)寫到輸出緩沖區(qū)中,這時候并沒有真正的把數(shù)據(jù)寫回給客戶端。
在2.3中提到Redis在循環(huán)接收客戶端請求時,會調(diào)用beforesleep函數(shù)會調(diào)用handleClientsWithPendingReadsUsingThreads函數(shù)讀取、解析客戶端請求并執(zhí)行客戶端命令,然后beforeSleep函數(shù)會調(diào)用handleClientsWithPendingWritesUsingThreads函數(shù)來把緩沖區(qū)中的數(shù)據(jù)寫回客戶端。
handleClientsWithPendingWritesUsingThreads函數(shù)的執(zhí)行邏輯大致可以分為四步:
- 判斷是否需要使用I/O線程來寫回客戶端。
- 把待寫客戶端,按照輪詢方式分配給 I/O 線程,添加到 io_threads_list 數(shù)組各元素中。
- 主 I/O 線程處理其待寫客戶端,并執(zhí)行 while(1) 循環(huán)等待所有 I/O 線程完成處理。
- 再次檢查是否還有緩沖區(qū)的數(shù)據(jù)未被寫回客戶端。這里是通過事件驅(qū)動框架來將緩沖區(qū)的數(shù)據(jù)寫回客戶端的,具體就不詳細討論了。
int handleClientsWithPendingWritesUsingThreads(void) {
......
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
//1. 判斷是否需要使用I/O線程來寫回客戶端。
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
......
//2. 把待寫客戶端,按照輪詢方式分配給 IO 線程
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
......
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
//3. 主I/O線程處理其待寫客戶端
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//等待所有 I/O 線程完成處理
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
//4.再次檢查是否還有緩沖區(qū)的數(shù)據(jù)未被寫回客戶端
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
......
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
2.5. 線程間同步
2.5.1 啟動I/O線程
在前面2.1分析數(shù)據(jù)結構的時候,有一個互斥鎖數(shù)組io_threads_mutex,數(shù)組里面的每一個元素就是一把I/O線程對應的互斥鎖。
在2.2初始化I/O線程的時候,initThreadedIO函數(shù)中,循環(huán)創(chuàng)建io_threads_num個線程,在真正調(diào)用pthread_create創(chuàng)建線程之前,主線程會先調(diào)用pthread_mutex_lock函數(shù),獲取到這個I/O線程對應的鎖,具體代碼如下:
void initThreadedIO(void) {
......
//3.初始化I/O線程
for (int i = 0; i < server.io_threads_num; i++) {
......
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
......
}
io_threads[i] = tid;
}
}
而I/O線程執(zhí)行的主函數(shù)中,首先會循環(huán)100w次,來等待I/O線程任務的到來,如果循環(huán)100w次還沒有任務到來的時候,就會調(diào)用pthread_mutex_lock獲取該I/O線程的互斥鎖,而這把互斥鎖在該線程創(chuàng)建之前,已經(jīng)被主線程拿到了,所以I/O線程就阻塞在這里,等待被喚醒。就算I/O線程獲取到互斥鎖,也會立刻釋放掉,讓主線程可以隨時停止I/O線程。
void *IOThreadMain(void *myid) {
......
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
}
}
在2.4寫回客戶端時候,handleClientsWithPendingWritesUsingThreads函數(shù)首先會進行兩個判斷,如下所示:
int handleClientsWithPendingWritesUsingThreads(void) {
......
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();
}
第一個if判斷:如果設置的I/O線程數(shù)量為1(io_threads_num通過redis.conf配置),并且有必要停止I/O線程的話,就使用單線程的方式執(zhí)行寫回操作。
而stopThreadedIOIfNeeded會判斷當前等待被處理的客戶端數(shù)量pending,是否小于server.io_threads_num*2,如果小于的話,就會停止I/O線程,停止I/O線程同樣是主線程獲取該I/O線程的互斥鎖。
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
void stopThreadedIO(void) {
......
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
server.io_threads_active = 0;
}
第二個if判斷:判斷server.io_threads_active的值,由于在初始化的時候,該值是被設為0的,并且走到這個判斷,就說明客戶端數(shù)量pending > server.io_threads_num*2,這個時候會調(diào)用startThreadedIO函數(shù)來啟動I/O線程,startThreadxianedIO函數(shù)里面,其實就是釋放掉每個I/O線程的互斥鎖,這樣I/O線程就可以獲取到互斥鎖繼續(xù)執(zhí)行了。
void startThreadxianedIO(void) {
......
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
}
2.5.2 寫回客戶端同步
在前面2.4已經(jīng)介紹過寫回客戶端函數(shù)handleClientsWithPendingWritesUsingThreads的執(zhí)行邏輯,他是在主線程中執(zhí)行的,而在2.2初始化時候也討論過I/O線程執(zhí)行的主函數(shù)IOThreadMain的主要邏輯。
在這兩個線程的執(zhí)行方法中,有兩個變量是都會被這兩個線程進行讀寫操作的,io_threads_list和io_threads_pending,io_threads_list存放每個I/O線程需要處理的客戶端列表,io_threads_pending保存每個I/O線程待處理客戶端的數(shù)量。
下面通過一張表格來分析主線程和I/O線程是否會出現(xiàn)線程安全問題:

下面簡單分析下上面這個表格:
- T1、T2時刻對應了I/O線程的創(chuàng)建,即2.5.1討論的過程
- T3、T4時刻,在I/O線程被啟動之后,主線程首先會給io_threads_list添加client,而此時I/O線程在循環(huán)判斷io_threads_pending是否等于0。
- T5時刻主線程設置了I/O線程的操作類型為寫操作。
- T6時刻主線程設置了I/O線程需要處理的客戶端數(shù)量之后,I/O線程才開始繼續(xù)執(zhí)行while循環(huán)里面后續(xù)操作。
- T7時刻主線程循環(huán)判斷io_threads_pending是否等于0,而這時I/O線程在寫回客戶端。
- T8、T9時刻I/O線程清空客戶端列表,并設置io_threads_pending = 0,主線程才退出循環(huán)繼續(xù)執(zhí)行后續(xù)操作。
分析讀取客戶端命令的同步操作,其實和上面本小節(jié)分析的差不多,因此不再累述了,如果有問題的話,可以留言一起討論。
雖然兩個線程都有對io_threads_list和io_threads_pending變量的讀、寫操作,但是在同一時刻,只有一個線程在寫其中的一個變量,因此并不會出現(xiàn)線程安全問題,真是藝高人膽大啊,希望有朝一日我也能寫出如此騷氣的代碼,繼續(xù)加油吧。
參考資料:
- 極客時間專欄《Redis源碼剖析與實戰(zhàn)》.蔣德鈞.2021
- 極客時間專欄《Redis核心技術與實戰(zhàn)》.蔣德鈞.2020
- Redis 6.0源碼:https://github.com/redis/redis/tree/6.0/src