MQTT mosquitto的初步學習

mosquitto簡介

MQTT(MQ Telemetry Transport),消息隊列遙測傳輸協(xié)議,輕量級的發(fā)布/訂閱協(xié)議,適用于一些條件比較苛刻的環(huán)境,進行低帶寬、不可靠或間歇性的通信。
Mosquitto是一個開源(BSD許可證)的消息代理,實現(xiàn)MQTT(消息隊列遙測傳輸)協(xié)議版本3.1。

為每個MQTT消息頭命令消息包含一個固定頭,頭只有兩個字節(jié),格式如下:

消息頭

參考:http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#msg-format

一、安裝mosquitto服務

本文僅介紹mosquitto在Windows上的安裝,Linux系統(tǒng)與之類似

在mosquitto官網(wǎng)下載安裝包http://mosquitto.org/files/binary/win32/mosquitto-1.2.3-install-win32.exe
下載之后直接安裝即可

二、配置和運行

可參照http://www.cnblogs.com/li-baibo/archive/2013/01/21/2869225.html

三、在客戶端的使用(Java)

pc 端客戶端,下載地址:http://download.csdn.net/detail/kuailebeihun/7312947
解壓,運行\(zhòng)ia92\J2SE\wmqttSample.jar即可

1. 客戶端使用的庫

下載地址:http://download.csdn.net/detail/kuailebeihun/7312731(JDK環(huán)境1.6及以上,若以下請下載源碼開發(fā))

2. 客戶端 庫 的源碼

下載地址:http://download.csdn.net/detail/kuailebeihun/7312743

3. 具體使用

(1)消息質量

0: “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡。會發(fā)生消息丟失或重復。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送;
1 :“至少一次”,確保消息到達,但消息重復可能會發(fā)生;
2:“只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復或丟失會導致不正確的結果。

(2)消息主題

構建一個應用程序時,主題樹的設計應考慮以下主題名稱的語法和語義的原則:
主題必須至少一個字符長;  
主題名稱是區(qū)分大小寫的,例如,A和a是兩個不同的主題;
“/”創(chuàng)造了一個獨特的主題,例如,/a與a是不同的主題。/a匹配“+/+”和“/+”,但不匹配“+”; 
任何主題不包含空字符(Unicode \ x0000)。

以下原則適用于主題樹的結構和內(nèi)容:     
64 k的長度是有限的,但在沒有限制水平主題樹的數(shù)量?! ?br> 可以有任意數(shù)量的根節(jié)點,也就是說,可以有任意數(shù)量的主題樹。

(3) subscribe訂閱

使用正斜杠(/)分隔主題樹中的每個級別,并提供一個主題空間的層次結構。主題層面分離器的使用中遇到兩個通配符時重要的主題由用戶指定。

數(shù)字符號(#)是一個通配符匹配任意數(shù)量的水平在一個主題,只能用在最后,如a/#/c是不合法的
例如,如果你訂閱a/b/c/#,你在這些主題接收消息:
a/b/c
a/b/c/d
a/b/c/e

加號(+)是一個通配符匹配的主題只有一個水平
例如,如果你訂閱a/+/c/#,你在這些主題:接收消息:
a/a/c/
a/b/c/d
a/c/c/e

若訂閱"+/#",此主題可接受所有類型主題的消息

(4) publish發(fā)布

發(fā)布的時候主題時,+或者#不能通配,是且僅是一個明確的主題。

<span style="clolor: #ff0000;">貼上一段簡單代碼</span>

import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
import org.eclipse.paho.client.mqttv3.MqttSecurityException;  
import org.eclipse.paho.client.mqttv3.MqttTopic;  
import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;  
  
/**  
 *  
 * @author LP by 2014-04-24 
 * 
 */  
public class MqttServiceClient implements MqttCallback {  
  
    private static final String MQTT_HOST = "tcp://192.168.12.38:1883";  
    private static final String MQTT_CLIENT = "Test_";  
      
    public static MqttServiceClient mqttServiceClient = null;  
      
    private MqttClient client = null;  
    private MqttConnectOptions options = null;  
      
    /** 
     * 單例模式構造類 
     */  
    public static MqttServiceClient getInstance() {  
        if (mqttServiceClient == null) {  
            mqttServiceClient = new MqttServiceClient();  
        }  
        return mqttServiceClient;  
    }  
  
    private MqttServiceClient() {  
        System.out.println("init MQTTClientService");  
        init();  
    }  
    // The major API implementation follows :-  
  
    /** 
     * 初始化 
     */  
    private void init() {  
        try {  
          
            // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內(nèi)存保存  
            client = new MqttClient(MQTT_HOST, MQTT_CLIENT, new MemoryPersistence());  
            // MQTT的連接設置  
            options = new MqttConnectOptions();  
            // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接  
            options.setCleanSession(true);  
            // 設置連接的用戶名  
            // options.setUserName(userName);  
            // 設置連接的密碼  
            // options.setPassword(passWord.toCharArray());  
            // 設置超時時間 單位為秒  
            options.setConnectionTimeout(50);  
            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制  
            options.setKeepAliveInterval(30);  
            // 設置回調  
            client.setCallback(this);  
              
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    /** 
     * 連接到MQTT 
     */  
    void connect() {  
        System.out.println("Start connect----------");  
        try {  
            client.connect(options);  
            //訂閱主題的方法,2為消息的質量  
            client.subscribe("+/#", 2);  
            //發(fā)送消息  
            publish("test", "撒打發(fā)水電費水電費");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
      
    /** 
     * 斷開連接到MQTT 
     */  
    public void disconnect() {  
        System.out.println("Start disconnect----------");  
        try {  
            client.disconnect();  
        } catch (MqttSecurityException e) {  
            e.printStackTrace();  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
    }  
  
    /**  
     * 發(fā)布消息 
     * @param topic 主題 
     * @param msg 消息 
     */  
    public void publish(String topic, String msg) {  
        System.out.println("Start publish----------");  
        try {  
            MqttTopic mqttTopic = client.getTopic(topic);  
            //2為消息的質量  
            MqttDeliveryToken messageToken = mqttTopic.publish(msg.getBytes(), 2, true);  
            System.out.println("publish success==>"+messageToken.getMessage());  
//          client.publish(topic, 2, msg);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
      
      
// -------------------------------------------------回調方法------------------------------------------------------------//  
      
    /**  
     * 連接斷開觸發(fā)此方法 
     */  
    @Override  
    public void connectionLost(Throwable cause) {  
        System.out.println("Connection Lost---------->" + cause.getMessage());  
    }  
  
    /**  
     * 消息達到觸發(fā)此方法 
     */  
    @Override  
    public void messageArrived(MqttTopic topic, MqttMessage message)  
            throws Exception {  
        System.out.println(topic + ":" + message.toString());  
    }  
  
    /** 
     * 消息發(fā)送成功觸發(fā)此方法 
     */  
    @Override  
    public void deliveryComplete(MqttDeliveryToken token)  {  
        try {  
            System.out.println("deliveryComplete---------" + token.getMessage());  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
    }  
  
      
    public static void main(String[] args)throws Exception {  
          
        MqttServiceClient.getInstance().disconnect();  
        MqttServiceClient.getInstance().connect();  
          
        new Thread() {  
            public void run() {  
                int count = 0;  
                while(true) {  
                    try {  
                        Thread.sleep(1000*3);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    MqttServiceClient.getInstance().publish("AAA", "hello world ! count=" + count);  
                    count ++;  
                }  
            };  
        }.start();  
    }  
      
}
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,030評論 25 709
  • 《裕語言》速成開發(fā)手冊3.0 官方用戶交流:iApp開發(fā)交流(1) 239547050iApp開發(fā)交流(2) 10...
    葉染柒丶閱讀 28,749評論 5 20
  • 最近一直做物聯(lián)網(wǎng)方面的開發(fā),以下內(nèi)容關于使用MQTT過程中遇到問題的記錄以及需要掌握的機制原理,主要講解理論。 背...
    踐行者閱讀 2,023評論 1 7
  • 春分是一年里我最喜歡的一天。白晝即將變長,夏天已經(jīng)在路上。這種余裕的感覺就像手里握著一把好牌,像旅行前坐在候機大廳...
    tomoyababe閱讀 235評論 0 0

友情鏈接更多精彩內(nèi)容