1.首先引入?
github.com/apache/pulsar-client-go/pulsar
2.啟動(dòng)pulsar的client
/*啟動(dòng)一個(gè)pulsar? client*/
defer func() {
??ifr:=recover();r!=nil{
?????logger.Fatalf(ctx,"pulsar is down stack[%+v] recover[%+v]",string(debug.Stack()),r)
?? }
}()
varerrerror
SunacClient,err=pulsar.NewClient(pulsar.ClientOptions{
??URL:conf.PulsarSetting.Url,
})
iferr!=nil{
??logger.Errorf(ctx,"setup pulsar failed to get new client err[%v]",err)
??return
}
3.生產(chǎn)者
funcSendMessage(ctx*gin.Context,clientpulsar.Client,topicstring,msg []byte) {
??/*
???提供生產(chǎn)者方法
?? */
?? /*
?????(1)初始化一個(gè)producer設(shè)置好主題
?? */
??producer,err:=client.CreateProducer(pulsar.ProducerOptions{
?????Topic:topic,
??})
??/*
??? */
??iferr!=nil{
?????logger.Errorf(ctx,"SendMessage Error[%v]",err)
?? }
??/*
??? */
?? //(2)把消息結(jié)構(gòu)體發(fā)給pusar
??_,err=producer.Send(context.Background(), &pulsar.ProducerMessage{
?????Payload:msg,
?????EventTime:time.Now(),
??})
??logger.Infof(ctx,"Pulsar_Published_SendMessage [%v]",string(msg))
??deferproducer.Close()
??iferr!=nil{
?????logger.Errorf(ctx,"Failed to publish message",err)
?? }
}
4.消費(fèi)者
funcnewConsumer(ctx*gin.Context,clientpulsar.Client,topicstring) {
??//創(chuàng)建消費(fèi)者
??defer func() {
?????ifr:=recover();r!=nil{
????????logger.Fatalf(ctx,"Pulsar is down [stack=%+v] [recover=%+v]",string(debug.Stack()),r)
????? }
?? }()
??consumer,err:=client.Subscribe(pulsar.ConsumerOptions{
?????Topics:[]string{topic},
?????SubscriptionName:conf.RcSeverSetting.ServiceName,
?????Type:pulsar.Failover,
??})
??iferr!=nil{
?????logger.Errorf(ctx,"NewConsumer failed to start pulsar consumer,err[%v]",err)
?????return
??}
??deferconsumer.Close()
??//循環(huán)的獲取pulsar的消息???? e
??for{
?????msg,err:=consumer.Receive(context.Background())
?????iferr!=nil{
????????logger.Error(ctx,"newConsumer failed to receive message,err[%v],topic[%s]",err,msg.Topic())
????? }
?????//根據(jù)消息的類(lèi)型 對(duì)消息進(jìn)行處理
?????logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s] msg[%v]",msg.Topic(),string(msg.Payload()))
?????switchmsg.Topic() {
?????casecommon.RCPULSARFACEADD://人臉添加或更新
????????logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s]",msg.Topic())
?????casecommon.RCPULSARPASSRECORDADD://通行記錄添加
????????logger.Infof(ctx,"newConsumer RCPULSARPASSRECORDADD topic[%s]",msg.Topic())
?????default:
logger.Errorf(ctx,"pulsar consumer got an wrong topic,message[%v]",msg)
????? }
?????iferr!=nil{
????????logger.Errorf(ctx,"newConsumer handle got error,msg will nack,topic=[%s],err=[%v]",topic,err)
????????consumer.Nack(msg)//When a message is "negatively acked" it will be marked for redelivery after some fixed delay
????????continue
?????}
?????consumer.Ack(msg)
?? }
}