mqtt簡(jiǎn)單封裝類

mqttmanager

import android.content.Context;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;

import com.licheedev.myutils.LogPlus;
import com.simonfong.mqttdemo.listener.MqttCallback;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author simonfong
 * Created  on 2019/6/6
 */
public class MqttManager {
    private String serverUri;
    private String userName;
    private String passWord;
    private final Context mContext;
    private volatile MqttAndroidClient mqttAndroidClient;
    private static volatile MqttManager mqttManager = null;
    private MqttCallback callback;
    private String[] mqttTopic;
    private int[] qos;
    private boolean autoReconnect = true;

    /**
     * 初始化
     *
     * @param serverUri mqtt域名
     * @param userName  賬號(hào)
     * @param passWord  密碼
     * @param context
     */
    public static MqttManager init(String serverUri, String userName, String passWord, Context context) {
        return init(serverUri,userName,passWord,context,null,null);
    }

    /**
     * 初始化
     *
     * @param serverUri mqtt域名
     * @param userName  賬號(hào)
     * @param passWord  密碼
     * @param context
     * @param topics    主題
     * @param qos       QOS = 0/1/2   最多一次  最少一次 多次
     * @return
     */
    public static MqttManager init(String serverUri, String userName, String passWord, Context context, String[] topics,
                                   int[] qos) {
        if (mqttManager == null) {
            synchronized (MqttManager.class) {
                if (mqttManager == null) {
                    mqttManager = new MqttManager(serverUri, userName, passWord, context, topics, qos);
                }
            }
        }
        return mqttManager;
    }

    /**
     * 修改訂閱主題
     *
     * @param topics 主題
     * @param qos    QOS = 0/1/2   最多一次  最少一次 多次
     */
    public void setTopicAndQos(String[] topics, int[] qos) {
        if (mqttAndroidClient != null || !mqttAndroidClient.isConnected()) {
            try {
                mqttAndroidClient.unsubscribe(mqttTopic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        mqttTopic = topics;
        this.qos = qos;
        subscribeToTopic();
    }

    /**
     * 設(shè)置是否自動(dòng)重連,默認(rèn)為true
     *
     * @param isAuto
     */
    public void setAutoReconnect(boolean isAuto) {
        autoReconnect = isAuto;
    }

    public MqttManager(String serverUri, String userName, String passWord, Context context, String[] topics, int[] qos) {
        this.serverUri = serverUri;
        this.userName = userName;
        this.passWord = passWord;
        this.mqttTopic = topics;
        this.qos = qos;
        mContext = context;
        initConnect();
    }

    public void setCallback(MqttCallback callback) {
        this.callback = callback;
    }

    private boolean isConnect() {
        if (mqttAndroidClient != null) {
            return mqttAndroidClient.isConnected();
        }
        return false;
    }

    private void initConnect() {

        if (isConnect()) {
            return;
        }
        mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, MqttClient.generateClientId());
        mqttAndroidClient.registerResources(mContext);
        mqttAndroidClient.setCallback(new MyMqttCallbackExtended());
        //mqtt連接參數(shù)設(shè)置
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        //設(shè)置自動(dòng)重連
        mqttConnectOptions.setAutomaticReconnect(autoReconnect);
        // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄
        // 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
        mqttConnectOptions.setCleanSession(false);
        //設(shè)置連接的用戶名
        mqttConnectOptions.setUserName(userName);
        //設(shè)置連接的密碼
        mqttConnectOptions.setPassword(passWord.toCharArray());
        // 設(shè)置超時(shí)時(shí)間 單位為秒
        mqttConnectOptions.setConnectionTimeout(10);
        // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔20秒的時(shí)間向客戶端發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制
        mqttConnectOptions.setKeepAliveInterval(20);
        try {
            mqttAndroidClient.connect(mqttConnectOptions);
        } catch (Exception e) {
            LogPlus.e("connect--onFailure:" + e.toString());
            e.printStackTrace();
            if (callback != null) {
                callback.connectFail(e.toString());
            }
        }
    }

    private class MyMqttCallbackExtended implements MqttCallbackExtended {

        /**
         * 連接完成回調(diào)
         *
         * @param reconnect true 斷開重連,false 首次連接
         * @param serverURI 服務(wù)器URI
         */
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            LogPlus.e("連接成功connectComplete:是否重連+" + reconnect + "-----serverURI:" + serverURI);
            if (callback != null) {
                callback.connectSuccess(reconnect);
            }
            subscribeToTopic();
        }

        @Override
        public void connectionLost(Throwable cause) {
            LogPlus.e("connectionLost:斷開");
            if (callback != null) {
                callback.connectLost("connectionLost:斷開");
            }
            cause.printStackTrace();
        }

        /**
         * 消息接收,如果在訂閱的時(shí)候沒有設(shè)置IMqttMessageListener,那么收到消息則會(huì)在這里回調(diào)。
         * 如果設(shè)置了IMqttMessageListener,則消息回調(diào)在IMqttMessageListener中
         *
         * @param topic
         * @param message
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            LogPlus.e(message.getId() + "-->receive message: " + message.toString());
            if (callback != null) {
                callback.receiveMessage(topic, message.toString());
            }

        }

        /**
         * 交付完成回調(diào)。在publish消息的時(shí)候會(huì)收到此回調(diào).
         * qos:
         * 0 發(fā)送完則回調(diào)
         * 1 或 2 會(huì)在對(duì)方收到時(shí)候回調(diào)
         *
         * @param token
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            if (callback != null) {
                try {
                    callback.deliveryComplete(token.getMessage().toString());
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
            LogPlus.e(token.toString());
        }
    }

    private void subscribeToTopic() {
        if (mqttAndroidClient == null || !mqttAndroidClient.isConnected() || mqttTopic == null || qos == null) {
            return;
        }
        try {
            //TODO  訂閱的主題,和qos
            mqttAndroidClient.subscribe(mqttTopic, qos, mContext.getApplicationContext(), new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    for (String s : mqttTopic) {
                        LogPlus.e("訂閱成功" + "topic:" + s);
                    }
                    if (callback != null) {
                        callback.subscribedSuccess(mqttTopic);
                    }
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogPlus.e("訂閱失敗");
                    if (callback != null) {
                        callback.subscribedFail(exception.toString());
                    }
                }

            });
        } catch (MqttException e) {
            e.printStackTrace();
            if (callback != null) {
                callback.subscribedFail(e.toString());
            }
        }
    }

    public static MqttManager getInstance() {
        if (mqttManager == null) {
            throw new NullPointerException("請(qǐng)先調(diào)用init方法進(jìn)行初始化");
        }
        return mqttManager;
    }

    /**
     * 發(fā)送
     *
     * @param topic    發(fā)送的主題
     * @param msg      發(fā)送的消息
     * @param qos      QOS = 0/1/2   最多一次  最少一次 多次
     * @param retained 是否保留消息,為true時(shí),后來訂閱該主題的仍然收到該消息
     */
    public void publishMessage(String topic, String msg, int qos, boolean retained) {
        if (isConnect()) {
            try {
                mqttAndroidClient.publish(topic, msg.getBytes(), qos, retained);
                LogPlus.e("Mqtt 發(fā)送消息:" + msg);
                if (!mqttAndroidClient.isConnected()) {
                    LogPlus.e(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
                }
            } catch (MqttException e) {
                LogPlus.e("Error Publishing: " + e.toString());
            }
        }
    }

    public void onDestroy() {
        if (mqttAndroidClient == null) {
            return;
        }
        try {
            mqttAndroidClient.close();
            mqttAndroidClient.disconnect();
            mqttAndroidClient.unregisterResources();
            mqttManager = null;
            mqttAndroidClient = null;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 判斷網(wǎng)絡(luò)是否連接
    private boolean isConnectIsNomarl() {
        ConnectivityManager connectivityManager = (ConnectivityManager) mContext.getApplicationContext()
                .getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo info = connectivityManager.getActiveNetworkInfo();
        if (info != null && info.isAvailable()) {
            String name = info.getTypeName();
            LogPlus.e("MQTT當(dāng)前網(wǎng)絡(luò)名稱:" + name);
            return true;
        } else {
            LogPlus.e("MQTT 沒有可用網(wǎng)絡(luò)");
            return false;
        }
    }

}
image.gif

MqttCallback:

public abstract class MqttCallback {
    /**
     * 訂閱成功
     *
     * @param mqttTopic
     */
    public void subscribedSuccess(String[] mqttTopic) {

    }

    /**
     * 訂閱失敗
     *
     * @param message
     */
    public void subscribedFail(String message) {

    }

    /**
     * 發(fā)送成功
     * @param message
     */
    public void deliveryComplete(String message) {

    }

    /**
     * 接收的數(shù)據(jù)
     *
     * @param topic
     * @param message
     */
    public abstract void receiveMessage(String topic, String message);

    /**
     * 連接成功
     */
    public void connectSuccess(boolean reconnect) {

    }

    /**
     * 連接失敗
     *
     * @param message
     */
    public void connectFail(String message) {

    }

    /**
     * 斷開連接
     *
     * @param message
     */
    public void connectLost(String message) {

    }
}

image.gif

使用:

  private void initMqtt() {
        String[] topics = {myTopic, myTopic2};
        int[] qos = {0, 0};
        MqttManager.init(host, userName, passWord, this, topics, qos);
        MqttManager.getInstance().setCallback(new MyMqttCallback());

    }

    private class MyMqttCallback extends MqttCallback {

        @Override
        public void subscribedSuccess(String[] mqttTopic) {
            for (String s : mqttTopic) {
                LogPlus.e("subscribedSuccess:" + s);
            }
        }

        @Override
        public void subscribedFail(String message) {
            LogPlus.e("subscribedFail:" + message);
        }

        @Override
        public void receiveMessage(String topic, String message) {
            LogPlus.e("topic:" + topic + "------receiveMessage:" + message);
        }

        @Override
        public void connectSuccess(boolean reconnect) {
            LogPlus.e("connectSuccess:" + reconnect);
        }

        @Override
        public void connectFail(String message) {
            LogPlus.e("connectFail:" + message);
        }

        @Override
        public void connectLost(String message) {
            LogPlus.e("connectLost:" + message);
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        MqttManager.getInstance().onDestroy();
    }
image.gif

注意,回調(diào)是在子線程,要進(jìn)行ui操作必須要回到主線程

發(fā)消息

 MqttManager.getInstance().publishMessage(myTopic2, content, 0, false);
image.gif
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,681評(píng)論 1 32
  • Object C中創(chuàng)建線程的方法是什么?如果在主線程中執(zhí)行代碼,方法是什么?如果想延時(shí)執(zhí)行代碼、方法又是什么? 1...
    AlanGe閱讀 1,926評(píng)論 0 17
  • 美圖欣賞 Java、Android知識(shí)點(diǎn)匯集 Java集合類 ** Java集合相關(guān)的博客** java面試相關(guān) ...
    ElvenShi閱讀 1,898評(píng)論 0 2
  • 為了更好的理解 Looper 的工作原理,我們需要對(duì) ThreadLocal 進(jìn)行了解,如果對(duì) ThreadLoc...
    墨染書閱讀 1,602評(píng)論 0 3
  • Android消息處理機(jī)制估計(jì)都被寫爛了,但是依然還是要寫一下,因?yàn)锳ndroid應(yīng)用程序是通過消息來驅(qū)動(dòng)的,An...
    一碼立程閱讀 4,604評(píng)論 4 36

友情鏈接更多精彩內(nèi)容