手?jǐn)]golang etcd raft協(xié)議之9
緣起
最近閱讀 [云原生分布式存儲(chǔ)基石:etcd深入解析] (杜軍 , 2019.1)
本系列筆記擬采用golang練習(xí)之
raft分布式一致性算法
分布式存儲(chǔ)系統(tǒng)通常會(huì)通過維護(hù)多個(gè)副本來進(jìn)行容錯(cuò),
以提高系統(tǒng)的可用性。
這就引出了分布式存儲(chǔ)系統(tǒng)的核心問題——如何保證多個(gè)副本的一致性?
Raft算法把問題分解成了四個(gè)子問題:
1. 領(lǐng)袖選舉(leader election)、
2. 日志復(fù)制(log replication)、
3. 安全性(safety)
4. 成員關(guān)系變化(membership changes)
這幾個(gè)子問題。
目標(biāo)
- 根據(jù)raft協(xié)議,實(shí)現(xiàn)高可用分布式強(qiáng)一致的kv存儲(chǔ)
子目標(biāo)(Day 9)
- Leader狀態(tài)下的邏輯處理
設(shè)計(jì)
- tLeaderState: 實(shí)現(xiàn)Leader狀態(tài)的raft狀態(tài)機(jī)處理。事件驅(qū)動(dòng)的邏輯編排,讀寫分離的字段管理。
tLeaderState.go
(未完成)實(shí)現(xiàn)Leader狀態(tài)的raft狀態(tài)機(jī)處理。事件驅(qū)動(dòng)的邏輯編排,讀寫分離的字段管理。
// tLeaderState.go
// gitee: https://gitee.com/ioly/learning.gooop
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tLeaderState presents a leader node
type tLeaderState struct {
tEventDrivenModel
context iRaftStateContext
mInitOnce sync.Once
mStartOnce sync.Once
// update: leInit / leLeaderHeartbeat
mTerm int64
// update: leInit / leDisposing
mDisposedFlag bool
// update: leVoteToCandidate
mVotedTerm int64
mVotedCandidateID string
mVotedTimestamp int64
}
// trigger: init()
// args: empty
const leInit = "leader.init"
// trigger: Start()
// args: empty
const leStart = "leader.Start"
// trigger: whenNewLeaderAnnouncedThenSwitchToFollower
// args: empty
const leDiposing = "leader.Disposing"
// trigger : Heartbeat() / AppendLog()
// args: term int64
const leNewLeaderAnnounced = "leader.NewLeaderAnnounced"
// trigger: RequestVote()
// args: *rpc.RequestVoteCmd
const leBeforeRequestVote = "leader.BeforeRequestVote"
// trigger:
// args: *rpc.RequestVoteCmd
const leVoteToCandidate = "leader.VoteToCandidate"
func newLeaderState(ctx iRaftStateContext, term int64) IRaftState {
it := new(tLeaderState)
it.init(ctx, term)
return it
}
func (me *tLeaderState) init(ctx iRaftStateContext, term int64) {
me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = term
me.initEventHandlers()
me.raise(leInit)
})
}
func (me *tLeaderState) initEventHandlers() {
// write only logic
me.hookEventsForDisposedFlag()
me.hookEventsForVotedTerm()
// read only logic
me.hook(leStart,
me.whenStartThenBeginHeartbeatToOthers)
me.hook(leNewLeaderAnnounced,
me.whenNewLeaderAnnouncedThenSwitchToFollower)
}
func (me *tLeaderState) hookEventsForDisposedFlag() {
me.hook(leInit, func(e string, args ...interface{}) {
me.mDisposedFlag = false
})
me.hook(leDiposing, func(e string, args ...interface{}) {
me.mDisposedFlag = true
})
}
func (me *tLeaderState) hookEventsForVotedTerm() {
me.hook(leBeforeRequestVote, func(e string, args ...interface{}) {
// check last vote timeout
if me.mVotedTerm == 0 {
return
}
if time.Duration(time.Now().UnixNano() - me.mVotedTimestamp)*time.Nanosecond >= timeout.ElectionTimeout {
me.mVotedTerm = 0
me.mVotedTimestamp = 0
me.mVotedCandidateID = ""
}
})
me.hook(leVoteToCandidate, func(e string, args ...interface{}) {
// after vote to candidate
cmd := args[0].(*rpc.RequestVoteCmd)
me.mVotedTerm = cmd.Term
me.mVotedCandidateID = cmd.CandidateID
me.mVotedTimestamp = time.Now().UnixNano()
})
}
func (me *tLeaderState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
// check term
if cmd.Term <= me.mTerm {
ret.Code = rpc.HBTermMismatch
return nil
}
// new leader
me.raise(leNewLeaderAnnounced, cmd.Term)
// return ok
ret.Code = rpc.HBOk
return nil
}
func (me *tLeaderState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
// check term
if cmd.Term <= me.mTerm {
ret.Code = rpc.ALTermMismatch
return nil
}
// new leader
me.raise(leNewLeaderAnnounced, cmd.Term)
// return ok
ret.Code = rpc.ALInternalError
return nil
}
func (me *tLeaderState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
// just ignore
ret.Code = rpc.CLInternalError
return nil
}
func (me *tLeaderState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
me.raise(leBeforeRequestVote, cmd)
// check voted term
if cmd.Term < me.mVotedTerm {
ret.Code = rpc.RVTermMismatch
return nil
}
if cmd.Term == me.mVotedTerm {
if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
// already vote another
ret.Code = rpc.RVVotedAnother
return nil
} else {
// already voted
ret.Code = rpc.RVOk
return nil
}
}
if cmd.Term > me.mVotedTerm {
// new term, check log
if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
// good log
me.raise(leVoteToCandidate, cmd)
ret.Code = rpc.RVOk
} else {
// bad log
ret.Code = rpc.RVLogMismatch
}
return nil
}
// should not reach here
ret.Code = rpc.RVTermMismatch
return nil
}
func (me *tLeaderState) Role() roles.RaftRole {
return roles.Leader
}
func (me *tLeaderState) Start() {
me.mStartOnce.Do(func() {
me.raise(leStart)
})
}
func (me *tLeaderState) whenStartThenBeginHeartbeatToOthers(_ string, _ ...interface{}) {
// todo: fixme
panic("implements me")
}
func (me *tLeaderState) whenNewLeaderAnnouncedThenSwitchToFollower(_ string, args ...interface{}) {
me.raise(leDiposing)
term := args[0].(int64)
me.context.HandleStateChanged(newFollowerState(me.context, term))
}
(未完待續(xù))