官方文檔
https://www.emqx.com/zh/blog/how-to-use-mqtt-in-golang
連接
var once sync.Once
var MqttClient mqtt.Client
var MqttMessage mqtt.Message
func ConnectMqtt(pact, broker, port, username, password string) {
once.Do(func() {
MqttClient = NewClient(pact, broker, port, username, password)
})
}
func NewClient(pact, broker, port, username, password string) mqtt.Client {
//取配置
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("%s://%s:%s", pact, broker, port))
//todo 如果使用固定的clientID會出現(xiàn)在第一次重連成功后,一直斷開再重連再斷開的死循環(huán)
//處理這個問題的方法就是每次重新連接的時候使用新的clientId
u1 := uuid.New()
clientId := u1.String()
opts.SetClientID(clientId)
opts.SetUsername(username)
opts.SetPassword(password)
//Keep Alive 的最大值為 18 小時 12 分 15 秒;
opts.SetKeepAlive(18 * time.Hour)
opts.SetDefaultPublishHandler(MessagePubHandler)
//把配置里的 cleanSession 設為false,客戶端掉線后 服務器端不會清除session,
//當重連后可以接收之前訂閱主題的消息。當客戶端上線后會接受到它離線的這段時間的消息
//但是這個只是進行了重連,重連后還需要再次發(fā)起訂閱
opts.CleanSession = false
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
var MessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//實現(xiàn)數(shù)據(jù)存儲或者入庫
logger.ZapLogger.Infof("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
// 連接回調(diào)
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected Ok")
logger.ZapLogger.Infof("MQTT Connect SUCCESS")
}
// 丟失回調(diào)
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost :%v", err)
logger.ZapLogger.Errorf("Connect lost :%v", err)
}
發(fā)送數(shù)據(jù)和訂閱接收
type MqttInterface interface {
//發(fā)送
PublishMessage(msg any)
//訂閱
Subscription()
}
type MqttC struct {
Topic string
MqttMessage mqtt.Message
}
func NewMqttCRes(topic string) *MqttC {
return &MqttC{
Topic: topic,
}
}
func (m *MqttC) PublishMessage(msg any) {
//等級:0 消息最多傳送一次。如果當前客戶端不可用,它將丟失這條消息。
//1消息至少傳送一次。
//2消息只傳送一次。
qos := byte(1)
retained := false
token := MqttClient.Publish(m.Topic, qos, retained, msg)
if token.Error() != nil {
fmt.Printf("Error while publishing %v", token.Error())
}
token.Wait()
}
func (m *MqttC) Subscription() {
qos := byte(0)
token := MqttClient.Subscribe(m.Topic, qos, MessagePubHandler)
token.Wait()
fmt.Printf("Subscribed to topic: %s", m.Topic)
}