golang mqtt的使用

官方文檔

https://www.emqx.com/zh/blog/how-to-use-mqtt-in-golang

連接

var once sync.Once
var MqttClient mqtt.Client
var MqttMessage mqtt.Message

func ConnectMqtt(pact, broker, port, username, password string) {
    once.Do(func() {
        MqttClient = NewClient(pact, broker, port, username, password)
    })
}

func NewClient(pact, broker, port, username, password string) mqtt.Client {
    //取配置
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("%s://%s:%s", pact, broker, port))
    //todo 如果使用固定的clientID會出現(xiàn)在第一次重連成功后,一直斷開再重連再斷開的死循環(huán)
    //處理這個問題的方法就是每次重新連接的時候使用新的clientId
    u1 := uuid.New()
    clientId := u1.String()
    opts.SetClientID(clientId)
    opts.SetUsername(username)
    opts.SetPassword(password)
    //Keep Alive 的最大值為 18 小時 12 分 15 秒;
    opts.SetKeepAlive(18 * time.Hour)
    opts.SetDefaultPublishHandler(MessagePubHandler)
    //把配置里的 cleanSession 設為false,客戶端掉線后 服務器端不會清除session,
    //當重連后可以接收之前訂閱主題的消息。當客戶端上線后會接受到它離線的這段時間的消息
    //但是這個只是進行了重連,重連后還需要再次發(fā)起訂閱
    opts.CleanSession = false
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    return client

}


var MessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    //實現(xiàn)數(shù)據(jù)存儲或者入庫
    logger.ZapLogger.Infof("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

// 連接回調(diào)
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected Ok")
    logger.ZapLogger.Infof("MQTT Connect SUCCESS")
}

// 丟失回調(diào)
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect  lost :%v", err)
    logger.ZapLogger.Errorf("Connect lost :%v", err)
}

發(fā)送數(shù)據(jù)和訂閱接收


type MqttInterface interface {
    //發(fā)送
    PublishMessage(msg any)
    //訂閱
    Subscription()
}

type MqttC struct {
    Topic       string
    MqttMessage mqtt.Message
}

func NewMqttCRes(topic string) *MqttC {
    return &MqttC{
        Topic: topic,
    }
}

func (m *MqttC) PublishMessage(msg any) {
    //等級:0 消息最多傳送一次。如果當前客戶端不可用,它將丟失這條消息。
    //1消息至少傳送一次。
    //2消息只傳送一次。
    qos := byte(1)
    retained := false
    token := MqttClient.Publish(m.Topic, qos, retained, msg)
    if token.Error() != nil {
        fmt.Printf("Error while publishing %v", token.Error())
    }
    token.Wait()
}

func (m *MqttC) Subscription() {
    qos := byte(0)
    token := MqttClient.Subscribe(m.Topic, qos, MessagePubHandler)
    token.Wait()
    fmt.Printf("Subscribed to topic: %s", m.Topic)
}

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容