go-redis 管道和事務(wù),

在Eino 中indexer 組件中, 會使用redis的pipeline功能, 批量將文檔的內(nèi)容(content), 向量對應(yīng)的bytes(content_vactor)及元數(shù)據(jù)(matadata)信息寫入的hset進(jìn)行存儲。

Redis對于命令的批量處理, 提供pipeline 和 TxPipeline 兩種方式執(zhí)行,

  • pipeline : 通過在一次通信中將多個命令一起發(fā)送到服務(wù)器執(zhí)行,避免了網(wǎng)絡(luò)和處理開銷
  • TxPipeline : 在pipeline 的基礎(chǔ)上, 保證所有包含的命令將完整執(zhí)行,不會被其他客戶端的命令中斷。

pipeline代碼示例 :

func TestRedisStackClient_Do2(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    pipe := cli.Client.Pipeline()
    ctx := context.Background()
    for i := 0; i < 5; i++ {
        pipe.Set(ctx, fmt.Sprintf("seat:%v", i), fmt.Sprintf("#%v", i), 0)
    }
    cmds, err := pipe.Exec(ctx)
    if err != nil {
        panic(err)
    }
    for _, c := range cmds {
        fmt.Printf("%v;", c.(*redis.StatusCmd).Val())
    }
    fmt.Println("")
    pipe = cli.Client.Pipeline()
    get0Result := pipe.Get(ctx, "seat:0")
    get3Result := pipe.Get(ctx, "seat:3")
    get4Result := pipe.Get(ctx, "seat:4")
    cmds, err = pipe.Exec(ctx)
    // The results are available only after the pipeline
    // has finished executing.
    fmt.Println(get0Result.Val()) // >>> #0
    fmt.Println(get3Result.Val()) // >>> #3
    fmt.Println(get4Result.Val()) // >>> #4
}

TxPipeline 代碼示例:

func TestRedisStackClient_Do3(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    trans := cli.Client.TxPipeline()
    ctx := context.Background()
    trans.IncrBy(ctx, "count:test1", 1)
    trans.IncrBy(ctx, "count:test1", 2)
    trans.IncrBy(ctx, "count:test1", 3)
    cmds, err := trans.Exec(ctx)
    if err != nil {
        panic(err)
    }
    for _, c := range cmds {
        fmt.Println(c.(*redis.IntCmd).Val())
    }
    cmd := cli.Client.Do(ctx, "GET", "count:test1")
    fmt.Println(cmd.Val())
}

Redis 的所有單條命令(包括 INCR、INCRBY、DECR、DECRBY、HINCRBY 等)都是原子操作。
無論多少個客戶端并發(fā)調(diào)用,Redis 都會以單線程方式依次執(zhí)行每一條指令,因此不會出現(xiàn)競態(tài)條件,天然線程安全。

Redis 不支持事務(wù)回滾,那么如何保證操作的原子性呢?

redis沒有回滾機(jī)制,但可以通過以下 2 種手段在不同場景下保證“操作的完整性”——即 要么全部成功,要么全部不生效。

樂觀鎖 + 重試(WATCH)

使用 WATCH 監(jiān)視關(guān)鍵 key,事務(wù)里一旦發(fā)現(xiàn) key 被其他客戶端修改,就讓 EXEC 失敗并 重新執(zhí)行整個流程。
下面用 WATCH 實(shí)現(xiàn)“ZPOP”——即原子地彈出有序集合中的最小/最大元素

// zPopMin 原子彈出分值最小的成員
func zPopMin(ctx context.Context, rdb *redis.Client, key string) (member string, score float64, err error) {
    for {
        err = rdb.Watch(ctx, func(tx *redis.Tx) error {
            // 1. 先讀最小成員
            res, err := tx.ZRangeWithScores(ctx, key, 0, 0).Result()
            if err != nil || len(res) == 0 {
                return redis.Nil // 集合為空
            }
            member, score = res[0].Member.(string), res[0].Score

            // 2. 事務(wù)里刪除
            _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
                pipe.ZRem(ctx, key, member)
                return nil
            })
            return err
        }, key)

        switch err {
        case nil: // 成功
            return member, score, nil
        case redis.TxFailedErr: // 樂觀鎖沖突,重試
            continue
        default: // 其他錯誤
            return "", 0, err
        }
    }
}

Lua 腳本(EVAL)——真正原子性

把需要一次性完成的 所有讀寫邏輯 寫成 Lua 腳本,Redis 會 單線程原子執(zhí)行 整個腳本;腳本里一旦檢測到異常,可通過 return redis.error_reply(...) 終止,不會留下中間狀態(tài)。

如下:實(shí)現(xiàn)賬戶A轉(zhuǎn)賬100元到賬戶B:
lua:

-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
    return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return "OK"

bash :

EVAL "$(cat atomic_transfer.lua)" 2 accountA accountB 100
  • 2?→?腳本需要 2 個 key(accountA 和 accountB )
  • accountA accountB 100?→?依次為 KEYS[1], KEYS[2], ARGV[1]

golang:

func TestRedisStackClient_Do4(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    ctx := context.Background()
    script := redis.NewScript(`
        -- atomic_transfer.lua
        local bal = tonumber(redis.call('GET', KEYS[1]))
        if bal < tonumber(ARGV[1]) then
            return redis.error_reply("insufficient")
        end
        redis.call('DECRBY', KEYS[1], ARGV[1])
        redis.call('INCRBY', KEYS[2], ARGV[1])
        return {"OK",  bal - tonumber(ARGV[1])}
    `)

    val, err := script.Run(ctx, cli.Client, []string{"accountB", "accountA"}, 100).Result()
    if err != nil {
        panic(err)
    }
    // 4. 解析結(jié)果
    arr := val.([]interface{})
    result := arr[0].(string)
    score, _ := arr[1].(int64)
    fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}

Redis Cluster 環(huán)境下, 如果事務(wù)跨多個redis節(jié)點(diǎn)如何保證操作的原子性?

  • 可以通過Lua 腳本 + Hash Tag的方式實(shí)現(xiàn) 跨節(jié)點(diǎn)的事務(wù)原子性, 僅對 Redis Cluster 生效;單機(jī) / 哨兵無意義。

Hash Tag 是 Redis Cluster 為 把多個 key 強(qiáng)制映射到同一個 slot 而設(shè)計(jì)的 特殊語法標(biāo)記,用來解決“跨 slot 操作”或“Lua 腳本多 key 必須落在同一 slot”的問題。
用一對花括號 {} 包住 任意子串,只要 {} 里的子串相同,key 就落在 同一 slot(進(jìn)而同一節(jié)點(diǎn),Cluster 通過 哈希槽分片 把 0-16383 共 16384 個槽分配給集群中的節(jié)點(diǎn))

真實(shí) key 參與 CRC16 的片段 slot 結(jié)果 是否同 slot
user:{100}:balance 100 slot 1234 ?
user:{100}:ledger 100 slot 1234 ?
user:100:balance user:100:balance slot 5678 ?
func TestRedisStackClient_Do5(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    ctx := context.Background()
    script := `
        -- atomic_transfer.lua
        local bal = tonumber(redis.call('GET', KEYS[1]))
        if bal < tonumber(ARGV[1]) then
            return redis.error_reply("insufficient")
        end
        redis.call('DECRBY', KEYS[1], ARGV[1])
        redis.call('INCRBY', KEYS[2], ARGV[1])
        return {"OK",  bal - tonumber(ARGV[1])}
    `
    sha, _ := cli.Client.ScriptLoad(ctx, script).Result()
    res, err := cli.Client.EvalSha(ctx, sha,
        []string{"user:{100}:balance", "user:{100}:ledger"},
        100,
    ).Result()

    if err != nil {
        panic(err)
    }
    // 4. 解析結(jié)果
    arr := res.([]interface{})
    result := arr[0].(string)
    score, _ := arr[1].(int64)
    fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容