MQTT簡(jiǎn)介和使用

前言

最近在做的智能家居項(xiàng)目中有用到MQTT做消息的推送,主要是為了實(shí)現(xiàn)低流量下的智能家居控制(我們用到的是勞沃協(xié)議),在使用的時(shí)候也是遇到很多坑(特別是重連),這里講講自己的個(gè)人經(jīng)驗(yàn)和解決問(wèn)題的方式。

一.MQTT介紹

1.簡(jiǎn)介

MQTT(message queuing telemetry transport)是IBM開(kāi)發(fā)的即時(shí)通訊協(xié)議,是一種發(fā)布/訂閱極其輕量級(jí)的消息傳輸協(xié)議,專門(mén)為網(wǎng)絡(luò)受限設(shè)備、低寬帶以及高延遲和不可靠的網(wǎng)絡(luò)而設(shè)計(jì)的。由于以上輕量級(jí)的特點(diǎn),是實(shí)現(xiàn)智能家居的首選傳輸協(xié)議,相比于XMPP,更加輕量級(jí)而且占用寬帶低。

2.特點(diǎn)

a.由于采用發(fā)布/訂閱的消息模式,可以提供一對(duì)多的消息發(fā)布
b.輕量級(jí),網(wǎng)絡(luò)開(kāi)銷小
c.對(duì)負(fù)載內(nèi)容會(huì)有屏蔽的消息傳輸
d.有三種消息發(fā)布質(zhì)量(Qos):
qos=0:“至多一次”,這一級(jí)別會(huì)發(fā)生消息丟失或重復(fù),消息發(fā)布依賴于TCP/IP網(wǎng)絡(luò)
qos=1:“至少一次”,確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生
qos=2:“只有一次”,確保消息到達(dá)一次
e.通知機(jī)制,異常中斷時(shí)會(huì)通知雙方

3.原理

14523188625918865.png

MQTT協(xié)議有三種身份:發(fā)布者、代理、訂閱者,發(fā)布者和訂閱者都為客戶端,代理為服務(wù)器,同時(shí)消息的發(fā)布者也可以是訂閱者(為了節(jié)約內(nèi)存和流量發(fā)布者和訂閱者一般都會(huì)定義在一起)。
MQTT傳輸?shù)南⒎譃橹黝}(Topic,可理解為消息的類型,訂閱者訂閱后,就會(huì)收到該主題的消息內(nèi)容(payload))和負(fù)載(payload,可以理解為消息的內(nèi)容)兩部分。

二.MQTT通用使用

通用的使用方式的重連機(jī)制在安卓系統(tǒng)中需要自己去編寫(xiě),下一篇會(huì)詳細(xì)講解阿里專門(mén)針對(duì)Android客戶端的實(shí)現(xiàn)方式MqttAndroidClient

1.集成

1.build.grade中導(dǎo)入

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'

2.添加權(quán)限

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />

2.使用

直接上代碼,以下就是發(fā)布者和訂閱者一體的實(shí)現(xiàn)方式,自己添加了重連機(jī)制,對(duì)應(yīng)的注釋也比較清晰,如果對(duì)于MQTT入門(mén)最好看下阿里MQTT的官方文檔

public class MQTTManager {

    private String broker = "";//固定配置
    private String secretKey = "";//固定配置
    private String acessKey = "";//固定配置

    private String topic = "";//自己定義
    private String groupId = "";//自己定義
    private String clientId = "";//自己定義

    private MqttClient mqttClient;
    private volatile static MQTTManager manager;

    private int[] qos = {0, 0};//訂閱個(gè)數(shù)就是數(shù)組的長(zhǎng)度

    private ScheduledExecutorService reconnectPool;//重連線程池

    public static MQTTManager getInstance() {
        if (manager == null) {
            synchronized (MQTTManager.class) {
                if (manager == null)
                    manager = new MQTTManager();
            }
        }
        return manager;
    }

    public MQTTManager() {
        clientId = String.format("%s@@@%s", groupId, MQTTCons.Sep_SEND);//這是根據(jù)自己需求定義的clientId
    }

    /**
     * 發(fā)送信息
     * @param msg
     */
    public void sendMessage(String msg) {
        MqttMessage message = new MqttMessage(msg.getBytes());
        message.setQos(0);
        try {
            if (mqttClient != null)
                mqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
            TVLog.i("MqttException-sendMQTT-" + e);
        }
    }

    /**
     * 開(kāi)啟MQTT連接和訂閱
     */
    public void startSendMQTT() {
        try {
            closeMQTT();//斷開(kāi)和關(guān)閉連接的操作,由于我們需求需要,切換用戶要重新創(chuàng)建新的連接,一般應(yīng)用中基本都會(huì)始終訂閱一條
            MemoryPersistence persistence = new MemoryPersistence();
            mqttClient = new MqttClient(broker, clientId, persistence);
            final MqttConnectOptions connOpts = new MqttConnectOptions();
            String sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
            connOpts.setUserName(acessKey);
            connOpts.setServerURIs(new String[]{broker});
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(20);
            connOpts.setConnectionTimeout(10);
            connOpts.setMqttVersion(MQTT_VERSION_3_1_1);
            connOpts.setAutomaticReconnect(false);//禁用自帶重連機(jī)制,用于TV端會(huì)出現(xiàn)不穩(wěn)定性,所以自己寫(xiě)了重連
            mqttClient.setCallback(new MqttCallbackExtended() {
                public void connectComplete(boolean reconnect, String serverURI) {
                    TVLog.i("Send connect success" + topic);
                    closeReconnectTask();
                    subscribeFilter();//mqtt每次連接成功都得訂閱Topic
                }

                public void connectionLost(Throwable throwable) {
                    TVLog.i("mqtt connection lost");
                    startReconnectTask();
                }

                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    TVLog.i("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                }

                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    try {
                        TVLog.i("deliveryComplete:" + iMqttDeliveryToken.getMessage().toString());
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            mqttClient.connect(connOpts);
        } catch (Exception me) {
            me.printStackTrace();
        }
    }

    /**
     * 訂閱Topic
     */
    private void subscribeFilter() {
        String registerTopic = "";//自定義
        String controlTopic = "";//自定義,作為示例訂閱了兩個(gè)
        String[] topicFilters = new String[]{registerTopic, controlTopic};

        try {
            mqttClient.subscribe(topicFilters, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private synchronized void startReconnectTask() {//開(kāi)啟重連任務(wù)
        if (reconnectPool != null) return;
        reconnectPool = Executors.newScheduledThreadPool(1);
        reconnectPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (mqttClient == null || mqttClient.isConnected()) return;
                    mqttClient.reconnect();
                    TVLog.d("reconnectSendMQTT" + topic);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 0, 5 * 1000, TimeUnit.MILLISECONDS);
    }

    public synchronized void closeReconnectTask() {//程序銷毀的時(shí)候也記得關(guān)閉
        if (reconnectPool != null) {
            reconnectPool.shutdownNow();
            reconnectPool = null;
        }
    }

    public void closeMQTT() {//在程序銷毀的時(shí)候也記得調(diào)用
        try {
            closeReconnectTask();
            if (mqttClient != null) {
                mqttClient.disconnect();
                mqttClient.close();
                mqttClient = null;
                TVLog.d("closeSendMQTT" + topic);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

注意:mqttClient.disconnect()和mqttClient.connect(connOpts)都為耗時(shí)操作,網(wǎng)絡(luò)差的時(shí)候會(huì)阻塞主線程,可以新開(kāi)線程或者直接用線程池ScheduledExecutorService管理

結(jié)語(yǔ)

以上就是MQTT的簡(jiǎn)介、以及由官方Java的通用實(shí)現(xiàn)Demo改寫(xiě)后,在Android客戶端的實(shí)現(xiàn)方式,下一篇會(huì)詳細(xì)講解阿里專門(mén)針對(duì)Android客戶端的實(shí)現(xiàn)方式MqttAndroidClient

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容