Redis學(xué)習(xí)之事務(wù)

介紹

???Redis 事務(wù)可以一次執(zhí)行多個命令, 并且?guī)в幸韵聝蓚€重要的保證:

  • 批量操作在發(fā)送 EXEC 命令前被放入隊列緩存。

  • 收到 EXEC 命令后進入事務(wù)執(zhí)行,事務(wù)中任意命令執(zhí)行失敗,其余的命令依然被執(zhí)行。

事務(wù)過程不同的客戶端是沒有事務(wù)之間的影響的,一個事務(wù)從開始到執(zhí)行會經(jīng)歷以下三個階段:

  • 開始事務(wù)

  • 命令入隊

  • 執(zhí)行事務(wù)

一、相關(guān)命令

  • Exec 執(zhí)行所有事務(wù)塊內(nèi)的命令。

  • Watch 監(jiān)視一個(或多個) key ,如果在事務(wù)執(zhí)行之前這個(或這些) key 被其他命令所改動,那么事務(wù)將被打斷。

  • Discard 取消事務(wù),放棄執(zhí)行事務(wù)塊內(nèi)的所有命令。

  • Unwatch 取消 WATCH 命令對所有 key 的監(jiān)視。

  • Multi 標記一個事務(wù)塊的開始。

二、數(shù)據(jù)結(jié)構(gòu)

typedef struct client {
    ...
    list *watched_keys; /* 事務(wù)操作 監(jiān)控的keys鏈表 值為watchedKey結(jié)構(gòu)*/
    multiState mstate;  /* 事務(wù)狀態(tài) */
    ...
}client;
typedef struct multiState {
    multiCmd *commands;     /*事務(wù)命令隊列 */
    int count;              /*命令總個數(shù)*/
    ...
} multiState;
其中、client.watched_keys里面放的Node都是數(shù)據(jù)時如下結(jié)構(gòu)體
//監(jiān)視key
typedef struct watchedKey {
    robj *key;//監(jiān)視的key
    redisDb *db;//哪個db里面的
} watchedKey;

//redisDb結(jié)構(gòu)
typedef struct redisDb {
    ...
    dict *watched_keys;         /*數(shù)據(jù)庫的監(jiān)控key字典用來實現(xiàn)事務(wù)的 key -> client鏈表*/
    ...
} redisDb;

三、API

void multiCommand(client *c);//事務(wù)開始
void execCommand(client *c);//事務(wù)執(zhí)行
void discardCommand(client *c);//取消事務(wù)
void watchCommand(client *c);//監(jiān)控客戶端的key
void unwatchCommand(client *c);//取消監(jiān)控客戶端正在監(jiān)控的key

四、重要API解析

  • multiCommand

    void multiCommand(client *c) {//設(shè)置事務(wù)標志
        if (c->flags & CLIENT_MULTI) {
            addReplyError(c,"MULTI calls can not be nested");
            return;
        }
        c->flags |= CLIENT_MULTI;//設(shè)置事務(wù)標志位
        addReply(c,shared.ok);
    }
    
  • watchCommand

    /* 監(jiān)視特定的key */
    void watchForKey(client *c, robj *key) {
        list *clients = NULL;
        listIter li;
        listNode *ln;
        watchedKey *wk;
        listRewind(c->watched_keys,&li);
        while((ln = listNext(&li))) {
            wk = listNodeValue(ln);
            if (wk->db == c->db && equalStringObjects(key,wk->key))
                return; /* 如果這個key已經(jīng)被監(jiān)控了 直接返回*/
        }
        /* 否則加入監(jiān)視 */
        clients = dictFetchValue(c->db->watched_keys,key);
        if (!clients) {
            clients = listCreate();
            dictAdd(c->db->watched_keys,key,clients);
            incrRefCount(key);
        }
        listAddNodeTail(clients,c);//加入到監(jiān)視keys的鏈表
        wk = zmalloc(sizeof(*wk));
        wk->key = key;
        wk->db = c->db;
        incrRefCount(key);
        listAddNodeTail(c->watched_keys,wk);//加入到watched_keys中
    }
    
    /* 鍵如果被改變 事務(wù)執(zhí)行會失敗*/
    void touchWatchedKey(redisDb *db, robj *key) {
        list *clients;
        listIter li;
        listNode *ln;
    
        if (dictSize(db->watched_keys) == 0) return;
        clients = dictFetchValue(db->watched_keys, key);
        if (!clients) return;
    
        //如果key被改變了,那么監(jiān)視這個key的所有client都要被設(shè)置CLIENT_DIRTY_CAS
        listRewind(clients,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
    
            c->flags |= CLIENT_DIRTY_CAS;
        }
    }
    
  • execCommand

    void execCommand(client *c) {//執(zhí)行事務(wù)
        int j;
        robj **orig_argv;
        int orig_argc;
        struct redisCommand *orig_cmd;
        int must_propagate = 0; /* 需要將 MULTI/EXEC 同步到 AOF / slaves? */
        int was_master = server.masterhost == NULL;
    
        if (!(c->flags & CLIENT_MULTI)) {//如果并沒有設(shè)置MULTI標志,無操作
            addReplyError(c,"EXEC without MULTI");
            return;
        }
        //如果事務(wù)期間被監(jiān)視的key被修改了 停止事務(wù),返回nil
        //入隊的命令如果有誤的話,執(zhí)行事務(wù)error
        if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
            addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                       shared.nullarray[c->resp]);
            discardTransaction(c);
            goto handle_monitor;
        }
        //如果對于從庫有修改key的操作 取消事務(wù)
        if (!server.loading && server.masterhost && server.repl_slave_ro &&
            !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
        {
            addReplyError(c,
                "Transaction contains write commands but instance "
                "is now a read-only replica. EXEC aborted.");
            discardTransaction(c);
            goto handle_monitor;
        }
    
        /* 執(zhí)行所有命令*/
        unwatchAllKeys(c); /* 取消此用戶所有監(jiān)視key 提升cpu效率 */
        orig_argv = c->argv;
        orig_argc = c->argc;
        orig_cmd = c->cmd;
        addReplyArrayLen(c,c->mstate.count);
        for (j = 0; j < c->mstate.count; j++) {
            c->argc = c->mstate.commands[j].argc;
            c->argv = c->mstate.commands[j].argv;
            c->cmd = c->mstate.commands[j].cmd;
            //如果有修改key的操作的話 需要同步到 aof 和 從庫
            if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
                execCommandPropagateMulti(c);
                must_propagate = 1;
            }
            //執(zhí)行命令
            call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
            /* 以上函數(shù)可能會改變argc argv. */
            c->mstate.commands[j].argc = c->argc;
            c->mstate.commands[j].argv = c->argv;
            c->mstate.commands[j].cmd = c->cmd;
        }
        c->argv = orig_argv;
        c->argc = orig_argc;
        c->cmd = orig_cmd;
        discardTransaction(c);
        /* 需要將 MULTI/EXEC 同步到 AOF / slaves */
        if (must_propagate) {
            int is_master = server.masterhost == NULL;
            server.dirty++;
            if (server.repl_backlog && was_master && !is_master) {
                char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
                feedReplicationBacklog(execcmd,strlen(execcmd));
            }
        }
    handle_monitor:
        if (listLength(server.monitors) && !server.loading)
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
    

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容