背景
從區(qū)塊鏈的角度來說,kafka的方式是違背初衷的,試問中心化的kafka部署在哪里合適,云?第三方機構?可以說哪都不合適,一個精心包裝的去中心化的架構里面卻包含了中心化的服務,真是如鯁在喉,不吐不快。好在,Fabric早就意識到了這個問題,很早就在計劃要引入raft。一個公開,平等的聯盟鏈體系里,每個企業(yè)都能部署自己的排序服務。
在分布式一致性算法方面raft可以說非常成熟,算法本身非常精妙。想要搞懂這部分實現,還是需要一些背景知識的,強烈建議先去學習下。
Fabric的這部分主要是用到了etcd的raft庫的實現,實際就是raft算法的標準實現,至于網絡通訊及存儲部分,則留給應用層自己。之后你可以看到Fabric還是做了不少工作,以后如果etcdraft能獨立出來,我想更有利于應用接入。
名詞解釋
| 名詞 | 解釋 |
|---|---|
| Term | 任期 |
| Vote | 選舉投票 |
| Entry | 日志數據條目 |
| candidate | 候選人 |
| leader | 領導者 |
| follower | 跟隨者 |
| commit | 提交 |
| propose | 提議 |
配置
Orderer: &OrdererDefaults
OrdererType: etcdraft
Addresses:
- orderer1st-ordererorg:7050
- orderer2nd-ordererorg:7050
- orderer3rd-ordererorg:7050
BatchTimeout: 2s
BatchSize:
MaxMessageCount: 500
AbsoluteMaxBytes: 98 MB
PreferredMaxBytes: 512 KB
EtcdRaft:
Consenters:
- Host: orderer1st-ordererorg
Port: 7050
ClientTLSCert: ...
ServerTLSCert: ...
- Host: orderer2nd-ordererorg
Port: 7050
ClientTLSCert: ...
ServerTLSCert: ...
- Host: orderer3rd-ordererorg
Port: 7050
ClientTLSCert: ...
ServerTLSCert: ...
Options:
TickInterval: 100
ElectionTick: 10
HeartbeatTick: 1
MaxInflightMsgs: 256
MaxSizePerMsg: 1048576
SnapshotInterval: 500
可以看到Raftnode就是Orderer自己啦,并沒有在Orderer上再建立Raft集群的概念,跟kafka還是有區(qū)別。
Raft
Node

Raft庫有提供Node來與應用層互動。
名詞 解釋 Tick 這個就像是Raft的發(fā)條,要每隔一段時間來調度這里,驅動選舉和心跳 Advance 告訴raft,上次推送的ready,我已經處理完畢,準備好處理下一個Ready Ready Raft世界的風吹草動會通知這里,這非常重要,后面會講到 Step 將收到的消息寫入狀態(tài)機 ProposeConfChange 提交配置變更 Propose 提議寫入數據到日志中,可能會返回錯誤。 Campaign 調用該函數將驅動節(jié)點進入候選人狀態(tài),將競爭leader。 ApplyConfChange 應用配置變更
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
在Raft世界里一切風吹草動差不多都在這里了,應用層要跟raft來互動的話,這里是一切動作的起源。搞懂這些字段的作用是理解實現的關鍵。
名詞 解釋 SoftState 記錄的是當前任期的leader是誰,以及該節(jié)點在raft集群的角色,易變的狀態(tài)不需要保存 HardState 需要寫入持久化存儲中,包括:節(jié)點當前Term、Vote、Commit Entries 在向其他集群發(fā)送消息之前需要先寫入持久化存儲的日志數據 Snapshot 需要寫入持久化存儲中的快照數據 CommittedEntries 需要輸入到狀態(tài)機中的數據,這些數據之前已經被保存到持久化存儲中了 Messages 在entries被寫入持久化存儲中以后,需要發(fā)送出去的數據
Raft->Orderer
case rd := <-n.Ready():
if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
}
// skip empty apply
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
}
n.Advance()
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
n.send(rd.Messages)
這里處理了Raft發(fā)來的Ready通知。
- 首先不管怎么樣,只要收到Ready,先把Entries,HardState,Snapshot存儲在本地。要注意存下來并不代表會寫入狀態(tài)機,先收下來比較重要,Raft之后會保證哪些是需要應用到狀態(tài)機的。因為Raft庫沒有存儲支持,所以需要應用進行接管。
- 如果含有snapshot快照,通知snapC,這里后面再講
- len(rd.CommittedEntries) != 0 || rd.SoftState != nil,這里說明如果有CommittedEntries或SoftState變更,通知applyC
- 全部處理完,Advance,通知Raft處理完畢,可以發(fā)下一個Ready了。
- 因為Raft庫沒有網絡支持,所以node間的消息交互需要應用進行接管。這個后面再講。
存儲
func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
if err := rs.wal.Save(hardstate, entries); err != nil {
return err
}
if !raft.IsEmptySnap(snapshot) {
if err := rs.saveSnap(snapshot); err != nil {
return err
}
if err := rs.ram.ApplySnapshot(snapshot); err != nil {
if err == raft.ErrSnapOutOfDate {
rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
snapshot.Metadata.Term, snapshot.Metadata.Index)
} else {
rs.lg.Fatalf("Unexpected programming error: %s", err)
}
}
}
if err := rs.ram.Append(entries); err != nil {
return err
}
return nil
}
- HardState和Entries寫入WAL
- Snapshot寫入snap
- Snapshot和Entries放到MemoryStorage,可以看成是storage的cache層
快照
case sn := <-c.snapC:
if sn.Metadata.Index <= c.appliedIndex {
c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
break
}
b := utils.UnmarshalBlockOrPanic(sn.Data)
c.lastSnapBlockNum = b.Header.Number
c.confState = sn.Metadata.ConfState
c.appliedIndex = sn.Metadata.Index
if err := c.catchUp(sn); err != nil {
sn.Metadata.Term, sn.Metadata.Index, err)
}
- 如果狀態(tài)機的index比快照還要新,那繼續(xù)下去沒有意義了
- 將快照的數據給chain做更新
- 注意這里的confState,里面記錄了成員列表,以及l(fā)earner列表。
- 基本上收到快照的都是不成器的follower或新來的learner,要努力跟leader保持一致,所以要調用catchUp
- 多提一句,learner不參加選舉,是因為它落后太多了,為了不擾亂民主程序的正常進行,先靠邊站,等你跟我一致了,你再來把。
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
b, err := utils.UnmarshalBlock(snap.Data)
if err != nil {
return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
}
if c.lastBlock.Header.Number >= b.Header.Number {
c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
return nil
}
puller, err := c.createPuller()
if err != nil {
return errors.Errorf("failed to create block puller: %s", err)
}
defer puller.Close()
var block *common.Block
next := c.lastBlock.Header.Number + 1
c.logger.Infof("Catching up with snapshot taken at block %d, starting from block %d", b.Header.Number, next)
for next <= b.Header.Number {
block = puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block %d from cluster", next)
}
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
} else {
c.support.WriteBlock(block, nil)
}
next++
}
c.lastBlock = block
c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
return nil
}
- 這里有個技巧是快照是怎么構建的,這里后面會講到,其實就是保存的那段快照區(qū)間的最后一個block。
- 這里用到了Puller,這里底層就是對接的Orderer的deliver服務拉取block。
- 下面就很明顯了,去拉取該區(qū)間的block的同時寫入本地賬本,并更新lastblock標記位。
applyC
SoftState
case app := <-c.applyC:
if app.soft != nil {
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
if newLeader != soft.Lead {
c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
c.Metrics.LeaderChanges.Add(1)
atomic.StoreUint64(&c.lastKnownLeader, newLeader)
if newLeader == c.raftID {
propC, cancelProp = becomeLeader()
}
if soft.Lead == c.raftID {
becomeFollower()
}
}
...
soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}
// notify external observer
select {
case c.observeC <- soft:
default:
}
}
- 收到這個通知,就代表可能變天了,要換領導。
- 看下新來的領導任命書跟現在所知的是不是一個人,如果不是,不好意思就是這么現實,開始工作交接。
- 看下是不是自己當選,如是becomeLeader。不用懷疑。
- 如果上次是本人當選,這次換人的話,那leader職權得立即停止,becomeFollower。
- 記錄最新得softstate通知observeC,不過當前外部沒有人關注這個事情。
becomeLeader
becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
c.Metrics.IsLeader.Set(1)
c.blockInflight = 0
c.justElected = true
submitC = nil
ch := make(chan *common.Block, c.opts.MaxInflightMsgs)
// if there is unfinished ConfChange, we should resume the effort to propose it as
// new leader, and wait for it to be committed before start serving new requests.
if cc := c.getInFlightConfChange(); cc != nil {
// The reason `ProposeConfChange` should be called in go routine is documented in `writeConfigBlock` method.
go func() {
if err := c.Node.ProposeConfChange(context.TODO(), *cc); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = cc
c.configInflight = true
}
// Leader should call Propose in go routine, because this method may be blocked
// if node is leaderless (this can happen when leader steps down in a heavily
// loaded network). We need to make sure applyC can still be consumed properly.
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context, ch <-chan *common.Block) {
for {
select {
case b := <-ch:
data := utils.MarshalOrPanic(b)
if err := c.Node.Propose(ctx, data); err != nil {
c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
return
}
c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)
case <-ctx.Done():
c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
return
}
}
}(ctx, ch)
return ch, cancel
}
其實最主要就是后面的函數,外部調用propC, cancelProp = becomeLeader(),會循環(huán)監(jiān)聽propC通道,然后將data用Node.Propose發(fā)給Raft,這個后面再講。
再Raft的世界里,leader就是王道,它的話就是圣旨,只有l(wèi)eader才有資格Propose東西出去。所以選上的最重要的事情就是拿到與Raft的溝通權力。
如果有配置未提交,c.Node.ProposeConfChange
這里什么地方會通知到ch,這里賣個關子,后面會講到。
becomeFollower
becomeFollower := func() {
cancelProp()
c.blockInflight = 0
_ = c.support.BlockCutter().Cut()
stop()
submitC = c.submitC
bc = nil
c.Metrics.IsLeader.Set(0)
}
交出權力的心情是痛苦的,我們看下做了什么?
原來以為cancelProp會斷開與propC的關系,現在看來do nothing,一是從理論上來說,follower不會有propose的機會,二是給最后一次的超時補償做準備。
blockInflight是代表說leader會記錄propose出去的block,是不是在Raft里面形成了大多數一致,如果達成一致,leader會在本地commit,這個時候才會移除掉這條記錄。
-
c.support.BlockCutter().Cut(), 這里有個疑問,這種調法會清理掉pendingBatch,真的這么肯定到這里不會剩下沒有處理完的么?
func (r *receiver) Cut() []*cb.Envelope { r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds()) r.PendingBatchStartTime = time.Time{} batch := r.pendingBatch r.pendingBatch = nil r.pendingBatchSizeBytes = 0 return batch } stop,既然上面已經清掉了pending,那這里再stop pendingbatch的超時處理,也就沒有什么問題。
submitC通道是代表接受客戶端的數據提交,這個后面再講。
bc就是blockcreator,里面保存的最近一次創(chuàng)建block的信息,既然你都卸任了,這些也就沒什么意義了。
CommittedEntries
func (c *Chain) apply(ents []raftpb.Entry) {
if len(ents) == 0 {
return
}
if ents[0].Index > c.appliedIndex+1 {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}
var appliedb uint64
var position int
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
break
}
// We need to strictly avoid re-applying normal entries,
// otherwise we are writing the same block twice.
if ents[i].Index <= c.appliedIndex {
c.logger.Debugf("Received block with raft index (%d) <= applied index (%d), skip", ents[i].Index, c.appliedIndex)
break
}
block := utils.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index)
appliedb = block.Header.Number
c.Metrics.CommittedBlockNumber.Set(float64(appliedb))
position = i
c.accDataSize += uint32(len(ents[i].Data))
...
if c.accDataSize >= c.sizeLimit {
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block %d, last snapshotted block number is %d",
c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
c.accDataSize = 0
c.lastSnapBlockNum = appliedb
c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
}
}
return
}
再Raft的世界里面,確認提交的有兩種entry啦,一種就是所謂的普通,一種就是配置變更。
- 遍歷普通日志,如果是已經寫入狀態(tài)機,也就是寫入本地賬本的block, 那當然要拒絕,免得重復。
- 接下就是writeblock到本地啦。
- 然后記錄這次處理到第幾個了,最后再統計這次總共處理的datasize,就是blocksize累加啦。這個之后會有妙用,后面再講。
下面我們看下writeBlock的邏輯
writeBlock
func (c *Chain) writeBlock(block *common.Block, index uint64) {
if c.blockInflight > 0 {a
c.blockInflight-- // only reduce on leader
}
c.lastBlock = block
c.logger.Debugf("Writing block %d to ledger", block.Header.Number)
if utils.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
return
}
c.raftMetadataLock.Lock()
c.opts.RaftMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.RaftMetadata)
c.raftMetadataLock.Unlock()
c.support.WriteBlock(block, m)
}
- blockInflight前面講過了,這里收到代表我發(fā)出去的propose收到了群眾的強烈支持,那這個提案就過了。剩下就是好好把提案落地就好。
- 配置部分有機會單獨講,本身不影響主要的流程,這里先跳過
- c.support.WriteBlock(block, m),就是寫本地賬本啦。
if c.accDataSize >= c.sizeLimit {
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block %d, last snapshotted block number is %d",
c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
c.accDataSize = 0
c.lastSnapBlockNum = appliedb
c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
}
}
func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte) error {
rs.lg.Debugf("Creating snapshot at index %d from MemoryStorage", i)
snap, err := rs.ram.CreateSnapshot(i, &cs, data)
if err != nil {
return errors.Errorf("failed to create snapshot from MemoryStorage: %s", err)
}
if err = rs.saveSnap(snap); err != nil {
return err
}
rs.snapshotIndex = append(rs.snapshotIndex, snap.Metadata.Index)
// Keep some entries in memory for slow followers to catchup
if i > rs.SnapshotCatchUpEntries {
compacti := i - rs.SnapshotCatchUpEntries
rs.lg.Debugf("Purging in-memory raft entries prior to %d", compacti)
if err = rs.ram.Compact(compacti); err != nil {
if err == raft.ErrCompacted {
rs.lg.Warnf("Raft entries prior to %d are already purged", compacti)
} else {
rs.lg.Fatalf("Failed to purge raft entries: %s", err)
}
}
}
rs.lg.Infof("Snapshot is taken at index %d", i)
rs.gc()
return nil
}
- 如果累加的accDataSize超過閾值,這里會將寫入的最后一個block的相關信息通知給gcC通道。
- gcC再轉調takeSnapshot
- TakeSnapshot里面很簡單,就是生成snapshot,包括任期,最后一次的日志下標以及block。保存到本地snap。
- rs.gc里面涉及到一個閾值,MaxSnapshotFiles,如果超過,需要清理文件。首當其沖的是wal,看下是不是有比快照還要老的日志,有的話清掉。既然都有快照了,wal日志也就沒有存在的意義了。Raft的世界里index是一切衡量的基礎。snap文件就簡單,超過多少就刪多少。
if c.justElected {
msgInflight := c.Node.lastIndex() > c.appliedIndex
if msgInflight {
c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
continue
}
if c.configInflight {
c.logger.Debugf("There is config block in flight, new leader should not serve requests")
continue
}
c.logger.Infof("Start accepting requests as Raft leader at block %d", c.lastBlock.Header.Number)
bc = &blockCreator{
hash: c.lastBlock.Header.Hash(),
number: c.lastBlock.Header.Number,
logger: c.logger,
}
submitC = c.submitC
c.justElected = false
} else if c.configInflight {
c.logger.Info("Config block or ConfChange in flight, pause accepting transaction")
submitC = nil
} else if c.blockInflight < c.opts.MaxInflightMsgs {
submitC = c.submitC
}
- justElected就代表剛選上那會。過程自己體會。
- msgInflight就代表有MemoryStorage的entry還沒有寫入賬本啦,不宜出門接客
- configInflight也是一樣,有Raft配置變更或config block進來還沒有生效前,更加不宜出門接客
- 如果前面都過了,submitC = c.submitC就代表結果submitC通道,正式開始開門迎客。需要注意的是之后可進不到這里哦。
- 如果之前有過配置變更的干擾,c.blockInflight < c.opts.MaxInflightMsgs這里就是給她重新出門接客的機會。
- 還記不記得becomeFollower的時候立馬就能接客,而leader條件很多,說明leader要求高嘛。
到這里基本把Raft到Orderer的處理都講完了。
Messages
n.Advance()
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
n.send(rd.Messages)
Advance的意思是這波Ready我已經處理完了,我準備好再處理
前面提到過,EtcdRaft只關注算法本身,集群節(jié)點間怎么通訊,不是它關注的點,不過當然了,消息要發(fā)給誰,它是知道的,只不過想讓你代勞而已。
func (n *node) send(msgs []raftpb.Message) {
n.unreachableLock.RLock()
defer n.unreachableLock.RUnlock()
for _, msg := range msgs {
if msg.To == 0 {
continue
}
status := raft.SnapshotFinish
msgBytes := utils.MarshalOrPanic(&msg)
err := n.rpc.SendConsensus(msg.To, &orderer.ConsensusRequest{Channel: n.chainID, Payload: msgBytes})
if err != nil {
// TODO We should call ReportUnreachable if message delivery fails
n.logSendFailure(msg.To, err)
status = raft.SnapshotFailure
} else if _, ok := n.unreachable[msg.To]; ok {
n.logger.Infof("Successfully sent StepRequest to %d after failed attempt(s)", msg.To)
delete(n.unreachable, msg.To)
}
if msg.Type == raftpb.MsgSnap {
n.ReportSnapshot(msg.To, status)
}
}
}
還記得之前將snap寫入存儲吧?到這里一般的情況可以將狀態(tài)置為SnapshotFinish,但是保險起見,這波消息只要發(fā)送失敗就認為這次快照存儲失敗,情愿重發(fā)一次。
最后ReportSnapshot就是用來向Leader報告你發(fā)給我的快照的執(zhí)行情況。
Orderer->Raft
Orderer是怎么把消息發(fā)給Raft的呢?Fabric剝離了底層共識算法與Orderer的耦合,讓替換成為可能??催^之前kafka和solo的對這個應該很熟悉。
type Consenter interface {
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(config *cb.Envelope, configSeq uint64) error
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
// This is useful when consenter needs to temporarily block ingress messages so
// that in-flight messages can be consumed. It could return error if consenter is
// in erroneous states. If this blocking behavior is not desired, consenter could
// simply return nil.
WaitReady() error
}
只要是普通類型的事件都會走Order,來push到后端的共識服務。
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
這里封裝成SubmitRequest繼續(xù)往后傳遞
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
if err := c.isRunning(); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}:
lead := <-leadC
if lead == raft.None {
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("no Raft leader")
}
if lead != c.raftID {
if err := c.rpc.SendSubmit(lead, req); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
}
case <-c.doneC:
c.Metrics.ProposalFailures.Add(1)
return errors.Errorf("chain is stopped")
}
return nil
}
當然chain要是running狀態(tài),將收到的事件通知給submitC通道。
等待leadC的通知,先中斷下,看下要干嘛,其實就是返回當前任期的leader,當前任期不準確,應該是最新一任leader,因為有可能在某個任期leader沒有選出來,當然這個幾率非常非常低,因為有隨機超時的存在。
繼續(xù),拿到任命書,看下如果是raft.None, 說明現在還沒有領導,直接return,表示這個事件我不能收,收下真的處理不了。
如果leader不是本人,那問題大了,在Raft的世界里只有l(wèi)eader才能發(fā)號施令,現在這個事件怎么辦?丟掉又可惜,因為如果都發(fā)給leader,那那邊壓力太大了。既然你沒有篡位之意,那努力給你的領導分憂不是,借助rpc將這個事件發(fā)給他好了。RPC模塊是給orderer間通訊用的,也就是Raftnode間通訊用的。沒它整個體系你玩不轉的,以后有機會再講把。
前面Raft->Orderer章節(jié),我們講了用submitC來通知開門迎客,下面我們看下接客會做些什么?
submitC
case s := <-submitC:
if s == nil {
// polled by `WaitReady`
continue
}
if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
s.leader <- raft.None
continue
}
s.leader <- soft.Lead
if soft.Lead != c.raftID {
continue
}
batches, pending, err := c.ordered(s.req)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
}
if pending {
start() // no-op if timer is already started
} else {
stop()
}
c.propose(propC, bc, batches...)
if c.configInflight {
c.logger.Info("Received config block, pause accepting transaction till it is committed")
submitC = nil
} else if c.blockInflight >= c.opts.MaxInflightMsgs {
c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
c.blockInflight, c.opts.MaxInflightMsgs)
submitC = nil
}
- 如果當前節(jié)點的狀態(tài)是準候選人或候選人,那就沒什么好說了,leader現在還沒有產生
- 如果soft.Lead != c.raftID,說明什么,說明最新任期不是自己哦,沒有propose的權利,丟棄這次請求。
- batches, pending, err := c.ordered(s.req),很熟悉了,這里負責出包。
- 如果還有剩下的事件沒有出包,為了保證不浪費,啟動計時器,來做補償,這部分后面再講。
- c.propose(propC, bc, batches...),重點,這里是真正給Raft發(fā)狀態(tài)的地方,后面講到。
- 最后無非就是一些異常情況,會讓leader失去接受請求的能力。
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch)
c.logger.Debugf("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b:
default:
c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
}
// if it is config block, then we should wait for the commit of the block
if utils.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
return
}
還記不記得前面講becomeLeader的時候提到的ch,這里最后會一個接一個的將block通知到ch。再貼一遍那邊的代碼。
go func(ctx context.Context, ch <-chan *common.Block) {
for {
select {
case b := <-ch:
data := utils.MarshalOrPanic(b)
if err := c.Node.Propose(ctx, data); err != nil {
c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
return
}
c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)
case <-ctx.Done():
c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
return
}
}
}(ctx, ch)
最終會調用c.Node.Propose(ctx, data)的方法。
Propose的意思就是將日志廣播出去,要群眾都盡量保存起來,但還沒有提交,等到leader收到半數以上的群眾都響應說已經保存完了,leader這時就可以提交了,下一次Ready的時候就會帶上committedindex。
超時處理
case <-timer.C():
ticking = false
batch := c.support.BlockCutter().Cut()
if len(batch) == 0 {
c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
continue
}
c.logger.Debugf("Batch timer expired, creating block")
c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
沒有新意,無非就是將pending的做一次Cut,然后propose到Raft。
配置更新
配置部分是Raft不可忽略的一部分,Fabric是怎樣將成員的變更傳遞給Raft的?
首先我們回到Node接口,看下ProposeConfChange和ApplyConfChange。
一個是通知Raft,有配置變更。另外一個是接到Raft通知,有配置更新,立即執(zhí)行。
ProposeConfChange
func (c *Chain) writeBlock(block *common.Block, index uint64) {
...
if utils.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
return
}
...
}
還記得么,前面提到的writeBlock里面會判斷當前寫入的是不是configblock
func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
metadata, raftMetadata := c.newRaftMetadata(block)
var changes *MembershipChanges
if metadata != nil {
changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
}
confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
raftMetadata.RaftIndex = index
raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
// write block with metadata
c.support.WriteConfigBlock(block, raftMetadataBytes)
c.configInflight = false
// update membership
if confChange != nil {
// We need to propose conf change in a go routine, because it may be blocked if raft node
// becomes leaderless, and we should not block `serveRequest` so it can keep consuming applyC,
// otherwise we have a deadlock.
go func() {
// ProposeConfChange returns error only if node being stopped.
// This proposal is dropped by followers because DisableProposalForwarding is enabled.
if err := c.Node.ProposeConfChange(context.TODO(), *confChange); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = confChange
switch confChange.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", confChange.NodeID)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", confChange.NodeID)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
c.configInflight = true
}
c.raftMetadataLock.Lock()
c.opts.RaftMetadata = raftMetadata
c.raftMetadataLock.Unlock()
}
第一眼就可以得出一個結論,在Fabric的世界里,Raft的配置更新是包括在ConfigBlock里面的,只不過在block寫入賬本之前會從里面剝離出來涉及到Raft的配置變更的部分,然后去通知Raft。
- 去ConfigBlock里面拿到EtcdRaft部分的配置,以及當前在用的配置
- 做比對,得出一份報告,這次新增了哪幾個節(jié)點,要刪除哪幾個幾點
- 得出報告還不行,還得結合Raft的規(guī)定,出局一份書面的申請。UpdateRaftMetadataAndConfChange就是干這個的。
func (mc *MembershipChanges) UpdateRaftMetadataAndConfChange(raftMetadata *etcdraft.RaftMetadata) *raftpb.ConfChange {
if mc == nil || mc.TotalChanges == 0 {
return nil
}
var confChange *raftpb.ConfChange
// producing corresponding raft configuration changes
if len(mc.AddedNodes) > 0 {
nodeID := raftMetadata.NextConsenterId
raftMetadata.Consenters[nodeID] = mc.AddedNodes[0]
raftMetadata.NextConsenterId++
confChange = &raftpb.ConfChange{
ID: raftMetadata.ConfChangeCounts,
NodeID: nodeID,
Type: raftpb.ConfChangeAddNode,
}
raftMetadata.ConfChangeCounts++
return confChange
}
if len(mc.RemovedNodes) > 0 {
for _, c := range mc.RemovedNodes {
for nodeID, node := range raftMetadata.Consenters {
if bytes.Equal(c.ClientTlsCert, node.ClientTlsCert) {
delete(raftMetadata.Consenters, nodeID)
confChange = &raftpb.ConfChange{
ID: raftMetadata.ConfChangeCounts,
NodeID: nodeID,
Type: raftpb.ConfChangeRemoveNode,
}
raftMetadata.ConfChangeCounts++
break
}
}
}
}
return confChange
}
有沒有發(fā)現,這里執(zhí)行下來每次只會更新一個節(jié)點,意味著每次更新Raft成員信息的時候,每次只能新增或刪除一個節(jié)點,否則剩下的是不會生效的。這里感興趣的可以參考Raft論文,每次只變更一個節(jié)點,是性價比高的實現。
if len(confState.Nodes) == len(c.opts.RaftMetadata.Consenters) { // since configuration change could only add one node or // remove one node at a time, if raft nodes state size // equal to membership stored in block metadata field, // that means everything is in sync and no need to propose // update return nil }寫入ConfigBlock到本地賬本
c.Node.ProposeConfChange,這里就是通知Raft做配置更新了。
設置configInflight=true,表示現在有個配置更新已經提案給Raft了,等通知。
記錄本次更新到confChangeInProgress用來之后的跟蹤進度
ApplyConfChange
回憶下,當我們的提案發(fā)到Raft后,我怎么知道成員都達成一致準備開干呢?當然是等待Ready的CommittedEntries啦,最終會通知applyc通道。
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
continue
}
c.confState = *c.Node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
// This ConfChange was introduced by a previously committed config block,
// we can now unblock submitC to accept envelopes.
if c.confChangeInProgress != nil &&
c.confChangeInProgress.NodeID == cc.NodeID &&
c.confChangeInProgress.Type == cc.Type {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.RaftMetadata.Consenters)))
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
// calling goroutine, since otherwise it will be blocked
// trying to write into haltC
go c.Halt()
}
}
- c.Node.ApplyConfChange(cc),總算是看到了,這里就是執(zhí)行配置更新了。
- 還記得上面會去記錄confChangeInProgress么?如果相等說明之前給Raft的提案,終于收到了響應,大家都準備好了,開始吧。
- c.configureComm()會在cluster章節(jié)講解,這里簡單的說就是按照最新的成員,構建Raft網絡。
- 釋放configInflight和confChangeInProgress,代表本次配置更新完畢。
- 如果接收到的是刪除節(jié)點的通知,看下是不是本人,如果是,調用Halt,想也知道,最終會去調Node的Stop,停掉該Raft節(jié)點。
最后
關于通訊層也是很重要的部分,這里不光是托管Raft的消息傳遞,也是支撐Orderer cluster的關鍵,下次單獨拿來講吧。
etcdraft的部分差不多就是這樣了,當然了,有很多細節(jié)沒有涉及,比如config的部分。