Redis6.0多I/O線程實現(xiàn)原理

在 2020 年 5 月推出的 Redis 6.0 版本中,Redis 在執(zhí)行模型中還進一步使用了多線程來處理 IO 任務。之前在:http://www.itdecent.cn/p/0323fc06a36f 簡單討論過Redis執(zhí)行命令的過程大致分為:讀取命令、解析命令、執(zhí)行命令、返回結果四個階段。而多線程處理 IO 任務的目的,就是為了充分利用當前服務器的多核特性,使用多核運行多線程,讓多線程幫助加速命令讀取、命令解析以及數(shù)據(jù)寫回的速度,提升 Redis 整體性能。

源碼地址:https://github.com/redis/redis/tree/6.0/src

1. 基本步驟

1.1. 輸入、輸出緩沖區(qū)

為了避免客戶端和服務器端的請求發(fā)送和處理速度不匹配,服務器端給每個連接的客戶端都設置了一個輸入緩沖區(qū)和輸出緩沖區(qū),我們稱之為客戶端輸入緩沖區(qū)和輸出緩沖區(qū)。

輸入緩沖區(qū)會先把客戶端發(fā)送過來的命令暫存起來,Redis 主線程再從輸入緩沖區(qū)中讀取命令,進行處理。當 Redis 主線程處理完數(shù)據(jù)后,會把結果寫入到輸出緩沖區(qū),再通過輸出緩沖區(qū)返回給客戶端,如下圖所示:


1.png

1.2. 多線程處理網(wǎng)絡請求

引入多I/O線程優(yōu)化之后,為了避免多線程訪問共享資源造成的線程安全問題,執(zhí)行命令階段仍然是在主線程中執(zhí)行的,而I/O線程只是在讀取客戶端請求、解析命令、將命令執(zhí)行結果返回給客戶端的時候起作用。而且,在同一時刻,讀寫客戶端命令操作和執(zhí)行命令操作只有一種在運行。下面用一張表格來簡單描述下這個過程:

主線程 I/O線程
T1 接收客戶端連接,建立連接socket
T2 把連接socket分配給I/O線程
T3 等待I/O線程讀取、解析命令
T4 讀取命令
T5 解析命令
T6 執(zhí)行命令
T7 將結果寫到輸出緩沖區(qū)
T8 等待I/O線程寫回客戶端
T9 將緩沖區(qū)數(shù)據(jù)寫回客戶端
T10 I/O線程寫回客戶端完成,等待后續(xù)請求

2. 源碼解析

2.1. 數(shù)據(jù)結構

在Redis中,全局變量都會保存在redisServer結構體類型的變量server中(在server.h文件中),我們的RDB、AOF、主從等配置都是在這個server變量中保存的,而多I/O線程中有一個全局變量io_threads_active,來表示Redis是否開啟了多I/O線程:

  • server.io_threads_active = 0,未啟動多I/O線程。
  • server.io_threads_active = 1,啟動多I/O線程。

server中還有一個變量io_threads_num保存I/O線程的數(shù)量。

server中有兩個 List 類型的成員變量:clients_pending_write 和 clients_pending_read,它們分別記錄了待寫回數(shù)據(jù)的客戶端和待讀取數(shù)據(jù)的客戶端,如下所示:

struct redisServer {
...
list *clients_pending_write;  //待寫回數(shù)據(jù)的客戶端
list *clients_pending_read;  //待讀取數(shù)據(jù)的客戶端
...
}

在networking.c文件中,定義了四個數(shù)組,用來保存多I/O線程的相關數(shù)據(jù):

  • io_threads_list 數(shù)組:每個元素是一個List類型的列表,列表保存了每個線程待處理的客戶端,比如io_threads_list[0]保存了0號線程要處理的客戶端列表;
  • io_threads_pending 數(shù)組:保存等待每個 IO 線程處理的客戶端個數(shù);
  • io_threads_mutex 數(shù)組:保存線程互斥鎖,可以對標Java的ReentrantLock,到后面會介紹這個數(shù)組的作用;
  • io_threads 數(shù)組:保存每個 IO 線程。
pthread_t io_threads[IO_THREADS_MAX_NUM];   //記錄線程描述符的數(shù)組
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];  //記錄線程互斥鎖的數(shù)組
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];  //記錄線程待處理的客戶端個數(shù)
list *io_threads_list[IO_THREADS_MAX_NUM];  //記錄線程對應處理的客戶端

networking.c還定義了一個int型的變量io_threads_op,表示I/O線程當前要執(zhí)行的操作是讀操作還是寫操作,這個變量也表明所有的I/O線程要么都在讀,要么都在寫,不可能一部分I/O線程在讀,一部分I/O線程在寫,該變量有兩個值:

  • IO_THREADS_OP_WRITE:這表明該 IO 線程要做的是寫操作。
  • IO_THREADS_OP_READ:這表明該 IO 線程要做的是讀操作。

每一次客戶端連接請求進來的時候,redis都會為這個客戶端創(chuàng)建一個client變量,而client有一個屬性flags,flags標記了當前客戶端的狀態(tài),這次僅分析和多I/O線程有關的三個狀態(tài):

  • CLIENT_PENDING_READ:有命令等待被讀取
  • CLIENT_PENDING_WRITE:等待被寫回
  • CLIENT_PENDING_COMMAND:命令已經(jīng)被解析,等待被執(zhí)行

2.2. 初始化

redis啟動的main函數(shù)(在server.c文件中)會執(zhí)行server的初始化過程,server 在初始化過程的最后,調(diào)用 InitSeverLast 函數(shù),而 InitServerLast 函數(shù)再進一步調(diào)用 initThreadedIO 函數(shù)(在networking.c文件中)來完成多I/O線程的初始化操作。具體如下所示:

void InitServerLast() {
    bioInit();
    initThreadedIO();  //調(diào)用initThreadedIO函數(shù)初始化IO線程
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

下面來看下initThreadedIO函數(shù)的主要執(zhí)行流程,他主要分為3步:

  1. 設置激活I/O線程的標志為未啟動,即server.io_threads_active = 0。
  2. 用兩個if來判斷是否推出方法:設置的線程數(shù)是否為1;設置的線程數(shù)是否超過最大閾值。
  3. 初始化I/O線程,即上面提到的networking.c文件中定義的有關I.O線程的數(shù)據(jù)結構。

具體如下:

/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
    //1. 設置激活I/O線程的標志為未啟動
    server.io_threads_active = 0; /* We start with threads not active. */

    //2. 設置的線程數(shù)是否為1
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;
    //2. 設置的線程數(shù)是否超過最大閾值
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
    //3.初始化I/O線程
    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

在初始化I/O線程的時候,會調(diào)用pthread_create函數(shù)來創(chuàng)建I/O線程,pthread_create可以對標Java的 new Thread(runnable),然后IOThreadMain函數(shù)就是I/O線程執(zhí)行的主函數(shù),I/O線程獲取客戶端列表,然后根據(jù)當前I/O線程的操作類型,來執(zhí)行讀取命令或?qū)懟乜蛻舳瞬僮?。IOThreadMain還有一些線程同步的操作等到后面再討論。

void *IOThreadMain(void *myid) {
    ......
    while(1) {
        ......
        //獲取IO線程要處理的客戶端列表
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                //如果線程操作是寫操作,則將數(shù)據(jù)寫回客戶端
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                //如果線程操作是讀操作,則從客戶端讀取數(shù)據(jù)
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

2.3. 讀取命令

http://www.itdecent.cn/p/0323fc06a36f分析命令執(zhí)行過程中有提到,redis是在readQueryFromClient函數(shù)(networking.c文件)中執(zhí)行命令讀取操作的,而在readQueryFromClient函數(shù)一開始,會判斷是否開啟了I/O線程,

void readQueryFromClient(connection *conn) {
    ......
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
}

而在postponeClientRead函數(shù)中,有五個判斷條件,分別是:

  1. I/O線程被激活,即server.io_threads_active = 1,在2.2分析初始化的時候,io_threads_active 是被設置為0的,而具體什么時候會把他置為1,會在2.5分析線程間同步的時候再討論。
  2. I/O線程可以用于讀取命令,這個變量值是在 Redis 配置文件 redis.conf 中,通過配置項 io-threads-do-reads 設置的,默認值為 no,如果想用多 IO 線程處理客戶端讀操作,就需要把 io-threads-do-reads 配置項設為 yes。
  3. 客戶端沒有被暫停。
  4. processEventsWhileBlokced 函數(shù)沒有在執(zhí)行。
  5. 客戶端現(xiàn)有標識不能有 CLIENT_MASTER、CLIENT_SLAVE 和 CLIENT_PENDING_READ,前面兩個常量是關于主從的,這里先不討論,CLIENT_PENDING_READ前面有提到過。
/* Return 1 if we want to handle the client read later using threaded I/O.
 * This is called by the readable handler of the event loop.
 * As a side effect of calling this function the client is put in the
 * pending read clients and flagged as such. */
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !clientsArePaused() &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

如果五個條件都滿足的話,就把客戶端的狀態(tài)設置為CLIENT_PENDING_READ,并把當前客戶端添加到server.clients_pending_read列表中。

也就是說當Redis接收到客戶端請求時,如果I/O線程被激活,并不會直接讀取命令,而是把client記為CLIENT_PENDING_READ,并把client添加到“等待被讀取客戶端”clients_pending_read列表中就直接返回了。這個操作是在主線程中進行的。
分析Redis命令處理過程中有提到,Redis會在一個循環(huán)中接收客戶端請求,在阻塞等待客戶端請求到來之前,會調(diào)用beforesleep函數(shù),進而調(diào)用handleClientsWithPendingReadsUsingThreads函數(shù)來處理等待被讀取的客戶端,該函數(shù)主要邏輯分為四步:

  1. 判定 IO 線程是否激活,以及用戶是否設置了 Redis 可以用 IO 線程處理待讀客戶端,如果不滿足條件直接返回,這兩個判斷和postponeClientRead函數(shù)中的第1、2個條件相同。
  2. 取出等待被讀取的客戶端,以輪詢的方式分配給各個I/O線程。
  3. 主線程把自己該讀取的客戶端中的命令,先讀取、解析完。這里要說明的一點就是主線程就是0號線程也會參與I/O操作,并且讀取的是io_threads_list[0]中的元素。
  4. 當I/O線程讀取、解析完成之后,執(zhí)行server.clients_pending_read列表中所有的客戶端命令。
int handleClientsWithPendingReadsUsingThreads(void) {
    //1. 判定 IO 線程是否激活,以及用戶是否設置了 Redis 可以用 IO 線程處理待讀客戶端
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    ......

    //2. 取出等待被讀取的客戶端,以輪詢的方式分配給各個I/O線程。
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    ......
    //3.主線程把自己該讀取的客戶端中的命令,先讀取、解析完
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    //等待I/O線程解析完所有的命令
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");
    //4.執(zhí)行所有的命令
    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        ......
        //執(zhí)行所有命令
        if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }
        ......
    }

    ......
    return processed;
}

這里再重新說一下readQueryFromClient函數(shù),剛開始該函數(shù)只是把客戶端添加到全局列表中,并標記client.flags 為 CLIENT_PENDING_READ,就直接返回了。這個時候再進入這個函數(shù),就會進入后面的讀取邏輯,進而調(diào)用processInputBuffer解析命令。這一部分邏輯可以參考http://www.itdecent.cn/p/0323fc06a36f。

而processInputBuffer函數(shù)的最后,會判斷當前客戶端client.flags是否為CLIENT_PENDING_READ,如果是的話,就把該客戶端標識設置為CLIENT_PENDING_COMMAND,即解析完客戶端的命令,就直接返回了,并沒有執(zhí)行客戶端的命令

/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
    c->flags |= CLIENT_PENDING_COMMAND;
    break;
}

2.4. 寫回客戶端

在2.3中分析的handleClientsWithPendingReadsUsingThreads函數(shù)中,當所有客戶端的命令都解析完成以后,循環(huán)遍歷所有的客戶端,調(diào)用processPendingCommandsAndResetClient函數(shù)執(zhí)行客戶端命令,而processPendingCommandsAndResetClient會進而調(diào)用processCommand(在server.c文件中)函數(shù)來處理客戶端命令,處理完客戶端命令后,調(diào)用addReply函數(shù)(在networking.c文件中)寫回客戶端。

而addReply函數(shù)一開始,會調(diào)用 prepareClientToWrite 函數(shù),來判斷是否使用I/O線程來寫回客戶端:

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;
    ...
}

prepareClientToWrite函數(shù)前面一些主從的判斷這里先忽略,在最后會調(diào)用 clientHasPendingReplies 函數(shù),判斷當前客戶端是否還有留存在輸出緩沖區(qū)中的數(shù)據(jù)等待寫回。在前面2.3最后解析完客戶端命令之后,就會把client.flags設置為CLIENT_PENDING_COMMAND,所以第二個條件也會滿足

如果沒有的話,那么,prepareClientToWrite 就會調(diào)用 clientInstallWriteHandler 函數(shù),再進一步判斷能否推遲該客戶端寫操作。

int prepareClientToWrite(client *c) {
    ......
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
            clientInstallWriteHandler(c);
    /* Authorize the caller to queue in the output buffer of this client. */
    return C_OK;
}

clientInstallWriteHandler函數(shù)中,忽略掉主從相關判斷,就判斷c->flags 是否為 CLIENT_PENDING_WRITE,而這個時候client.flags為CLIENT_PENDING_COMMAND,滿足條件。把c->flags設置為CLIENT_PENDING_WRITE,并把當前client添加到全局變量server.clients_pending_write中,就返回了。

void clientInstallWriteHandler(client *c) {
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write,c);
    }
}

prepareClientToWrite函數(shù)返回之后,addReply函數(shù)會進一步把寫回給客戶端的數(shù)據(jù)寫到輸出緩沖區(qū)中,這時候并沒有真正的把數(shù)據(jù)寫回給客戶端。

在2.3中提到Redis在循環(huán)接收客戶端請求時,會調(diào)用beforesleep函數(shù)會調(diào)用handleClientsWithPendingReadsUsingThreads函數(shù)讀取、解析客戶端請求并執(zhí)行客戶端命令,然后beforeSleep函數(shù)會調(diào)用handleClientsWithPendingWritesUsingThreads函數(shù)來把緩沖區(qū)中的數(shù)據(jù)寫回客戶端。
handleClientsWithPendingWritesUsingThreads函數(shù)的執(zhí)行邏輯大致可以分為四步:

  1. 判斷是否需要使用I/O線程來寫回客戶端。
  2. 把待寫客戶端,按照輪詢方式分配給 I/O 線程,添加到 io_threads_list 數(shù)組各元素中。
  3. 主 I/O 線程處理其待寫客戶端,并執(zhí)行 while(1) 循環(huán)等待所有 I/O 線程完成處理。
  4. 再次檢查是否還有緩沖區(qū)的數(shù)據(jù)未被寫回客戶端。這里是通過事件驅(qū)動框架來將緩沖區(qū)的數(shù)據(jù)寫回客戶端的,具體就不詳細討論了。
int handleClientsWithPendingWritesUsingThreads(void) {
    ......
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    //1. 判斷是否需要使用I/O線程來寫回客戶端。
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }
    ......
    //2. 把待寫客戶端,按照輪詢方式分配給 IO 線程
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        ......
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    //3. 主I/O線程處理其待寫客戶端
    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);
    //等待所有 I/O 線程完成處理
    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshed\n");

    /* Run the list of clients again to install the write handler where
     * needed. */
     //4.再次檢查是否還有緩沖區(qū)的數(shù)據(jù)未被寫回客戶端
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        ......
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}

2.5. 線程間同步

2.5.1 啟動I/O線程

在前面2.1分析數(shù)據(jù)結構的時候,有一個互斥鎖數(shù)組io_threads_mutex,數(shù)組里面的每一個元素就是一把I/O線程對應的互斥鎖。

在2.2初始化I/O線程的時候,initThreadedIO函數(shù)中,循環(huán)創(chuàng)建io_threads_num個線程,在真正調(diào)用pthread_create創(chuàng)建線程之前,主線程會先調(diào)用pthread_mutex_lock函數(shù),獲取到這個I/O線程對應的鎖,具體代碼如下:

void initThreadedIO(void) {
    ......
    //3.初始化I/O線程
    for (int i = 0; i < server.io_threads_num; i++) {
        ......
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            ......
        }
        io_threads[i] = tid;
    }
}

而I/O線程執(zhí)行的主函數(shù)中,首先會循環(huán)100w次,來等待I/O線程任務的到來,如果循環(huán)100w次還沒有任務到來的時候,就會調(diào)用pthread_mutex_lock獲取該I/O線程的互斥鎖,而這把互斥鎖在該線程創(chuàng)建之前,已經(jīng)被主線程拿到了,所以I/O線程就阻塞在這里,等待被喚醒。就算I/O線程獲取到互斥鎖,也會立刻釋放掉,讓主線程可以隨時停止I/O線程。

void *IOThreadMain(void *myid) {
    ......
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
    }
}

在2.4寫回客戶端時候,handleClientsWithPendingWritesUsingThreads函數(shù)首先會進行兩個判斷,如下所示:

int handleClientsWithPendingWritesUsingThreads(void) {
    ......
    /* If I/O threads are disabled or we have few clients to serve, don't
     * use I/O threads, but thejboring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();
}

第一個if判斷:如果設置的I/O線程數(shù)量為1(io_threads_num通過redis.conf配置),并且有必要停止I/O線程的話,就使用單線程的方式執(zhí)行寫回操作。

而stopThreadedIOIfNeeded會判斷當前等待被處理的客戶端數(shù)量pending,是否小于server.io_threads_num*2,如果小于的話,就會停止I/O線程,停止I/O線程同樣是主線程獲取該I/O線程的互斥鎖。

int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    /* Return ASAP if IO threads are disabled (single threaded mode). */
    if (server.io_threads_num == 1) return 1;

    if (pending < (server.io_threads_num*2)) {
        if (server.io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

void stopThreadedIO(void) {
    ......
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_lock(&io_threads_mutex[j]);
    server.io_threads_active = 0;
}

第二個if判斷:判斷server.io_threads_active的值,由于在初始化的時候,該值是被設為0的,并且走到這個判斷,就說明客戶端數(shù)量pending > server.io_threads_num*2,這個時候會調(diào)用startThreadedIO函數(shù)來啟動I/O線程,startThreadxianedIO函數(shù)里面,其實就是釋放掉每個I/O線程的互斥鎖,這樣I/O線程就可以獲取到互斥鎖繼續(xù)執(zhí)行了。

void startThreadxianedIO(void) {
    ......
    for (int j = 1; j < server.io_threads_num; j++)
        pthread_mutex_unlock(&io_threads_mutex[j]);
    server.io_threads_active = 1;
}

2.5.2 寫回客戶端同步

在前面2.4已經(jīng)介紹過寫回客戶端函數(shù)handleClientsWithPendingWritesUsingThreads的執(zhí)行邏輯,他是在主線程中執(zhí)行的,而在2.2初始化時候也討論過I/O線程執(zhí)行的主函數(shù)IOThreadMain的主要邏輯。

在這兩個線程的執(zhí)行方法中,有兩個變量是都會被這兩個線程進行讀寫操作的,io_threads_listio_threads_pending,io_threads_list存放每個I/O線程需要處理的客戶端列表,io_threads_pending保存每個I/O線程待處理客戶端的數(shù)量。

下面通過一張表格來分析主線程和I/O線程是否會出現(xiàn)線程安全問題:


3.png

下面簡單分析下上面這個表格:

  • T1、T2時刻對應了I/O線程的創(chuàng)建,即2.5.1討論的過程
  • T3、T4時刻,在I/O線程被啟動之后,主線程首先會給io_threads_list添加client,而此時I/O線程在循環(huán)判斷io_threads_pending是否等于0。
  • T5時刻主線程設置了I/O線程的操作類型為寫操作。
  • T6時刻主線程設置了I/O線程需要處理的客戶端數(shù)量之后,I/O線程才開始繼續(xù)執(zhí)行while循環(huán)里面后續(xù)操作。
  • T7時刻主線程循環(huán)判斷io_threads_pending是否等于0,而這時I/O線程在寫回客戶端。
  • T8、T9時刻I/O線程清空客戶端列表,并設置io_threads_pending = 0,主線程才退出循環(huán)繼續(xù)執(zhí)行后續(xù)操作。

分析讀取客戶端命令的同步操作,其實和上面本小節(jié)分析的差不多,因此不再累述了,如果有問題的話,可以留言一起討論。

雖然兩個線程都有對io_threads_list和io_threads_pending變量的讀、寫操作,但是在同一時刻,只有一個線程在寫其中的一個變量,因此并不會出現(xiàn)線程安全問題,真是藝高人膽大啊,希望有朝一日我也能寫出如此騷氣的代碼,繼續(xù)加油吧。
參考資料:

  1. 極客時間專欄《Redis源碼剖析與實戰(zhàn)》.蔣德鈞.2021
  2. 極客時間專欄《Redis核心技術與實戰(zhàn)》.蔣德鈞.2020
  3. Redis 6.0源碼:https://github.com/redis/redis/tree/6.0/src
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容