死磕Redis5.0之事務(wù)

Redis 事務(wù)簡(jiǎn)介

???????Redis 通過(guò) MULTI、EXEC、WATCH 等命令來(lái)實(shí)現(xiàn)事務(wù)功能。事務(wù)提供了一種將多個(gè)命令請(qǐng)求打包,然后一次性、按順序的執(zhí)行多個(gè)命令的機(jī)制,并且在事務(wù)執(zhí)行期間,服務(wù)器不會(huì)中斷事務(wù)而改去執(zhí)行其他客戶端的命令請(qǐng)求,它會(huì)將事務(wù)中的所有命令都執(zhí)行完畢,然后才去處理其他客戶端的命令請(qǐng)求。一個(gè)事務(wù)從開始到結(jié)束通常會(huì)經(jīng)歷以下三個(gè)階段:

  1. 事務(wù)開始
  2. 命令入隊(duì)
  3. 事務(wù)執(zhí)行
    文章也會(huì)重點(diǎn)圍繞上面三個(gè)步驟來(lái)講的。

事務(wù)組成

???????Redis要想了解 Redis 中的事務(wù)自然要了解 Redis 是如何實(shí)現(xiàn)事務(wù)的,那么我們肯定也就要知道 Redis 用了那些結(jié)構(gòu)來(lái)存儲(chǔ)我們的事務(wù)的,下面我們就來(lái)看看事務(wù)的組成。

/*
 * todo: client 結(jié)構(gòu)體
 *
 * With multiplexing we need to take per-client state.
 * Clients are taken in a linked list.
 */
typedef struct client {
    ...
    // 標(biāo)記,如果有事務(wù)進(jìn)來(lái) flags |= CLIENT_MULTI 追加事務(wù)狀態(tài)
    int flags;              
    
    ...
    // 事務(wù)結(jié)構(gòu)體
    multiState mstate;      /* MULTI/EXEC state */

    ...
    
    // 使用 watch 監(jiān)控的所有 key
    list *watched_keys; 
    
    ...
} client;

???????Redis Redis 會(huì)為每個(gè)客戶端創(chuàng)建一個(gè) client 結(jié)構(gòu)體,client 結(jié)構(gòu)體里會(huì)存儲(chǔ)當(dāng)前客戶端的一些信息,而我們的事務(wù)信息也會(huì)保存在里面,下面我們?cè)敿?xì)講解和事務(wù)相關(guān)的幾個(gè)字段。

  1. flags :字段采用位運(yùn)算記錄很多狀態(tài),當(dāng)我們標(biāo)記事務(wù)狀態(tài)的時(shí)候只需要將 flags |= CLIENT_MULTI 即可追加事務(wù)狀態(tài)。
  2. mstate :事務(wù)狀態(tài)結(jié)構(gòu)體,里面會(huì)存儲(chǔ)我們的命令列表
  3. watched_keys :看名字我們就知道,這個(gè)肯定是用監(jiān)控事務(wù)中 key 變化的列表
    現(xiàn)在我們來(lái)看看 multiState 這個(gè)結(jié)構(gòu)體里面到底存儲(chǔ)了哪些東西吧。
redis> multi
ok

redis> set "name" "practical common lisp"
queued

redis> get "name"
queue

redis> set "author" "peter seibel"
queued

redis> get "author"
queued

上面的命令服務(wù)器將為客戶端創(chuàng)建下圖所示的事務(wù)狀態(tài):


image.png
/**
 * 事務(wù)狀態(tài)結(jié)構(gòu)體
 */
typedef struct multiState {
    /**
     * 事務(wù)中命令列表
     */
    multiCmd *commands;     
    /**
     * 事務(wù)隊(duì)列里面命令的個(gè)數(shù)
     */
    int count;              
    /**
     * 用于同步復(fù)制
     */
    int minreplicas;        
    /**
     * 同步復(fù)制超時(shí)時(shí)間
     */
    time_t minreplicas_timeout; 
} multiState;

???????Redis 通過(guò)上面我們可以看到,multiState 保存了我們事務(wù)中所有的命令列表,一旦我們發(fā)送提交事務(wù)的命令,那么 Redis 就會(huì)從 multiState 拿到事務(wù)中所有的命令,然后依次執(zhí)行。上面 commands 保存的是 multiCmd 結(jié)構(gòu)體,而這個(gè)結(jié)構(gòu)體里面就保存了我們命令要執(zhí)行的一些信息。

/**
  * 客戶端事務(wù)命令結(jié)構(gòu)體
  * Client MULTI/EXEC state 
  */
typedef struct multiCmd {
    /**
     * 命令執(zhí)行的參數(shù)列表
     */
    robj **argv;
    /**
     * 命令執(zhí)行的參數(shù)的個(gè)數(shù)
     */
    int argc;
    /**
     * 具體要執(zhí)行的命令指針
     */
    struct redisCommand *cmd;
} multiCmd;

???????Redis 為了更好的理解上面的結(jié)構(gòu),我們可以添加幾條命令,看看 Redis 到底是如何使用這幾個(gè)結(jié)構(gòu)體來(lái)存儲(chǔ)我們的事務(wù)命令的。

事務(wù)開始

???????Redis 事務(wù)是從 multi 命令開始的,那么我們看看輸入 multi 命令,Redis 到底做了哪些操作。我們知道一個(gè) multi 命令在 Redis 里面就對(duì)應(yīng)了一個(gè) multiCommand 方法,那么我就找到該方法一探究竟吧。

/**
 * 開啟一個(gè)事務(wù)的命令
 */
void multiCommand(client *c) {
    // 事務(wù)不支持嵌套(不支持事務(wù)里面再包含事務(wù))
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 將客戶端的 flags 標(biāo)志添加一個(gè)事務(wù)標(biāo)志
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}

???????Redis 從上面我們可以看出,我們使用 multi 開啟一個(gè)事務(wù)的時(shí)候,Redis 只是將當(dāng)前 client 的 flags 追加一個(gè)事務(wù)標(biāo)志。如果當(dāng)前客戶端已經(jīng)開啟了事務(wù),那么在當(dāng)前事務(wù)沒有結(jié)束之前是不允許再發(fā)送 multi 命令的。

事務(wù)入隊(duì)

???????Redis 從上面我們已經(jīng)知道 multi 命令后 Redis 是如何開啟一個(gè)事務(wù)的,也許現(xiàn)在很多人又會(huì)會(huì)很好奇,為什么我們輸入一個(gè) multi 命令后,redis 就會(huì)把 multi 之后的命令都加入命令隊(duì)列里面呢,下面我就來(lái)揭曉這個(gè)答案吧。我們來(lái)看一下所有 redis 命令的入口吧。

int processCommand(client *c) {
    
    ...

    /* Exec the command 這里就是事務(wù)命令執(zhí)行的地方 */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) {
        queueMultiCommand(c);
        addReply(c, shared.queued);
    } else {
        // todo: call 調(diào)用,這里面就會(huì)調(diào)用非事務(wù)命令的方法
        call(c, CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

???????Redis 通過(guò) processCommand 方法我們可以知道,當(dāng) client 狀態(tài) flags 為 CLIENT_MULTI 事務(wù)狀態(tài)的時(shí)候,并且,客戶端輸入的命令非 exec、discard、multi、watch 命令的時(shí)候,Redis 會(huì)將輸入的命令通過(guò) queueMultiCommand 方法加入事務(wù)隊(duì)列,然后向客戶端返回 shared.queued("QUEUED") 字符串。如果不是事物狀態(tài),那么 Redis 會(huì)馬上執(zhí)行我們輸入的命令,看到這里就知道為什么 multi 之后的命令都會(huì)加入命令隊(duì)列了吧??吹竭@里是否有意猶未盡之意,我們繼續(xù)往下看,Redis 的 queueMultiCommand 方法具體是如何實(shí)現(xiàn)的。

/**
 * 將新命令添加到MULTI命令隊(duì)列中
 * 閱讀該方法一定要有 C 語(yǔ)言基礎(chǔ),能看懂指針地址的賦值操作
 */
void queueMultiCommand(client *c) {
    // 事務(wù)命令指針,里面會(huì)指向真正要執(zhí)行的命令
    multiCmd *mc;
    int j;
    // 給新增的 multiCmd 計(jì)算內(nèi)存起始地址,在 commands 鏈表中加入新事務(wù)命令
    c->mstate.commands = zrealloc(c->mstate.commands,
                                  sizeof(multiCmd) * (c->mstate.count + 1));
    // 地址賦值,實(shí)際就是將 multiCmd 加入 mstate.commands 隊(duì)尾
    mc = c->mstate.commands + c->mstate.count;
    // 將客戶端的命令和參數(shù)賦值給事務(wù)命令結(jié)構(gòu)體,方便后面執(zhí)行
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj *) * c->argc);
    memcpy(mc->argv, c->argv, sizeof(robj *) * c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
}

???????Redis 就如上面這樣,Redis 會(huì)把 multi 之后的命令構(gòu)造成一個(gè) multiCmd 結(jié)構(gòu)添加到
mstate.commands 鏈表后面,方便后續(xù)執(zhí)行。代碼看的不過(guò)癮,我們還可以看看執(zhí)行的流程圖,方便大家理解。下圖就是上面代碼執(zhí)行的流程圖。


image.png

事務(wù)執(zhí)行

???????Redis 在事務(wù)執(zhí)行之前,我們不得不提一下 watch 命令。Redis 的官方文檔上說(shuō),WATCH 命令是為了讓 Redis 擁有 check-and-set(CAS) 的特性。CAS 的意思是,一個(gè)客戶端在修改某個(gè)值之前,要檢測(cè)它是否更改;如果沒有更改,修改操作才能成功。通過(guò)上面的 client 結(jié)構(gòu)體我們可以知道 watch 監(jiān)控的 key 是以鏈表的形式存儲(chǔ)在 Redis 的 client 結(jié)構(gòu)體中。具體如下圖所示:


image.png

監(jiān)視鍵值的過(guò)程:

/**
 * 這個(gè)就是 watch 命令執(zhí)行步驟
 */
void watchCommand(client *c) {
    int j;

    // 該命令只能出現(xiàn)在 multi 命令之前
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c, "WATCH inside MULTI is not allowed");
        return;
    }
    // 監(jiān)控指定的 key
    for (j = 1; j < c->argc; j++)
        // todo: 實(shí)際監(jiān)控 key 的操作 
        watchForKey(c, c->argv[j]);
    // 像客戶端緩沖區(qū)返回 ok
    addReply(c, shared.ok);
}

???????Redis 有一些前置操作,比如檢測(cè) watch 命令是否在 multi 命令之前,如果不是則直接報(bào)錯(cuò),實(shí)際監(jiān)控 key 的還是 watchForKey 方法,下面我們重點(diǎn)講解該方法。

/* 
 * Watch for the specified key 
 *
 * 監(jiān)控指定的 key
 */
void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    listRewind(c->watched_keys, &li);
    while ((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        // 條件滿足說(shuō)明該 key 已經(jīng)被 watched 了
        if (wk->db == c->db && equalStringObjects(key, wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    // 此DB中尚未監(jiān)視此 key 。 我們加上吧
    // 先從 c->db->watched_keys 中取出該 key 對(duì)應(yīng)的客戶端 client
    clients = dictFetchValue(c->db->watched_keys, key);
    // 如果該 client 為 null,則說(shuō)明該key 沒有被 client 監(jiān)控
    // 則需要在該key 后面創(chuàng)建一個(gè) client list 列表,用來(lái)保存
    // 監(jiān)控了該key 的客戶端 client
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys, key, clients);
        incrRefCount(key);
    }
    // 尾插法。將客戶端添加到鏈表尾部,實(shí)際服務(wù)端也會(huì)保存一份
    listAddNodeTail(clients, c);
    /* Add the new key to the list of keys watched by this client */
    // 將新 key 添加到此客戶端 watched(監(jiān)控) 的 key 列表中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    // 把 wk 賦值給指定的 client 的監(jiān)控key的結(jié)構(gòu)體中
    listAddNodeTail(c->watched_keys, wk);
}

???????Redis 當(dāng)客戶端鍵值被修改的時(shí)候,監(jiān)視該鍵值的所有客戶端都會(huì)被標(biāo)記為 REDISDIRTY-CAS,表示此該鍵值對(duì)被修改過(guò),因此如果這個(gè)客戶端已經(jīng)進(jìn)入到事務(wù)狀態(tài),它命令隊(duì)列中的命令是不會(huì)被執(zhí)行的。

???????Redis touchWatchedKey() 是標(biāo)記某鍵值被修改的函數(shù),它一般不被 signalModifyKey() 函數(shù)包裝。下面是 touchWatchedKey() 的實(shí)現(xiàn)。

// 標(biāo)記鍵值對(duì)的客戶端為REDIS_DIRTY_CAS,表示其所監(jiān)視的數(shù)據(jù)已經(jīng)被修改過(guò)
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
    // 獲取監(jiān)視key 的所有客戶端
    if (dictSize(db->watched_keys) == 0) return;
        clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;
        // 標(biāo)記監(jiān)視key 的所有客戶端REDIS_DIRTY_CAS
        /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
        /* Check if we are already watching for this key */
        listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);
        // REDIS_DIRTY_CAS 更改的時(shí)候會(huì)設(shè)置此標(biāo)記
        c->flags |= REDIS_DIRTY_CAS;
    }
}

???????Redis 當(dāng)用戶發(fā)出 EXEC 的時(shí)候,在它 MULTI 命令之后提交的所有命令都會(huì)被執(zhí)行。從代碼的實(shí)現(xiàn)來(lái)看,如果客戶端監(jiān)視的數(shù)據(jù)被修改,它會(huì)被標(biāo)記 REDIS_DIRTY_CAS,會(huì)調(diào)用 discardTransaction() 從而取消該事務(wù)。特別的,用戶開啟一個(gè)事務(wù)后會(huì)提交多個(gè)命令,如果命令在入隊(duì)過(guò)程中出現(xiàn)錯(cuò)誤,譬如提交的命令本身不存在,參數(shù)錯(cuò)誤和內(nèi)存超額等,都會(huì)導(dǎo)致客戶端被標(biāo)記 REDIS_DIRTY_EXEC,被標(biāo)記 REDIS_DIRTY_EXEC 會(huì)導(dǎo)致事務(wù)被取消。

因此總結(jié)一下:

  1. REDIS_DIRTY_CAS 更改的時(shí)候會(huì)設(shè)置此標(biāo)記
  2. REDIS_DIRTY_EXEC 命令入隊(duì)時(shí)出現(xiàn)錯(cuò)誤,此標(biāo)記會(huì)導(dǎo)致 EXEC 命令執(zhí)行失敗
    下面是執(zhí)行事務(wù)的過(guò)程:
/**
 * 執(zhí)行事務(wù)的命令
 */
void execCommand(client *c) {
    
    ...
    
    // 是否需要將MULTI/EXEC命令傳播到slave節(jié)點(diǎn)/AOF
    int must_propagate = 0; 
    int was_master = server.masterhost == NULL;

    // 事務(wù)有可能會(huì)被取消
    if (!(c->flags & CLIENT_MULTI)) {
        // 沒事事務(wù)可以執(zhí)行
        addReplyError(c, "EXEC without MULTI");
        return;
    }

    /* 
     * 停止執(zhí)行事務(wù)命令的情況::
     * 1. 有一些被監(jiān)控的 key 被修改了
     * 2. 由于命令隊(duì)列里面出現(xiàn)了錯(cuò)誤
     *
     * 第一種情況下失敗的EXEC返回一個(gè)多塊nil對(duì)象
     * 技術(shù)上它不是錯(cuò)誤,而是特殊行為,而在第二個(gè)中返回EXECABORT錯(cuò)誤
     */
    if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                    shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }

    /* 執(zhí)行所有排隊(duì)的命令 */

    // 取消對(duì)所有key的監(jiān)控,否則會(huì)浪費(fèi)CPU資源
    //  因?yàn)?redis 是單線程。所以不用擔(dān)心 key 再被修改了
    unwatchAllKeys(c); 
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c, c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        // todo: 遇到包含寫操作的命令需要將MULTI 命令寫入AOF 文件
        if (!must_propagate && !(c->cmd->flags & (CMD_READONLY | CMD_ADMIN))) {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }
        // 執(zhí)行我們事務(wù)隊(duì)列里面的命令
        call(c, CMD_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 清除事務(wù)狀態(tài)
    discardTransaction(c);
    
    ...

    handle_monitor:
  
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c, server.monitors, c->db->id, c->argv, c->argc);
}

如上所說(shuō),被監(jiān)視的鍵值被修改或者命令入隊(duì)出錯(cuò)都會(huì)導(dǎo)致事務(wù)被取消:

/**
 * 取消事務(wù),比如遇到事務(wù)中的語(yǔ)法錯(cuò)誤問(wèn)題
 */
void discardTransaction(client *c) {
    // 清空命令隊(duì)列
    freeClientMultiState(c);
    // 初始化命令隊(duì)列
    initClientMultiState(c);
    // 取消標(biāo)記flag
    c->flags &= ~(CLIENT_MULTI | CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC);
    // 取消對(duì) client 所有被監(jiān)控的 key
    unwatchAllKeys(c);
}

Redis 事務(wù)番外篇

你可能已經(jīng)注意到「事務(wù)」這個(gè)詞。在學(xué)習(xí)數(shù)據(jù)庫(kù)原理的時(shí)候有提到過(guò)事務(wù)的 ACID,即原子性、一致性、隔離性、持久性。接下來(lái),看看 Redis 事務(wù)是否支持 ACID。

  1. 原子性,即一個(gè)事務(wù)中的所有操作,要么全部完成,要么全部不完成,不會(huì)結(jié)束在中間某個(gè)環(huán)節(jié)。Redis 事務(wù)不支持原子性,最明顯的是 Redis 不支持回滾操作。一致性,在事務(wù)開始之前和事務(wù)結(jié)束以后,數(shù)據(jù)庫(kù)的完整性沒有被破壞。這一點(diǎn),Redis 事務(wù)能夠保證。

  2. 隔離性,當(dāng)兩個(gè)或者多個(gè)事務(wù)并發(fā)訪問(wèn)(此處訪問(wèn)指查詢和修改的操作)數(shù)據(jù)庫(kù)的同一數(shù)據(jù)時(shí)所表現(xiàn)出的相互關(guān)系。Redis 不存在多個(gè)事務(wù)的問(wèn)題,因?yàn)?Redis 是單進(jìn)程單線程的工作模式。

  3. 持久性,在事務(wù)完成以后,該事務(wù)對(duì)數(shù)據(jù)庫(kù)所作的更改便持久地保存在數(shù)據(jù)庫(kù)之中,并且是完全的。Redis 提供兩種持久化的方式,即 RDB 和 AOF。RDB 持久化只備份當(dāng)前內(nèi)存中的數(shù)據(jù)集,事務(wù)執(zhí)行完畢時(shí),其數(shù)據(jù)還在內(nèi)存中,并未立即寫入到磁盤,所以 RDB 持久化不能保證 Redis 事務(wù)的持久性。再來(lái)討論 AOF 持久化,Redis AOF 有后臺(tái)執(zhí)行和邊服務(wù)邊備份兩種方式。后臺(tái)執(zhí)行和 RDB 持久化類似,只能保存當(dāng)前內(nèi)存中的數(shù)據(jù)集;邊備份邊服務(wù)的方式中,因?yàn)?Redis 只是每間隔 2s 才進(jìn)行一次備份,因此它的持久性也是不完整的!

  4. 一致性:待補(bǔ)充

???????還有一個(gè)亮點(diǎn),就是 check-and-set CAS。一個(gè)修改操作不斷的判斷X 值是否已經(jīng)被修改,直到 X 值沒有被其他操作修改,才設(shè)置新的值。Redis 借助 WATCH/MULTI 命令來(lái)實(shí)現(xiàn) CAS 操作的。
???????實(shí)際操作中,多個(gè)線程嘗試修改一個(gè)全局變量,通常我們會(huì)用鎖,從讀取這個(gè)變量的時(shí)候就開始鎖住這個(gè)資源從而阻擋其他線程的修改,修改完畢后才釋放鎖,這是悲觀鎖的做法。相對(duì)應(yīng)的有一種樂(lè)觀鎖,樂(lè)觀鎖假定其他用戶企圖修改你正在修改的對(duì)象的概率很小,直到提交變更的時(shí)候才加鎖,讀取和修改的情況都不加鎖。一般情況下,不同客戶端會(huì)訪問(wèn)修改不同的鍵值對(duì),因此一般 check 一次就可以 set 了,而不需要重復(fù) check 多次。

???????注意:Redis 是不支持事務(wù)回滾的,據(jù)說(shuō)作者認(rèn)為回滾操作會(huì)影響 Redis 的性能,所以沒有事務(wù)回滾的功能。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容