mqtt簡單封裝類

mqttmanager

import android.content.Context;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;

import com.licheedev.myutils.LogPlus;
import com.simonfong.mqttdemo.listener.MqttCallback;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author simonfong
 * Created  on 2019/6/6
 */
public class MqttManager {
    private String serverUri;
    private String userName;
    private String passWord;
    private final Context mContext;
    private volatile MqttAndroidClient mqttAndroidClient;
    private static volatile MqttManager mqttManager = null;
    private MqttCallback callback;
    private String[] mqttTopic;
    private int[] qos;
    private boolean autoReconnect = true;

    /**
     * 初始化
     *
     * @param serverUri mqtt域名
     * @param userName  賬號
     * @param passWord  密碼
     * @param context
     */
    public static MqttManager init(String serverUri, String userName, String passWord, Context context) {
        return init(serverUri,userName,passWord,context,null,null);
    }

    /**
     * 初始化
     *
     * @param serverUri mqtt域名
     * @param userName  賬號
     * @param passWord  密碼
     * @param context
     * @param topics    主題
     * @param qos       QOS = 0/1/2   最多一次  最少一次 多次
     * @return
     */
    public static MqttManager init(String serverUri, String userName, String passWord, Context context, String[] topics,
                                   int[] qos) {
        if (mqttManager == null) {
            synchronized (MqttManager.class) {
                if (mqttManager == null) {
                    mqttManager = new MqttManager(serverUri, userName, passWord, context, topics, qos);
                }
            }
        }
        return mqttManager;
    }

    /**
     * 修改訂閱主題
     *
     * @param topics 主題
     * @param qos    QOS = 0/1/2   最多一次  最少一次 多次
     */
    public void setTopicAndQos(String[] topics, int[] qos) {
        if (mqttAndroidClient != null || !mqttAndroidClient.isConnected()) {
            try {
                mqttAndroidClient.unsubscribe(mqttTopic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        mqttTopic = topics;
        this.qos = qos;
        subscribeToTopic();
    }

    /**
     * 設置是否自動重連,默認為true
     *
     * @param isAuto
     */
    public void setAutoReconnect(boolean isAuto) {
        autoReconnect = isAuto;
    }

    public MqttManager(String serverUri, String userName, String passWord, Context context, String[] topics, int[] qos) {
        this.serverUri = serverUri;
        this.userName = userName;
        this.passWord = passWord;
        this.mqttTopic = topics;
        this.qos = qos;
        mContext = context;
        initConnect();
    }

    public void setCallback(MqttCallback callback) {
        this.callback = callback;
    }

    private boolean isConnect() {
        if (mqttAndroidClient != null) {
            return mqttAndroidClient.isConnected();
        }
        return false;
    }

    private void initConnect() {

        if (isConnect()) {
            return;
        }
        mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, MqttClient.generateClientId());
        mqttAndroidClient.registerResources(mContext);
        mqttAndroidClient.setCallback(new MyMqttCallbackExtended());
        //mqtt連接參數設置
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        //設置自動重連
        mqttConnectOptions.setAutomaticReconnect(autoReconnect);
        // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄
        // 這里設置為true表示每次連接到服務器都以新的身份連接
        mqttConnectOptions.setCleanSession(false);
        //設置連接的用戶名
        mqttConnectOptions.setUserName(userName);
        //設置連接的密碼
        mqttConnectOptions.setPassword(passWord.toCharArray());
        // 設置超時時間 單位為秒
        mqttConnectOptions.setConnectionTimeout(10);
        // 設置會話心跳時間 單位為秒 服務器會每隔20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制
        mqttConnectOptions.setKeepAliveInterval(20);
        try {
            mqttAndroidClient.connect(mqttConnectOptions);
        } catch (Exception e) {
            LogPlus.e("connect--onFailure:" + e.toString());
            e.printStackTrace();
            if (callback != null) {
                callback.connectFail(e.toString());
            }
        }
    }

    private class MyMqttCallbackExtended implements MqttCallbackExtended {

        /**
         * 連接完成回調
         *
         * @param reconnect true 斷開重連,false 首次連接
         * @param serverURI 服務器URI
         */
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            LogPlus.e("連接成功connectComplete:是否重連+" + reconnect + "-----serverURI:" + serverURI);
            if (callback != null) {
                callback.connectSuccess(reconnect);
            }
            subscribeToTopic();
        }

        @Override
        public void connectionLost(Throwable cause) {
            LogPlus.e("connectionLost:斷開");
            if (callback != null) {
                callback.connectLost("connectionLost:斷開");
            }
            cause.printStackTrace();
        }

        /**
         * 消息接收,如果在訂閱的時候沒有設置IMqttMessageListener,那么收到消息則會在這里回調。
         * 如果設置了IMqttMessageListener,則消息回調在IMqttMessageListener中
         *
         * @param topic
         * @param message
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            LogPlus.e(message.getId() + "-->receive message: " + message.toString());
            if (callback != null) {
                callback.receiveMessage(topic, message.toString());
            }

        }

        /**
         * 交付完成回調。在publish消息的時候會收到此回調.
         * qos:
         * 0 發(fā)送完則回調
         * 1 或 2 會在對方收到時候回調
         *
         * @param token
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            if (callback != null) {
                try {
                    callback.deliveryComplete(token.getMessage().toString());
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
            LogPlus.e(token.toString());
        }
    }

    private void subscribeToTopic() {
        if (mqttAndroidClient == null || !mqttAndroidClient.isConnected() || mqttTopic == null || qos == null) {
            return;
        }
        try {
            //TODO  訂閱的主題,和qos
            mqttAndroidClient.subscribe(mqttTopic, qos, mContext.getApplicationContext(), new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    for (String s : mqttTopic) {
                        LogPlus.e("訂閱成功" + "topic:" + s);
                    }
                    if (callback != null) {
                        callback.subscribedSuccess(mqttTopic);
                    }
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogPlus.e("訂閱失敗");
                    if (callback != null) {
                        callback.subscribedFail(exception.toString());
                    }
                }

            });
        } catch (MqttException e) {
            e.printStackTrace();
            if (callback != null) {
                callback.subscribedFail(e.toString());
            }
        }
    }

    public static MqttManager getInstance() {
        if (mqttManager == null) {
            throw new NullPointerException("請先調用init方法進行初始化");
        }
        return mqttManager;
    }

    /**
     * 發(fā)送
     *
     * @param topic    發(fā)送的主題
     * @param msg      發(fā)送的消息
     * @param qos      QOS = 0/1/2   最多一次  最少一次 多次
     * @param retained 是否保留消息,為true時,后來訂閱該主題的仍然收到該消息
     */
    public void publishMessage(String topic, String msg, int qos, boolean retained) {
        if (isConnect()) {
            try {
                mqttAndroidClient.publish(topic, msg.getBytes(), qos, retained);
                LogPlus.e("Mqtt 發(fā)送消息:" + msg);
                if (!mqttAndroidClient.isConnected()) {
                    LogPlus.e(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
                }
            } catch (MqttException e) {
                LogPlus.e("Error Publishing: " + e.toString());
            }
        }
    }

    public void onDestroy() {
        if (mqttAndroidClient == null) {
            return;
        }
        try {
            mqttAndroidClient.close();
            mqttAndroidClient.disconnect();
            mqttAndroidClient.unregisterResources();
            mqttManager = null;
            mqttAndroidClient = null;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 判斷網絡是否連接
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) mContext.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            LogPlus.e("MQTT當前網絡名稱:" + name);
            return true;
        } else {
            LogPlus.e("MQTT 沒有可用網絡");
            return false;
        }
    }

}
image.gif

MqttCallback:

public abstract class MqttCallback {
    /**
     * 訂閱成功
     *
     * @param mqttTopic
     */
    public void subscribedSuccess(String[] mqttTopic) {

    }

    /**
     * 訂閱失敗
     *
     * @param message
     */
    public void subscribedFail(String message) {

    }

    /**
     * 發(fā)送成功
     * @param message
     */
    public void deliveryComplete(String message) {

    }

    /**
     * 接收的數據
     *
     * @param topic
     * @param message
     */
    public abstract void receiveMessage(String topic, String message);

    /**
     * 連接成功
     */
    public void connectSuccess(boolean reconnect) {

    }

    /**
     * 連接失敗
     *
     * @param message
     */
    public void connectFail(String message) {

    }

    /**
     * 斷開連接
     *
     * @param message
     */
    public void connectLost(String message) {

    }
}

image.gif

使用:

  private void initMqtt() {
        String[] topics = {myTopic, myTopic2};
        int[] qos = {0, 0};
        MqttManager.init(host, userName, passWord, this, topics, qos);
        MqttManager.getInstance().setCallback(new MyMqttCallback());

    }

    private class MyMqttCallback extends MqttCallback {

        @Override
        public void subscribedSuccess(String[] mqttTopic) {
            for (String s : mqttTopic) {
                LogPlus.e("subscribedSuccess:" + s);
            }
        }

        @Override
        public void subscribedFail(String message) {
            LogPlus.e("subscribedFail:" + message);
        }

        @Override
        public void receiveMessage(String topic, String message) {
            LogPlus.e("topic:" + topic + "------receiveMessage:" + message);
        }

        @Override
        public void connectSuccess(boolean reconnect) {
            LogPlus.e("connectSuccess:" + reconnect);
        }

        @Override
        public void connectFail(String message) {
            LogPlus.e("connectFail:" + message);
        }

        @Override
        public void connectLost(String message) {
            LogPlus.e("connectLost:" + message);
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        MqttManager.getInstance().onDestroy();
    }
image.gif

注意,回調是在子線程,要進行ui操作必須要回到主線程

發(fā)消息

 MqttManager.getInstance().publishMessage(myTopic2, content, 0, false);
image.gif
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴謹 對...
    cosWriter閱讀 11,618評論 1 32
  • Object C中創(chuàng)建線程的方法是什么?如果在主線程中執(zhí)行代碼,方法是什么?如果想延時執(zhí)行代碼、方法又是什么? 1...
    AlanGe閱讀 1,907評論 0 17
  • 美圖欣賞 Java、Android知識點匯集 Java集合類 ** Java集合相關的博客** java面試相關 ...
    ElvenShi閱讀 1,881評論 0 2
  • 為了更好的理解 Looper 的工作原理,我們需要對 ThreadLocal 進行了解,如果對 ThreadLoc...
    墨染書閱讀 1,594評論 0 3
  • Android消息處理機制估計都被寫爛了,但是依然還是要寫一下,因為Android應用程序是通過消息來驅動的,An...
    一碼立程閱讀 4,591評論 4 36

友情鏈接更多精彩內容