Hyperledger-Fabric源碼分析(orderer-consensus-kafka)

Fabric的kafka交互用到了sarama包,有興趣可以自行去研究下,這里就不展開了,還是focus在業(yè)務邏輯的部分。
強烈建議在看這個之前或之后去看下這篇文章。
A Kafka-based Ordering Service for Fabric

啟動

func (chain *chainImpl) Start() {
    go startThread(chain)
}
func startThread(chain *chainImpl) {

    err = setupTopicForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.consenter.topicDetail(), chain.channel)
    
    // Set up the producer
    chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
    
    // Have the producer post the CONNECT message
    sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); 

    // Set up the parent consumer
    chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)

    // Set up the channel consumer
    chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)

    chain.replicaIDs, err = getHealthyClusterReplicaInfo(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.channel)

    chain.doneProcessingMessagesToBlocks = make(chan struct{})

    chain.errorChan = make(chan struct{}) // Deliver requests will also go through
    close(chain.startChan)                // Broadcast requests will now go through

    chain.processMessagesToBlocks() // Keep up to date with the channel
}
  1. 首先在Fabric的世界里,channel是連接彼此的第一要素,將Topic當作channel的延申再合適不過,orderer將block發(fā)到topic上,訂閱該topic的消費者會寫入本地orderer賬本。setupTopicForChannel里面有個細節(jié),每個Topic都只有一個分區(qū),這是為了保證消息嚴格有序。這很重要。
  2. 根據(jù)配置創(chuàng)建消息生產(chǎn)者,消費者
  3. 給消息生產(chǎn)者發(fā)送KafkaMessage_Connect消息,后面會講到。
  4. close(chain.startChan)用來提醒,chain啟動已經(jīng)完畢。
  5. 接下來processMessagesToBlocks將監(jiān)聽并處理收到的消息

Normal消息處理

func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
    ...
    case in, ok := <-chain.channelConsumer.Messages():
            if !ok {
                logger.Criticalf("[channel: %s] Kafka consumer closed.", chain.ChainID())
                return counts, nil
            }
            
            ...
            switch msg.Type.(type) {
            case *ab.KafkaMessage_Connect:
                _ = chain.processConnect(chain.ChainID())
                counts[indexProcessConnectPass]++
            case *ab.KafkaMessage_TimeToCut:
                if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
                    logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
                    logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
                    counts[indexProcessTimeToCutError]++
                    return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
                }
                counts[indexProcessTimeToCutPass]++
            case *ab.KafkaMessage_Regular:
                if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
                    logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.ChainID(), err)
                    counts[indexProcessRegularError]++
                } else {
                    counts[indexProcessRegularPass]++
                }
            }
    ...
}

這里針對三種消息類型分別做了處理,可以看到KafkaMessage_Regular類型會轉(zhuǎn)發(fā)到chain.processRegular。

這里稍微提下KafkaMessage_Connect的處理,里面什么都沒有做,還記得啟動的時候創(chuàng)建完producer后,馬上接著發(fā)了一個Connect消息出去么?這一步的意義在于,一是,topic已經(jīng)創(chuàng)建成功。二是,消息能成功發(fā)送。三是,消息能成功消費。

接下來正式開始處理Normal消息

case ab.KafkaMessageRegular_NORMAL:
   if regularMessage.OriginalOffset != 0 {
      // But we've reprocessed it already
      if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
         logger.Debugf(
            "[channel: %s] OriginalOffset(%d) <= LastOriginalOffsetProcessed(%d), message has been consumed already, discard",
            chain.ChainID(), regularMessage.OriginalOffset, chain.lastOriginalOffsetProcessed)
         return nil
      }

   }

   // The config sequence has advanced
   if regularMessage.ConfigSeq < seq {
      logger.Debugf("[channel: %s] Config sequence has advanced since this normal message got validated, re-validating", chain.ChainID())
      configSeq, err := chain.ProcessNormalMsg(env)
      if err != nil {
         return fmt.Errorf("discarding bad normal message because = %s", err)
      }

      if err := chain.order(env, configSeq, receivedOffset); err != nil {
         return fmt.Errorf("error re-submitting normal message because = %s", err)
      }

      return nil
   }

   offset := regularMessage.OriginalOffset
   if offset == 0 {
      offset = chain.lastOriginalOffsetProcessed
   }

   // During consensus-type migration, drop normal messages that managed to sneak in past Order, possibly from other orderers
   if chain.migrationStatusStepper.IsPending() || chain.migrationStatusStepper.IsCommitted() {
      return nil
   }
   commitNormalMsg(env, offset)
  1. 因為kafka單分區(qū)保證有序的特征,所以chain能維持一個最新消息offset,用來判斷新到的消息是不是已經(jīng)處理過了。這里有個細節(jié),之前看的時候忽略了,以為只是個容錯處理,想不到大有文章。regularMessage.OriginalOffset != 0,查了下每個消息的OriginalOffset初始都為0,什么情況會變?不為0意味著這個消息被人消費過但是因為種種原因沒有處理,被丟到kafka,重新reorder過。
  2. regularMessage.ConfigSeq < seq是不是很眼熟,這里solo講過了。不同在于,如果發(fā)現(xiàn)從發(fā)送到接受處理的過程中發(fā)現(xiàn)config有變化,如果能re-validate,說明消息是沒有問題的,只不過是大環(huán)境變了,試著re-order看看。
  3. 最后開始真正的commitNormalMsg
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
   batches, pending := chain.BlockCutter().Ordered(message)
   logger.Debugf("[channel: %s] Ordering results: items in batch = %d, pending = %v", chain.ChainID(), len(batches), pending)

   switch {
   case chain.timer != nil && !pending:
      chain.timer = nil
   case chain.timer == nil && pending:
      chain.timer = time.After(chain.SharedConfig().BatchTimeout())
      logger.Debugf("[channel: %s] Just began %s batch timer", chain.ChainID(), chain.SharedConfig().BatchTimeout().String())
   default:
   }

   if len(batches) == 0 {
      // If no block is cut, we update the `lastOriginalOffsetProcessed`, start the timer if necessary and return
      chain.lastOriginalOffsetProcessed = newOffset
      return
   }

   offset := receivedOffset
   if pending || len(batches) == 2 {
      offset--
   } else {
      chain.lastOriginalOffsetProcessed = newOffset
   }

   // Commit the first block
   block := chain.CreateNextBlock(batches[0])
   metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
      LastOffsetPersisted:         offset,
      LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
      LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
   })
   chain.WriteBlock(block, metadata)
   chain.lastCutBlockNumber++
   logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", chain.ChainID(), chain.lastCutBlockNumber, offset)

   // Commit the second block if exists
   if len(batches) == 2 {
      chain.lastOriginalOffsetProcessed = newOffset
      offset++

      block := chain.CreateNextBlock(batches[1])
      metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
         LastOffsetPersisted:         offset,
         LastOriginalOffsetProcessed: newOffset,
         LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
      })
      chain.WriteBlock(block, metadata)
      chain.lastCutBlockNumber++
      logger.Debugf("[channel: %s] Batch filled, just cut block %d - last persisted offset is now %d", chain.ChainID(), chain.lastCutBlockNumber, offset)
   }
}
  1. 這里前半部分跟solo是一樣了,無非就是看準時機進行blockcut。這里要注意的是,cut出來的結(jié)果只有一種情況會返回2個,也就是單個消息size太大,導致單獨成包的情況。其他情況都是一個。

  2. pending || len(batches) == 2這個判斷很關(guān)鍵,如果新消息進來,這個條件成立,說明什么?說明這個新消息要不這次根本就沒條件去commit,要不cut出來兩個block,自己歸在第二個里面,所以這里會首先offset--,等之后真正提交自己的時候再++回來。

  3. 否則條件不成立就說明這次新來的消息時機簡直完美,不光觸發(fā)了Cut,自己還能趕上這波commit

  4. 這里幾個offset有必要體會下,理解對了,實現(xiàn)就沒問題。

    • receivedOffset : consumer的消費進度
    • newOffset : 新進消息的offset,兩種情況,brandnew和reorder的。
    • LastOffsetPersisted : 已經(jīng)commit到賬本的最后消息的offset
    • LastOriginalOffsetProcessed : 已經(jīng)接受到的最新消息的offset
  5. 接下來很熟悉了,將生成的block都寫入到orderer本地賬本中。

  6. 至此是不是覺得少了點什么,看過solo篇的就知道,少了超時處理?;氐竭@個最開始,差點看漏了,這里正好在生成一個超時的timer,如果有pending消息等待出包的話。

超時處理

case <-chain.timer:
   if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil {
      ...
   }
}

可以看到這里是通過發(fā)送KafkaMessage_TimeToCut來實現(xiàn)的,想想為什么不直接本地做超時處理就好了?這里需要注意的是orderer是集群的情況,怎樣滿足成員間保持一致,首要條件是只能有一個地方寫入,其他成員進行同步,如果都自己做自己的,那共識也就無從談起。

case *ab.KafkaMessage_TimeToCut:
   if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
      return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
   }
func (chain *chainImpl) processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, receivedOffset int64) error {
   ttcNumber := ttcMessage.GetBlockNumber()
   if ttcNumber == chain.lastCutBlockNumber+1 {
      chain.timer = nil
      logger.Debugf("[channel: %s] Nil'd the timer", chain.ChainID())
      batch := chain.BlockCutter().Cut()
      if len(batch) == 0 {
         return fmt.Errorf("got right time-to-cut message (for block %d),"+
            " no pending requests though; this might indicate a bug", chain.lastCutBlockNumber+1)
      }
      block := chain.CreateNextBlock(batch)
      metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
         LastOffsetPersisted:         receivedOffset,
         LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
      })
      chain.WriteBlock(block, metadata)
      chain.lastCutBlockNumber++
      return nil
   } else if ttcNumber > chain.lastCutBlockNumber+1 {
      return fmt.Errorf("got larger time-to-cut message (%d) than allowed/expected (%d)"+
         " - this might indicate a bug", ttcNumber, chain.lastCutBlockNumber+1)
   }
   return nil
}

跟前面也沒有什么區(qū)別,沒什么好講的。

有個場景可以想象一下,orderer集群成員基本上都是同時收到的消息進行處理的,大家的狀態(tài)理想狀態(tài)都是一致的,如果同時觸發(fā)timeout,然后又同時對外發(fā)布timeout消息,那是不是說都各自去創(chuàng)建block去提交,那不是亂套了?

這種情況仔細想想沒有可能性,前面同時觸發(fā)timeout是可能的,因為大家狀態(tài)都一樣,要超時肯定都超時,關(guān)鍵是同時發(fā)布timeout消息的部分,這里kafka不是吃素的,它保證了這些消息是有序的,這也間接保證只有一個orderer能commit本地成功,之后消息執(zhí)行會不滿足條件(ttcNumber == chain.lastCutBlockNumber+1 )。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容