MQTT簡析

由于部門做的設(shè)備需要使用到MQTT,在此總結(jié)下。

基本介紹

MQTT(Message Queue Telemetry Transport),遙測傳輸協(xié)議,提供訂閱/發(fā)布模式,更為簡約、輕量,易于使用,針對受限環(huán)境(帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定),可以簡單概括為物聯(lián)網(wǎng)打造,官方總結(jié)特點如下:

  1. 使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應(yīng)用程序耦合。
  1. 對負載內(nèi)容屏蔽的消息傳輸。
  2. 使用 TCP/IP 提供網(wǎng)絡(luò)連接。
  3. 有三種消息發(fā)布服務(wù)質(zhì)量:
    “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。
    “至少一次”,確保消息到達,但消息重復(fù)可能會發(fā)生。
    “只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。
  4. 小型傳輸,開銷很?。ü潭ㄩL度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量。
  5. 使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機制。

XMPP和MQTT對比

XMPP
優(yōu)點:

  • 要涉及到上層業(yè)務(wù),設(shè)計到用戶分群分組,用戶層次關(guān)系的維護,xmpp已經(jīng)為你做了很多很多,后期的開發(fā)會很省心;
  • 協(xié)議成熟、強大、可擴展性強、目前主要應(yīng)用于許多聊天系統(tǒng)中,且已有開源的Java版的開發(fā)實例androidpn;
  • 協(xié)議成熟,強大,可擴展性強,并且有成熟的開源方案。
  • 分布式:任何人都可以運行自己的XMPP服務(wù)器,它沒有主服務(wù)器
  • 安全性高:使用TLS等技術(shù)
  • 跨平臺

缺點:

  • 協(xié)議雖然完整擴展性雖然好,它耗費網(wǎng)絡(luò)流量很大,交互此說太多,跑起來比MQTT慢很多;另外有高達70%的流量是耗費在XMPP本身的標簽和編解碼上面。
  • 協(xié)議較復(fù)雜、冗余(基于XML)、費流量、費電,部署硬件成本高;

MQTT
優(yōu)點:

  • 專門針對移動互聯(lián)網(wǎng)開發(fā)的輕量級傳輸協(xié)議,二進制、協(xié)議簡潔、小巧、可擴展性強、省流量、省電,適合做大量節(jié)點弱網(wǎng)絡(luò)差的場景,非常適合現(xiàn)在移動互聯(lián)網(wǎng)的基礎(chǔ)設(shè)施
  • 開源的協(xié)議和實現(xiàn),擴展方便且輕量級;
  • MQTT代碼量也比XMPP小很多,使用容易。
  • 支持的設(shè)備從智能硬件到智能手機無所不包。

MQTT快速示例

市面上有相當多的高質(zhì)量MQTT代理,其中mosquitto是一個開源的輕量級的C實現(xiàn),完全兼容了MQTT 3.1和MQTT 3.1.1。下面我們就以mosquitto為例演示一下MQTT的使用。

  • 安裝mosquitto以及搭配的客戶端:
apt-get install mosquitto
apt-get install mosquitto-clients
  • 訂閱一個主題:
mosquitto_sub -d -t baidu/chatroom
Received CONNACK
Received SUBACK
Subscribed (mid: 1): 0
  • 另外打開一個SSH連接然后在這個主題里面打個招呼:
mosquitto_pub -d -t baidu/chatroom -m "Hello World"
Received CONNACK
Sending PUBLISH (d0, q0, r0, m1, 'baidu/chatroom', ... (11 bytes))
  • 此時回到第一個SSH客戶端可以看到信息已經(jīng)接收到了,之后便是心跳消息:
Received PUBLISH (d0, q0, r0, m0, 'baidu/chatroom', ... (11 bytes))
Hello World
Sending PINGREQ
Received PINGRESP
  • 需要注意的是mosquitto客戶端默認使用QoS 0,下面我們使用QoS 2訂閱這個主題:
mosquitto_sub -d -q 2 -t baidu/chatroom
Received CONNACK
Received SUBACK
Subscribed (mid: 1): 2
  • 切換到另外SSH連接然后在這個主題里面打個招呼:
mosquitto_pub -d -q 2 -t baidu/chatroom -m "Hello World"
Received CONNACK
Sending PUBLISH (d0, q2, r0, m1, 'baidu/chatroom', ... (11 bytes))
Received PUBREC (Mid: 1)
Sending PUBREL (Mid: 1)
Received PUBCOMP (Mid: 1)
  • 此時回到第一個SSH客戶端可以看到信息已經(jīng)接收到了,以及相應(yīng)的多次握手消息:
Received PUBLISH (d0, q2, r0, m1, 'baidu/chatroom', ... (11 bytes))
Sending PUBREC (Mid: 1)
Received PUBREL (Mid: 1)
Hello World
Sending PUBCOMP (Mid: 1)

MQTT Java客戶端實現(xiàn)

使用開源項目https://www.eclipse.org/paho/提供的MQTT服務(wù)端tcp://iot.eclipse.org:1883,進行如下實驗:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


/**
 *@Description:
 *@author lx
 *@date 2017-1-12 下午1:19:42
 */
public class TestMQTT {

    public static void main(String args[]){

        //消息的類型
          String topic        = "TOPIC MQTT Examples";
          //消息內(nèi)容
          String content      = "XX發(fā)布了消息";
          //消息發(fā)送的模式   選擇消息發(fā)送的次數(shù),依據(jù)不同的使用環(huán)境使用不同的模式
          int qos             = 2;
          //服務(wù)器地址
          String broker       = "tcp://iot.eclipse.org:1883";
          //客戶端的唯一標識
          String clientId     = "CLIENTID JavaSample";
          //消息緩存的方式  內(nèi)存緩存
          MemoryPersistence persistence = new MemoryPersistence();

          try {
              //創(chuàng)建以惡搞MQTT客戶端
              MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
              //消息的配置參數(shù)
              MqttConnectOptions connOpts = new MqttConnectOptions();
              //不記憶上一次會話
              connOpts.setCleanSession(true);
              System.out.println("Connecting to broker: "+broker);
              //鏈接服務(wù)器
              sampleClient.connect(connOpts);
              System.out.println("Connected");
              System.out.println("Publishing message: "+content);
              //創(chuàng)建消息
              MqttMessage message = new MqttMessage(content.getBytes());
              //給消息設(shè)置發(fā)送的模式
              message.setQos(qos);
              //發(fā)布消息到服務(wù)器
              sampleClient.publish(topic, message);
              System.out.println("Message published");
              //斷開鏈接
              sampleClient.disconnect();
              System.out.println("Disconnected");
              System.exit(0);
          } catch(MqttException me) {
              System.out.println("reason "+me.getReasonCode());
              System.out.println("msg "+me.getMessage());
              System.out.println("loc "+me.getLocalizedMessage());
              System.out.println("cause "+me.getCause());
              System.out.println("excep "+me);
              me.printStackTrace();
          }

    }
}

參考https://github.com/eclipse/paho.mqtt.android

服務(wù)端添加SSL加密

這里采用了Moqutte推薦的SSL加密方式(http://andsel.github.io/moquette/),屬于SSL加密中的單向加密。

  1. 生成服務(wù)器的keystore
    執(zhí)行命令:
$JAVA_HOME/bin/keytool -keystore serverkeystore.jks -alias testserver -genkey -keyalg RSA

過程中提示輸入名字時(CN),必須填寫服務(wù)器的域名,本地調(diào)試時可填寫localhost。
然后修改工程里的 /config/moquette.conf 中的jks_path 對應(yīng)值為serverkeystore.jks 的路徑,把同時serverkeystore.jks復(fù)制到工程根目錄下。

  1. 生成客戶端的keystore
    1)導(dǎo)出服務(wù)器keystore的證書
    執(zhí)行命令:
$JAVA_HOME/bin/keytool -export -alias testserver -keystore serverkeystore.jks -file testserver.crt

2)生成客戶端的keystore
執(zhí)行命令:

$JAVA_HOME/bin/keytool -keystore clientkeystore.jks -genkey -keyalg RSA

3)向客戶端的keystore 導(dǎo)入服務(wù)器keystore的證書,使客戶端信任證書。
執(zhí)行命令:

$JAVA_HOME/bin/keytool -keystore clientkeystore.jks -import -alias testserver -file testserver.crt -trustcacerts

4)客戶端啟動時加載clientkeystore.jks然后再與服務(wù)器的SSL端口進行連接即可。
客戶端代碼參考:sslSimplePublisher.groovy

客戶端添加ssl加密

  1. 生成 .bks文件:
    a、根據(jù)上一節(jié),拿到服務(wù)器生成的 .jks證書,
    b、到官網(wǎng)下載 bcprov-ext-jdk15on-146.jar ,將該文件放到j(luò)dk1.6.0_03\jre\lib\ext目錄下.
    c、配置bcprov
    在 jdk_home\jre\lib\security\目錄中找到 java.security 在內(nèi)容增加一行(數(shù)字可以自己定義)
security.provider.11=org.bouncycastle.jce.provider.BouncyCastleProvider

d、生成android平臺的證書:

keytool -exportcert -alias testserver -file test.cert -keystore  local_clientkeystore.jks
keytool -importcert -keystore test.bks -file test.cert -storetype BKS -provider org.bouncycastle.jce.provider.BouncyCastleProvider
  1. 在Android工程中加載證書:
    a、生成SSLSocketFactory對象
public class SslUtil {
    public static SSLSocketFactory createSocketFactory(Context context) {
        SSLContext sslContext;
        try {
            KeyStore ks = KeyStore.getInstance("BKS");
            ks.load(context.getResources().openRawResource(R.raw.peer),
                    "123456".toCharArray());                           //該字符串應(yīng)隨機生成,保證每次session唯一;
            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            //    kmf.init(ks, "passw0rd".toCharArray());
            kmf.init(ks, "123456".toCharArray());
            TrustManagerFactory tmf = TrustManagerFactory
                    .getInstance("X509");
            tmf.init(ks);
            TrustManager[] tm = tmf.getTrustManagers();
            sslContext = SSLContext.getInstance("TLS");

            sslContext.init(kmf.getKeyManagers(), tm, null);
            // SocketFactory factory= SSLSocketFactory.getDefault();

            // Socket socket =factory.createSocket("localhost", 10000);
            SSLSocketFactory ssf = sslContext.getSocketFactory();
            return  ssf;
        }catch (Exception e){
            e.printStackTrace();
            return  null;
        }
    }
}

b、在MqttOptions中添加SSL配置:

mOptions = new MqttConnectOptions();
mOptions.setCleanSession(true);
SSLSocketFactory socketFactory = SslUtil.createSocketFactory(getApplicationContext());
if (socketFactory != null) {
    LogUtil.d("socketFactory is not null");
    mOptions.setSocketFactory(socketFactory);
}
  1. 修改url地址前綴:
    修改url地址由:
    tcp://yoursite.com:8402

    ssl://yoursite.com:2883
  2. 使用wireshark抓包驗證,加密成功!

參考文章

MQTT協(xié)議筆記之頭部信息
MQTT快速入門

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容