死磕hyperledger fabric源碼|kafka共識排序
文章及代碼:https://github.com/blockchainGuide/
分支:v1.1.0

概述
Orderer共識組件提供HandleChain()方法創(chuàng)建通道綁定的共識組件鏈對象(consensus.Chain接口),包括Solo(solo.chain類型)、Kafka(kafka.chainImpl類型)等類型,屬于通道共識組件的重要實現(xiàn)模塊,并設(shè)置到鏈支持對象的cs.Chain字段。共識組件鏈對象提供Orderer共識排序服務(wù),負責(zé)關(guān)聯(lián)通道上交易排序、打包出塊、提交賬本、通道管理等工作,目前采用Golang通道或Kafka集群作為共識排序后端,接收來自Broadcast服務(wù)過濾轉(zhuǎn)發(fā)的交易消息并進行排序。
kafka共識排序服務(wù)
orderer服務(wù)集群
Orderer節(jié)點采用Sarama開源的Kafka第三方庫構(gòu)建Kafka共識組件,可以同時接受處理多個客戶端發(fā)送的交易消息請求,能夠有效提高Orderer節(jié)點處理交易消息的并發(fā)能力。同時,可利用Kafka集群在單一分區(qū)內(nèi)按序收集相同主題消息(消息序號唯一)的功能,來保證交易消息具有確定性的順序(以消息序號排序),從而實現(xiàn)對交易排序達成全局共識的目的。
Kafka生產(chǎn)者按照主題(Topic)生產(chǎn)消息并進行發(fā)布,Kafka服務(wù)器集群自動對消息主題進行分類。同一個主題的消息都會被收集到一個或多個分區(qū)文件中,按照FIFO的順序追加到文件尾部,并且每個消息在分區(qū)中都會有一個OFFSET位置偏移量作為該消息的唯一標(biāo)識ID。目前,Hyperledger Fabric基于Kafka集群為每個通道創(chuàng)建綁定了一個主題(即鏈ID,chainID),并且只設(shè)置一個分區(qū)(分區(qū)號為0)。Kafka消費者管理多個分區(qū)消費者并訂閱指定分區(qū)的主題消息,包括主題(即chainID)、分區(qū)號(目前只有1個分區(qū)號為0的分區(qū))、起始偏移量(開始訂閱的消息位置offset)等。
Hyperledger Fabric采用Kafka集群對單個或多個Orderer排序節(jié)點提交的交易消息進行排序。此時,Orderer排序節(jié)點同時充當(dāng)Kafka集群的消息生產(chǎn)者(分區(qū))和消費者,發(fā)布消息與訂閱消息到Kafka集群上的同一個主題分區(qū),即先將Peer節(jié)點提交的交易消息轉(zhuǎn)發(fā)給Kafka服務(wù)端,同時,從指定主題的Kafka分區(qū)上按順序獲取排序后的交易消息并自動過濾重啟的交易消息。這期間可能會存在網(wǎng)絡(luò)時延造成獲取消息時間的差異。如果不考慮丟包造成消息丟失的情況,則所有Orderer節(jié)點獲取消息的順序與數(shù)量應(yīng)該是確定的和一致的。同時,采用相同的Kafka共識組件鏈對象與出塊規(guī)則等,以保證所有Orderer節(jié)點都可以創(chuàng)建與更新相同配置的通道,并切割生成相同的批量交易集合出塊,再“同步”構(gòu)造出相同的區(qū)塊數(shù)據(jù),從而基于Kafka集群達成全局共識,以保證區(qū)塊數(shù)據(jù)的全局一致性。
啟動共識組件鏈對象
啟動入口:
orderer/consensus/kafka/chain.go/Start()
func (chain *chainImpl) Start() {
go startThread(chain)
}
func startThread(chain *chainImpl) {
...
//創(chuàng)建kafka生產(chǎn)者
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
...
// Kafka生產(chǎn)者發(fā)送CONNECT消息建立連接
if err = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err != nil {
logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
}
...
//創(chuàng)建Kafka消費者
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
...
//創(chuàng)建Kafka分區(qū)消費者
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
...
close(chain.startChan) // 已經(jīng)啟動共識組件鏈對象,不阻塞Broadcast
chain.errorChan = make(chan struct{}) // 創(chuàng)建errorChan通道,不阻塞Deliver服務(wù)處理句柄
...
chain.processMessagesToBlocks() //創(chuàng)建消息處理循環(huán),循環(huán)處理訂閱分區(qū)上接收到的消息
}
startThread函數(shù)首先創(chuàng)建kafka生產(chǎn)者,發(fā)布消息到指定主題(即通道ID)和分區(qū)號的通道分區(qū)(chain.channel)上。
然后發(fā)送CONNECT消息建立連接,該消息指定了主題Topic字段為鏈ID、Key字段為分區(qū)號0、Value字段為CONNECT類型消息負載等。訂閱該主題的Kafka(分區(qū))消費者會接收到該消息。
接著創(chuàng)建指定Kafka分區(qū)和Broker服務(wù)器配置的Kafka消費者對象,并設(shè)置從指定主題(鏈ID)和分區(qū)號(0)的Kafka分區(qū)上獲取消息。
最后,調(diào)用processMessagesToBlocks()方法創(chuàng)建消息處理循環(huán),負責(zé)處理從Kafka集群中接收到的訂閱消息。
處理消息
processMessagesToBlocks接收到正常的Kafka分區(qū)消費者消息會根據(jù)kafka的消息類型進行處理,包括以下幾種類型:
- Kafka- Message_Regular
- KafkaMessage_TimeToCut
- KafkaMessage_Connect
func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
...
for { // 消息處理循環(huán)
select {
...
case in, ok := <-chain.channelConsumer.Messages(): //接收到正常的Kafka分區(qū)消費者消息
...
select {
case <-chain.errorChan: // If this channel was closed... // 如果該通道已經(jīng)關(guān)閉,則重新創(chuàng)建該通道
...
switch msg.Type.(type) { //分析Kafka消息類型
case *ab.KafkaMessage_Connect: //Kafka連接消息 由于錯誤而重新恢復(fù)Kafka消費者分區(qū)訂閱流程
_ = chain.processConnect(chain.ChainID()) //處理CONNECT連接消息, 不做任何事情
counts[indexProcessConnectPass]++ // 成功處理消息計數(shù)增1
case *ab.KafkaMessage_TimeToCut: // Kafka定時切割生成區(qū)塊消息
if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
counts[indexProcessTimeToCutError]++
return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
}
counts[indexProcessTimeToCutPass]++ // 成功處理消息計數(shù)增1
case *ab.KafkaMessage_Regular: // Kafka常規(guī)消息
if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil { // 處理Kafka常 規(guī)消息
...
counts[indexProcessRegularError]++
}...
}
case <-chain.timer: // 超時定時器
if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil { //發(fā)送TimeToCut類型消息,請求打包出塊
...
counts[indexSendTimeToCutError]++
} ...
}
}
}
①:KafkaMessage_Connect類型消息
Kafka連接消息用于測試連通Kafka分區(qū)消費者的工作狀態(tài),用于驗證Kafka共識組件的正常工作狀態(tài)與排除故障,并調(diào)用chain.processConnect(chain.ChainID())方法處理該消息。
②:KafkaMessage_TimeToCut類型消息
processMessagesToBlocks()方法可調(diào)用chain.processTimeToCut()方法處理TIMETOCUT類型消息。如果消息中的區(qū)塊號ttcNumber不是當(dāng)前Orderer節(jié)點當(dāng)前通道賬本中下一個打包出塊的區(qū)塊號(最新區(qū)塊號lastCutBlockNumber+1),則直接丟棄不處理。否則,調(diào)用BlockCutter().Cut()方法,切割當(dāng)前該通道上待處理的緩存交易消息列表為批量交易集合batch([]*cb.Envelope),再調(diào)用CreateNextBlock(batch)方法構(gòu)造新區(qū)塊并提交賬本。最后,調(diào)用WriteBlock(block,metadata)方法,更新區(qū)塊元數(shù)據(jù)并提交賬本,同時更新Kafka共識組件鏈對象的最新區(qū)塊號lastCutBlockNumber增1。
事實上,Orderer服務(wù)集群節(jié)點獨立打包出塊的時間點通常不是完全同步的,同時還可能會重復(fù)接收其他Orderer節(jié)點提交的TIMETOCUT類型消息(重復(fù)區(qū)塊號)。此時,Orderer節(jié)點以接收到的第一個TIMETOCUT類型消息為準(zhǔn),打包出塊并提交到賬本,再更新當(dāng)前通道的最新區(qū)塊號lastCutBlockNumber。這樣,processTimeToCut()方法就能利用最新的lastCutBlockNumber過濾掉其他重復(fù)的TIMETOCUT類型消息,以保證所有Orderer節(jié)點上賬本區(qū)塊文件的數(shù)據(jù)同步,實際上是將原先的時間同步機制轉(zhuǎn)換為消息同步機制。
③:KafkaMessage_Regular類型消息
包括通道配置交易消息(KafkaMessageRegular_CONFIG類型)和普通交易消息(KafkaMessageRegular_NORMAL類型)。 詳細的分析將會在processRegular方法中體現(xiàn)。
處理配置交易消息
我們先大概的看一下ProcessRegular中關(guān)于處理配置交易消息的代碼部分,因為這部分相當(dāng)?shù)拈L,必須先看個概覽:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
...
commitConfigMsg := func(message *cb.Envelope, newOffset int64){...}
seq := chain.Sequence() // 獲取當(dāng)前通道的最新配置序號
...
switch regularMessage.Class {
case ab.KafkaMessageRegular_UNKNOWN: // 未知消息類型
...
case ab.KafkaMessageRegular_NORMAL: // 普通交易消息類型
...
case ab.KafkaMessageRegular_CONFIG: // 通道配置交易消息
...
}
...
}
我們直接跳轉(zhuǎn)到case ab.KafkaMessageRegular_CONFIG進行分析:
①:如果regularMessage.OriginalOffset 不為 0
說明這是重新過濾驗證和排序的通道配置交易消息。
1.1 過濾重復(fù)提交的消息
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {}
1.2 確認(rèn)是否是最近重新驗證且重新排序的配置交易消息,并且通道配置序號是最新的
if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset &®ularMessage.ConfigSeq == seq {
// 因此,關(guān)閉通道并解除Broadcast服務(wù)處理句柄阻塞等待,通知重新接收消息進行處理
close(chain.doneReprocessingMsgInFlight)
}
1.3 主動更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted
存在其他Orderer節(jié)點重新提交了配置消息,但是本地Orderer節(jié)點沒有重新提交該消息。因此這里需要更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted。
if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
}
②:regularMessage.OriginalOffset為 0
說明是第一次提交通道配置交易消息,而不是重新驗證和重新排序的。
2.1 如果消息中的配置序號regularMessage.ConfigSeq小于當(dāng)前通道的最新配置序號seq
則說明已經(jīng)更新了通道配置(配置序號較高),然后再處理當(dāng)前配置交易消息(配置序號較低)。將會調(diào)用ProcessConfigMsg重新過濾和處理該消息。
接著通過configure重新提交該配置消息進行排序,重置消息初始偏移量。然后再更新最近重新提交消息的偏移量。
if regularMessage.ConfigSeq < seq {
...
configEnv, configSeq, err := chain.ProcessConfigMsg(env)
if err := chain.configure(configEnv, configSeq, receivedOffset); err != nil {...}
// 阻塞接收消息處理,更新最近重新提交消息的偏移量
chain.lastResubmittedConfigOffset = receivedOffset
//創(chuàng)建通道阻塞Broadcast服務(wù)接收處理消息
chain.doneReprocessingMsgInFlight = make(chan struct{})
}
③:提交配置交易消息執(zhí)行通道管理操作
經(jīng)過上面的①和②過濾掉不符合條件的情況,接下來就提交配置交易消息執(zhí)行通道管理操作,核心函數(shù):commitConfigMsg(env, offset)
3.1 將當(dāng)前緩存交易消息切割成批量交易集合
batch := chain.BlockCutter().Cut()
3.2 創(chuàng)建新區(qū)塊block
block := chain.CreateNextBlock(batch)
3.3 構(gòu)造Kafka元數(shù)據(jù)
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ //構(gòu)造Kafka元數(shù)據(jù)
LastOffsetPersisted: receivedOffset - 1, // 偏移量減1
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
3.4 寫入?yún)^(qū)塊
通過區(qū)塊寫組件提交新區(qū)塊到賬本,更新當(dāng)前通道的最新區(qū)塊號chain.lastCutBlockNumber增1
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
接著更新本鏈的lastOriginal- OffsetProcessed為newOffset參數(shù),然后做和上面差不多的事情:
chain.lastOriginalOffsetProcessed = newOffset
block := chain.CreateNextBlock([]*cb.Envelope{message}) // 構(gòu)造新區(qū)塊
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ // 構(gòu)造Kafka元數(shù)據(jù)
LastOffsetPersisted: receivedOffset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteConfigBlock(block, metadata) // 寫入配置區(qū)塊
chain.lastCutBlockNumber++ // 最新區(qū)塊號增1
不管是上面的WriteBlock還是WriteConfigBlock底層都是調(diào)用的commitBlock,如下:
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
... // 添加塊簽名
bw.addBlockSignature(bw.lastBlock)
// 添加最新的配置簽名
bw.addLastConfigSignature(bw.lastBlock)
// 寫入新塊
err := bw.support.Append(bw.lastBlock)
...
}
接下來再討論kafka共識組件如何處理普通交易消息的。
處理普通交易消息
還是先回到 processRegular方法,關(guān)于處理普通消息的方法大概如下:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
...
case ab.KafkaMessageRegular_NORMAL: // 普通交易消息類型
// 如果OriginalOffset不是0,則說明該消息是重新驗證且重新提交排序的
if regularMessage.OriginalOffset != 0 {
...
// 如果消息偏移量不大于lastOriginalOffsetProcessed最近已處理消息的偏移量,
// 則說明已經(jīng)處理過該消息,此時應(yīng)丟棄返回,防止重復(fù)處理其他Orderer提交的相同偏移 量的普通交易消息
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
...
}
// // 檢查通道的配置序號是否更新
if regularMessage.ConfigSeq < seq {
...
//// 消息的配置序號低,需要重新驗證過濾消息
configSeq, err := chain.ProcessNormalMsg(env)
...
//重新提交普通交易消息
if err := chain.order(env, configSeq, receivedOffset); err != nil {}
...
}
// advance lastOriginalOffsetProcessed iff message is re-validated and re-ordered
//當(dāng)且僅當(dāng)消息重新驗證和重新排序時,才需要修正lastOriginalOffsetProcessed偏移量
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
}
// 提交處理普通交易消息,offset為最近處理的普通交易消息偏移量
commitNormalMsg(env, offset)
}
處理普通交易消息的流程與處理配置交易消息的流程基本類似,主要看最后的commitNormalMsg(env, offset),我們來繼續(xù)分析:
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
//// 添加所接收的消息到緩存交易消息列表,并切割成批量交易集合列表batches
batches, pending := chain.BlockCutter().Ordered(message)
...
if len(batches) == 0 {
// 如果不存在批量交易集合,則啟動定時器周期性地發(fā)送切割出塊消息n
chain.lastOriginalOffsetProcessed = newOffset
if chain.timer == nil {
chain.timer = time.After(chain.SharedConfig().BatchTimeout())
...
return
}
chain.timer = nil
offset := receivedOffset // 設(shè)置當(dāng)前消息偏移量
if pending || len(batches) == 2 {
offset-- // 計算第1個批量交易消息的偏移量是offset減1
} else { // 只有1個批量交易集合構(gòu)成1個區(qū)塊
//// 設(shè)置第1個批量交易集合的消息偏移量為newOffset
chain.lastOriginalOffsetProcessed = newOffset
}
//// 構(gòu)造并提交第1個區(qū)塊
block := chain.CreateNextBlock(batches[0])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata) // 更新區(qū)塊元數(shù)據(jù),并提交區(qū)塊到賬本
chain.lastCutBlockNumber++ // 更新當(dāng)前通道上最近出塊的區(qū)塊號增1
...
// Commit the second block if exists
//// 檢查第2個批量交易集合,構(gòu)造并提交第2個區(qū)塊
if len(batches) == 2 {
chain.lastOriginalOffsetProcessed = newOffset
offset++ // 設(shè)置第2個批量交易集合的消息偏移量offset加1
block := chain.CreateNextBlock(batches[1])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: newOffset,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
...
}
}
首先將新的普通交易消息添加到當(dāng)前的緩存交易列表,并切割成批量交易集合列表batches ,但最多只能包含2個批量交易集合,并且第2個批量交易集合最多包含1個交易。最終也是調(diào)用的WriteBlock寫入到賬本。
到此為止整個processRegular()方法處理消息結(jié)束。
總結(jié)及參考
kafka共識排序的邏輯其實是比較簡單的,大概的流程如下 :

https://github.com/blockchainGuide/ (文章圖片代碼資料在里面)
微信公眾號:區(qū)塊鏈技術(shù)棧