Fabric的kafka交互用到了sarama包,有興趣可以自行去研究下,這里就不展開了,還是focus在業(yè)務邏輯的部分。
強烈建議在看這個之前或之后去看下這篇文章。
A Kafka-based Ordering Service for Fabric
啟動
func (chain *chainImpl) Start() {
go startThread(chain)
}
func startThread(chain *chainImpl) {
err = setupTopicForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.consenter.topicDetail(), chain.channel)
// Set up the producer
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
// Have the producer post the CONNECT message
sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel);
// Set up the parent consumer
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
// Set up the channel consumer
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
chain.replicaIDs, err = getHealthyClusterReplicaInfo(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.channel)
chain.doneProcessingMessagesToBlocks = make(chan struct{})
chain.errorChan = make(chan struct{}) // Deliver requests will also go through
close(chain.startChan) // Broadcast requests will now go through
chain.processMessagesToBlocks() // Keep up to date with the channel
}
- 首先在Fabric的世界里,channel是連接彼此的第一要素,將Topic當作channel的延申再合適不過,orderer將block發(fā)到topic上,訂閱該topic的消費者會寫入本地orderer賬本。setupTopicForChannel里面有個細節(jié),每個Topic都只有一個分區(qū),這是為了保證消息嚴格有序。這很重要。
- 根據(jù)配置創(chuàng)建消息生產(chǎn)者,消費者
- 給消息生產(chǎn)者發(fā)送KafkaMessage_Connect消息,后面會講到。
- close(chain.startChan)用來提醒,chain啟動已經(jīng)完畢。
- 接下來processMessagesToBlocks將監(jiān)聽并處理收到的消息
Normal消息處理
func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
...
case in, ok := <-chain.channelConsumer.Messages():
if !ok {
logger.Criticalf("[channel: %s] Kafka consumer closed.", chain.ChainID())
return counts, nil
}
...
switch msg.Type.(type) {
case *ab.KafkaMessage_Connect:
_ = chain.processConnect(chain.ChainID())
counts[indexProcessConnectPass]++
case *ab.KafkaMessage_TimeToCut:
if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
counts[indexProcessTimeToCutError]++
return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
}
counts[indexProcessTimeToCutPass]++
case *ab.KafkaMessage_Regular:
if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.ChainID(), err)
counts[indexProcessRegularError]++
} else {
counts[indexProcessRegularPass]++
}
}
...
}
這里針對三種消息類型分別做了處理,可以看到KafkaMessage_Regular類型會轉(zhuǎn)發(fā)到chain.processRegular。
這里稍微提下KafkaMessage_Connect的處理,里面什么都沒有做,還記得啟動的時候創(chuàng)建完producer后,馬上接著發(fā)了一個Connect消息出去么?這一步的意義在于,一是,topic已經(jīng)創(chuàng)建成功。二是,消息能成功發(fā)送。三是,消息能成功消費。
接下來正式開始處理Normal消息
case ab.KafkaMessageRegular_NORMAL:
if regularMessage.OriginalOffset != 0 {
// But we've reprocessed it already
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
logger.Debugf(
"[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed(%d), message has been consumed already, discard",
chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
return nil
}
}
// The config sequence has advanced
if regularMessage.ConfigSeq < seq {
logger.Debugf("[channel: %s] Config sequence has advanced since this normal message got validated, re-validating", chain.ChainID())
configSeq, err := chain.ProcessNormalMsg(env)
if err != nil {
return fmt.Errorf("discarding bad normal message because = %s", err)
}
if err := chain.order(env, configSeq, receivedOffset); err != nil {
return fmt.Errorf("error re-submitting normal message because = %s", err)
}
return nil
}
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
}
// During consensus-type migration, drop normal messages that managed to sneak in past Order, possibly from other orderers
if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
return nil
}
commitNormalMsg(env, offset)
- 因為kafka單分區(qū)保證有序的特征,所以chain能維持一個最新消息offset,用來判斷新到的消息是不是已經(jīng)處理過了。這里有個細節(jié),之前看的時候忽略了,以為只是個容錯處理,想不到大有文章。regularMessage.OriginalOffset != 0,查了下每個消息的OriginalOffset初始都為0,什么情況會變?不為0意味著這個消息被人消費過但是因為種種原因沒有處理,被丟到kafka,重新reorder過。
- regularMessage.ConfigSeq < seq是不是很眼熟,這里solo講過了。不同在于,如果發(fā)現(xiàn)從發(fā)送到接受處理的過程中發(fā)現(xiàn)config有變化,如果能re-validate,說明消息是沒有問題的,只不過是大環(huán)境變了,試著re-order看看。
- 最后開始真正的commitNormalMsg
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
batches, pending := chain.BlockCutter().Ordered(message)
logger.Debugf("[channel: %s] Ordering results: items in batch = %d, pending = %v", chain.ChainID(), len(batches), pending)
switch {
case chain.timer != nil && !pending:
chain.timer = nil
case chain.timer == nil && pending:
chain.timer = time.After(chain.SharedConfig().BatchTimeout())
logger.Debugf("[channel: %s] Just began %s batch timer", chain.ChainID(), chain.SharedConfig().BatchTimeout().String())
default:
}
if len(batches) == 0 {
// If no block is cut, we update the `lastOriginalOffsetProcessed`, start the timer if necessary and return
chain.lastOriginalOffsetProcessed = newOffset
return
}
offset := receivedOffset
if pending || len(batches) == 2 {
offset--
} else {
chain.lastOriginalOffsetProcessed = newOffset
}
// Commit the first block
block := chain.CreateNextBlock(batches[0])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", chain.ChainID(), chain.lastCutBlockNumber, offset)
// Commit the second block if exists
if len(batches) == 2 {
chain.lastOriginalOffsetProcessed = newOffset
offset++
block := chain.CreateNextBlock(batches[1])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: newOffset,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", chain.ChainID(), chain.lastCutBlockNumber, offset)
}
}
這里前半部分跟solo是一樣了,無非就是看準時機進行blockcut。這里要注意的是,cut出來的結(jié)果只有一種情況會返回2個,也就是單個消息size太大,導致單獨成包的情況。其他情況都是一個。
pending || len(batches) == 2這個判斷很關(guān)鍵,如果新消息進來,這個條件成立,說明什么?說明這個新消息要不這次根本就沒條件去commit,要不cut出來兩個block,自己歸在第二個里面,所以這里會首先offset--,等之后真正提交自己的時候再++回來。
否則條件不成立就說明這次新來的消息時機簡直完美,不光觸發(fā)了Cut,自己還能趕上這波commit
這里幾個offset有必要體會下,理解對了,實現(xiàn)就沒問題。
- receivedOffset : consumer的消費進度
- newOffset : 新進消息的offset,兩種情況,brandnew和reorder的。
- LastOffsetPersisted : 已經(jīng)commit到賬本的最后消息的offset
- LastOriginalOffsetProcessed : 已經(jīng)接受到的最新消息的offset
接下來很熟悉了,將生成的block都寫入到orderer本地賬本中。
至此是不是覺得少了點什么,看過solo篇的就知道,少了超時處理?;氐竭@個最開始,差點看漏了,這里正好在生成一個超時的timer,如果有pending消息等待出包的話。
超時處理
case <-chain.timer:
if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil {
...
}
}
可以看到這里是通過發(fā)送KafkaMessage_TimeToCut來實現(xiàn)的,想想為什么不直接本地做超時處理就好了?這里需要注意的是orderer是集群的情況,怎樣滿足成員間保持一致,首要條件是只能有一個地方寫入,其他成員進行同步,如果都自己做自己的,那共識也就無從談起。
case *ab.KafkaMessage_TimeToCut:
if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
}
func (chain *chainImpl) processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, receivedOffset int64) error {
ttcNumber := ttcMessage.GetBlockNumber()
if ttcNumber == chain.lastCutBlockNumber+1 {
chain.timer = nil
logger.Debugf("[channel: %s] Nil'd the timer", chain.ChainID())
batch := chain.BlockCutter().Cut()
if len(batch) == 0 {
return fmt.Errorf("got right time-to-cut message (for block %d),"+
" no pending requests though; this might indicate a bug", chain.lastCutBlockNumber+1)
}
block := chain.CreateNextBlock(batch)
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: receivedOffset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
return nil
} else if ttcNumber > chain.lastCutBlockNumber+1 {
return fmt.Errorf("got larger time-to-cut message (%d) than allowed/expected (%d)"+
" - this might indicate a bug", ttcNumber, chain.lastCutBlockNumber+1)
}
return nil
}
跟前面也沒有什么區(qū)別,沒什么好講的。
有個場景可以想象一下,orderer集群成員基本上都是同時收到的消息進行處理的,大家的狀態(tài)理想狀態(tài)都是一致的,如果同時觸發(fā)timeout,然后又同時對外發(fā)布timeout消息,那是不是說都各自去創(chuàng)建block去提交,那不是亂套了?
這種情況仔細想想沒有可能性,前面同時觸發(fā)timeout是可能的,因為大家狀態(tài)都一樣,要超時肯定都超時,關(guān)鍵是同時發(fā)布timeout消息的部分,這里kafka不是吃素的,它保證了這些消息是有序的,這也間接保證只有一個orderer能commit本地成功,之后消息執(zhí)行會不滿足條件(ttcNumber == chain.lastCutBlockNumber+1 )。