Eclipse Paho 實(shí)現(xiàn)Android推送

Eclipse Paho:是Eclipse提供的一個(gè)訪問MQTT服務(wù)器的一種開源客戶端庫。

MQTT Client 對(duì)照表

Eclipse目前提供十種不同語言平臺(tái)的客戶端類庫,
對(duì)于Java平臺(tái)而言和MQTT服務(wù)器交互的開源框架還有很多, 例如:
Eclipse Paho Java、 Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等等...
但是, 根據(jù)GIthub上使用次數(shù)來講Eclipse Paho無疑是主流, 就個(gè)人使用而已, Eclipse Paho集成非常方便、簡(jiǎn)單。
對(duì)MQTT協(xié)議不是很了解的可以看一下
Eclipse Paho 官網(wǎng)
Paho.client.mqttv3 在線API

注意:

QoS:服務(wù)質(zhì)量,其作用是當(dāng)網(wǎng)絡(luò)過載或擁塞時(shí),QoS 能確保重要業(yè)務(wù)量不受延遲,同時(shí)保證網(wǎng)絡(luò)的高效運(yùn)行

  • 服務(wù)質(zhì)量0 - 表示一條消息最多應(yīng)該發(fā)送一次(零次或一次)。該消息不會(huì)持久保存到磁盤,并且不會(huì)通過網(wǎng)絡(luò)進(jìn)行確認(rèn)。這種QoS是最快的,但只能用于無價(jià)值的消息
  • 服務(wù)質(zhì)量1 - 表示一條消息應(yīng)至少傳遞一次(一次或多次)。該消息只能在持久存在的情況下才能被安全地傳遞,因此應(yīng)用程序必須提供一種持久化方法。如果未指定持久性機(jī)制,則在發(fā)生客戶端故障時(shí)不會(huì)傳遞消息。該消息將通過網(wǎng)絡(luò)得到確認(rèn)。這是默認(rèn)的QoS。
  • 服務(wù)質(zhì)量2 - 表示應(yīng)該傳遞一次消息。該消息將被保存到磁盤,并且將受到整個(gè)網(wǎng)絡(luò)的兩階段確認(rèn)。該消息只能在持久存在的情況下才能被安全地傳遞,因此應(yīng)用程序必須提供一種持久化方法。如果未指定持久性機(jī)制,則在發(fā)生客戶端故障時(shí)不會(huì)傳遞消息。

如果未配置持久性,則在發(fā)生網(wǎng)絡(luò)或服務(wù)器問題時(shí),仍會(huì)傳送QoS 1和2消息,因?yàn)榭蛻舳藢⒃趦?nèi)存中保持狀態(tài)。如果MQTT客戶端關(guān)閉或失敗,并且未配置持久性,則由于客戶端狀態(tài)將丟失,因此無法保持QoS 1和2消息的傳輸。

publish: 向服務(wù)器上的主題發(fā)布消息, 主要用于即時(shí)通訊(IoT、 聊天...)

subscribe: 訂閱主題, 服務(wù)器通過你訂閱的主題向你發(fā)布消息, 這樣就能實(shí)現(xiàn)服務(wù)器向APP推送信息了

ClientId: 客戶端ID應(yīng)該是唯一的, 多個(gè)用戶同時(shí)使用同一個(gè)ID創(chuàng)建連接會(huì)不斷擠線互踢導(dǎo)致連接不斷中斷, 服務(wù)器也是通過ID來區(qū)分不同用戶的操作


開始使用Eclipse Paho



1. 配置maven庫


repositories {

...

    maven {

        url "https://repo.eclipse.org/content/repositories/paho-snapshots/"

    }

...

}



2.添加依賴

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'  // MQTT Eclipse paho



3.添加權(quán)限

<uses-permission android:name="android.permission.WAKE_LOCK"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.INTERNET"/>



4.編寫邏輯代碼

4.1新建一個(gè)MqttManager類, 用于MQTT操作的管理

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

/**
 * @author Ai(陳祥林)
 * @date 2018/1/3  10:37
 * @email Webb@starcc.cc
 */
public class MqttManager {

    private static MqttManager mInstance = null;
    /**
     * Mqtt回調(diào)
     */
    private MqttCallback mCallback;
    /**
     * Mqtt客戶端
     */
    private static MqttClient client;
    /**
     * Mqtt連接選項(xiàng)
     */
    private MqttConnectOptions conOpt;

    private MqttManager() {
        mCallback = new MqttCallbackBus();
    }

    public static MqttManager getInstance() {
        if (null == mInstance) {
            synchronized (MqttManager.class) {
                if (mInstance == null) {
                    mInstance = new MqttManager();
                }
            }
        }
        return mInstance;
    }

    /**
     * 釋放單例, 及其所引用的資源
     */
    public static void release() {
        try {
            if (mInstance != null) {
                disConnect();
                mInstance = null;
            }
        } catch (Exception e) {
            Log.e("MqttManager", "release : " + e.toString());
        }
    }

    /**
     * 創(chuàng)建Mqtt 連接
     *
     * @param brokerUrl Mqtt服務(wù)器地址(tcp://xxxx:1863)
     * @param userName  用戶名
     * @param password  密碼
     * @param clientId  客戶端Id
     */
    public void creatConnect(String brokerUrl, String userName,
                             String password, String clientId, String topic) {
        // 獲取默認(rèn)的臨時(shí)文件路徑
        String tmpDir = System.getProperty("java.io.tmpdir");

        /*
         * MqttDefaultFilePersistence:
         * 將數(shù)據(jù)包保存到持久化文件中,
         * 在數(shù)據(jù)發(fā)送過程中無論程序是否奔潰、 網(wǎng)絡(luò)好壞
         * 只要發(fā)送的數(shù)據(jù)包客戶端沒有收到,
         * 這個(gè)數(shù)據(jù)包會(huì)一直保存在文件中,
         * 直到發(fā)送成功為止。
         */
        // Mqtt的默認(rèn)文件持久化
        MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
        try {
            // 構(gòu)建包含連接參數(shù)的連接選擇對(duì)象
            conOpt = new MqttConnectOptions();
            // 設(shè)置Mqtt版本
            conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            // 設(shè)置清空Session,false表示服務(wù)器會(huì)保留客戶端的連接記錄,true表示每次以新的身份連接到服務(wù)器
            conOpt.setCleanSession(false);
            // 設(shè)置會(huì)話心跳時(shí)間,單位為秒
            // 客戶端每隔10秒向服務(wù)端發(fā)送心跳包判斷客戶端是否在線
            conOpt.setKeepAliveInterval(10);
            // 設(shè)置賬號(hào)
            if (userName != null) {
                conOpt.setUserName(userName);
            }
            // 設(shè)置密碼
            if (password != null) {
                conOpt.setPassword(password.toCharArray());
            }
            // 最后的遺言(連接斷開時(shí), 發(fā)動(dòng)"close"給訂閱了topic該主題的用戶告知連接已中斷)
            conOpt.setWill(topic, "close".getBytes(), 2, true);
            // 客戶端是否自動(dòng)嘗試重新連接到服務(wù)器 
            conOpt.setAutomaticReconnect(true);
            // 創(chuàng)建MQTT客戶端
            client = new MqttClient(brokerUrl, clientId, dataStore);
            // 設(shè)置回調(diào)
            client.setCallback(mCallback);
            // 連接
            doConnect();
     
        } catch (MqttException e) {
            Log.e("MqttManager", "creatConnect : " + e.toString());

        }

    }


    /**
     * 建立連接
     */
    public void doConnect() {
        if (client != null) {
            try {
                client.connect(conOpt);
            } catch (Exception e) {
                Log.e("MqttManager", "doConnect : " + e.toString());
            }
        }
    }


    /**
     * 發(fā)布消息
     *
     * @param topicName 主題名稱
     * @param qos       質(zhì)量(0,1,2)
     * @param payload   發(fā)送的內(nèi)容
     */
    public void publish(String topicName, int qos, byte[] payload) {
        if (client != null && client.isConnected()) {
            // 創(chuàng)建和配置一個(gè)消息
            MqttMessage message = new MqttMessage(payload);
            message.setPayload(payload);
            message.setQos(qos);
            try {
                client.publish(topicName, message);
            } catch (MqttException e) {
                Log.e("MqttManager", "publish : " + e.toString());
            }
        }
    }


     public void publish(String topicName, int qos, String payload) {
        if (client != null && client.isConnected()) {
            // 創(chuàng)建和配置一個(gè)消息
            MqttMessage message = new MqttMessage(payload.getBytes);
            message.setPayload(payload.getBytes);
            message.setQos(qos);
            try {
                client.publish(topicName, message);
            } catch (MqttException e) {
                Log.e("MqttManager", "publish : " + e.toString());
            }
        }
    }


    /**
     * 訂閱主題
     *
     * @param topicName 主題名稱
     * @param qos      消息質(zhì)量
     */
    public void subscribe(String topicName, int qos) {
        if (client != null && client.isConnected()) {
            try {
                client.subscribe(topicName, qos);
            } catch (MqttException e) {
                Log.e("MqttManager", "subscribe : " + e.toString());
            }
        }
    }


    /**
     * 取消連接
     */
    public static void disConnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
        }
    }


    /**
     * 判斷是否連接
     */
    public static boolean isConnected() {
        return client != null && client.isConnected();
    }
}

4.2新建一個(gè)MqttCallbackBus類用于MQTT異步回調(diào)
我是使用自己封裝的EventBus發(fā)送粘性事件的方式來處理回調(diào)的內(nèi)容
對(duì)EventBus不熟悉可以使用Handler來處理

import com.zhanyun.key.eventbus.EventModel;
import com.zhanyun.key.eventbus.EventBusUtil;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author Ai(陳祥林)
 * @date 2018/1/3  10:45
 * @email Webb@starcc.cc
 */
public class MqttCallbackBus implements MqttCallback {

    /**
     * 連接中斷
     */
    @Override
    public void connectionLost(Throwable cause) {
        Log.e("MqttManager", "cause : " + cause.toString());
        // 可在此方法內(nèi)寫重連的邏輯 
    }


    /**
     * 消息送達(dá)
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        Log.e("MqttManager", "topic : " + topic + "\t MqttMessage : " + message.toString());
        EventBusUtil.sendStickyEvent(new EventModel(10001, topic));
        EventBusUtil.sendStickyEvent(new EventModel(10010, message));
    }


    /**
     * 交互完成
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.e("MqttManager", "token : " + token.toString());
    }
}



5.使用
在Activity中調(diào)用

/**
* 第一個(gè)參數(shù)是服務(wù)器地址
* 第二個(gè)參數(shù)是用戶名
* 第三個(gè)參數(shù)是密碼
* 第四個(gè)參數(shù)是客戶端ID
* 第五個(gè)參數(shù)是主題
**/
MqttManager
        .getInstance()
        .creatConnect(
              brokerUrl,
              userName,
              password,
              clientId, 
              topic);


運(yùn)行后打印服務(wù)端返回的數(shù)據(jù)



6.總結(jié)
MQTT是基于TCP/IP的, 手機(jī)鎖屏?xí)r會(huì)阻塞TCP, 導(dǎo)致MQTT中斷, MqttConnectOptions設(shè)置isAutomaticReconnect()為true時(shí)可自動(dòng)重連, 但多個(gè)相同的ClientID同時(shí)創(chuàng)建連接時(shí)會(huì)無限的連接中斷和自動(dòng)連接(注意)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,578評(píng)論 19 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,094評(píng)論 25 709
  • 每年的這個(gè)時(shí)候,伴著北方寒冷的天氣,心情也開始變得復(fù)雜了。不是因?yàn)樘鞖獾脑?,而是因?yàn)槟杲K將至,新年將來,不由自主...
  • 01 “戚本”是《紅樓夢(mèng)》眾多版本中的一種,前面因有戚蓼生的序而得名,通常意義上又專指有正書局石印的大字本和小字本...
    雨如酥閱讀 2,105評(píng)論 0 3

友情鏈接更多精彩內(nèi)容