基于 MQTT 協(xié)議的推送服務(wù)

一、簡述

MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用。

MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用。

二、apache-Apollo 服務(wù)器搭建

下載地址:http://activemq.apache.org/apollo/download.html

  • 1.將下載后的 壓縮包進(jìn)行解壓:


    apollo.png
  • 2.打開 cmd 運(yùn)行 bin/apollo.cmd


    apollo.png
  • 3.創(chuàng)建一個(gè)服務(wù)器實(shí)例


    image.png
  • 4.在服務(wù)中打開 apollo 服務(wù)


    image.png
  • 5.啟動(dòng)服務(wù)后,在瀏覽器上面輸入:https://127.0.0.1:61681/http://127.0.0.1:61680/

    apollo控制臺(tái)

初始賬號(hào): admin 密碼:password
  • 6.進(jìn)入apollo控制臺(tái)


    image.png

    進(jìn)入此頁面說明服務(wù)器成功搭建

三、服務(wù)端推送和客戶端監(jiān)聽
  • SeverClass
package com.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ServerMQTT {
    //tcp://MQTT安裝的服務(wù)器地址:MQTT定義的端口號(hào)
    public static final String HOST = "tcp://0.0.0.0:61613";
    //定義一個(gè)主題
    public static final String TOPIC = "message";
    //定義MQTT的ID,可以在MQTT服務(wù)配置中指定
    private static final String clientid = "server11";

    private MqttClient client;
    private MqttTopic topic11;
    private String userName = "admin";  //非必須
    private String passWord = "password";  //非必須

    private MqttMessage message;

    /**
     * 構(gòu)造函數(shù)
     * @throws MqttException
     */
    public ServerMQTT() throws MqttException {
        // MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        connect();
    }

    /**
     *  用來連接服務(wù)器
     */
    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 設(shè)置超時(shí)時(shí)間
        options.setConnectionTimeout(10);
        // 設(shè)置會(huì)話心跳時(shí)間
        options.setKeepAliveInterval(20);
        try {
            client.setCallback(new PushCallback());
            client.connect(options);

            topic11 = client.getTopic(TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! "
                + token.isComplete());
    }

    /**
     *  啟動(dòng)入口
     * @param args
     * @throws MqttException
     */
    public static void main(String[] args) throws MqttException {
        ServerMQTT server = new ServerMQTT();

        server.message = new MqttMessage();
        server.message.setQos(1);  //保證消息能到達(dá)一次
        server.message.setRetained(true);
        server.message.setPayload("這是服務(wù)器發(fā)出的信息的內(nèi)容".getBytes());
        server.publish(server.topic11 , server.message);
        System.out.println(server.message.isRetained() + "------ratained狀態(tài)");

    }

}
  • ClientClass
package com.mqtt;

import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientMQTT {

    public static final String HOST = "tcp://0.0.0.0:61613";
    public static final String TOPIC1 = "message";
    private static final String clientid = "client11";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "admin";    //非必須
    private String passWord = "password";  //非必須
    @SuppressWarnings("unused")
    private ScheduledExecutorService scheduler;

    private void start() {
        try {
            // host為主機(jī)名,clientid即連接MQTT的客戶端ID,一般以唯一標(biāo)識(shí)符表示,MemoryPersistence設(shè)置clientid的保存形式,默認(rèn)為以內(nèi)存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的連接設(shè)置
            options = new MqttConnectOptions();
            // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
            options.setCleanSession(false);
            // 設(shè)置連接的用戶名
            options.setUserName(userName);
            // 設(shè)置連接的密碼
            options.setPassword(passWord.toCharArray());
            // 設(shè)置超時(shí)時(shí)間 單位為秒
            options.setConnectionTimeout(10);
            // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制
            options.setKeepAliveInterval(20);
            // 設(shè)置回調(diào)
            client.setCallback(new PushCallback());
            MqttTopic topic = client.getTopic(TOPIC1);
            //setWill方法,如果項(xiàng)目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息
            //options.setWill(topic, "close".getBytes(), 2, true);
            client.connect(options);
            //訂閱消息
            //訂閱消息

            client.subscribe(TOPIC1, 1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws MqttException {
        ClientMQTT client = new ClientMQTT();
        client.start();
    }
}
  • server


    server
  • client


    client
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容