第一步 閱讀4B的文檔
弄清楚里面的每個(gè)段落
第二步 基于LAB3,寫出可以通過單GROUP的代碼
Your first task is to pass the very first shardkv test. In this test, there is only a single assignment of shards, so your code should be very similar to that of your Lab 3 server. The biggest modification will be to have your server detect when a configuration happens and start accepting requests whose keys match shards that it now owns.
基于LAB3,把代碼可以抄的都抄過去

抄代碼就不展示了?;敬蠖鄶?shù)都是把代碼復(fù)制過去。
因?yàn)檫@邊的CLIENT 給了代碼 是需要看ERR這個(gè)屬性的。

所以和LAB3不同(那里我沒用這個(gè)REPLY ERR的屬性),需要加上返回值。

很快,測(cè)試1通過了

第三步 增加拉CONFIG,和拒絕不屬于自己的SHARD的代碼
來自HINT
Add code to server.go to periodically fetch the latest configuration from the shardmaster, and add code to reject client requests if the receiving group isn't responsible for the client's key's shard. You should still pass the first test.




測(cè)試之后,依然確??梢赃^第一個(gè)TEST
第四步 思考
思考 CONFIG變化后,如何轉(zhuǎn)移SHARD
我的思考是這樣的。如果一個(gè)REPLICA GROUP A得到一個(gè)SHARD 1,對(duì)應(yīng)B 失去一個(gè)SHARD 1
如果是A檢測(cè)到多了,去等待別人來發(fā)給我,會(huì)比較被動(dòng)。因?yàn)椴恢酪榷嗑谩?br>
其次B還需要發(fā)現(xiàn)自己失去SHARD 1后,要主動(dòng)去發(fā)給A,這增加了B 的工作量。
因?yàn)槲覀冞@邊在做SHARD MIGRATION的時(shí)候,是不能響應(yīng)請(qǐng)求。但對(duì)B來說,他可以立刻更新CONFIG,即使沒把SHARD 1發(fā)送出去,他也是可以響應(yīng)請(qǐng)求。
但對(duì)A來說,一定要拿到SHARD 1后,它才可以繼續(xù)服務(wù)。
基于上述思考,決定讓A去問B要SHARD,這樣會(huì)簡(jiǎn)化設(shè)計(jì)。因?yàn)檫@樣做B可以如果發(fā)現(xiàn)了新的CONFIG,可以直接更新讓它立刻生效。A需要等待PULL成功后,更新CONFIG讓它生效。
Reconfiguration 會(huì)影響到 PutAppend/Get,因此同樣需要利用 raft 保證 group 內(nèi)的一致性,確保集群內(nèi)完成了之前的操作后同時(shí)進(jìn)行Reconfiguration;
要思考的第二個(gè)點(diǎn),是傳輸SHARD的RPC。
首先SHARD DATA是一定要發(fā)過去的。
但是只是發(fā)SHARD DATA是不夠的。
比如一個(gè)APPEND REQUEST 在向A SERVER發(fā)送的時(shí)候,TIMEOUT了。這個(gè)時(shí)候A server 已經(jīng)做了這個(gè)更新操作。
在這個(gè)點(diǎn)之后,Reconfiguration 發(fā)生,CLIENT 去問B SERVER發(fā)送APPEND REQ。如果只是SHARD DATA過去。會(huì)造成APPEND2次。
所以我們還需要把去重的MAP也一起發(fā)過去。
發(fā)過去的參數(shù)除了要訴說我要哪個(gè)SHARD之外,還需要加上CONFIG NUM,因?yàn)橛锌赡芪野l(fā)的CONFIG NUM比那邊還要大,說明那邊的CONFIG還沒同步到。
基于上述思路。我設(shè)計(jì)的RPC如下。

第五步 實(shí)現(xiàn)MIGRATE SHARD RPC HANDLER
這里有個(gè)很重要的思路,每個(gè)RAFT GROUP都是由LEADER負(fù)責(zé)發(fā)送和接受RPC。FOLLOWER只負(fù)責(zé)從APPLY MSG里去和LEADER SYNC狀態(tài)。
還有一個(gè)點(diǎn),就是我們不能直接從DB里去取數(shù)據(jù),如果我們沒有實(shí)現(xiàn)清理數(shù)據(jù)的前提下,因?yàn)閿?shù)據(jù)不清理。所以我們會(huì)有多的數(shù)據(jù),想象一下。我們先接受SHARD1,然后不接受,再重新接受SHARD1,此時(shí)做遷移,會(huì)是一個(gè)并集。而我們只是希望是重新接受的那部分。基于上述考慮。我們需要基于每一個(gè)CONFIG,單獨(dú)把要遷移的數(shù)據(jù)給抽出來。這樣依據(jù)CONFIG來做遷移。

第六步 思考難點(diǎn)
如何去PULL DATA?如果我們選擇讓LEADER去交互,我們必須要HANDLER RAFT Leader掛掉,得有新的LEADER來負(fù)責(zé)PULL DATA。
所以在所有節(jié)點(diǎn)上必須得存好要問哪里去PULL DATA。如果PULL到,我們需要確保LEADER會(huì)往RAFT里發(fā)CMD(這個(gè)CMD是讓節(jié)點(diǎn)同步數(shù)據(jù),同時(shí)刪掉那個(gè)維護(hù)的哪里去PULL DATA的地方)
而且我們必須額外開一個(gè)后臺(tái)進(jìn)程與循環(huán)的做這件事。不然LEADER轉(zhuǎn)移過去之后,就沒有人PULL DATA了。 因?yàn)镻ULL DATA 這件事是沒有CLIENT超時(shí)重試的。
因?yàn)橐笈_(tái)循環(huán)去PULL DATA,我們拿到DATA后,送進(jìn)RAFT,再進(jìn)入到APPLY CH,需要所有的節(jié)點(diǎn)都可以同步這個(gè)數(shù)據(jù)。一旦同步成功,我們需要清理這個(gè)要等待的數(shù)據(jù)。這樣后臺(tái)線程可以少發(fā)很多無用的RPC。
同時(shí)我們?cè)谒饕獢?shù)據(jù)的時(shí)候也要知道往哪個(gè)REPLICA GROUP要。




第7步
目前我們已經(jīng)再LEADER端,把收到的新的CONFIG和拿到的MIGRATION DATA打給放進(jìn)RAFT的LOG去做線性一致的排序。
所以當(dāng)這個(gè)2個(gè)消息從APPLY MSG出來的時(shí)候,需要去做一些事情。
為此,我單獨(dú)開了一個(gè)函數(shù)去寫APPLY的邏輯

第8步 實(shí)現(xiàn)APPLY MSG 是MIGRATION DATA REPLY
這里的點(diǎn)(后面大量調(diào)試獲得的),因?yàn)镽EPLY 發(fā)到RAFT里面,雖然有順序,但返回的時(shí)候順序可能是亂的。比如現(xiàn)在我的CONFIG已經(jīng)更新到9,這個(gè)時(shí)候RAFT才把CONFIG的6 返回回來。我們應(yīng)該直接忽略這個(gè)版本。如果更新了,就會(huì)產(chǎn)生不一致。
那么依據(jù)亂序思想,我們不得不CHECK 就是當(dāng)前REPLY的CONFIG版本號(hào)必須是當(dāng)前CONFIG版本號(hào)小一個(gè)。
為什么?
這里我們?cè)谑盏紺ONFIG 變更,我們就會(huì)刷新CONFIG。但是此時(shí)CONFIG刷新之后,我們會(huì)更新COME IN SHARD,隨后后臺(tái)線程會(huì)去PULL。從更新COME IN SHARD到數(shù)據(jù)SHARD過來,這段時(shí)間內(nèi),我們必須得拒絕掉所有的索要該SHARD的請(qǐng)求。所以我們不能直接從CONFIG來判斷是不是WRONG GROUP。
至此,我們需要額外再維護(hù)一個(gè)我現(xiàn)在能HANDLER哪些SHARD的數(shù)據(jù)結(jié)構(gòu)。

那么發(fā)出去的SHARD,我可以直接從這個(gè)數(shù)據(jù)結(jié)構(gòu)里刪掉。要進(jìn)來的話,等真的進(jìn)來了,再添加到這個(gè)數(shù)據(jù)結(jié)構(gòu)中。

判斷是不是WRONG GROUP,也依據(jù)這個(gè)數(shù)據(jù)結(jié)構(gòu)來看。
第9步 實(shí)現(xiàn)updateInAndOutDataShard
這邊我們會(huì)根據(jù)新的CONFIG來,判斷自己要送出去的數(shù)據(jù)是哪些,自己要接受進(jìn)來的數(shù)組是哪些。在我的設(shè)計(jì)里這2個(gè)數(shù)據(jù)結(jié)構(gòu)必要性在第5,6步討論過了。

第十步 判斷WRONG GROUP的時(shí)機(jī)
在前面的版本中,我們是在SERVER端接受到請(qǐng)求的時(shí)候,就直接去依據(jù)CONFIG判斷WRONG GROUP?,F(xiàn)在我們改成依據(jù)MYSHARD來看,但是這還是不足的。
還記得我們?cè)谧鯨AB 3的時(shí)候,判斷去重,必須得再消息回來的時(shí)候再看一次。 因?yàn)榭赡茉谡?qǐng)求發(fā)送的時(shí)候,數(shù)據(jù)還在REPLICA GROUP 1??墒堑较腞AFT返回來的時(shí)候,當(dāng)中發(fā)生過更新CONFIG。數(shù)據(jù)不再GROUP 1了。所以要把判斷WRONG GROUP的邏輯,加在數(shù)據(jù)返回層。
同時(shí)因?yàn)閿?shù)據(jù)會(huì)在APPLY CH收到新的CONFIG,一部分要TO OUT的數(shù)據(jù)就會(huì)從DB里DELETE掉。為了確保NOTIFY CH的傳輸過程中,這個(gè)DB的更改不會(huì)影響到實(shí)際的GET的返回值。我們需要在接到APPLY CH的時(shí)候就把結(jié)果給注入到OP里。不然等OP發(fā)過去再從DB拿,有一定概率此時(shí)另一個(gè)線程已經(jīng)再DELETE DB了。



同時(shí)根據(jù)這個(gè)思路,我把SHARD MASTER的QUERY 也加在返回層來做。

第11步 初始化新加的屬性

第12步 更新POLL NEW CONFIG代碼,需要一個(gè)個(gè)更新
來自HINT
Process re-configurations one at a time, in order.
同時(shí)注意如果當(dāng)前CONFIG,那些需要轉(zhuǎn)移的SHARD還沒做完。不要立刻去拿下一個(gè)CONFIG。


第13步 測(cè)試JOIN AND LEAVE
發(fā)現(xiàn)有時(shí)可以過,有時(shí)過不了會(huì)阻塞。
BUG 1 死鎖
通過幾小時(shí)的調(diào)試發(fā)現(xiàn),是一個(gè)3維死鎖。首先RAFT里面拿了RAFT的鎖,阻塞在APPLY CH那。APPLY CH的后臺(tái)線程阻塞在KV SERVER的鎖上。 還有一個(gè)PULL CONFIG的線程,持有了KV SERVER的鎖,阻塞在RAFT的鎖上。

FIX方法,交換代碼順序。

測(cè)試通過
但是寫了這么多代碼,很多地方都沒注意保護(hù)共享變量。所以用TEST DATA RACE的時(shí)候會(huì)出問題。
檢查思路,先看3個(gè)后臺(tái)進(jìn)程,隨后看幾個(gè)RPC handler
這邊就自己加一下鎖吧。
GO TEST RACE OK之后,會(huì)在第三個(gè)測(cè)試敗掉。是SNAPSHOT
我們需要存儲(chǔ)更多的狀態(tài)進(jìn)SNAPSHOT。
第14步 實(shí)現(xiàn)新的SNAPSHOT



再直接測(cè)試,發(fā)現(xiàn)阻塞在UNRELIABLE 3

BUG 2 一處地方?jīng)]有釋放鎖

修復(fù)后重新對(duì)這個(gè)CASE單獨(dú)測(cè)試100次通過,測(cè)全集。只剩下CHANLLEGE 1,DELETE的TASK了

第15步 思考如何刪除不必要的狀態(tài)
在上面的實(shí)現(xiàn)里,我們開了3個(gè)數(shù)據(jù)結(jié)構(gòu),一個(gè)是TO OUT,一個(gè)是COME IN,一個(gè)是MY SHARD;
第三個(gè)是固定大小的。不用考慮
第二個(gè),我們已經(jīng)再接受到DATA之后會(huì)去刪除它。
唯一沒有回收的就是第一個(gè)。
最NAIVE的實(shí)現(xiàn)是當(dāng)我們把數(shù)據(jù)當(dāng)做REPLY發(fā)過去的時(shí)候,就直接刪掉。這是危險(xiǎn)的。因?yàn)楹苡锌赡苓@個(gè)消息會(huì)丟失,被那邊服務(wù)器拒絕,造成這個(gè)數(shù)據(jù)就永遠(yuǎn)不會(huì)被回收。
正確的做法是等到對(duì)方服務(wù)器,成功接收了DATA,然后刪除了對(duì)應(yīng)的COME IN,這個(gè)時(shí)候應(yīng)該發(fā)REQUEST告訴TO OUT一方,你可以安全的把TO OUT里的這塊DATA給回收了。
但是依然存在RPC會(huì)丟失的情況。和PULL的思想一樣。(用一個(gè)COME IN LIST+ 后臺(tái)線程,來不斷重試,成功時(shí)候刪除COME IN LIST內(nèi)容,就不再去PULL直到有新的COME IN來。失敗的話,因?yàn)镃OME IN 內(nèi)容還在,就會(huì)自動(dòng)重試,不怕網(wǎng)絡(luò)不穩(wěn)定)
那么我針對(duì)這個(gè)CASE,用相同的套路。后臺(tái)GC線程+Garbage List.
具體思路就是當(dāng)COME IN 的DATA收到后,我們要把這塊數(shù)據(jù)標(biāo)記進(jìn)Garbage List。 后臺(tái)GC線程發(fā)現(xiàn)Garbage List有內(nèi)容,就會(huì)往對(duì)應(yīng)的GROUP發(fā)送GC RPC。對(duì)應(yīng)的GROUP清理成功后,REPLY告知。我們把Garbage List對(duì)應(yīng)的內(nèi)容刪除。
同樣我們依然只和LEADER交互,并且利用RAFT LOG,來確保所有節(jié)點(diǎn)都成功刪除GARBAGE,再RPC回復(fù)SUCCESS
第16步 寫GC RPC HANDLER,抽一個(gè)TEMPLATE
發(fā)現(xiàn)可以用ERR 里面加一個(gè)WRONGLEADER來代表LEADER不對(duì)。就可以去掉一個(gè)參數(shù)。
當(dāng)OP TYPE是GC的時(shí)候,KEY 是CONFIG NUM,SEQNUM是SHARD。



第17步 實(shí)現(xiàn)GC


第18步 實(shí)現(xiàn)GC后臺(tái)進(jìn)程


第19步 往GARBAGE里添加值

第20步,更新SNAPSHOT
這里小伙伴自行更新吧
測(cè)試通過

第21步
因?yàn)槲业腞EPLICA GROUP里會(huì)往MASTER 發(fā)送QUERY請(qǐng)求,這個(gè)時(shí)候可能會(huì)造成LAST LEADER的DATA RACE。
所以我用原子方法改寫。

第22步 CONCISE代碼
1.我把WRONDLEADER給去掉了。同時(shí)用ERR 的WRONGLEADER來表示。

2.把幾個(gè)RPC HANDLER用TEMPLATE 提取公有邏輯

最終430 行代碼
GO TEST 測(cè)試200次
這個(gè)測(cè)試不適合并行,因?yàn)闀?huì)大量開線程在做。并行測(cè)試會(huì)造成有些CASE跑的巨慢。所以串行測(cè)試了。
同時(shí)CONFIG里因?yàn)镸ASTER的資源沒有回收。越到后面TEST 跑的越慢,我加了如下代碼來提速測(cè)試

基本跑完一整套是2分鐘

測(cè)試200次的結(jié)果 是
BUG 3
TestChallenge2Unaffected 會(huì)有1/50的概率阻塞。
經(jīng)過打LOG 發(fā)現(xiàn),是還沒來得及把所有DATA SHARD完,超過了1秒,之后就有數(shù)據(jù)再也MIGRATE不過來。造成拿不到而阻塞。
這里分享一個(gè)打LOG的技巧,避免淹沒在茫茫LOG海里。就是出了問題,再打LOG




原因如下:

下圖這個(gè)4號(hào)數(shù)據(jù)塊 在測(cè)試?yán)飳儆?01的OWN,但是還沒來得及拿到,100的網(wǎng)就斷了。再也取不到了。

解決方案,加快PULL的頻率

加快QUERY CONFIG的速度。思路是如果是拿已知的CONFIG,因?yàn)镃ONFIG的APPEND不會(huì)修改,所以可以直接返回。

縮短MASTER CLIENT的睡眠時(shí)間。

測(cè)試200次后無阻塞。
SHARD KV 測(cè)試200次通過
測(cè)試腳本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6.824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 200; i++))
do
echo $i
(go test) > ./res/$i
grep -nr "FAIL.*" res
done

回歸測(cè)試SHARD MASTER500次通過

回歸測(cè)試RAFT 300次通過
有2次時(shí)之前說的不是代碼問題的KNOWN ISSUE,具體參考文集的2C部分

回歸測(cè)試KVRAFT 210次通過

CONCISE SERVER
package shardkv
import (
"bytes"
"labrpc"
"log"
"shardmaster"
"strconv"
"time"
)
import "raft"
import "sync"
import "labgob"
type Op struct {
OpType string "operation type(eg. put/append/gc/get)"
Key string "key for normal, config num for gc"
Value string
Cid int64 "cid for put/append, operation uid for get/gc"
SeqNum int "seqnum for put/append, shard for gc"
}
type ShardKV struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
make_end func(string) *labrpc.ClientEnd
gid int
masters []*labrpc.ClientEnd
maxraftstate int // snapshot if log grows this big
// Your definitions here.
mck *shardmaster.Clerk
cfg shardmaster.Config
persist *raft.Persister
db map[string]string
chMap map[int]chan Op
cid2Seq map[int64]int
toOutShards map[int]map[int]map[string]string "cfg num -> (shard -> db)"
comeInShards map[int]int "shard->config number"
myShards map[int]bool "to record which shard i can offer service"
garbages map[int]map[int]bool "cfg number -> shards"
killCh chan bool
}
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
originOp := Op{"Get",args.Key,"",Nrand(),0}
reply.Err,reply.Value = kv.templateStart(originOp)
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
reply.Err,_ = kv.templateStart(originOp)
}
func (kv *ShardKV) templateStart(originOp Op) (Err, string) {
index,_,isLeader := kv.rf.Start(originOp)
if isLeader {
ch := kv.put(index, true)
op := kv.beNotified(ch, index)
if equalOp(originOp, op) { return OK, op.Value }
if op.OpType == ErrWrongGroup { return ErrWrongGroup, "" }
}
return ErrWrongLeader,""
}
func (kv *ShardKV) GarbageCollection(args *MigrateArgs, reply *MigrateReply) {
reply.Err = ErrWrongLeader
if _, isLeader := kv.rf.GetState(); !isLeader {return}
kv.mu.Lock()
defer kv.mu.Unlock()
if _,ok := kv.toOutShards[args.ConfigNum]; !ok {return}
if _,ok := kv.toOutShards[args.ConfigNum][args.Shard]; !ok {return}
originOp := Op{"GC",strconv.Itoa(args.ConfigNum),"",Nrand(),args.Shard}
kv.mu.Unlock()
reply.Err,_ = kv.templateStart(originOp)
kv.mu.Lock()
}
func (kv *ShardKV) ShardMigration(args *MigrateArgs, reply *MigrateReply) {
reply.Err, reply.Shard, reply.ConfigNum = ErrWrongLeader, args.Shard, args.ConfigNum
if _,isLeader := kv.rf.GetState(); !isLeader {return}
kv.mu.Lock()
defer kv.mu.Unlock()
reply.Err = ErrWrongGroup
if args.ConfigNum >= kv.cfg.Num {return}
reply.Err,reply.ConfigNum, reply.Shard = OK, args.ConfigNum, args.Shard
reply.DB, reply.Cid2Seq = kv.deepCopyDBAndDedupMap(args.ConfigNum,args.Shard)
}
func (kv *ShardKV) deepCopyDBAndDedupMap(config int,shard int) (map[string]string, map[int64]int) {
db2 := make(map[string]string)
cid2Seq2 := make(map[int64]int)
for k, v := range kv.toOutShards[config][shard] {
db2[k] = v
}
for k, v := range kv.cid2Seq {
cid2Seq2[k] = v
}
return db2, cid2Seq2
}
func (kv *ShardKV) beNotified(ch chan Op,index int) Op{
select {
case notifyArg,ok := <- ch :
if ok {
close(ch)
}
kv.mu.Lock()
delete(kv.chMap,index)
kv.mu.Unlock()
return notifyArg
case <- time.After(time.Duration(1000)*time.Millisecond):
return Op{}
}
}
func (kv *ShardKV) put(idx int,createIfNotExists bool) chan Op{
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.chMap[idx]; !ok {
if !createIfNotExists {return nil}
kv.chMap[idx] = make(chan Op,1)
}
return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
return a.Key == b.Key && a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}
func (kv *ShardKV) Kill() {
kv.rf.Kill()
select{
case <-kv.killCh:
default:
}
kv.killCh <- true
}
func (kv *ShardKV) readSnapShot(snapshot []byte) {
kv.mu.Lock()
defer kv.mu.Unlock()
if snapshot == nil || len(snapshot) < 1 {return}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var db map[string]string
var cid2Seq map[int64]int
var toOutShards map[int]map[int]map[string]string
var comeInShards map[int]int
var myShards map[int]bool
var garbages map[int]map[int]bool
var cfg shardmaster.Config
if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil || d.Decode(&comeInShards) != nil ||
d.Decode(&toOutShards) != nil || d.Decode(&myShards) != nil || d.Decode(&cfg) != nil ||
d.Decode(&garbages) != nil {
log.Fatal("readSnapShot ERROR for server %v",kv.me)
} else {
kv.db, kv.cid2Seq, kv.cfg = db, cid2Seq, cfg
kv.toOutShards, kv.comeInShards, kv.myShards, kv.garbages = toOutShards,comeInShards,myShards,garbages
}
}
func (kv *ShardKV) needSnapShot() bool {
kv.mu.Lock()
defer kv.mu.Unlock()
threshold := 10
return kv.maxraftstate > 0 &&
kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}
func (kv *ShardKV) doSnapShot(index int) {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
kv.mu.Lock()
e.Encode(kv.db)
e.Encode(kv.cid2Seq)
e.Encode(kv.comeInShards)
e.Encode(kv.toOutShards)
e.Encode(kv.myShards)
e.Encode(kv.cfg)
e.Encode(kv.garbages)
kv.mu.Unlock()
kv.rf.DoSnapShot(index,w.Bytes())
}
func (kv *ShardKV) tryPollNewCfg() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.comeInShards) > 0{
kv.mu.Unlock()
return
}
next := kv.cfg.Num + 1
kv.mu.Unlock()
cfg := kv.mck.Query(next)
if cfg.Num == next {
kv.rf.Start(cfg) //sync follower with new cfg
}
}
func (kv *ShardKV) tryGC() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.garbages) == 0{
kv.mu.Unlock()
return
}
var wait sync.WaitGroup
for cfgNum, shards := range kv.garbages {
for shard := range shards {
wait.Add(1)
go func(shard int, cfg shardmaster.Config) {
defer wait.Done()
args := MigrateArgs{shard, cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := MigrateReply{}
if ok := srv.Call("ShardKV.GarbageCollection", &args, &reply); ok && reply.Err == OK {
kv.mu.Lock()
defer kv.mu.Unlock()
delete(kv.garbages[cfgNum], shard)
if len(kv.garbages[cfgNum]) == 0 {
delete(kv.garbages, cfgNum)
}
}
}
}(shard, kv.mck.Query(cfgNum))
}
}
kv.mu.Unlock()
wait.Wait()
}
func (kv *ShardKV) tryPullShard() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.comeInShards) == 0 {
kv.mu.Unlock()
return
}
var wait sync.WaitGroup
for shard, idx := range kv.comeInShards {
wait.Add(1)
go func(shard int, cfg shardmaster.Config) {
defer wait.Done()
args := MigrateArgs{shard, cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := MigrateReply{}
if ok := srv.Call("ShardKV.ShardMigration", &args, &reply); ok && reply.Err == OK {
kv.rf.Start(reply)
}
}
}(shard, kv.mck.Query(idx))
}
kv.mu.Unlock()
wait.Wait()
}
func (kv *ShardKV) daemon(do func(), sleepMS int) {
for {
select {
case <-kv.killCh:
return
default:
do()
}
time.Sleep(time.Duration(sleepMS) * time.Millisecond)
}
}
func (kv *ShardKV) apply(applyMsg raft.ApplyMsg) {
if cfg, ok := applyMsg.Command.(shardmaster.Config); ok {
kv.updateInAndOutDataShard(cfg)
} else if migrationData, ok := applyMsg.Command.(MigrateReply); ok{
kv.updateDBWithMigrateData(migrationData)
}else {
op := applyMsg.Command.(Op)
if op.OpType == "GC" {
cfgNum,_ := strconv.Atoi(op.Key)
kv.gc(cfgNum,op.SeqNum);
} else {
kv.normal(&op)
}
if notifyCh := kv.put(applyMsg.CommandIndex,false); notifyCh != nil {
send(notifyCh,op)
}
}
if kv.needSnapShot() {
go kv.doSnapShot(applyMsg.CommandIndex)
}
}
func (kv *ShardKV) gc(cfgNum int, shard int) {
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.toOutShards[cfgNum]; ok {
delete(kv.toOutShards[cfgNum], shard)
if len(kv.toOutShards[cfgNum]) == 0 {
delete(kv.toOutShards, cfgNum)
}
}
}
func (kv *ShardKV) updateInAndOutDataShard(cfg shardmaster.Config) {
kv.mu.Lock()
defer kv.mu.Unlock()
if cfg.Num <= kv.cfg.Num { //only consider newer config
return
}
oldCfg, toOutShard := kv.cfg, kv.myShards
kv.myShards, kv.cfg = make(map[int]bool), cfg
for shard, gid := range cfg.Shards {
if gid != kv.gid {continue}
if _, ok := toOutShard[shard]; ok || oldCfg.Num == 0 {
kv.myShards[shard] = true
delete(toOutShard, shard)
} else {
kv.comeInShards[shard] = oldCfg.Num
}
}
if len(toOutShard) > 0 { // prepare data that needed migration
kv.toOutShards[oldCfg.Num] = make(map[int]map[string]string)
for shard := range toOutShard {
outDb := make(map[string]string)
for k, v := range kv.db {
if key2shard(k) == shard {
outDb[k] = v
delete(kv.db, k)
}
}
kv.toOutShards[oldCfg.Num][shard] = outDb
}
}
}
func (kv *ShardKV) updateDBWithMigrateData(migrationData MigrateReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if migrationData.ConfigNum != kv.cfg.Num-1 {return}
delete(kv.comeInShards, migrationData.Shard)
//this check is necessary, to avoid use kv.cfg.Num-1 to update kv.cfg.Num's shard
if _, ok := kv.myShards[migrationData.Shard]; !ok {
kv.myShards[migrationData.Shard] = true
for k, v := range migrationData.DB {
kv.db[k] = v
}
for k, v := range migrationData.Cid2Seq {
kv.cid2Seq[k] = Max(v,kv.cid2Seq[k])
}
if _, ok := kv.garbages[migrationData.ConfigNum]; !ok {
kv.garbages[migrationData.ConfigNum] = make(map[int]bool)
}
kv.garbages[migrationData.ConfigNum][migrationData.Shard] = true
}
}
func (kv *ShardKV) normal(op *Op) {
shard := key2shard(op.Key)
kv.mu.Lock()
if _, ok := kv.myShards[shard]; !ok {
op.OpType = ErrWrongGroup
} else {
maxSeq,found := kv.cid2Seq[op.Cid]
if !found || op.SeqNum > maxSeq {
if op.OpType == "Put" {
kv.db[op.Key] = op.Value
} else if op.OpType == "Append" {
kv.db[op.Key] += op.Value
}
kv.cid2Seq[op.Cid] = op.SeqNum
}
if op.OpType == "Get" {
op.Value = kv.db[op.Key]
}
}
kv.mu.Unlock()
}
func send(notifyCh chan Op,op Op) {
select{
case <-notifyCh:
default:
}
notifyCh <- op
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
labgob.Register(MigrateArgs{})
labgob.Register(MigrateReply{})
labgob.Register(shardmaster.Config{})
kv := new(ShardKV)
kv.me = me
kv.maxraftstate = maxraftstate
kv.make_end = make_end
kv.gid = gid
kv.masters = masters
// Your initialization code here.
kv.persist = persister
// Use something like this to talk to the shardmaster:
kv.mck = shardmaster.MakeClerk(kv.masters)
kv.cfg = shardmaster.Config{}
kv.db = make(map[string]string)
kv.chMap = make(map[int]chan Op)
kv.cid2Seq = make(map[int64]int)
kv.toOutShards = make(map[int]map[int]map[string]string)
kv.comeInShards = make(map[int]int)
kv.myShards = make(map[int]bool)
kv.garbages = make(map[int]map[int]bool)
kv.readSnapShot(kv.persist.ReadSnapshot())
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.killCh = make(chan bool,1)
go kv.daemon(kv.tryPollNewCfg,50)
go kv.daemon(kv.tryPullShard,80)
go kv.daemon(kv.tryGC,100)
go func() {
for {
select {
case <- kv.killCh:
return
case applyMsg := <- kv.applyCh:
if !applyMsg.CommandValid {
kv.readSnapShot(applyMsg.SnapShot)
continue
}
kv.apply(applyMsg)
}
}
}()
return kv
}
CONCISE CLIENT
package shardkv
//
// client code to talk to a sharded key/value service.
//
// the client first talks to the shardmaster to find out
// the assignment of shards (keys) to groups, and then
// talks to the group that holds the key's shard.
//
import (
"labrpc"
)
import "crypto/rand"
import "math/big"
import "shardmaster"
import "time"
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
shard %= shardmaster.NShards
return shard
}
func Nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
type Clerk struct {
sm *shardmaster.Clerk
config shardmaster.Config
make_end func(string) *labrpc.ClientEnd
// You will have to modify this struct.
lastLeader int
id int64
seqNum int
}
func MakeClerk(masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.sm = shardmaster.MakeClerk(masters)
ck.make_end = make_end
// You'll have to add code here.
ck.id = Nrand()//give each client a unique identifier, and then have them
ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
ck.lastLeader = 0
return ck
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
args := GetArgs{}
args.Key = key
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
// try each server for the shard.
for i := 0; i < len(servers); i++ {
si := (i + ck.lastLeader) % len(servers)
srv := ck.make_end(servers[si])
var reply GetReply
ok := srv.Call("ShardKV.Get", &args, &reply)
if ok && reply.Err == OK {
ck.lastLeader = si;
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
//
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
args := PutAppendArgs{key,value,op,ck.id,ck.seqNum}
ck.seqNum++
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
for i := 0; i < len(servers); i++ {
si := (i + ck.lastLeader) % len(servers)
srv := ck.make_end(servers[si])
var reply PutAppendReply
ok := srv.Call("ShardKV.PutAppend", &args, &reply)
if ok && reply.Err == OK {
ck.lastLeader = si
return
}
if ok && reply.Err == ErrWrongGroup {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
CONCISE COMMON
package shardkv
//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each shard.
// Shardmaster may change shard assignment from time to time.
//
// You will have to modify these definitions.
//
const (
OK = "OK"
ErrWrongLeader = "ErrWrongLeader"
ErrWrongGroup = "ErrWrongGroup"
)
type Err string
// Put or Append
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
Cid int64 "client unique id"
SeqNum int "each request with a monotonically increasing sequence number"
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
}
type GetReply struct {
Err Err
Value string
}
type MigrateArgs struct {
Shard int
ConfigNum int
}
type MigrateReply struct {
Err Err
ConfigNum int
Shard int
DB map[string]string
Cid2Seq map[int64]int
}
func Max(x, y int) int {
if x > y {
return x
}
return y
}
最后再把全部代碼提交進(jìn)GITHUB。待我把MAP REDUCE寫了一起吧。