下面將從下面幾個部分介紹PBFT共識機制
- Fabric事件機制
- Fabric可插拔共識機制
- Fabric共識引擎初始化
- Fabric共識接口
部分代碼將以python縮進格式對源代碼進行簡化。
1. Fabric事件機制
fabric中幾乎所有共識的驅(qū)動都是通過event來進行分發(fā)的。想要了解fabric共識時如何運作的,必須先了解fabric的分發(fā)機制。
fabric通過事件管理器用于來管理事件,一般需要管理多個事件并且按事件接收的先后順序來處理。
事件管理器
type Manager interface {##
Inject(Event) // 允許事件管理器線程跳過隊列的臨時接口
Queue() chan<- Event // 返回一個類型為Event的channel,用于存儲事件
SetReceiver(Receiver) // 設置事件處理對象
Start() // 啟動Manager線程
Halt() // 停止Manager線程
}
-
Queue()接口返回一個類型為Event的channel,用于存儲事件。Fabric用一個隊列來存儲事件,Queue()返回該channel對象。 -
Start()方法會啟動一個goroutine循環(huán)處理接收到的事件,通過channel能夠保證只有接收到事件才會處理,不用每時每刻循環(huán)檢查隊列去執(zhí)行事件,浪費CPU性能。 -
SetReceiver(Recevier)需要設置事件管理器的實際處理者,Receiver接口需要實現(xiàn)ProcessEvent(Event) Event方法。 -
Inject(Event)會在Queue()收到事件之后將事件發(fā)放給Receiver進行處理 -
Halt()停止Manager線程
總結(jié):Start() 檢測 Queue() 中是否收到event; 收到event后Inject(Event)將event交給Receiver進行處理;不同的consensus實現(xiàn)不同的Receiver功能。
事件來源
這部分比較繁瑣,可以跳過,直接進入總結(jié)
1.1 客戶端通過調(diào)用fabric的RESTful接口/chaincode調(diào)用鏈代碼或者部署鏈代碼,fabric在處理請求的時候(fabric/core/rest/rest_api.go.ProcessChaincode)再通過JSON RPC向peer節(jié)點發(fā)起執(zhí)行事務請求,hyperledger/fabric/core/devops.go的Deplopy、invokeOrQuery方法,會調(diào)用peer.Impl的ExecuteTransaction方法,如下面代碼所示:
// hyperledger/fabric/core/peer/peer.go
func (p *Impl) ExecuteTransaction(transaction *pb.Transaction) (response *pb.Response) {
if p.isValidator {
response = p.sendTransactionsToLocalEngine(transaction)
} else {
peerAddresses := p.discHelper.GetRandomNodes(1)
response = p.SendTransactionsToPeer(peerAddresses[0], transaction)
}
return response
}
1.2 peer節(jié)點在啟動時,讀取配置文件core.yaml的文件配置項peer.validator.enabled的值,peer根據(jù)這個值將自身設置為validator或者非validator。validator與非validator的區(qū)別在于:前者能夠直接執(zhí)行事務,而后者不直接執(zhí)行事務而是通過gRPC的方式調(diào)用validator節(jié)點來執(zhí)行事務(相當于轉(zhuǎn)發(fā)事務),詳細請參見SendTransactionsToPeer的實現(xiàn),最終請求會定向到sendTransactionsToLocalEngine。
1.3 sendTransactionsToLocalEngin方法會調(diào)用p.engine.ProcessTransactionMsg,p.engine為結(jié)構(gòu)體EngineImpl,這是Engine接口實例。Engine在啟動peer時候創(chuàng)建,后面會詳細說明。Engine這個接口用于管理peer網(wǎng)絡的通訊和處理事務。EngineImpl的結(jié)構(gòu)如下:
// hyperledger/fabric/consensus/helper/engine.go
type EngineImpl struct {
consenter consensus.Consenter // 每個共識插件都需要實現(xiàn)Consenter接口,包括RecvMsg方法和ExecutionConsumer接口的里函數(shù)(可以直接返回)
helper *Helper // 包含一些工具類方法,可以調(diào)用外部接口,比如獲取網(wǎng)絡信息,消息簽名、驗證,持久化一些對象等
peerEndpoint *pb.PeerEndpoint
consensusFan *util.MessageFan
}
1.4 ProcessTransactionMsg的代碼如下,可以看見鏈代碼查詢事務直接執(zhí)行不需要進行共識,因為讀取某個peer節(jié)點的賬本不會影響自身以及其他peer節(jié)點賬本,所以不需要共識來同步。而鏈代碼調(diào)用和部署事務會影響到單個peer節(jié)點賬本和狀態(tài),所以會調(diào)用共識插件的RecvMsg函數(shù)來保證各個peer節(jié)點的賬本和狀態(tài)一致。
// hyperledger/fabric/consensus/helper/engine.go
func (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx *pb.Transaction) (response *pb.Response) {
//TODO: Do we always verify security, or can we supply a flag on the invoke ot this functions so to bypass check for locally generated transactions?
if tx.Type == pb.Transaction_CHAINCODE_QUERY {
// ...
result, _, err := chaincode.Execute(cxt, chaincode.GetChain(chaincode.DefaultChain), tx) // 直接執(zhí)行查詢事務,不需要共識
// ...
} else {
// ...
err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID) // 使用共識插件保證各個peer節(jié)點賬本和狀態(tài)保持一致
if err != nil {
response = &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(err.Error())}
}
// ...
1.5 RecvMsg函數(shù)在Fabric中有兩個版本,一個是PBFT版本,一個是Noops版本。
- NOOPS:用于開發(fā)和測試使用的插件,當一個validator節(jié)點收到一個事務消息時,會把消息轉(zhuǎn)為共識消息,并會向所有節(jié)點廣播共識消息。一般情況下,所有節(jié)點都會接收到這條共識消息,并執(zhí)行消息里的事務。這是一種比較樸素的共識方式,一旦因為網(wǎng)絡或者其他原因,有些節(jié)點沒收到廣播消息,就會存在狀態(tài)不一致問題,所以不只用于開發(fā)和測試。
- PBFT:PBFT算法實現(xiàn)。簡單地說當網(wǎng)絡里的錯誤失效節(jié)點數(shù)量f與總的節(jié)點數(shù)量N滿足關(guān)系N>3f時,PBFT算法也能保證各個節(jié)點的狀態(tài)保持一致。但是實現(xiàn)PBFT算法的需要滿足以下的約束條件,所以在選擇共識算法時要對系統(tǒng)進行全面評估,基于系統(tǒng)自身情況選擇,不能盲目選擇。
這里我們對PBFT版本進行分析,查看一個event是如何進入事件管理器如何進行處理:
// consensus/pbft/external.go
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
// consensus/util/events/events.go
func (em *managerImpl) Queue() chan<- Event {
return em.events
}
可以看出PBFT收到消息之后調(diào)用共識插件的RecvMsg函數(shù)將event通過Queue()加入到消息管理器隊列(em.events)中,加入em.events的事件會有一個專門的線程對其進行處理,下面將會對這個過程進行討論。
總結(jié):
- 客戶端調(diào)用或部署鏈代碼時,調(diào)用
peer.Impl的ExecuteTransaction方法; -
ExecuteTransaction進入到sendTransactionsToLocalEngine函數(shù)進而執(zhí)行p.engine.ProcessTransactionMsg; -
ProcessTransactionMsg調(diào)用共識插件的RecvMsg函數(shù)將event通過Queue()加入到消息管理器隊列(em.events)中。
消息處理
Fabric的共識消息是通過eventLoop注射給對應處理函數(shù)的。
// consensus/util/events/events.go
func (em *managerImpl) Start() {
go em.eventLoop()
}
// eventLoop is where the event thread loops, delivering events
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
case <-em.exit:
logger.Debug("eventLoop told to exit")
return
}
}
}
// Inject can only safely be called by the managerImpl thread itself
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil
SendEvent(em.receiver, event)
}
// SendEvent performs the event loop on a receiver to completion
func SendEvent(receiver Receiver, event Event) {
next := event
for {
next = receiver.ProcessEvent(next)
if next == nil
break
}
}
// SetReceiver sets the destination for events
func (em *managerImpl) SetReceiver(receiver Receiver) {
em.receiver = receiver
}
-
RecvMsg函數(shù)將event通過Queue()加入到em.events中 -
eventLoop函數(shù)不斷的從em.events里取出事件,通過Inject注射給對應的Receiver,注意,通過SendEvent注射給接收者的ProcessEvent方法 -
SendEvent循環(huán)獲取receiver.ProcessEvent對象,如果不為nil,則不斷的調(diào)用receiver.ProcessEvent直到找到對應的消息處理函數(shù) -
SetReceiver函數(shù)修改receiver對象
關(guān)于ProcessEvent和Receiver的細節(jié)在可插拔共識中詳解。
總結(jié):RecvMsg函數(shù)通過Inject函數(shù)將event分發(fā)給receiver.ProcessEvent
2、可插拔共識
fabric/consensus/consensus.go對外提供共識模塊的方法調(diào)用。
其中最核心也是每個共識必須實現(xiàn)的接口是Consenter。
// fabric/consensus/consensus.go
type ExecutionConsumer interface {
Executed(tag interface{})
Committed(tag interface{}, target *pb.BlockchainInfo)
RolledBack(tag interface{})
StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error
ExecutionConsumer
}
-
RecvMsg處理共識引擎在收到客戶端發(fā)來的event事件 -
Executed,Committed,RolledBack,StateUpdated等方法來監(jiān)聽異步交易中的各個步驟執(zhí)行情況,并進行處理。
Fabric的PBFT都是通過加入em.events之后通過eventLoop捕獲事件之后再進行處理。所有event都是通過Receiver.ProcessEvent等待事件處理。
// fabric/consensus/pbft/external.go
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID)
eer.manager.Queue() <- batchMessageEvent{msg: ocMsg, sender: senderHandle,}
// Executed is called whenever Execute completes
func (eer *externalEventReceiver) Executed(tag interface{})
eer.manager.Queue() <- executedEvent{tag}
// Committed is called whenever Commit completes
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo)
eer.manager.Queue() <- committedEvent{tag, target}
// RolledBack is called whenever a Rollback completes
func (eer *externalEventReceiver) RolledBack(tag interface{})
eer.manager.Queue() <- rolledBackEvent{}
// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *...) StateUpdated(tag interface{}, target *pb.BlockchainInfo)
eer.manager.Queue() <- stateUpdatedEvent{ chkpt: tag.(*...),target: target}
PBFT在ProcessEvent中會對各個事件進行處理
// allow the primary to send a batch when the timer expires
func (op *obcBatch) ProcessEvent(event events.Event) events.Event {
logger.Debugf("Replica %d batch main thread looping", op.pbft.id)
switch et := event.(type) {
case batchMessageEvent:
ocMsg := et
return op.processMessage(ocMsg.msg, ocMsg.sender) // ocMsg的消息類型仍為鏈代碼事務類型
case executedEvent:
op.stack.Commit(nil, et.tag.([]byte))
case committedEvent:
logger.Debugf("Replica %d received committedEvent", op.pbft.id)
return execDoneEvent{}
// ...
case stateUpdatedEvent:
op.reqStore = newRequestStore()
return op.pbft.ProcessEvent(event)
default:
return op.pbft.ProcessEvent(event)
}
pbft.ProcessEvent(event)定義了PBFT共識三個階段的函數(shù)實現(xiàn)
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
// ...
case *RequestBatch:
err = instance.recvRequestBatch(et)
case *PrePrepare:
err = instance.recvPrePrepare(et)
case *Prepare:
err = instance.recvPrepare(et)
case *Commit:
err = instance.recvCommit(et)
case *Checkpoint:
return instance.recvCheckpoint(et)
case *ViewChange:
return instance.recvViewChange(et)
case *NewView:
return instance.recvNewView(et)
case *FetchRequestBatch:
err = instance.recvFetchRequestBatch(et)
case returnRequestBatchEvent:
return instance.recvReturnRequestBatch(et)
// ...
}
總結(jié):
- Fabric所有共識必須實現(xiàn)
Consenter接口 - 修改共識需要修改
Receiver來對事件進行處理 - PBFT通過事件管理器將
event交給receiver.ProcessEvent進行處理
這個時候肯定會有人問了,說好的可插拔共識機制呢,怎么全部變成PBFT實現(xiàn)了。別急,這不準備開始了:
- 自定義共識時,首先實現(xiàn)
Consenter接口 - 其次調(diào)用
SetReceiver修改Receiver - 重寫
receiver.ProcessEvent - 所有處理可以通過事件管理器將
event交給receiver.ProcessEvent進行處理然后放回事件管理器。
沒看明白?為什么實現(xiàn)Consenter接口就可以自定義共識了,為什么就可插拔了?這就對了,因為還沒講到共識引擎初始化呢!上面介紹了一部分可插拔的事件處理機制。后面介紹一下自定義可插拔共識,介紹如何Fabric實現(xiàn)可插拔。
3、Fabric共識引擎初始化
初始化共識引擎
PBFT的共識引擎在啟動peer時自動初始化。具體調(diào)用過程為
// peer/node/start.go
func serve(args []string) error
peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)
// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
engineOnce.Do(func()
engine = new(EngineImpl)
engine.helper = NewHelper(coord)
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()
go func()
for msg := range engine.consensusFan.GetOutChannel()
engine.consenter.RecvMsg(msg.Msg, msg.Sender)
)
return engine, err
}
-
GetEngine的作用是進行共識模塊的初始化,同時啟動一個goroutine等待消息進入。 -
GetEngine初始化一個consenter和一個helper,并互相把一個句柄賦值給了對方。這樣做的目的,就是為了可以讓外部調(diào)用內(nèi)部,內(nèi)部可以調(diào)用外部。
注意:這里的consenter即為第二節(jié)的共識接口。
選擇Consenter
engine.consenter是在consensus/controller/controller.go里選擇
// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error)
engine.consenter = controller.NewConsenter(engine.helper)
// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {
plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
if plugin == "pbft"
return pbft.GetPlugin(stack)
return noops.GetNoops(stack)
}
默認選擇的是noops,如果需要添加自己編寫的共識模塊需要在這里自行添加判斷。
NOOPS:用于開發(fā)和測試使用的插件,當一個validator節(jié)點收到一個事務消息時,會把消息轉(zhuǎn)為共識消息,并會向所有節(jié)點廣播共識消息。一般情況下,所有節(jié)點都會接收到這條共識消息,并執(zhí)行消息里的事務。
初始化PBFT
如果選擇了PBFT則會調(diào)用consensus/pbft/pbft.go進行初始化。
使用PBFT的batch模式啟動時會調(diào)用newObcBatch進行PBFT算法初始化
// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
...
op.manager = events.NewManagerImpl()
op.manager.SetReceiver(op)
etf := events.NewTimerFactoryImpl(op.manager)
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
blockchainInfoBlob := stack.GetBlockchainInfoBlob()
op.externalEventReceiver.manager = op.manager
...
return op
}
newObcBatch主要做了這幾項工作
- 初始化了
eventLoop的消息隊列 - 設置了消息的接收者,用來處理對應的消息
- 創(chuàng)建監(jiān)聽消息超時的定時器
- 初始化
pbft算法 - 啟動消息隊列,不斷監(jiān)聽事件的到來并且分發(fā)給接收者處理
總結(jié):可以看出共識引擎初始化時的幾個步驟:
- 初始化
eventLoop - 設置
Receiver - 初始化共識算法
- 啟動事件管理器
至此,之前所講的所有東西全部啟動了?;貞浺幌抡麄€流程:
- 事件由客戶端發(fā)起,經(jīng)過一系列步驟執(zhí)行到
RecvMsg函數(shù) -
RecvMsg函數(shù)由Consenter定義,Consenter在NewConsenter內(nèi)初始化,NewConsenter可插播 -
Consenter提供共識模塊的方法調(diào)用ExecutionConsumer接口,Consenter可插拔 -
receiver.ProcessEvent負責處理事件管理器隊列內(nèi)的事件,Receiver可插播
總結(jié): 所有與共識有關(guān)的接口全部可插拔,并且提供了相應的接口,自定義實現(xiàn)Consenter和Receiver,在初始化的時候 SetReceiver,即可實現(xiàn)自定義共識。
4. Fabric共識接口
consensus.go主要包含了共識算法插件內(nèi)部對外暴露的接口和外部對內(nèi)暴露的接口??偨Y(jié)如下:



目錄結(jié)構(gòu):
可以看到共識模塊目錄如下。
consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events
目錄含義如下
controller 用來控制Fabric選擇什么樣的共識算法,默認是noops。
executor 封裝了消息隊列中對交易的處理。
helper 對外提供接口調(diào)用和數(shù)據(jù)持久化接口。
noops 提供了如何編寫Fabric共識算法的Demo。
pbft PBFT算法的具體實現(xiàn)。
util 實現(xiàn)了一個peer節(jié)點到共識算法的一個消息通道,和一個消息隊列。
推薦閱讀:
[1] Hyperledger Fabric中PBFT算法詳解:
https://zhuanlan.zhihu.com/p/48899458
[2] Fabric源碼分析-共識模塊:
https://zhuanlan.zhihu.com/p/35255567