Fabric PBFT可插拔共識詳解

下面將從下面幾個部分介紹PBFT共識機制

  1. Fabric事件機制
  2. Fabric可插拔共識機制
  3. Fabric共識引擎初始化
  4. Fabric共識接口

部分代碼將以python縮進格式對源代碼進行簡化。

1. Fabric事件機制


fabric中幾乎所有共識的驅(qū)動都是通過event來進行分發(fā)的。想要了解fabric共識時如何運作的,必須先了解fabric的分發(fā)機制。

fabric通過事件管理器用于來管理事件,一般需要管理多個事件并且按事件接收的先后順序來處理。

事件管理器

type Manager interface {##
        Inject(Event)         // 允許事件管理器線程跳過隊列的臨時接口
        Queue() chan<- Event  // 返回一個類型為Event的channel,用于存儲事件
        SetReceiver(Receiver) // 設置事件處理對象
        Start()               // 啟動Manager線程
        Halt()                // 停止Manager線程
}
  1. Queue()接口返回一個類型為Event的channel,用于存儲事件。Fabric用一個隊列來存儲事件,Queue()返回該channel對象。
  2. Start()方法會啟動一個goroutine循環(huán)處理接收到的事件,通過channel能夠保證只有接收到事件才會處理,不用每時每刻循環(huán)檢查隊列去執(zhí)行事件,浪費CPU性能。
  3. SetReceiver(Recevier)需要設置事件管理器的實際處理者,Receiver接口需要實現(xiàn)ProcessEvent(Event) Event方法。
  4. Inject(Event)會在Queue()收到事件之后將事件發(fā)放給Receiver進行處理
  5. Halt()停止Manager線程

總結(jié):Start() 檢測 Queue() 中是否收到event; 收到event后Inject(Event)將event交給Receiver進行處理;不同的consensus實現(xiàn)不同的Receiver功能。


事件來源

這部分比較繁瑣,可以跳過,直接進入總結(jié)

1.1 客戶端通過調(diào)用fabric的RESTful接口/chaincode調(diào)用鏈代碼或者部署鏈代碼,fabric在處理請求的時候(fabric/core/rest/rest_api.go.ProcessChaincode)再通過JSON RPC向peer節(jié)點發(fā)起執(zhí)行事務請求,hyperledger/fabric/core/devops.go的Deplopy、invokeOrQuery方法,會調(diào)用peer.ImplExecuteTransaction方法,如下面代碼所示:

// hyperledger/fabric/core/peer/peer.go
func (p *Impl) ExecuteTransaction(transaction *pb.Transaction) (response *pb.Response) {
        if p.isValidator {
                response = p.sendTransactionsToLocalEngine(transaction)
        } else {
                peerAddresses := p.discHelper.GetRandomNodes(1)
                response = p.SendTransactionsToPeer(peerAddresses[0], transaction)
        }
        return response
}

1.2 peer節(jié)點在啟動時,讀取配置文件core.yaml的文件配置項peer.validator.enabled的值,peer根據(jù)這個值將自身設置為validator或者非validator。validator與非validator的區(qū)別在于:前者能夠直接執(zhí)行事務,而后者不直接執(zhí)行事務而是通過gRPC的方式調(diào)用validator節(jié)點來執(zhí)行事務(相當于轉(zhuǎn)發(fā)事務),詳細請參見SendTransactionsToPeer的實現(xiàn),最終請求會定向到sendTransactionsToLocalEngine

1.3 sendTransactionsToLocalEngin方法會調(diào)用p.engine.ProcessTransactionMsg,p.engine為結(jié)構(gòu)體EngineImpl,這是Engine接口實例。Engine在啟動peer時候創(chuàng)建,后面會詳細說明。Engine這個接口用于管理peer網(wǎng)絡的通訊和處理事務。EngineImpl的結(jié)構(gòu)如下:

// hyperledger/fabric/consensus/helper/engine.go
type EngineImpl struct {
        consenter    consensus.Consenter // 每個共識插件都需要實現(xiàn)Consenter接口,包括RecvMsg方法和ExecutionConsumer接口的里函數(shù)(可以直接返回)
        helper       *Helper // 包含一些工具類方法,可以調(diào)用外部接口,比如獲取網(wǎng)絡信息,消息簽名、驗證,持久化一些對象等
        peerEndpoint *pb.PeerEndpoint
        consensusFan *util.MessageFan
}

1.4 ProcessTransactionMsg的代碼如下,可以看見鏈代碼查詢事務直接執(zhí)行不需要進行共識,因為讀取某個peer節(jié)點的賬本不會影響自身以及其他peer節(jié)點賬本,所以不需要共識來同步。而鏈代碼調(diào)用和部署事務會影響到單個peer節(jié)點賬本和狀態(tài),所以會調(diào)用共識插件的RecvMsg函數(shù)來保證各個peer節(jié)點的賬本和狀態(tài)一致。

// hyperledger/fabric/consensus/helper/engine.go
func (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx *pb.Transaction) (response *pb.Response) {
        //TODO: Do we always verify security, or can we supply a flag on the invoke ot this functions so to bypass check for locally generated transactions?
        if tx.Type == pb.Transaction_CHAINCODE_QUERY {
           // ... 
           result, _, err := chaincode.Execute(cxt, chaincode.GetChain(chaincode.DefaultChain), tx) // 直接執(zhí)行查詢事務,不需要共識
           // ...
        } else {
           // ...
           err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)  // 使用共識插件保證各個peer節(jié)點賬本和狀態(tài)保持一致
           if err != nil {
                    response = &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(err.Error())}
           }
           // ...

1.5 RecvMsg函數(shù)在Fabric中有兩個版本,一個是PBFT版本,一個是Noops版本。

  • NOOPS:用于開發(fā)和測試使用的插件,當一個validator節(jié)點收到一個事務消息時,會把消息轉(zhuǎn)為共識消息,并會向所有節(jié)點廣播共識消息。一般情況下,所有節(jié)點都會接收到這條共識消息,并執(zhí)行消息里的事務。這是一種比較樸素的共識方式,一旦因為網(wǎng)絡或者其他原因,有些節(jié)點沒收到廣播消息,就會存在狀態(tài)不一致問題,所以不只用于開發(fā)和測試。
  • PBFT:PBFT算法實現(xiàn)。簡單地說當網(wǎng)絡里的錯誤失效節(jié)點數(shù)量f與總的節(jié)點數(shù)量N滿足關(guān)系N>3f時,PBFT算法也能保證各個節(jié)點的狀態(tài)保持一致。但是實現(xiàn)PBFT算法的需要滿足以下的約束條件,所以在選擇共識算法時要對系統(tǒng)進行全面評估,基于系統(tǒng)自身情況選擇,不能盲目選擇。

這里我們對PBFT版本進行分析,查看一個event是如何進入事件管理器如何進行處理:

// consensus/pbft/external.go
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
    eer.manager.Queue() <- batchMessageEvent{
        msg:    ocMsg,
        sender: senderHandle,
    }
    return nil
}

// consensus/util/events/events.go
func (em *managerImpl) Queue() chan<- Event {
    return em.events
}

可以看出PBFT收到消息之后調(diào)用共識插件的RecvMsg函數(shù)將event通過Queue()加入到消息管理器隊列(em.events)中,加入em.events的事件會有一個專門的線程對其進行處理,下面將會對這個過程進行討論。


總結(jié):

  1. 客戶端調(diào)用或部署鏈代碼時,調(diào)用peer.ImplExecuteTransaction方法;
  2. ExecuteTransaction進入到sendTransactionsToLocalEngine函數(shù)進而執(zhí)行p.engine.ProcessTransactionMsg
  3. ProcessTransactionMsg調(diào)用共識插件的RecvMsg函數(shù)將event通過Queue()加入到消息管理器隊列(em.events)中。

消息處理

Fabric的共識消息是通過eventLoop注射給對應處理函數(shù)的。

// consensus/util/events/events.go
func (em *managerImpl) Start() {
        go em.eventLoop()
}

// eventLoop is where the event thread loops, delivering events
func (em *managerImpl) eventLoop() {
    for {
        select {
            case next := <-em.events:
            em.Inject(next)
            case <-em.exit:
            logger.Debug("eventLoop told to exit")
            return
        }
    }
}

// Inject can only safely be called by the managerImpl thread itself
func (em *managerImpl) Inject(event Event) {
    if em.receiver != nil 
        SendEvent(em.receiver, event)
}

// SendEvent performs the event loop on a receiver to completion
func SendEvent(receiver Receiver, event Event) {
    next := event
    for {
        next = receiver.ProcessEvent(next)
        if next == nil 
            break
    }
}

// SetReceiver sets the destination for events
func (em *managerImpl) SetReceiver(receiver Receiver) {
    em.receiver = receiver
}
  1. RecvMsg函數(shù)將event通過Queue()加入到em.events
  2. eventLoop函數(shù)不斷的從em.events里取出事件,通過Inject注射給對應的Receiver,注意,通過SendEvent注射給接收者的ProcessEvent方法
  3. SendEvent循環(huán)獲取receiver.ProcessEvent對象,如果不為nil,則不斷的調(diào)用receiver.ProcessEvent直到找到對應的消息處理函數(shù)
  4. SetReceiver函數(shù)修改receiver對象

關(guān)于ProcessEvent和Receiver的細節(jié)在可插拔共識中詳解。


總結(jié):RecvMsg函數(shù)通過Inject函數(shù)將event分發(fā)給receiver.ProcessEvent


2、可插拔共識


fabric/consensus/consensus.go對外提供共識模塊的方法調(diào)用。
其中最核心也是每個共識必須實現(xiàn)的接口是Consenter。

// fabric/consensus/consensus.go
type ExecutionConsumer interface {
    Executed(tag interface{})                                
    Committed(tag interface{}, target *pb.BlockchainInfo)    
    RolledBack(tag interface{})                              
    StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}

type Consenter interface {
    RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error
    ExecutionConsumer
}
  1. RecvMsg處理共識引擎在收到客戶端發(fā)來的event事件
  2. Executed,Committed,RolledBackStateUpdated等方法來監(jiān)聽異步交易中的各個步驟執(zhí)行情況,并進行處理。

Fabric的PBFT都是通過加入em.events之后通過eventLoop捕獲事件之后再進行處理。所有event都是通過Receiver.ProcessEvent等待事件處理。

// fabric/consensus/pbft/external.go   
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) 
    eer.manager.Queue() <- batchMessageEvent{msg:    ocMsg, sender: senderHandle,}

// Executed is called whenever Execute completes
func (eer *externalEventReceiver) Executed(tag interface{}) 
    eer.manager.Queue() <- executedEvent{tag}

// Committed is called whenever Commit completes
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) 
    eer.manager.Queue() <- committedEvent{tag, target}

// RolledBack is called whenever a Rollback completes
func (eer *externalEventReceiver) RolledBack(tag interface{}) 
    eer.manager.Queue() <- rolledBackEvent{}

// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *...) StateUpdated(tag interface{}, target *pb.BlockchainInfo) 
    eer.manager.Queue() <- stateUpdatedEvent{ chkpt:  tag.(*...),target: target}

PBFT在ProcessEvent中會對各個事件進行處理

// allow the primary to send a batch when the timer expires
func (op *obcBatch) ProcessEvent(event events.Event) events.Event {
        logger.Debugf("Replica %d batch main thread looping", op.pbft.id)
        switch et := event.(type) {  
        case batchMessageEvent:
                ocMsg := et
                return op.processMessage(ocMsg.msg, ocMsg.sender)  // ocMsg的消息類型仍為鏈代碼事務類型
        case executedEvent:
                op.stack.Commit(nil, et.tag.([]byte))
        case committedEvent:
                logger.Debugf("Replica %d received committedEvent", op.pbft.id)
                return execDoneEvent{}
        // ...       
        case stateUpdatedEvent:
                op.reqStore = newRequestStore()
                return op.pbft.ProcessEvent(event)
        default:
                return op.pbft.ProcessEvent(event)
}

pbft.ProcessEvent(event)定義了PBFT共識三個階段的函數(shù)實現(xiàn)

func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
    // ...
    case *RequestBatch:
          err = instance.recvRequestBatch(et)
    case *PrePrepare:
          err = instance.recvPrePrepare(et)
    case *Prepare:
          err = instance.recvPrepare(et)
    case *Commit:
          err = instance.recvCommit(et)
    case *Checkpoint:
          return instance.recvCheckpoint(et)
    case *ViewChange:
          return instance.recvViewChange(et)
    case *NewView:
          return instance.recvNewView(et)
    case *FetchRequestBatch:
          err = instance.recvFetchRequestBatch(et)
    case returnRequestBatchEvent:
          return instance.recvReturnRequestBatch(et)
    // ...
}

總結(jié):

  1. Fabric所有共識必須實現(xiàn)Consenter接口
  2. 修改共識需要修改Receiver來對事件進行處理
  3. PBFT通過事件管理器將event交給receiver.ProcessEvent進行處理

這個時候肯定會有人問了,說好的可插拔共識機制呢,怎么全部變成PBFT實現(xiàn)了。別急,這不準備開始了:

  1. 自定義共識時,首先實現(xiàn)Consenter接口
  2. 其次調(diào)用SetReceiver修改Receiver
  3. 重寫receiver.ProcessEvent
  4. 所有處理可以通過事件管理器將event交給receiver.ProcessEvent進行處理然后放回事件管理器。

沒看明白?為什么實現(xiàn)Consenter接口就可以自定義共識了,為什么就可插拔了?這就對了,因為還沒講到共識引擎初始化呢!上面介紹了一部分可插拔的事件處理機制。后面介紹一下自定義可插拔共識,介紹如何Fabric實現(xiàn)可插拔。

3、Fabric共識引擎初始化


初始化共識引擎

PBFT的共識引擎在啟動peer時自動初始化。具體調(diào)用過程為

// peer/node/start.go
func serve(args []string) error
    peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)

// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
    engineOnce.Do(func() 
        engine = new(EngineImpl)
        engine.helper = NewHelper(coord)
        engine.consenter = controller.NewConsenter(engine.helper) 
        engine.helper.setConsenter(engine.consenter)
        engine.peerEndpoint, err = coord.GetPeerEndpoint()
        engine.consensusFan = util.NewMessageFan()

        go func() 
            for msg := range engine.consensusFan.GetOutChannel() 
                engine.consenter.RecvMsg(msg.Msg, msg.Sender)
    )
    return engine, err
}
  1. GetEngine的作用是進行共識模塊的初始化,同時啟動一個goroutine等待消息進入。
  2. GetEngine初始化一個consenter和一個helper,并互相把一個句柄賦值給了對方。這樣做的目的,就是為了可以讓外部調(diào)用內(nèi)部,內(nèi)部可以調(diào)用外部。

注意:這里的consenter即為第二節(jié)的共識接口。

選擇Consenter

engine.consenter是在consensus/controller/controller.go里選擇

// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error)
    engine.consenter = controller.NewConsenter(engine.helper)

// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {
    plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
    if plugin == "pbft" 
        return pbft.GetPlugin(stack)
    return noops.GetNoops(stack)
}

默認選擇的是noops,如果需要添加自己編寫的共識模塊需要在這里自行添加判斷。

NOOPS:用于開發(fā)和測試使用的插件,當一個validator節(jié)點收到一個事務消息時,會把消息轉(zhuǎn)為共識消息,并會向所有節(jié)點廣播共識消息。一般情況下,所有節(jié)點都會接收到這條共識消息,并執(zhí)行消息里的事務。

初始化PBFT

如果選擇了PBFT則會調(diào)用consensus/pbft/pbft.go進行初始化。
使用PBFTbatch模式啟動時會調(diào)用newObcBatch進行PBFT算法初始化

// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
    ...
    op.manager = events.NewManagerImpl()    
    op.manager.SetReceiver(op)
    etf := events.NewTimerFactoryImpl(op.manager)
    op.pbft = newPbftCore(id, config, op, etf)
    op.manager.Start()
    blockchainInfoBlob := stack.GetBlockchainInfoBlob()
    op.externalEventReceiver.manager = op.manager
    ...
    return op
}

newObcBatch主要做了這幾項工作

  • 初始化了eventLoop的消息隊列
  • 設置了消息的接收者,用來處理對應的消息
  • 創(chuàng)建監(jiān)聽消息超時的定時器
  • 初始化pbft算法
  • 啟動消息隊列,不斷監(jiān)聽事件的到來并且分發(fā)給接收者處理

總結(jié):可以看出共識引擎初始化時的幾個步驟:

  1. 初始化eventLoop
  2. 設置Receiver
  3. 初始化共識算法
  4. 啟動事件管理器

至此,之前所講的所有東西全部啟動了?;貞浺幌抡麄€流程:

  1. 事件由客戶端發(fā)起,經(jīng)過一系列步驟執(zhí)行到RecvMsg函數(shù)
  2. RecvMsg函數(shù)由Consenter定義,ConsenterNewConsenter內(nèi)初始化,NewConsenter可插播
  3. Consenter提供共識模塊的方法調(diào)用ExecutionConsumer接口, Consenter可插拔
  4. receiver.ProcessEvent負責處理事件管理器隊列內(nèi)的事件,Receiver可插播

總結(jié): 所有與共識有關(guān)的接口全部可插拔,并且提供了相應的接口,自定義實現(xiàn)ConsenterReceiver,在初始化的時候 SetReceiver,即可實現(xiàn)自定義共識。

4. Fabric共識接口


consensus.go主要包含了共識算法插件內(nèi)部對外暴露的接口和外部對內(nèi)暴露的接口??偨Y(jié)如下:

Stack
Consenter
Manager
目錄結(jié)構(gòu):

可以看到共識模塊目錄如下。

consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events
目錄含義如下

controller 用來控制Fabric選擇什么樣的共識算法,默認是noops。
executor 封裝了消息隊列中對交易的處理。
helper 對外提供接口調(diào)用和數(shù)據(jù)持久化接口。
noops 提供了如何編寫Fabric共識算法的Demo。
pbft PBFT算法的具體實現(xiàn)。
util 實現(xiàn)了一個peer節(jié)點到共識算法的一個消息通道,和一個消息隊列。

推薦閱讀:

[1] Hyperledger Fabric中PBFT算法詳解:
https://zhuanlan.zhihu.com/p/48899458
[2] Fabric源碼分析-共識模塊:
https://zhuanlan.zhihu.com/p/35255567

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • Fabric 0.6與1.0+ Fabric結(jié)構(gòu) Fabric 0.6的特點 結(jié)構(gòu)簡單: 應用-成員管理-Peer...
    CodingCattwo閱讀 25,453評論 2 20
  • 轉(zhuǎn)載:區(qū)塊鏈開源實現(xiàn) hyperledger fabric 架構(gòu)詳解 (http://t.cn/R14zaCC) ...
    vdes閱讀 10,921評論 0 27
  • 概要 區(qū)塊鏈網(wǎng)絡使用 gRPC 協(xié)議 Protocol Buffers(格式的 API) 使用的協(xié)議 gRPC P...
    簡聞閱讀 4,108評論 1 6
  • 手寫日記第34天,堅持手寫已經(jīng)一月有余了,女兒的日記也寫到54天了,繼續(xù)加油!
    徐夢賽媽媽李蕾閱讀 298評論 0 2
  • 從小到大,這樣的言語一直縈繞在我們的耳邊:堅持到底,就是勝利!如果你堅持下去,你將會怎么怎么樣,諸如此類。只有付出...
    爆米花123閱讀 443評論 1 0

友情鏈接更多精彩內(nèi)容