mosquitto簡介
MQTT(MQ Telemetry Transport),消息隊列遙測傳輸協(xié)議,輕量級的發(fā)布/訂閱協(xié)議,適用于一些條件比較苛刻的環(huán)境,進行低帶寬、不可靠或間歇性的通信。
Mosquitto是一個開源(BSD許可證)的消息代理,實現(xiàn)MQTT(消息隊列遙測傳輸)協(xié)議版本3.1。
為每個MQTT消息頭命令消息包含一個固定頭,頭只有兩個字節(jié),格式如下:

參考:http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#msg-format
一、安裝mosquitto服務
本文僅介紹mosquitto在Windows上的安裝,Linux系統(tǒng)與之類似
在mosquitto官網(wǎng)下載安裝包http://mosquitto.org/files/binary/win32/mosquitto-1.2.3-install-win32.exe
下載之后直接安裝即可
二、配置和運行
可參照http://www.cnblogs.com/li-baibo/archive/2013/01/21/2869225.html
三、在客戶端的使用(Java)
pc 端客戶端,下載地址:http://download.csdn.net/detail/kuailebeihun/7312947
解壓,運行\(zhòng)ia92\J2SE\wmqttSample.jar即可
1. 客戶端使用的庫
下載地址:http://download.csdn.net/detail/kuailebeihun/7312731(JDK環(huán)境1.6及以上,若以下請下載源碼開發(fā))
2. 客戶端 庫 的源碼
下載地址:http://download.csdn.net/detail/kuailebeihun/7312743
3. 具體使用
(1)消息質量
0: “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡。會發(fā)生消息丟失或重復。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送;
1 :“至少一次”,確保消息到達,但消息重復可能會發(fā)生;
2:“只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復或丟失會導致不正確的結果。
(2)消息主題
構建一個應用程序時,主題樹的設計應考慮以下主題名稱的語法和語義的原則:
主題必須至少一個字符長;
主題名稱是區(qū)分大小寫的,例如,A和a是兩個不同的主題;
“/”創(chuàng)造了一個獨特的主題,例如,/a與a是不同的主題。/a匹配“+/+”和“/+”,但不匹配“+”;
任何主題不包含空字符(Unicode \ x0000)。
以下原則適用于主題樹的結構和內(nèi)容:
64 k的長度是有限的,但在沒有限制水平主題樹的數(shù)量?! ?br>
可以有任意數(shù)量的根節(jié)點,也就是說,可以有任意數(shù)量的主題樹。
(3) subscribe訂閱
使用正斜杠(/)分隔主題樹中的每個級別,并提供一個主題空間的層次結構。主題層面分離器的使用中遇到兩個通配符時重要的主題由用戶指定。
數(shù)字符號(#)是一個通配符匹配任意數(shù)量的水平在一個主題,只能用在最后,如a/#/c是不合法的
例如,如果你訂閱a/b/c/#,你在這些主題接收消息:
a/b/c
a/b/c/d
a/b/c/e
加號(+)是一個通配符匹配的主題只有一個水平
例如,如果你訂閱a/+/c/#,你在這些主題:接收消息:
a/a/c/
a/b/c/d
a/c/c/e
若訂閱"+/#",此主題可接受所有類型主題的消息
(4) publish發(fā)布
發(fā)布的時候主題時,+或者#不能通配,是且僅是一個明確的主題。
<span style="clolor: #ff0000;">貼上一段簡單代碼</span>
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
/**
*
* @author LP by 2014-04-24
*
*/
public class MqttServiceClient implements MqttCallback {
private static final String MQTT_HOST = "tcp://192.168.12.38:1883";
private static final String MQTT_CLIENT = "Test_";
public static MqttServiceClient mqttServiceClient = null;
private MqttClient client = null;
private MqttConnectOptions options = null;
/**
* 單例模式構造類
*/
public static MqttServiceClient getInstance() {
if (mqttServiceClient == null) {
mqttServiceClient = new MqttServiceClient();
}
return mqttServiceClient;
}
private MqttServiceClient() {
System.out.println("init MQTTClientService");
init();
}
// The major API implementation follows :-
/**
* 初始化
*/
private void init() {
try {
// host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內(nèi)存保存
client = new MqttClient(MQTT_HOST, MQTT_CLIENT, new MemoryPersistence());
// MQTT的連接設置
options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(true);
// 設置連接的用戶名
// options.setUserName(userName);
// 設置連接的密碼
// options.setPassword(passWord.toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(50);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制
options.setKeepAliveInterval(30);
// 設置回調
client.setCallback(this);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 連接到MQTT
*/
void connect() {
System.out.println("Start connect----------");
try {
client.connect(options);
//訂閱主題的方法,2為消息的質量
client.subscribe("+/#", 2);
//發(fā)送消息
publish("test", "撒打發(fā)水電費水電費");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 斷開連接到MQTT
*/
public void disconnect() {
System.out.println("Start disconnect----------");
try {
client.disconnect();
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 發(fā)布消息
* @param topic 主題
* @param msg 消息
*/
public void publish(String topic, String msg) {
System.out.println("Start publish----------");
try {
MqttTopic mqttTopic = client.getTopic(topic);
//2為消息的質量
MqttDeliveryToken messageToken = mqttTopic.publish(msg.getBytes(), 2, true);
System.out.println("publish success==>"+messageToken.getMessage());
// client.publish(topic, 2, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
// -------------------------------------------------回調方法------------------------------------------------------------//
/**
* 連接斷開觸發(fā)此方法
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection Lost---------->" + cause.getMessage());
}
/**
* 消息達到觸發(fā)此方法
*/
@Override
public void messageArrived(MqttTopic topic, MqttMessage message)
throws Exception {
System.out.println(topic + ":" + message.toString());
}
/**
* 消息發(fā)送成功觸發(fā)此方法
*/
@Override
public void deliveryComplete(MqttDeliveryToken token) {
try {
System.out.println("deliveryComplete---------" + token.getMessage());
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void main(String[] args)throws Exception {
MqttServiceClient.getInstance().disconnect();
MqttServiceClient.getInstance().connect();
new Thread() {
public void run() {
int count = 0;
while(true) {
try {
Thread.sleep(1000*3);
} catch (InterruptedException e) {
e.printStackTrace();
}
MqttServiceClient.getInstance().publish("AAA", "hello world ! count=" + count);
count ++;
}
};
}.start();
}
}