- syncProducer 和asyncProducer的關(guān)系
? syncProducer 是所有功能都是由asyncProducer實(shí)現(xiàn)的,而syncProducer 之所以可以同步發(fā)送消息,答案就在SendMessage 函數(shù)中,源碼如下
func(sp *syncProducer)SendMessage(msg *ProducerMessage) (partitionint32,offsetint64,errerror) {
expectation :=make(chan*ProducerError,1)
msg.expectation = expectation
sp.producer.Input() <- msg
if err := <-expectation;err != nil { // 阻塞等待返回結(jié)果
return-1,-1,err.Err
}
return msg.Partition,msg.Offset,nil
}
而使用asyncProducer 時(shí),只需要 直接將信息producer.Input()<-&ProducerMessage{} 放入進(jìn)producer.Input(), 然后異步讀取返回結(jié)果 chan*ProducerError
- 消息傳遞過程
// one per topic
// partitions messages, then dispatches them by partition
type topicProducer struct{
parent *asyncProducer
topic string
input <-chan*ProducerMessage
breaker *breaker.Breaker
handlers map[int32] chan<- *ProducerMessage
partitioner Partitioner
}
type brokerProducer struct{
parent *asyncProducer
broker *Broker
input <-chan*ProducerMessage
output chan<- *produceSet
responses <-chan*brokerProducerResponse
buffer *produceSet
timer <-chantime.Time
timerFired bool
closing error
currentRetries map[string]map[int32]error
}
由代碼可以看出topicProducer,partitionProducer,brokerProducer的parent都是asyncProducer
消息傳遞過程:
asyncProducer.dispatcher ->topicProducer.dispath -> partitionProducer.dispatch -> brokerProducer ->produceSet
?其中produceSet 對消息進(jìn)行聚集,若配置了壓縮的參數(shù),則會壓縮一個(gè)set中的所有的msg, 即批量壓縮, 然后構(gòu)建一個(gè)ProduceRequest ,然后由 broker.Produce 將請求發(fā)送出去,其中 broker 結(jié)構(gòu)體代表一個(gè)kafka broker 的連接
?partitionProducer 會選擇leader broker地址 ,若選擇失敗,則會重新選擇leader broker ,然后由這個(gè)連接發(fā)送消息
?根據(jù)kafka版本不同,消息會放入到不同的結(jié)構(gòu)體中若版本大于V0.11,set.recordsToSend.RecordBatch.addRecord(rec) 將一個(gè)rec添加進(jìn)去,否則將set.recordsToSend.MsgSet.addMessage(msgToSend)
?在生成一個(gè)newBrokerProducer時(shí),broker會開啟消費(fèi)output, 而output就是一個(gè)存放produceSet的channel,阻塞等待刷新ProduceRequest 并將其發(fā)送出去