flutter mqtt的使用看這里,持續(xù)更新。。。

mqtt網(wǎng)絡(luò)協(xié)議,相信跟物聯(lián)網(wǎng)相關(guān)的公司都會(huì)遇到,在Android,iOS原生開(kāi)發(fā)是可以很好的實(shí)現(xiàn),相關(guān)的資料也是很多!但是在flutter里面還算比較嘗鮮的一個(gè)領(lǐng)域吧!

幸虧flutter里面 已經(jīng)有一個(gè)還不錯(cuò)的第三庫(kù)mqtt_client,我在項(xiàng)目中用的版本是比較低一點(diǎn)的版本,如下圖

image

此版本在項(xiàng)目中,已經(jīng)運(yùn)行了有一段時(shí)間了,相關(guān)的app,也已經(jīng)上線了,目前沒(méi)有發(fā)現(xiàn)有什么問(wèn)題.但是,秉承我一貫的原則,干貨,干貨,干貨;當(dāng)然是要為大家?guī)?lái)最新版本的mqtt的示例啦~~此時(shí)應(yīng)該有掌聲

這篇文章介紹 怎么封裝一個(gè)屬于自己的工具,解決在使用過(guò)程 中文亂碼等問(wèn)題。

首先我希望用單例模式,這樣整個(gè)項(xiàng)目隨處都可以調(diào)用,使用起來(lái)比較方便

然后希望支持加密與不加密的情況!

最后希望支持中文

話不多說(shuō),新建一個(gè)工程flutter_app_mqtt,在pubspec.yaml文件中,添加依賴庫(kù)mqtt_client,然后pub get一下,下載庫(kù)

mqtt_client: ^7.3.0

準(zhǔn)備工作好了,我們準(zhǔn)備封裝工具類MqttTool


file.png

核心代碼

MqttTool工具類代碼如下:

import 'dart:async';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:typed_data/typed_buffers.dart';

typedef ConnectedCallback = void Function();

class MqttTool {
  MqttQos qos = MqttQos.atLeastOnce;
  MqttServerClient mqttClient;
  static MqttTool _instance;
  static MqttTool getInstance() {
    if (_instance == null) {
      _instance = MqttTool();
    }
    return _instance;
  }

  Future<MqttClientConnectionStatus> connect(String server, int port,
      String clientIdentifier, String username, String password,
      {bool isSsl = false}) {
    mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);

    mqttClient.onConnected = onConnected;

    mqttClient.onSubscribed = _onSubscribed;

    mqttClient.onSubscribeFail = _onSubscribeFail;

    mqttClient.onUnsubscribed = _onUnSubscribed;

    mqttClient.setProtocolV311();
    mqttClient.logging(on: false);
    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }
    _log("_正在連接中...");
    return mqttClient.connect(username, password);
  }

  disconnect() {
    mqttClient.disconnect();
    _log("_disconnect");
  }

  int publishMessage(String pTopic, String msg) {

    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$msg");
    Uint8Buffer uint8buffer = Uint8Buffer();
    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(codeUnits);

    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  int publishRawMessage(String pTopic, List<int> list) {
    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$list");
    Uint8Buffer uint8buffer = Uint8Buffer();
//    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(list);
    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  Subscription subscribeMessage(String subtopic) {
    return mqttClient.subscribe(subtopic, qos);
  }

  unsubscribeMessage(String unSubtopic) {
    mqttClient.unsubscribe(unSubtopic);
  }

  MqttClientConnectionStatus getMqttStatus() {
    return mqttClient.connectionStatus;
  }

  Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
    _log("_監(jiān)聽(tīng)成功!");
    return mqttClient.updates;
  }

  onConnected() {
//    mqttClient.onConnected = callback;
    _log("_onConnected");
  }

  onDisConnected(ConnectedCallback callback) {
    mqttClient.onDisconnected = callback;
  }

  _onDisconnected() {
    _log("_onDisconnected");
  }

  _onSubscribed(String topic) {
    _log("_訂閱主題成功---topic:$topic");
  }

  _onUnSubscribed(String topic) {
    _log("_取消訂閱主題成功---topic:$topic");
  }

  _onSubscribeFail(String topic) {
    _log("_onSubscribeFail");
  }

  _log(String msg) {
    print("MQTT-->$msg");
  }
}

工具類封裝完成了,現(xiàn)在就需要去項(xiàng)目中應(yīng)用了,因此,我們改造一下HomePage的布局,來(lái)完成測(cè)試驗(yàn)證工作

Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            Text(
              '$_counter',
              style: Theme.of(context).textTheme.headline4,
            ),
            Container(
              child: RaisedButton(
                onPressed: _connect,
                child: Text(
                  "connect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _subscribeTopic,
                child: Text(
                  "subscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _unSubscribeTopic,
                child: Text(
                  "unSubscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _publishTopic,
                child: Text(
                  "publish topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _startListen,
                child: Text(
                  "start listen",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _disconnect,
                child: Text(
                  "disconnect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            )
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ), // This trailing comma makes auto-formatting nicer for build methods.
    );
  }

對(duì)應(yīng)的UI圖是這樣的


changeImage.png

RaiseButton對(duì)應(yīng)的點(diǎn)擊事件函數(shù)如下

//  建立連接
  _connect() async {
    String server = "your server name";
    int port = 1883;
    String clientId = "86-1885999fuehxz5f3ced1e";
    String userName = "86-18859995315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt連接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt連接失敗 --密碼錯(cuò)誤!!!");
      } else {
        print("有事做了~ ====mqtt連接失敗!!!");
      }
    });
  }

//  訂閱主題
  _subscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";

    String topic = "device/F4CFA26F1E43/#";

    String topic2 = "reply/device/F4CFA26F1E43/#";
    MqttTool.getInstance().subscribeMessage(topic);
    MqttTool.getInstance().subscribeMessage(topic2);
  }
  
//  取消訂閱
  _unSubscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";
    String topic = "device/F4CFA26F1E43/#";
    MqttTool.getInstance().unsubscribeMessage(topic);
  }
  
//  發(fā)布消息
  _publishTopic() {
    String topic1 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
    String str1 = "2950";

    String topic2 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
    String str2 = "2900";
    MqttTool.getInstance().publishMessage(topic1, str1);
    MqttTool.getInstance().publishMessage(topic2, str2);
  }
  
//  監(jiān)聽(tīng)消息的具體實(shí)現(xiàn)
  _onData(List<MqttReceivedMessage<MqttMessage>> data) {
    final MqttPublishMessage recMess = data[0].payload;
    final String topic = data[0].topic;
    final String pt = Utf8Decoder().convert(recMess.payload.message);
    String desString = "topic is <$topic>, payload is <-- $pt -->";
    print("string =$desString");
    Map p = Map();
    p["topic"] = topic;
    p["type"] = "string";
    p["payload"] = pt;
    ListEventBus.getDefault().post(p);
  }

//  開(kāi)啟監(jiān)聽(tīng)消息
  _startListen() {
    _listenSubscription = MqttTool.getInstance().updates().listen(_onData);
  }
  
//  斷開(kāi)連接
  _disconnect() {
    MqttTool.getInstance().disconnect();
  }

點(diǎn)擊RaiseButton順序 connect--->subscribe topic---->start listen---->publish topic----> disconnect

應(yīng)的控制臺(tái)log 如下:


log.png

至此,Mqtt協(xié)議的封裝,加應(yīng)用完成得差不多了.

還有二點(diǎn)需要補(bǔ)充一下,

一個(gè)是加密問(wèn)題,
另一個(gè)是 整個(gè)工程在監(jiān)聽(tīng)mqtt回來(lái)的數(shù)據(jù)該如何處理

加密問(wèn)題,其實(shí)MqttTool工具類代碼,已經(jīng)處理好了,相關(guān)代碼如下

    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }

所以用加密端口時(shí),_connect()方法里面要多傳一個(gè)參數(shù) isSsl:true

//  建立連接
  _connect() async {
    String server = "connect.owon.com";
    int port = 8883;
    String clientId = "86-1885999fuehxz5f3ced1e";
    String userName = "86-188599895315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password,isSsl: true)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt連接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt連接失敗 --密碼錯(cuò)誤!!!");
      } else {
        print("有事做了~ ====mqtt連接失敗!!!");
      }
    });
  }

另一個(gè)是 整個(gè)工程在監(jiān)聽(tīng)mqtt回來(lái)的數(shù)據(jù)該如何處理
我這邊是把mqtt工具類接收到數(shù)據(jù)用evenbus發(fā)送出去了, 其他需要監(jiān)聽(tīng)數(shù)據(jù)的page,就去用evenbus去監(jiān)聽(tīng)接收的數(shù)據(jù).

ListEventBus.getDefault().post(p);

evenbus類的代碼如下

import 'dart:async';

class ListEventBus {
  static ListEventBus _instance;
  StreamController _streamController;
  factory ListEventBus.getDefault() {
    if (_instance == null) {
      _instance = ListEventBus._init();
    }
    return _instance;
  }

  ListEventBus._init() {
    _streamController = StreamController.broadcast();
  }

  StreamSubscription<T> register<T>(void onData(T event)) {
    ///需要返回訂閱者,所以不能使用下面這種形式
//   return _streamController.stream.listen((event) {
//      if (event is T) {
//        onData(event);
//      }
//    });
    ///沒(méi)有指定類型,全類型注冊(cè)
    if (T == dynamic) {
      return _streamController.stream.listen(onData);
    } else {
      ///篩選出 類型為 T 的數(shù)據(jù),獲得只包含T的Stream
      Stream<T> stream =
          _streamController.stream.where((type) => type is T).cast<T>();
      return stream.listen(onData);
    }
  }

  void post(event) {
    _streamController.add(event);
  }

  void unregister() {
    _streamController.close();
  }

  void pause() {
    _streamController.onPause();
  }

  void resume() {
    _streamController.onResume();
  }
}

在需要監(jiān)聽(tīng)數(shù)據(jù)的page里面添加以下代碼就可以啦

  StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;
@override
  void initState() {
    // TODO: implement initState
    super.initState();
    _listEvenBusSubscription =
        ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
          String topic = msg["topic"];
          Map<String, dynamic> payload = msg["payload"];

        });
  }

main.dart文件里面的完整代碼

import 'dart:async';
import 'dart:convert';
import 'live_even_bus.dart';
import 'package:flutter/material.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'mqtt_tool.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
        visualDensity: VisualDensity.adaptivePlatformDensity,
      ),
      home: MyHomePage(title: 'Flutter Demo Home Page'),
    );
  }
}

class MyHomePage extends StatefulWidget {
  MyHomePage({Key key, this.title}) : super(key: key);
  final String title;
  @override
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;
  StreamSubscription<List<MqttReceivedMessage<MqttMessage>>>
      _listenSubscription;

  StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;

  void _incrementCounter() {
    setState(() {
      _counter++;
    });
  }

@override
  void initState() {
    // TODO: implement initState
    super.initState();
    _listEvenBusSubscription =
        ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
          String topic = msg["topic"];
          Map<String, dynamic> payload = msg["payload"];
          print("監(jiān)聽(tīng)的 topc= $topic, payload= $payload");
        });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            Text(
              '$_counter',
              style: Theme.of(context).textTheme.headline4,
            ),
            Container(
              child: RaisedButton(
                onPressed: _connect,
                child: Text(
                  "connect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _subscribeTopic,
                child: Text(
                  "subscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _unSubscribeTopic,
                child: Text(
                  "unSubscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _publishTopic,
                child: Text(
                  "publish topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _startListen,
                child: Text(
                  "start listen",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _disconnect,
                child: Text(
                  "disconnect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            )
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ), // This trailing comma makes auto-formatting nicer for build methods.
    );
  }

//  建立連接
  _connect() async {
    String server = "your server name";
    int port = 8883;
    String clientId = "86-1885999713jlb5f3d01f3";
    String userName = "86-188599895315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password, isSsl: true)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt連接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt連接失敗 --密碼錯(cuò)誤!!!");
      } else {
        print("有事做了~ ====mqtt連接失敗!!!");
      }
    });
  }

//  訂閱主題
  _subscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";

    String topic = "device/F4CFA26F1E43/#";

    String topic2 = "reply/device/F4CFA26F1E43/#";
    MqttTool.getInstance().subscribeMessage(topic);
    MqttTool.getInstance().subscribeMessage(topic2);
  }

//  取消訂閱
  _unSubscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";
    String topic = "device/F4CFA26F1E43/#";
    MqttTool.getInstance().unsubscribeMessage(topic);
  }

//  發(fā)布消息
  _publishTopic() {
    String topic1 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
    String str1 = "2950";

    String topic2 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
    String str2 = "2900";
    MqttTool.getInstance().publishMessage(topic1, str1);
    MqttTool.getInstance().publishMessage(topic2, str2);
  }

//  監(jiān)聽(tīng)消息的具體實(shí)現(xiàn)
  _onData(List<MqttReceivedMessage<MqttMessage>> data) {
    final MqttPublishMessage recMess = data[0].payload;
    final String topic = data[0].topic;
    final String pt = Utf8Decoder().convert(recMess.payload.message);
    String desString = "topic is <$topic>, payload is <-- $pt -->";
    print("string =$desString");
    Map p = Map();
    p["topic"] = topic;
    p["type"] = "string";
    p["payload"] = pt;
    ListEventBus.getDefault().post(p);
  }

//  開(kāi)啟監(jiān)聽(tīng)消息
  _startListen() {
    _listenSubscription = MqttTool.getInstance().updates().listen(_onData);
  }

//  斷開(kāi)連接
  _disconnect() {
    MqttTool.getInstance().disconnect();
  }


}

結(jié)尾

mqtt這個(gè)庫(kù)也是在不斷的更新,如果從低版本升到高版本,可能會(huì)遇到不兼容的問(wèn)題,希望到家,冷靜沉著應(yīng)對(duì)!祝君好運(yùn)~ 最后,小伙伴們覺(jué)得有點(diǎn)幫助,請(qǐng)幫忙點(diǎn)個(gè)贊吧。如果有什么問(wèn)題需要探討的,也歡迎留言~

2023年4月15日更新,希望能幫助到小伙伴們,好運(yùn)~

mqtt庫(kù)更新到如下版本

mqtt_client: ^9.6.6

主要變化:

  • MqttTool工具類適配空安全,
  • 增加mqtt連接成功監(jiān)聽(tīng)
  • 增加mqtt連接斷開(kāi)監(jiān)聽(tīng)
  • 增加網(wǎng)絡(luò)異常時(shí)提示
  • 解決中文亂碼問(wèn)題
    如下
import 'dart:async';
import 'dart:convert';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:owoncare/owon_res/owon_constant.dart';
import '../generated/l10n.dart';
import '../owon_utils/owon_log.dart';
import 'package:typed_data/typed_buffers.dart';
import 'owon_toast.dart';



class OwonMqtt {
  MqttQos qos = MqttQos.atLeastOnce;
  MqttServerClient mqttClient;
  static OwonMqtt _instance;
  StreamSubscription<ConnectivityResult> _subscription;
  static Set<String> keepTopics = new Set<String>();

  ConnectivityResult _currentConnectivityResult;
  
  static OwonMqtt getInstance() {
    if (_instance == null) {
      _instance = OwonMqtt();
    }
    return _instance;
  }

  Future<MqttClientConnectionStatus> connect(String server, int port,
      String clientIdentifier, String username, String password,
      {bool isSsl = false, Function onDisconnected, Function onConnected}) {
    _subscription = Connectivity()
        .onConnectivityChanged
        .listen((ConnectivityResult result) {
      _currentConnectivityResult = result;
    });

    print("mqtt connect to $server:$port, clientId:$clientIdentifier");
    mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);

    mqttClient.onSubscribed = _onSubscribed;

    mqttClient.onSubscribeFail = _onSubscribeFail;

    mqttClient.onUnsubscribed = _onUnSubscribed;
    mqttClient.onDisconnected = onDisconnected;
    mqttClient.onConnected = onConnected;

    mqttClient.setProtocolV311();
    mqttClient.logging(on: false);
    mqttClient.keepAlivePeriod = 120;
    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }
    _log("mqtt正在連接中...");
    return mqttClient.connect(username, password);
  }

  disconnect() {
    if (mqttClient != null) {
      mqttClient.disconnect();
      _subscription.cancel();
      _log("mqtt disconnect");
    } else {
      _log("mqtt client is null , disconnect failure");
    }
  }

//   int publishMessage(String pTopic, String msg) {
//     if (_currentConnectivityResult == ConnectivityResult.none) {
//       Future.delayed(Duration(milliseconds: 0), () {
//         OwonToast.show(S.of(OwonConstant.context).login_no_network,);
//       });
//       return null;
//     }
// //    AbnormalType currentType = Provider.of<AbnormalProvider>(currentContext).abnormalType;
// //    if(currentType == AbnormalType.failedTwice && (mqttClient.connectionStatus.state == MqttConnectionState.disconnected)){
// //      print("mqtt.dart第73行");
// //      ListEventBus.getDefault().post(2);
// //    }
//
//     _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$msg");
//     Uint8Buffer uint8buffer = Uint8Buffer();
//     var codeUnits = msg.codeUnits;
//     uint8buffer.addAll(codeUnits);
//
//     return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
//   }

  bool isEnable() {
    if (_currentConnectivityResult == ConnectivityResult.none) {
      OwonLog.e("mqtt 網(wǎng)絡(luò)異常");
      Future.delayed(Duration(milliseconds: 0), () {
        OwonToast.show(
          S.of(OwonConstant.context).login_no_network,
        );
      });
      return false;
    }
    if (mqttClient == null ||
        mqttClient.connectionStatus.state != MqttConnectionState.connected) {
      OwonLog.e("mqtt 連接狀態(tài)異常");
      if (mqttClient != null) {
        OwonLog.e("主動(dòng)斷開(kāi)mqtt連接");
        mqttClient.disconnect();
      }
      return false;
    } else {
      return true;
    }
  }

  ///此方法可以兼容中文
  int publishMessage(String pTopic, String msg) {
    if (!isEnable()) return null;

    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$msg");

    final bytes = utf8.encode(msg);
    // 創(chuàng)建MQTT消息并發(fā)送
    final message = MqttClientPayloadBuilder();
    bytes.forEach((element) {
      message.addByte(element);
    });

    return mqttClient.publishMessage(pTopic, qos, message.payload,
        retain: false);
  }

  int publishRawMessage(String pTopic, List<int> list) {
    if (!isEnable()) return null;
    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$list");
    Uint8Buffer uint8buffer = Uint8Buffer();
//    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(list);
    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  Subscription subscribeMessage(String subtopic, {bool keep = false}) {
    if (keep) {
      keepTopics.add(subtopic);
    }
    if (!isEnable()) return null;
    return mqttClient.subscribe(subtopic, qos);
  }

  unsubscribeMessage(String unSubtopic) {
    if (!isEnable()) return null;
    mqttClient.unsubscribe(unSubtopic);
  }

  MqttClientConnectionStatus getMqttStatus() {
    return mqttClient != null ? mqttClient.connectionStatus : null;
  }

  Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
    _log("_監(jiān)聽(tīng)成功!");
    return mqttClient.updates;
  }

  _onSubscribed(String topic) {
    _log("_訂閱主題成功---topic:$topic");
  }

  _onUnSubscribed(String topic) {
    _log("_取消訂閱主題成功---topic:$topic");
  }

  _onSubscribeFail(String topic) {
    _log("_onSubscribeFail");
  }

  _log(String msg) {
    // print("MQTT-->$msg");
    OwonLog.e("MQTT-->$msg");
  }

  static cleanKeepTopics() {
    keepTopics.clear();
  }
}

建立連接的代碼

      OwonMqtt.getInstance()
          .connect(_mMqttServer, _mIsSSLFlag ? _mMqttSSLPort : _mMqttPort,
              _mClientId, _mUserName, accessToken,
              isSsl: _mIsSSLFlag,
              onConnected: _onMqttConnectedState,
              onDisconnected: _onDisconnectedState)
          .then((v) {
        OwonLog.e("mqtt connect result = $v");
        if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
          OwonLog.e("恭喜你~ ====mqtt連接成功");
          
        } else if (v.returnCode ==
            MqttConnectReturnCode.badUsernameOrPassword) {
          OwonLog.e("有事做了~ ====mqtt連接失敗 --密碼錯(cuò)誤!!!");
        } else {
          OwonLog.e("有事做了~ ====mqtt連接失敗!!!");
        }
      }).catchError((e) {
          OwonLog.e("有事做了~ ====mqtt連接失敗!!!");
      });

監(jiān)聽(tīng)mqtt連接成功

 _onMqttConnectedState() {
    OwonLog.e("mqtt連接已成功")
    //do something,比如:去訂閱相關(guān)主題,或者更新UI
  }

監(jiān)聽(tīng)mqtt連接斷開(kāi)

 _onDisconnectedState() {
    OwonLog.e("mqtt連接已斷開(kāi)");
    //do something, 比如:給出提示,或者去重新連接mqtt等
  }
補(bǔ)充一下,當(dāng)前版本信息
Justin-Mac-mini:care-vefify$ flutter doctor
Doctor summary (to see all details, run flutter doctor -v):
[?] Flutter (Channel stable, 3.0.3, on macOS 13.3 22E252 darwin-x64, locale zh-Hans-CN)
[?] Android toolchain - develop for Android devices (Android SDK version 32.0.0)
[?] Xcode - develop for iOS and macOS (Xcode 14.3)
[?] Chrome - develop for the web
[?] Android Studio (version 2021.2)
[?] IntelliJ IDEA Community Edition (version 2021.2.2)
[?] VS Code (version 1.77.1)
[?] Connected device (2 available)
[?] HTTP Host Availability

? No issues found!
Justin-Mac-mini:care-vefify$ 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容