kafaka非常好用的消息隊(duì)列 一堆好處,不廢話上代碼
1.生產(chǎn)者
package?main
import?(
????"fmt"
????"github.com/Shopify/sarama"
????"log"
)
func?main()? {
????// 構(gòu)建 生產(chǎn)者
????// 生成 生產(chǎn)者配置文件
????config := sarama.NewConfig()
????// 設(shè)置生產(chǎn)者 消息 回復(fù)等級(jí) 0 1 all
????config.Producer.RequiredAcks = sarama.WaitForAll
????// 設(shè)置生產(chǎn)者 成功 發(fā)送消息 將在什么 通道返回
????config.Producer.Return.Successes = true
????// 設(shè)置生產(chǎn)者 發(fā)送的分區(qū)
????config.Producer.Partitioner = sarama.NewRandomPartitioner
????// 構(gòu)建 消息
????msg := &sarama.ProducerMessage{}
????msg.Topic =?"aaa"
????msg.Value = sarama.StringEncoder("123")
????// 連接 kafka
????producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
????if?err != nil {
????????log.Print(err)
????????return
????}
????defer?producer.Close()
????// 發(fā)送消息
????message, offset, err := producer.SendMessage(msg)
????if?err != nil {
????????log.Println(err)
????????return
????}
????fmt.Println(message,?" ", offset)
}
2.消費(fèi)者
packageconsumer
import(
??"errors"
?? "fmt"
?? "github.com/Shopify/sarama"
?? "gitlab.aiforward.cn/inf/golib/app/context"
)
varconsumersarama.Consumer
//消費(fèi)者回調(diào)函數(shù)
typeConsumerCallbackfunc(data []byte)
//初始化消費(fèi)者
funcInitConsumer(hoststring)error{
??config:=sarama.NewConfig()
??client,err:=sarama.NewClient([]string{host},config)
??iferr!=nil{
?????returnerrors.New("unable to create kafka client"+err.Error())
?? }
??consumer,err=sarama.NewConsumerFromClient(client)
??iferr!=nil{
?????returnerr
??}
??fmt.Println("InitConsumer Success")
??returnerr
}
//消費(fèi)者循環(huán)
funcLoopConsumer(topicstring,callbackConsumerCallback)error{
??ctx:=context.GetGinContextWithRequestId()
??partitionList,err:=consumer.Partitions(topic)
??iferr!=nil||len(partitionList)==0{
?????fmt.Println(ctx,"SubscribeTopic_Topic partitionList Error[%v]",err)
?? }
??for_,part:=rangepartitionList{
?????go func(partint32) {
????????partitionConsumer,err:=consumer.ConsumePartition(topic,part,sarama.OffsetNewest)
????????iferr!=nil{
???????????fmt.Println(err)
???????? }
????????deferpartitionConsumer.Close()
????????for{
???????????msg:= <-partitionConsumer.Messages()
???????????ifcallback!=nil{
?????????????? callback(msg.Value)
??????????? }
???????? }
????? }(part)
?? }
??returnnil
}
funcClose() {
??ifconsumer!=nil{
?????consumer.Close()
?? }
}