Android實(shí)現(xiàn)MQTT客戶端

目錄

MQTT簡介

MQTT是機(jī)器對(duì)機(jī)器(M2M)/物聯(lián)網(wǎng)(IoT)連接協(xié)議。它被設(shè)計(jì)為一個(gè)極其輕量級(jí)的發(fā)布/訂閱消息傳輸協(xié)議。對(duì)于需要較小代碼占用空間和/或網(wǎng)絡(luò)帶寬非常寶貴的遠(yuǎn)程連接非常有用,是專為受限設(shè)備和低帶寬、高延遲或不可靠的網(wǎng)絡(luò)而設(shè)計(jì)。這些原則也使該協(xié)議成為新興的“機(jī)器到機(jī)器”(M2M)或物聯(lián)網(wǎng)(IoT)世界的連接設(shè)備,以及帶寬和電池功率非常高的移動(dòng)應(yīng)用的理想選擇。例如,它已被用于通過衛(wèi)星鏈路與代理通信的傳感器、與醫(yī)療服務(wù)提供者的撥號(hào)連接,以及一系列家庭自動(dòng)化和小型設(shè)備場(chǎng)景。它也是移動(dòng)應(yīng)用的理想選擇,因?yàn)樗w積小,功耗低,數(shù)據(jù)包最小,并且可以有效地將信息分配給一個(gè)或多個(gè)接收器。

效果演示

這是我連接的MQTT中文網(wǎng)的公共服務(wù)器

基礎(chǔ)知識(shí)

這里開發(fā)客戶端需要的知識(shí)不多,paho的核心庫都封裝好了,我們只需要了解下基礎(chǔ)的知識(shí)就行了。

1.連接

這里我使用的MQTT3.1.1協(xié)議,服務(wù)器地址是以tcp開頭的末尾加上端口號(hào)1883

val server = "tcp://mqtt.p2hp.com:1883" //服務(wù)端地址

其他的一些參數(shù)如下:
clientId:(作為客戶端的標(biāo)識(shí)),這里我使用的是AndroidID,獲取不到的話就使用生成的一個(gè)UUID
CleanSession:設(shè)置不持久化的話,每次都是一次新會(huì)話
keepAliveInterval:發(fā)送心跳包的時(shí)間

 val androidId = DeviceUtils.getAndroidID()
        val clientId = if(!TextUtils.isEmpty(androidId)){
            androidId
        }else{
            UUID.randomUUID().toString()
        }
        mqttClient = MqttAndroidClient(context,serverUrl,clientId)
        connectOptions = MqttConnectOptions().apply {
            isCleanSession = false //是否會(huì)話持久化
            connectionTimeout = 30 //連接超時(shí)時(shí)間
            keepAliveInterval = 10 //發(fā)送心跳時(shí)間
            userName = name //如果設(shè)置了認(rèn)證,填的用戶名
            password = pass.toCharArray() //用戶密碼
        }
2.訂閱和發(fā)布


(1) 訂閱
這里的概念就好像你微博關(guān)注了一個(gè)博主,然后當(dāng)博主發(fā)布新的動(dòng)態(tài),你這就可以收到,而這里的訂閱就是類似關(guān)注,訂閱的主題格式跟文件路徑差不多,比如訂閱一個(gè)topic/1,當(dāng)然這里也有帶通配符的訂閱方式比如topic/#,#的意思就是匹配所有,也就是當(dāng)你訂閱了topic/#的主題你就可以收到所有topic開頭的主題消息,像topic/1、topic/2、topic/3等。
(2) 發(fā)布
發(fā)布消息的時(shí)候也需要指定一個(gè)主題,比如topic/1,但是不能指定帶通配符的主題。
(3) QOS
另外還有一個(gè)比較重要的概念就是QOS(服務(wù)質(zhì)量),這里有3個(gè)值(0,1,2),代表的意義如下:
0:代表,Sender 發(fā)送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,也就算了。
1:代表,Sender 發(fā)送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver 發(fā)送消息,如果發(fā)送失敗,會(huì)繼續(xù)重試,直到 Receiver 收到消息為止,但是因?yàn)橹貍鞯脑?,Receiver 有可能會(huì)收到重復(fù)的消息。
2:代表,Sender 發(fā)送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發(fā)送消息,如果發(fā)送失敗,會(huì)繼續(xù)重試,直到 Receiver 收到消息為止,同時(shí)保證 Receiver 不會(huì)因?yàn)橄⒅貍鞫盏街貜?fù)的消息。

實(shí)現(xiàn)步驟

1.引入依賴

這里本來我是用的這個(gè)庫https://github.com/eclipse/paho.mqtt.android,但是這個(gè)庫不適配Android12因此我下載了源碼調(diào)整了下,重新自己封裝了一個(gè),如下:

allprojects {
        repositories {
            ...
            maven { url 'https://jitpack.io' }
        }
    }

dependencies {
            implementation 'com.github.itfitness:MQTTAndroid:1.0.0'
    }
2.封裝方法

我對(duì)這個(gè)庫進(jìn)行了一些封裝,如下:

class MQTTHelper{
    private val mqttClient: MqttAndroidClient
    private val connectOptions: MqttConnectOptions
    private var mqttActionListener: IMqttActionListener? = null
    constructor(context: Context, serverUrl:String, name:String, pass:String){
        val macAddress = DeviceUtils.getAndroidID()
        val clientId = if(!TextUtils.isEmpty(macAddress)){
            macAddress
        }else{
            UUID.randomUUID().toString()
        }
        mqttClient = MqttAndroidClient(context,serverUrl,clientId)
        connectOptions = MqttConnectOptions().apply {
            isCleanSession = false
            connectionTimeout = 30
            keepAliveInterval = 10
            userName = name
            password = pass.toCharArray()
        }
    }

    /**
     * 連接
     * @param mqttCallback 接到訂閱的消息的回調(diào)
     * @param isFailRetry 失敗是否重新連接
     */
    fun connect(topic: Topic, qos: Qos, isFailRetry:Boolean, mqttCallback: MqttCallback){
        mqttClient.setCallback(mqttCallback)
        if(mqttActionListener == null){
            mqttActionListener = object :IMqttActionListener{
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    LogUtils.eTag("連接","連接成功")
                    subscribe(topic,qos)
                }
                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    //失敗重連
                    LogUtils.eTag("連接","連接失敗重試${exception?.message}")
                    if (isFailRetry){
                        mqttClient.connect(connectOptions,null,mqttActionListener)
                    }
                }
            }
        }
        mqttClient.connect(connectOptions,null,mqttActionListener)
    }

    /**
     * 訂閱
     */
    private fun subscribe(topic: Topic,qos:Qos){
        mqttClient.subscribe(topic.value(),qos.value())
    }

    /**
     * 發(fā)布
     */
    fun publish(topic:Topic,message:String,qos:Qos){
        val msg = MqttMessage()
        msg.isRetained = false
        msg.payload = message.toByteArray()
        msg.qos = qos.value()
        mqttClient.publish(topic.value(),msg)
    }

    /**
     * 斷開連接
     */
    fun disconnect(){
        mqttClient.disconnect()
    }
}
enum class Qos{
    QOS_ZERO{
        override fun value():Int{
            return 0
        }
    },
    QOS_ONE{
        override fun value():Int{
            return 1
        }
    },
    QOS_TWO{
        override fun value():Int{
            return 2
        }
    };
    abstract fun value(): Int
}
enum class Topic{
    //訂閱主題
    TOPIC_MSG{
        override fun value():String{
            return "testtopic/#"
        }
    },
    //發(fā)布主題
    TOPIC_SEND{
        override fun value():String{
            return "testtopic/1"
        }
    };
    abstract fun value(): String
}
3.使用

在Activity中的使用如下:

class MainActivity : AppCompatActivity() {
    @RequiresApi(Build.VERSION_CODES.N)
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val server = "tcp://mqtt.p2hp.com:1883" //服務(wù)端地址
        val mqttHelper = MQTTHelper(this,server,"123","123")
        mqttHelper.connect(Topic.TOPIC_MSG, Qos.QOS_TWO,false,object : MqttCallback {
            override fun connectionLost(cause: Throwable?) {

            }

            override fun messageArrived(topic: String?, message: MqttMessage?) {
                //收到消息
                message?.payload?.let { ToastUtils.showShort(String(it)) }
                LogUtils.eTag("消息", message?.payload?.let { String(it) })
            }

            override fun deliveryComplete(token: IMqttDeliveryToken?) {



            }
        })
        val etMsg = findViewById<EditText>(R.id.et_msg)
        findViewById<Button>(R.id.tv_send).setOnClickListener {
            //發(fā)送消息
            mqttHelper.publish(Topic.TOPIC_SEND,etMsg.text.toString(),Qos.QOS_TWO)
        }
    }
}

案例源碼

https://github.com/itfitness/MQTTAndroid

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容