1.5 Go關(guān)于kafka的使用

kafaka非常好用的消息隊(duì)列 一堆好處,不廢話上代碼

1.生產(chǎn)者

package?main


import?(

????"fmt"

????"github.com/Shopify/sarama"

????"log"

)


func?main()? {

????// 構(gòu)建 生產(chǎn)者

????// 生成 生產(chǎn)者配置文件

????config := sarama.NewConfig()

????// 設(shè)置生產(chǎn)者 消息 回復(fù)等級(jí) 0 1 all

????config.Producer.RequiredAcks = sarama.WaitForAll

????// 設(shè)置生產(chǎn)者 成功 發(fā)送消息 將在什么 通道返回

????config.Producer.Return.Successes = true

????// 設(shè)置生產(chǎn)者 發(fā)送的分區(qū)

????config.Producer.Partitioner = sarama.NewRandomPartitioner

????// 構(gòu)建 消息

????msg := &sarama.ProducerMessage{}

????msg.Topic =?"aaa"

????msg.Value = sarama.StringEncoder("123")


????// 連接 kafka

????producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)

????if?err != nil {

????????log.Print(err)

????????return

????}

????defer?producer.Close()

????// 發(fā)送消息

????message, offset, err := producer.SendMessage(msg)

????if?err != nil {

????????log.Println(err)

????????return

????}

????fmt.Println(message,?" ", offset)


}

2.消費(fèi)者

packageconsumer

import(

??"errors"

?? "fmt"

?? "github.com/Shopify/sarama"

?? "gitlab.aiforward.cn/inf/golib/app/context"

)

varconsumersarama.Consumer

//消費(fèi)者回調(diào)函數(shù)

typeConsumerCallbackfunc(data []byte)

//初始化消費(fèi)者

funcInitConsumer(hoststring)error{

??config:=sarama.NewConfig()

??client,err:=sarama.NewClient([]string{host},config)

??iferr!=nil{

?????returnerrors.New("unable to create kafka client"+err.Error())

?? }

??consumer,err=sarama.NewConsumerFromClient(client)

??iferr!=nil{

?????returnerr

??}

??fmt.Println("InitConsumer Success")

??returnerr

}

//消費(fèi)者循環(huán)

funcLoopConsumer(topicstring,callbackConsumerCallback)error{

??ctx:=context.GetGinContextWithRequestId()

??partitionList,err:=consumer.Partitions(topic)

??iferr!=nil||len(partitionList)==0{

?????fmt.Println(ctx,"SubscribeTopic_Topic partitionList Error[%v]",err)

?? }

??for_,part:=rangepartitionList{

?????go func(partint32) {

????????partitionConsumer,err:=consumer.ConsumePartition(topic,part,sarama.OffsetNewest)

????????iferr!=nil{

???????????fmt.Println(err)

???????? }

????????deferpartitionConsumer.Close()

????????for{

???????????msg:= <-partitionConsumer.Messages()

???????????ifcallback!=nil{

?????????????? callback(msg.Value)

??????????? }

???????? }

????? }(part)

?? }

??returnnil

}

funcClose() {

??ifconsumer!=nil{

?????consumer.Close()

?? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容