死磕hyperledger fabric源碼|交易廣播

死磕hyperledger fabric源碼|交易廣播

文章及代碼:https://github.com/blockchainGuide/

分支:v1.1.0

40fe3d4a84cc46e22caf5e06071c3aa7

前言

Hyperledger Fabric提供了Broadcast(srv ab.AtomicBroadcast_BroadcastServer)交易廣播服務(wù)接口,接收客戶端提交的簽名交易消息請求,交由共識組件鏈對象對交易進(jìn)行排序與執(zhí)行通道管理,按照交易出塊規(guī)則切割打包,構(gòu)造新區(qū)塊并提交賬本。同時,通過Deliver()區(qū)塊分發(fā)服務(wù)接口,將區(qū)塊數(shù)據(jù)發(fā)送給通道組織內(nèi)發(fā)起請求的Leader主節(jié)點(diǎn),再基于Gossip消息協(xié)議廣播到組織內(nèi)的其他節(jié)點(diǎn)上,從而實(shí)現(xiàn)廣播交易消息的目的。

Broadcast服務(wù)消息處理

Orderer節(jié)點(diǎn)啟動時已經(jīng)在本地的gRPC服務(wù)器上注冊了Orderer排序服務(wù)器,并創(chuàng)建了Broadcast服務(wù)處理句柄。當(dāng)客戶端調(diào)用Broadcast()服務(wù)接口發(fā)起服務(wù)請求時,Orderer排序服務(wù)器會調(diào)用Broadcast()→s.bh.Handle()方法處理請求,流程如下:

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
...
    return s.bh.Handle(&broadcastMsgTracer{
    ...
    })
}
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
  ...
}

主要就是這個Handle的處理,分析如下:

①:等待接收處理消息

msg, err := srv.Recv()

②:解析獲取通道頭部chdr、配置交易消息標(biāo)志位isConfig、通道鏈支持對象(通道消息處理器)

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)

③:檢查共識組件鏈對象是否準(zhǔn)備好接收新的交易消息

if err = processor.WaitReady(); err != nil {}

④:分類處理消息

處理普通消息

4.1 解析獲取通道的最新配置序號

configSeq, err := processor.ProcessNormalMsg(msg)
/orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    configSeq = s.support.Sequence()
    err = s.filters.Apply(env)
    return
}

configSeq是最新配置序號,默認(rèn)初始值為0,新建應(yīng)用通道后該配置序號自增為1,通過比較該序號就能判斷當(dāng)前通道配置版本是否發(fā)生了更新,從而確定當(dāng)前交易消息是否需要重新過濾與重新排序。

接著就是使用自帶的默認(rèn)通道消息過濾器過濾消息,有以下過濾條件:

  • 驗(yàn)證不能為空
  • 拒絕過期的簽名者身份證書
  • 消息最大字節(jié)數(shù)過濾器(98MB)
  • 消息簽名驗(yàn)證過濾器

4.2 構(gòu)造新的普通交易消息并發(fā)送到共識組件鏈對象請求處理

err = processor.Order(msg, configSeq) 

這里我們只關(guān)注kafka的共識組件處理。

首先序列化消息,然后將該消息發(fā)送到Kafka集群的指定分區(qū)上請求排序,再轉(zhuǎn)發(fā)給Kafka共識組件鏈對象請求打包出塊。

/orderer/consensus/kafka/chain.go
func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
    marshaledEnv, err := utils.Marshal(env)
    if err != nil {
        return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
    }
    if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq, originalOffset)) {
        return fmt.Errorf("cannot enqueue")
    }
    return nil
}

我們來看看enqueue方法是如何做的:

func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
    logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
    select {
    case <-chain.startChan: // // 共識組件在啟動階段啟動完成
        select {
        case <-chain.haltChan: //  已經(jīng)關(guān)閉chain.startChan通道
        ...
            }
            //// 創(chuàng)建Kafka生產(chǎn)者消息
            message := newProducerMessage(chain.channel, payload)
            //// 發(fā)送消息到Kafka集群請求排序
            if _, _, err = chain.producer.SendMessage(message); err != nil {
            ...
    }
}

處理通道配置交易消息

4.3 獲取配置交易消息與通道的最新配置序號

config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)

代碼位置:/orderer/common/msgprocessor/systemchannel.go/ProcessConfigUpdateMsg,大概做了以下事情:

  • 獲取消息中的通道ID
  • 檢查消息中的通道ID與當(dāng)前通道ID是否一致,一致的話交由標(biāo)準(zhǔn)通道處理器處理
  • 創(chuàng)建新應(yīng)用通道的通道配置實(shí)體Bundle結(jié)構(gòu)對象
  • 構(gòu)造新的通道配置更新交易消息(ConfigEnvelope類型),注意將該消息的通道配置序號更新為1
  • 創(chuàng)建內(nèi)層的通道配置交易消息(CONFIG類型)
  • 創(chuàng)建外層的配置交易消息(ORDERER_TRANSACTION類型)
  • 應(yīng)用系統(tǒng)通道的消息過濾器
  • 返回新的通道配置交易消息與當(dāng)前系統(tǒng)通道的配置序號
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    channelID, err := utils.ChannelID(envConfigUpdate) // 獲取消息中的通道ID
    ...
    //檢查消息中的通道ID與當(dāng)前通道ID是否一致
    if channelID == s.support.ChainID() {
        //// 交由標(biāo)準(zhǔn)通道處理器處理
        return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
    }
    ...
    // 創(chuàng)建新的應(yīng)用通道,其通道配置序號默認(rèn)初始化為0
    // 創(chuàng)建新應(yīng)用通道的通道配置實(shí)體Bundle結(jié)構(gòu)對象
    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
    ...
    //構(gòu)造新的通道配置更新交易消息(ConfigEnvelope類型),注意將該消息的通道配置序號更新為1
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
    ...
    //創(chuàng)建內(nèi)層的通道配置交易消息(CONFIG類型)
    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    ...
    //創(chuàng)建外層的配置交易消息(ORDERER_TRANSACTION類型)
    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    ...
    // 應(yīng)用系統(tǒng)通道的消息過濾器
    err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
    ...
    //返回新的通道配置交易消息與當(dāng)前系統(tǒng)通道的配置序號
    return wrappedOrdererTransaction, s.support.Sequence(), nil

4.4 構(gòu)造新的配置交易消息發(fā)送到共識組件鏈對象請求排序

err = processor.Configure(config, configSeq)

這里我們依舊只是考慮kafka共識組件,processor.Configure()方法實(shí)際上是調(diào)用chainImpl.configure()方法,同樣構(gòu)造Kafka常規(guī)消息(KafkaMessageRegular類型)。其中,Class消息類別屬于KafkaMessageRegular_CONFIG類型,包含了通道配置交易消息、 通道配置序號configSeq與初始消息偏移量originalOffset(0)。接著,調(diào)用chain.enqueue()方法,將其發(fā)送到Kafka集群上指定主題(chainID)和分區(qū)號(0)的分區(qū)上,同時,由Kafka共識組件鏈對象分區(qū)消費(fèi)者channelConsumer獲取該消息,再交由給Kafka共識組件鏈對象請求打包出塊。

⑤:發(fā)送成功處理狀態(tài)響應(yīng)消息

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})

整個流程圖如下:

image-20210125112957202

參考

https://github.com/blockchainGuide/ (文章圖片代碼資料)

微信公眾號:區(qū)塊鏈技術(shù)棧

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容