MQTT 是一種輕量級(jí)的、靈活的物聯(lián)網(wǎng)消息交換和數(shù)據(jù)傳遞協(xié)議,致力于為 IoT 開發(fā)人員實(shí)現(xiàn)靈活性與硬件/網(wǎng)絡(luò)資源的平衡。
Kotlin 是一門由 JetBrains 公司開發(fā)的編程語言,Kotlin 是基于 JVM 的,所以開發(fā)者可以很方便地用它來進(jìn)行 Android 開發(fā),并且支持 Kotlin 和 Java 的混合編寫。而早在 2017 年,Google 就宣布 Kotlin 成為官方開發(fā)語言。
本文主要介紹使用 Kotlin 語言在 Android 平臺(tái)上使用 MQTT。
新建 Kotlin 項(xiàng)目
打開 Android Studio 新建一個(gè)項(xiàng)目,選擇語言為 Kotlin,Android Studio 會(huì)自動(dòng)創(chuàng)建 Kotlin 相關(guān)配置。若要配置現(xiàn)有項(xiàng)目,則可以參考 將 Kotlin 添加到現(xiàn)有應(yīng)用。
添加依賴
打開項(xiàng)目的 build.gradle,添加 Eclipse Paho Java Client 和 Eclipse Paho Android Service 依賴到 dependencies 部分。
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
配置 AndroidManifest.xml
Android Service 是 Eclipse 開發(fā)的基于 Android 平臺(tái)的一個(gè)后臺(tái)服務(wù),我們需要將它注冊(cè)到AndroidManifest.xml 文件,同時(shí),我們需要注冊(cè)權(quán)限。
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
...
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
創(chuàng)建 MQTT 客戶端
private lateinit var mqttClient: MqttAndroidClient
// TAG
companion object {
const val TAG = "AndroidMqttClient"
}
連接 MQTT 服務(wù)器
本文將使用 EMQ X MQTT Cloud 運(yùn)營(yíng)和維護(hù)的免費(fèi)公共 MQTT 服務(wù)器, EMQ X Cloud 是由 EMQ 推出的安全的 MQTT 物聯(lián)網(wǎng)云服務(wù)平臺(tái),它提供一站式運(yùn)維代管、獨(dú)有隔離環(huán)境的 MQTT 5.0 接入服務(wù)。
- Broker: broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
fun connect(context: Context) {
val serverURI = "tcp://broker.emqx.io:1883"
mqttClient = MqttAndroidClient(context, serverURI, "kotlin_client")
mqttClient.setCallback(object : MqttCallback {
override fun messageArrived(topic: String?, message: MqttMessage?) {
Log.d(TAG, "Receive message: ${message.toString()} from topic: $topic")
}
override fun connectionLost(cause: Throwable?) {
Log.d(TAG, "Connection lost ${cause.toString()}")
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
}
})
val options = MqttConnectOptions()
try {
mqttClient.connect(options, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "Connection success")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Connection failure")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
其中,MqttCallback 接口包含 3 個(gè)方法:
- messageArrived:收到 broker 新消息
- connectionLost:與 broker 連接丟失
- deliveryComplete:消息到 broker 傳遞完成
MqttConnectOptions 用于配置連接設(shè)置,包含用戶名密碼,超時(shí)配置等,具體可以查看其方法。
創(chuàng)建 MQTT 訂閱
訂閱 topic
fun subscribe(topic: String, qos: Int = 1) {
try {
mqttClient.subscribe(topic, qos, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "Subscribed to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to subscribe $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
取消訂閱
取消訂閱 topic
fun unsubscribe(topic: String) {
try {
mqttClient.unsubscribe(topic, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "Unsubscribed to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to unsubscribe $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
發(fā)布消息
fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
try {
val message = MqttMessage()
message.payload = msg.toByteArray()
message.qos = qos
message.isRetained = retained
mqttClient.publish(topic, message, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "$msg published to $topic")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to publish $msg to $topic")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
斷開 MQTT 連接
fun disconnect() {
try {
mqttClient.disconnect(null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
Log.d(TAG, "Disconnected")
}
override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
Log.d(TAG, "Failed to disconnect")
}
})
} catch (e: MqttException) {
e.printStackTrace()
}
}
測(cè)試
首先將 Android 客戶端連接到 MQTT 服務(wù)器,然后訂閱 topic: a/b,可以看到連接成功和成功訂閱的日志

然后我們使用 MQTT 5.0 客戶端工具 - MQTT X 進(jìn)行測(cè)試,發(fā)布消息到 topic: a/b,客戶端可以看到收到消息的日志


我們?cè)诳蛻舳税l(fā)布消息到 topic: a/b ,因?yàn)槲覀冇嗛喠嗽?topic,同時(shí)也會(huì)收到消息,最后我們斷開客戶端與 MQTT 服務(wù)器的連接,日志如下:

至此,我們已經(jīng)完成了Android 上 MQTT 客戶端的構(gòu)建,實(shí)現(xiàn)了客戶端與 MQTT 服務(wù)器的連接、主題訂閱、收發(fā)消息等功能。
MQTT 可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用。
而 Kotlin 也是 Google 官方主推的一門語言,結(jié)合 MQTT 協(xié)議及 MQTT 云服務(wù),我們可以開發(fā)更多有趣的應(yīng)用。
版權(quán)聲明: 本文為 EMQ 原創(chuàng),轉(zhuǎn)載請(qǐng)注明出處。
原文鏈接:https://www.emqx.io/cn/blog/android-connects-mqtt-using-kotlin