Eclipse Paho:是Eclipse提供的一個(gè)訪問MQTT服務(wù)器的一種開源客戶端庫。

Eclipse目前提供十種不同語言平臺(tái)的客戶端類庫,
對(duì)于Java平臺(tái)而言和MQTT服務(wù)器交互的開源框架還有很多, 例如:
Eclipse Paho Java、 Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等等...
但是, 根據(jù)GIthub上使用次數(shù)來講Eclipse Paho無疑是主流, 就個(gè)人使用而已, Eclipse Paho集成非常方便、簡(jiǎn)單。
對(duì)MQTT協(xié)議不是很了解的可以看一下
Eclipse Paho 官網(wǎng)
Paho.client.mqttv3 在線API
注意:
QoS:服務(wù)質(zhì)量,其作用是當(dāng)網(wǎng)絡(luò)過載或擁塞時(shí),QoS 能確保重要業(yè)務(wù)量不受延遲,同時(shí)保證網(wǎng)絡(luò)的高效運(yùn)行
- 服務(wù)質(zhì)量0 - 表示一條消息最多應(yīng)該發(fā)送一次(零次或一次)。該消息不會(huì)持久保存到磁盤,并且不會(huì)通過網(wǎng)絡(luò)進(jìn)行確認(rèn)。這種QoS是最快的,但只能用于無價(jià)值的消息
- 服務(wù)質(zhì)量1 - 表示一條消息應(yīng)至少傳遞一次(一次或多次)。該消息只能在持久存在的情況下才能被安全地傳遞,因此應(yīng)用程序必須提供一種持久化方法。如果未指定持久性機(jī)制,則在發(fā)生客戶端故障時(shí)不會(huì)傳遞消息。該消息將通過網(wǎng)絡(luò)得到確認(rèn)。這是默認(rèn)的QoS。
- 服務(wù)質(zhì)量2 - 表示應(yīng)該傳遞一次消息。該消息將被保存到磁盤,并且將受到整個(gè)網(wǎng)絡(luò)的兩階段確認(rèn)。該消息只能在持久存在的情況下才能被安全地傳遞,因此應(yīng)用程序必須提供一種持久化方法。如果未指定持久性機(jī)制,則在發(fā)生客戶端故障時(shí)不會(huì)傳遞消息。
如果未配置持久性,則在發(fā)生網(wǎng)絡(luò)或服務(wù)器問題時(shí),仍會(huì)傳送QoS 1和2消息,因?yàn)榭蛻舳藢⒃趦?nèi)存中保持狀態(tài)。如果MQTT客戶端關(guān)閉或失敗,并且未配置持久性,則由于客戶端狀態(tài)將丟失,因此無法保持QoS 1和2消息的傳輸。
publish: 向服務(wù)器上的主題發(fā)布消息, 主要用于即時(shí)通訊(IoT、 聊天...)
subscribe: 訂閱主題, 服務(wù)器通過你訂閱的主題向你發(fā)布消息, 這樣就能實(shí)現(xiàn)服務(wù)器向APP推送信息了
ClientId: 客戶端ID應(yīng)該是唯一的, 多個(gè)用戶同時(shí)使用同一個(gè)ID創(chuàng)建連接會(huì)不斷擠線互踢導(dǎo)致連接不斷中斷, 服務(wù)器也是通過ID來區(qū)分不同用戶的操作
開始使用Eclipse Paho
1. 配置maven庫
repositories {
...
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
...
}
2.添加依賴
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0' // MQTT Eclipse paho
3.添加權(quán)限
<uses-permission android:name="android.permission.WAKE_LOCK"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.INTERNET"/>
4.編寫邏輯代碼
4.1新建一個(gè)MqttManager類, 用于MQTT操作的管理
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
/**
* @author Ai(陳祥林)
* @date 2018/1/3 10:37
* @email Webb@starcc.cc
*/
public class MqttManager {
private static MqttManager mInstance = null;
/**
* Mqtt回調(diào)
*/
private MqttCallback mCallback;
/**
* Mqtt客戶端
*/
private static MqttClient client;
/**
* Mqtt連接選項(xiàng)
*/
private MqttConnectOptions conOpt;
private MqttManager() {
mCallback = new MqttCallbackBus();
}
public static MqttManager getInstance() {
if (null == mInstance) {
synchronized (MqttManager.class) {
if (mInstance == null) {
mInstance = new MqttManager();
}
}
}
return mInstance;
}
/**
* 釋放單例, 及其所引用的資源
*/
public static void release() {
try {
if (mInstance != null) {
disConnect();
mInstance = null;
}
} catch (Exception e) {
Log.e("MqttManager", "release : " + e.toString());
}
}
/**
* 創(chuàng)建Mqtt 連接
*
* @param brokerUrl Mqtt服務(wù)器地址(tcp://xxxx:1863)
* @param userName 用戶名
* @param password 密碼
* @param clientId 客戶端Id
*/
public void creatConnect(String brokerUrl, String userName,
String password, String clientId, String topic) {
// 獲取默認(rèn)的臨時(shí)文件路徑
String tmpDir = System.getProperty("java.io.tmpdir");
/*
* MqttDefaultFilePersistence:
* 將數(shù)據(jù)包保存到持久化文件中,
* 在數(shù)據(jù)發(fā)送過程中無論程序是否奔潰、 網(wǎng)絡(luò)好壞
* 只要發(fā)送的數(shù)據(jù)包客戶端沒有收到,
* 這個(gè)數(shù)據(jù)包會(huì)一直保存在文件中,
* 直到發(fā)送成功為止。
*/
// Mqtt的默認(rèn)文件持久化
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
// 構(gòu)建包含連接參數(shù)的連接選擇對(duì)象
conOpt = new MqttConnectOptions();
// 設(shè)置Mqtt版本
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
// 設(shè)置清空Session,false表示服務(wù)器會(huì)保留客戶端的連接記錄,true表示每次以新的身份連接到服務(wù)器
conOpt.setCleanSession(false);
// 設(shè)置會(huì)話心跳時(shí)間,單位為秒
// 客戶端每隔10秒向服務(wù)端發(fā)送心跳包判斷客戶端是否在線
conOpt.setKeepAliveInterval(10);
// 設(shè)置賬號(hào)
if (userName != null) {
conOpt.setUserName(userName);
}
// 設(shè)置密碼
if (password != null) {
conOpt.setPassword(password.toCharArray());
}
// 最后的遺言(連接斷開時(shí), 發(fā)動(dòng)"close"給訂閱了topic該主題的用戶告知連接已中斷)
conOpt.setWill(topic, "close".getBytes(), 2, true);
// 客戶端是否自動(dòng)嘗試重新連接到服務(wù)器
conOpt.setAutomaticReconnect(true);
// 創(chuàng)建MQTT客戶端
client = new MqttClient(brokerUrl, clientId, dataStore);
// 設(shè)置回調(diào)
client.setCallback(mCallback);
// 連接
doConnect();
} catch (MqttException e) {
Log.e("MqttManager", "creatConnect : " + e.toString());
}
}
/**
* 建立連接
*/
public void doConnect() {
if (client != null) {
try {
client.connect(conOpt);
} catch (Exception e) {
Log.e("MqttManager", "doConnect : " + e.toString());
}
}
}
/**
* 發(fā)布消息
*
* @param topicName 主題名稱
* @param qos 質(zhì)量(0,1,2)
* @param payload 發(fā)送的內(nèi)容
*/
public void publish(String topicName, int qos, byte[] payload) {
if (client != null && client.isConnected()) {
// 創(chuàng)建和配置一個(gè)消息
MqttMessage message = new MqttMessage(payload);
message.setPayload(payload);
message.setQos(qos);
try {
client.publish(topicName, message);
} catch (MqttException e) {
Log.e("MqttManager", "publish : " + e.toString());
}
}
}
public void publish(String topicName, int qos, String payload) {
if (client != null && client.isConnected()) {
// 創(chuàng)建和配置一個(gè)消息
MqttMessage message = new MqttMessage(payload.getBytes);
message.setPayload(payload.getBytes);
message.setQos(qos);
try {
client.publish(topicName, message);
} catch (MqttException e) {
Log.e("MqttManager", "publish : " + e.toString());
}
}
}
/**
* 訂閱主題
*
* @param topicName 主題名稱
* @param qos 消息質(zhì)量
*/
public void subscribe(String topicName, int qos) {
if (client != null && client.isConnected()) {
try {
client.subscribe(topicName, qos);
} catch (MqttException e) {
Log.e("MqttManager", "subscribe : " + e.toString());
}
}
}
/**
* 取消連接
*/
public static void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
}
}
/**
* 判斷是否連接
*/
public static boolean isConnected() {
return client != null && client.isConnected();
}
}
4.2新建一個(gè)MqttCallbackBus類用于MQTT異步回調(diào)
我是使用自己封裝的EventBus發(fā)送粘性事件的方式來處理回調(diào)的內(nèi)容
對(duì)EventBus不熟悉可以使用Handler來處理
import com.zhanyun.key.eventbus.EventModel;
import com.zhanyun.key.eventbus.EventBusUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @author Ai(陳祥林)
* @date 2018/1/3 10:45
* @email Webb@starcc.cc
*/
public class MqttCallbackBus implements MqttCallback {
/**
* 連接中斷
*/
@Override
public void connectionLost(Throwable cause) {
Log.e("MqttManager", "cause : " + cause.toString());
// 可在此方法內(nèi)寫重連的邏輯
}
/**
* 消息送達(dá)
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.e("MqttManager", "topic : " + topic + "\t MqttMessage : " + message.toString());
EventBusUtil.sendStickyEvent(new EventModel(10001, topic));
EventBusUtil.sendStickyEvent(new EventModel(10010, message));
}
/**
* 交互完成
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.e("MqttManager", "token : " + token.toString());
}
}
5.使用
在Activity中調(diào)用
/**
* 第一個(gè)參數(shù)是服務(wù)器地址
* 第二個(gè)參數(shù)是用戶名
* 第三個(gè)參數(shù)是密碼
* 第四個(gè)參數(shù)是客戶端ID
* 第五個(gè)參數(shù)是主題
**/
MqttManager
.getInstance()
.creatConnect(
brokerUrl,
userName,
password,
clientId,
topic);

6.總結(jié)
MQTT是基于TCP/IP的, 手機(jī)鎖屏?xí)r會(huì)阻塞TCP, 導(dǎo)致MQTT中斷, MqttConnectOptions設(shè)置isAutomaticReconnect()為true時(shí)可自動(dòng)重連, 但多個(gè)相同的ClientID同時(shí)創(chuàng)建連接時(shí)會(huì)無限的連接中斷和自動(dòng)連接(注意)