MQTT消息通訊及Apollo通訊服務器的搭建

MQTT通訊及服務器的搭建

Apollo服務器的搭建

1:Apollo服務器下載

首先從http://activemq.apache.org/apollo/download.html官網(wǎng)上下載windows對應的apollo版本,本文下載的是apache-apollo-1.7.1-windows-distro.zip 版本。windows的版本為win10,JDK版本1.8。


2.解壓到C:\apache-apollo下,此時會多出一個apache-apollo-1.7.1文件夾。

3.然后以管理員的身份運行cmd,進入到如下目錄C:\apache-apollo\apache-apollo-1.7.1\bin,如下圖所示:


4.然后就是要創(chuàng)建broker,這里是創(chuàng)建在C:\apache-apollo\broker

的目錄下,執(zhí)行如下命令:apollo create myapolloC:\apache-apollo\broker


5.broker創(chuàng)建成功的提示如下圖所示:


6.創(chuàng)建完broker之后就是要運行apollo,進入C:\apache-apollo\broker\bin目錄下,執(zhí)行如下命令:apollo-brokerrun


7.apollo運行成功的提示,如下圖所示:


8.最后打開瀏覽器,輸入網(wǎng)址http://127.0.0.1:61680/,即可看到如下頁面,默認

賬號:admin?? 密碼:password


9.登錄成功之后的頁面,控制臺頁面如下圖所示 :


測試demo—服務器端代碼---需要下載并引入mqttv3jar包

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

import org.eclipse.paho.client.mqttv3.MqttTopic;

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MqttServerTest {

??? public static final String HOST = "tcp://127.0.0.1:61613";

??? public static final String TOPIC = "toclient/124";

??? public static final String TOPIC125 = "toclient/125";

??? private static final String clientid = "server-id-0";


??? private MqttClient client;

??? private MqttTopic topic;

??? private MqttTopic topic125;

??? private String userName = "admin";

??? private String passWord = "password";


??? private MqttMessage message;


??? public MqttServerTest() throws MqttException {

?????? // MemoryPersistence設(shè)置clientid的保存形式,默認為以內(nèi)存保存

?????? client = new MqttClient(HOST, clientid, newMemoryPersistence());

?????? connect();

??? }


??? private void connect() {

?????? MqttConnectOptions options = new MqttConnectOptions();

??? ??? options.setCleanSession(false);

?????? options.setUserName(userName);

?????? options.setPassword(passWord.toCharArray());

?????? // 設(shè)置超時時間

?????? options.setConnectionTimeout(10);

?????? // 設(shè)置會話心跳時間

?????? options.setKeepAliveInterval(20);

?????? try {

?????????? client.setCallback(new PushCallBack());

?????????? client.connect(options);

?????????? topic = client.getTopic(TOPIC);

?????????? topic125 = client.getTopic(TOPIC125);

?????? } catch (Exception e) {

?????????? e.printStackTrace();

?????? }

??? }


??? public void publish(MqttTopic topic, MqttMessage message)

?????????? throws MqttPersistenceException, MqttException {

?????? MqttDeliveryToken token = topic.publish(message);

?????? token.waitForCompletion();

?????? System.out.println("message is

published completely! "

????????????? + token.isComplete());

??? }


??? public static void main(String[] args) throws MqttException {

?????? MqttServerTest server = new MqttServerTest();


?????? server.message = new MqttMessage();

?????? server.message.setQos(2);

?????? server.message.setRetained(true);

?????? server.message.setPayload("給客戶端124推送的信息".getBytes());

?????? server.publish(server.topic, server.message);


?????? server.message = new MqttMessage();

?????? server.message.setQos(2);

?????? server.message.setRetained(true);

?????? server.message.setPayload("給客戶端125推送的信息".getBytes());

?????? server.publish(server.topic125, server.message);


?????? System.out.println(server.message.isRetained() + "------ratained狀態(tài)");

??? }

}

測試demo—客戶端代碼—需引入mqttv3jar包

1測試類一 代表 客戶端client124

importjava.util.concurrent.ScheduledExecutorService;


importorg.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

importorg.eclipse.paho.client.mqttv3.MqttException;

importorg.eclipse.paho.client.mqttv3.MqttTopic;

importorg.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class MqttClientTest {

?????? publicstatic final String HOST = "tcp://127.0.0.1:61613";

?????? publicstatic final String TOPIC = "toclient/124";

?????? privatestatic final String clientid = "client124";

?????? privateMqttClient client;

?????? privateMqttConnectOptions options;

?????? privateString userName = "admin";

?????? privateString passWord = "password";


?????? privateScheduledExecutorService scheduler;


?????? privatevoid start() {

????????????? try{

???????????????????? //host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設(shè)置clientid的保存形式,默認為以內(nèi)存保存

???????????????????? client= new MqttClient(HOST, clientid, new MemoryPersistence());

???????????????????? //MQTT的連接設(shè)置

???????????????????? options= new MqttConnectOptions();

???????????????????? //設(shè)置是否清空session,這里如果設(shè)置為false表示服務器會保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務器都以新的身份連接

???????????????????? options.setCleanSession(true);

???????????????????? //設(shè)置連接的用戶名

???????????????????? options.setUserName(userName);

???????????????????? //設(shè)置連接的密碼

???????????????????? options.setPassword(passWord.toCharArray());

???????????????????? //設(shè)置超時時間 單位為秒

???????????????????? options.setConnectionTimeout(10);

???????????????????? //設(shè)置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制

???????????????????? options.setKeepAliveInterval(20);

???????????????????? //設(shè)置回調(diào)

???????????????????? client.setCallback(newPushCallBack());

???????????????????? MqttTopictopic = client.getTopic(TOPIC);

???????????????????? //setWill方法,如果項目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息

???????????????????? options.setWill(topic,"close".getBytes(), 2, true);


???????????????????? client.connect(options);

???????????????????? //訂閱消息

???????????????????? int[]Qos = { 1 };

???????????????????? String[]topic1 = { TOPIC };

???????????????????? client.subscribe(topic1,Qos);


????????????? }catch (Exception e) {

???????????????????? e.printStackTrace();

????????????? }

?????? }


?????? publicstatic void main(String[] args) throws MqttException {

????????????? MqttClientTestclient = new MqttClientTest();

????????????? client.start();

?????? }

}

2測試類二 代表 客戶端client125

回調(diào)類demo---需要引入mqttv3jar包

importorg.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

importorg.eclipse.paho.client.mqttv3.MqttCallback;

importorg.eclipse.paho.client.mqttv3.MqttMessage;


public class PushCallBack implementsMqttCallback{

?????? publicvoid connectionLost(Throwable cause) {

????????????? //連接丟失后,一般在這里面進行重連

????????????? System.out.println("連接斷開,可以做重連");

?????? }


?????? publicvoid deliveryComplete(IMqttDeliveryToken token) {

????????????? System.out.println("deliveryComplete---------"+ token.isComplete());

?????? }


?????? publicvoid messageArrived(String topic, MqttMessage message)

???????????????????? throwsException {

????????????? //subscribe后得到的消息會執(zhí)行到這里面

????????????? System.out.println("接收消息主題: " + topic);

????????????? System.out.println("接收消息Qos : " + message.getQos());

????????????? System.out.println("接收消息內(nèi)容: " + new String(message.getPayload()));

?????? }

}

Demo架構(gòu)--只演示客戶端其中一個


服務端運行結(jié)果


客戶端運行結(jié)果


說明

1.? ?客戶端和服務端的demo代碼中的userName和passWord為apollo服務器的登錄賬號和密碼

2.? ?Host地址根據(jù)自己所需要的連接方式選擇


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容