1.2.Go如何使用pulsar

1.首先引入?

github.com/apache/pulsar-client-go/pulsar

2.啟動(dòng)pulsar的client

/*啟動(dòng)一個(gè)pulsar? client*/

defer func() {

??ifr:=recover();r!=nil{

?????logger.Fatalf(ctx,"pulsar is down stack[%+v] recover[%+v]",string(debug.Stack()),r)

?? }

}()

varerrerror

SunacClient,err=pulsar.NewClient(pulsar.ClientOptions{

??URL:conf.PulsarSetting.Url,

})

iferr!=nil{

??logger.Errorf(ctx,"setup pulsar failed to get new client err[%v]",err)

??return

}

3.生產(chǎn)者

funcSendMessage(ctx*gin.Context,clientpulsar.Client,topicstring,msg []byte) {

??/*

???提供生產(chǎn)者方法

?? */

?? /*

?????(1)初始化一個(gè)producer設(shè)置好主題

?? */

??producer,err:=client.CreateProducer(pulsar.ProducerOptions{

?????Topic:topic,

??})

??/*

??? */

??iferr!=nil{

?????logger.Errorf(ctx,"SendMessage Error[%v]",err)

?? }

??/*

??? */

?? //(2)把消息結(jié)構(gòu)體發(fā)給pusar

??_,err=producer.Send(context.Background(), &pulsar.ProducerMessage{

?????Payload:msg,

?????EventTime:time.Now(),

??})

??logger.Infof(ctx,"Pulsar_Published_SendMessage [%v]",string(msg))

??deferproducer.Close()

??iferr!=nil{

?????logger.Errorf(ctx,"Failed to publish message",err)

?? }

}

4.消費(fèi)者

funcnewConsumer(ctx*gin.Context,clientpulsar.Client,topicstring) {

??//創(chuàng)建消費(fèi)者

??defer func() {

?????ifr:=recover();r!=nil{

????????logger.Fatalf(ctx,"Pulsar is down [stack=%+v] [recover=%+v]",string(debug.Stack()),r)

????? }

?? }()

??consumer,err:=client.Subscribe(pulsar.ConsumerOptions{

?????Topics:[]string{topic},

?????SubscriptionName:conf.RcSeverSetting.ServiceName,

?????Type:pulsar.Failover,

??})

??iferr!=nil{

?????logger.Errorf(ctx,"NewConsumer failed to start pulsar consumer,err[%v]",err)

?????return

??}

??deferconsumer.Close()

??//循環(huán)的獲取pulsar的消息???? e

??for{

?????msg,err:=consumer.Receive(context.Background())

?????iferr!=nil{

????????logger.Error(ctx,"newConsumer failed to receive message,err[%v],topic[%s]",err,msg.Topic())

????? }

?????//根據(jù)消息的類(lèi)型 對(duì)消息進(jìn)行處理

?????logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s] msg[%v]",msg.Topic(),string(msg.Payload()))

?????switchmsg.Topic() {

?????casecommon.RCPULSARFACEADD://人臉添加或更新

????????logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s]",msg.Topic())

?????casecommon.RCPULSARPASSRECORDADD://通行記錄添加

????????logger.Infof(ctx,"newConsumer RCPULSARPASSRECORDADD topic[%s]",msg.Topic())

?????default:

logger.Errorf(ctx,"pulsar consumer got an wrong topic,message[%v]",msg)

????? }

?????iferr!=nil{

????????logger.Errorf(ctx,"newConsumer handle got error,msg will nack,topic=[%s],err=[%v]",topic,err)

????????consumer.Nack(msg)//When a message is "negatively acked" it will be marked for redelivery after some fixed delay

????????continue

?????}

?????consumer.Ack(msg)

?? }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容