在Eino 中indexer 組件中, 會使用redis的pipeline功能, 批量將文檔的內(nèi)容(content), 向量對應(yīng)的bytes(content_vactor)及元數(shù)據(jù)(matadata)信息寫入的hset進(jìn)行存儲。
Redis對于命令的批量處理, 提供pipeline 和 TxPipeline 兩種方式執(zhí)行,
- pipeline : 通過在一次通信中將多個命令一起發(fā)送到服務(wù)器執(zhí)行,避免了網(wǎng)絡(luò)和處理開銷
- TxPipeline : 在pipeline 的基礎(chǔ)上, 保證所有包含的命令將完整執(zhí)行,不會被其他客戶端的命令中斷。
pipeline代碼示例 :
func TestRedisStackClient_Do2(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
pipe := cli.Client.Pipeline()
ctx := context.Background()
for i := 0; i < 5; i++ {
pipe.Set(ctx, fmt.Sprintf("seat:%v", i), fmt.Sprintf("#%v", i), 0)
}
cmds, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
for _, c := range cmds {
fmt.Printf("%v;", c.(*redis.StatusCmd).Val())
}
fmt.Println("")
pipe = cli.Client.Pipeline()
get0Result := pipe.Get(ctx, "seat:0")
get3Result := pipe.Get(ctx, "seat:3")
get4Result := pipe.Get(ctx, "seat:4")
cmds, err = pipe.Exec(ctx)
// The results are available only after the pipeline
// has finished executing.
fmt.Println(get0Result.Val()) // >>> #0
fmt.Println(get3Result.Val()) // >>> #3
fmt.Println(get4Result.Val()) // >>> #4
}
TxPipeline 代碼示例:
func TestRedisStackClient_Do3(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
trans := cli.Client.TxPipeline()
ctx := context.Background()
trans.IncrBy(ctx, "count:test1", 1)
trans.IncrBy(ctx, "count:test1", 2)
trans.IncrBy(ctx, "count:test1", 3)
cmds, err := trans.Exec(ctx)
if err != nil {
panic(err)
}
for _, c := range cmds {
fmt.Println(c.(*redis.IntCmd).Val())
}
cmd := cli.Client.Do(ctx, "GET", "count:test1")
fmt.Println(cmd.Val())
}
Redis 的所有單條命令(包括 INCR、INCRBY、DECR、DECRBY、HINCRBY 等)都是原子操作。
無論多少個客戶端并發(fā)調(diào)用,Redis 都會以單線程方式依次執(zhí)行每一條指令,因此不會出現(xiàn)競態(tài)條件,天然線程安全。
Redis 不支持事務(wù)回滾,那么如何保證操作的原子性呢?
redis沒有回滾機(jī)制,但可以通過以下 2 種手段在不同場景下保證“操作的完整性”——即 要么全部成功,要么全部不生效。
樂觀鎖 + 重試(WATCH)
使用 WATCH 監(jiān)視關(guān)鍵 key,事務(wù)里一旦發(fā)現(xiàn) key 被其他客戶端修改,就讓 EXEC 失敗并 重新執(zhí)行整個流程。
下面用 WATCH 實(shí)現(xiàn)“ZPOP”——即原子地彈出有序集合中的最小/最大元素
// zPopMin 原子彈出分值最小的成員
func zPopMin(ctx context.Context, rdb *redis.Client, key string) (member string, score float64, err error) {
for {
err = rdb.Watch(ctx, func(tx *redis.Tx) error {
// 1. 先讀最小成員
res, err := tx.ZRangeWithScores(ctx, key, 0, 0).Result()
if err != nil || len(res) == 0 {
return redis.Nil // 集合為空
}
member, score = res[0].Member.(string), res[0].Score
// 2. 事務(wù)里刪除
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.ZRem(ctx, key, member)
return nil
})
return err
}, key)
switch err {
case nil: // 成功
return member, score, nil
case redis.TxFailedErr: // 樂觀鎖沖突,重試
continue
default: // 其他錯誤
return "", 0, err
}
}
}
Lua 腳本(EVAL)——真正原子性
把需要一次性完成的 所有讀寫邏輯 寫成 Lua 腳本,Redis 會 單線程原子執(zhí)行 整個腳本;腳本里一旦檢測到異常,可通過 return redis.error_reply(...) 終止,不會留下中間狀態(tài)。
如下:實(shí)現(xiàn)賬戶A轉(zhuǎn)賬100元到賬戶B:
lua:
-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return "OK"
bash :
EVAL "$(cat atomic_transfer.lua)" 2 accountA accountB 100
- 2?→?腳本需要 2 個 key(accountA 和 accountB )
- accountA accountB 100?→?依次為 KEYS[1], KEYS[2], ARGV[1]
golang:
func TestRedisStackClient_Do4(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
ctx := context.Background()
script := redis.NewScript(`
-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return {"OK", bal - tonumber(ARGV[1])}
`)
val, err := script.Run(ctx, cli.Client, []string{"accountB", "accountA"}, 100).Result()
if err != nil {
panic(err)
}
// 4. 解析結(jié)果
arr := val.([]interface{})
result := arr[0].(string)
score, _ := arr[1].(int64)
fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}
Redis Cluster 環(huán)境下, 如果事務(wù)跨多個redis節(jié)點(diǎn)如何保證操作的原子性?
- 可以通過Lua 腳本 + Hash Tag的方式實(shí)現(xiàn) 跨節(jié)點(diǎn)的事務(wù)原子性, 僅對 Redis Cluster 生效;單機(jī) / 哨兵無意義。
Hash Tag 是 Redis Cluster 為 把多個 key 強(qiáng)制映射到同一個 slot 而設(shè)計(jì)的 特殊語法標(biāo)記,用來解決“跨 slot 操作”或“Lua 腳本多 key 必須落在同一 slot”的問題。
用一對花括號 {} 包住 任意子串,只要 {} 里的子串相同,key 就落在 同一 slot(進(jìn)而同一節(jié)點(diǎn),Cluster 通過 哈希槽分片 把 0-16383 共 16384 個槽分配給集群中的節(jié)點(diǎn))
| 真實(shí) key | 參與 CRC16 的片段 | slot 結(jié)果 | 是否同 slot |
|---|---|---|---|
user:{100}:balance |
100 |
slot 1234 | ? |
user:{100}:ledger |
100 |
slot 1234 | ? |
user:100:balance |
user:100:balance |
slot 5678 | ? |
func TestRedisStackClient_Do5(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
ctx := context.Background()
script := `
-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return {"OK", bal - tonumber(ARGV[1])}
`
sha, _ := cli.Client.ScriptLoad(ctx, script).Result()
res, err := cli.Client.EvalSha(ctx, sha,
[]string{"user:{100}:balance", "user:{100}:ledger"},
100,
).Result()
if err != nil {
panic(err)
}
// 4. 解析結(jié)果
arr := res.([]interface{})
result := arr[0].(string)
score, _ := arr[1].(int64)
fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}