kafka golang 客戶端sarama 生產(chǎn)者代碼解析

  1. syncProducer 和asyncProducer的關(guān)系

? syncProducer 是所有功能都是由asyncProducer實(shí)現(xiàn)的,而syncProducer 之所以可以同步發(fā)送消息,答案就在SendMessage 函數(shù)中,源碼如下


 func(sp *syncProducer)SendMessage(msg *ProducerMessage) (partitionint32,offsetint64,errerror) {

   expectation :=make(chan*ProducerError,1)

   msg.expectation = expectation

   sp.producer.Input() <- msg

   if err := <-expectation;err != nil {    // 阻塞等待返回結(jié)果

        return-1,-1,err.Err

    }

   return msg.Partition,msg.Offset,nil

}

而使用asyncProducer 時(shí),只需要 直接將信息producer.Input()<-&ProducerMessage{} 放入進(jìn)producer.Input(), 然后異步讀取返回結(jié)果 chan*ProducerError

  1. 消息傳遞過程
 // one per topic

// partitions messages, then dispatches them by partition

type topicProducer struct{

    parent *asyncProducer

    topic string

    input <-chan*ProducerMessage

    breaker *breaker.Breaker

    handlers map[int32] chan<- *ProducerMessage

    partitioner Partitioner

}



type brokerProducer struct{

    parent *asyncProducer

    broker *Broker

    input  <-chan*ProducerMessage

    output chan<- *produceSet

    responses  <-chan*brokerProducerResponse

    buffer *produceSet

    timer  <-chantime.Time

    timerFired bool

    closing error

    currentRetries map[string]map[int32]error

}

由代碼可以看出topicProducer,partitionProducer,brokerProducer的parent都是asyncProducer

消息傳遞過程:
asyncProducer.dispatcher ->topicProducer.dispath -> partitionProducer.dispatch -> brokerProducer ->produceSet

?其中produceSet 對消息進(jìn)行聚集,若配置了壓縮的參數(shù),則會壓縮一個(gè)set中的所有的msg, 即批量壓縮, 然后構(gòu)建一個(gè)ProduceRequest ,然后由 broker.Produce 將請求發(fā)送出去,其中 broker 結(jié)構(gòu)體代表一個(gè)kafka broker 的連接
?partitionProducer 會選擇leader broker地址 ,若選擇失敗,則會重新選擇leader broker ,然后由這個(gè)連接發(fā)送消息

?根據(jù)kafka版本不同,消息會放入到不同的結(jié)構(gòu)體中若版本大于V0.11,set.recordsToSend.RecordBatch.addRecord(rec) 將一個(gè)rec添加進(jìn)去,否則將set.recordsToSend.MsgSet.addMessage(msgToSend)

?在生成一個(gè)newBrokerProducer時(shí),broker會開啟消費(fèi)output, 而output就是一個(gè)存放produceSet的channel,阻塞等待刷新ProduceRequest 并將其發(fā)送出去

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容