MQTT深入淺出系列(二)

mqtt源碼分析

為什么要分析源碼呢,一是為了需求擴(kuò)展,二也是為了學(xué)習(xí)優(yōu)秀的源代碼。

連接

逐層跟代碼,到ClientComms類(lèi),該類(lèi)用于與server通訊,發(fā)送和接收mqtt協(xié)議消息。

先看connect代碼:

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
    final String methodName = "connect";
    synchronized (conLock) {
        if (isDisconnected() && !closePending) {
            //@TRACE 214=state=CONNECTING

            conState = CONNECTING;

            conOptions = options;

            MqttConnect connect = new MqttConnect(client.getClientId(),
                    conOptions.getMqttVersion(),
                    conOptions.isCleanSession(),
                    conOptions.getKeepAliveInterval(),
                    conOptions.getUserName(),
                    conOptions.getPassword(),
                    conOptions.getWillMessage(),
                    conOptions.getWillDestination());

            this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
            this.clientState.setCleanSession(conOptions.isCleanSession());
            this.clientState.setMaxInflight(conOptions.getMaxInflight());

            tokenStore.open();
            ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
            conbg.start();
        }
        else {
            // @TRACE 207=connect failed: not disconnected {0}
            if (isClosed() || closePending) {
                throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
            } else if (isConnecting()) {
                throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
            } else if (isDisconnecting()) {
                throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
            } else {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
            }
        }
    }
}

可以看到,無(wú)非是:獲取之前我們?cè)O(shè)置的一些參數(shù) --> 然后開(kāi)啟ConnectBG連接線程

看ConnectBG的run方法:

NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
callback.start("MQTT Call: "+getClient().getClientId(), executorService);
internalSend(conPacket, conToken);

這段代碼的邏輯是:建立(tcp或者ssl或者WebSocket或者WebSocketSecure)連接 --> 開(kāi)啟接收線程 --> 開(kāi)啟發(fā)送線程 --> 發(fā)送消息

建立上文中的何種連接方式,是根據(jù)serverURI來(lái)判斷,參考MqttAsyncClient的createNetworkModules方法。

我們分析最常見(jiàn)的tcp連接方式(類(lèi)TCPNetworkModule的start方法):

SocketAddress sockaddr = new InetSocketAddress(host, port);
if (factory instanceof SSLSocketFactory) {
    // SNI support
    Socket tempsocket = new Socket();
    tempsocket.connect(sockaddr, conTimeout*1000);
    socket = ((SSLSocketFactory)factory).createSocket(tempsocket, host, port, true);
} else {
    socket = factory.createSocket();
    socket.connect(sockaddr, conTimeout*1000);
}

嗯,最終在這建立的socket連接。

如何發(fā)送消息

先簡(jiǎn)單描述:

將message、topic等字段實(shí)例化IMqttToken --> 接收線程循環(huán)獲取msg --> 寫(xiě)流傳輸

看看具體的代碼:

獲取msg:

message = clientState.get();
if (message != null) {
    //@TRACE 802=network send key={0} msg={1}

    if (message instanceof MqttAck) {
        out.write(message);
        out.flush();
    } else {
        MqttToken token = tokenStore.getToken(message);
        // While quiescing the tokenstore can be cleared so need
        // to check for null for the case where clear occurs
        // while trying to send a message.
        if (token != null) {
            synchronized (token) {
                out.write(message);
                try {
                    out.flush();
                } catch (IOException ex) {
        
                ....

如何接收消息

讀取數(shù)據(jù)流,再分發(fā)通知。

看接收線程的代碼:

while (running && (in != null)) {
        try {
            //@TRACE 852=network read message
            receiving = in.available() > 0;
            MqttWireMessage message = in.readMqttWireMessage();
            receiving = false;

            // instanceof checks if message is null
            if (message instanceof MqttAck) {
                token = tokenStore.getToken(message);
                if (token!=null) {
                    synchronized (token) {
                        // Ensure the notify processing is done under a lock on the token
                        // This ensures that the send processing can complete  before the
                        // receive processing starts! ( request and ack and ack processing
                        // can occur before request processing is complete if not!
                        clientState.notifyReceivedAck((MqttAck)message);
                    }
                } else if

                ...

如何保持長(zhǎng)連接的呢

通過(guò)鬧鐘定時(shí)任務(wù)(AlarmPingSender):

pendingIntent = PendingIntent.getBroadcast(service, 0, new Intent(action), PendingIntent.FLAG_UPDATE_CURRENT);

schedule(comms.getKeepAlive());

從AlarmReceiver類(lèi)onReceive方法入手切入,ClientState的checkForActivity方法有這么一段代碼:

token = new MqttToken(clientComms.getClient().getClientId());
if(pingCallback != null){
    token.setActionCallback(pingCallback);
}
tokenStore.saveToken(token, pingCommand);
pendingFlows.insertElementAt(pingCommand, 0);

nextPingTime = getKeepAlive();

//Wake sender thread since it may be in wait state (in ClientState.get())                                                                                                                             
notifyQueueLock();

原來(lái)是pendingFlows隊(duì)列中插入消息,接收線程收到消息將數(shù)據(jù)寫(xiě)入流。

總結(jié)

源碼結(jié)構(gòu)清晰,功能很完整,寫(xiě)的還不錯(cuò)的,還是值得一讀。

當(dāng)然也便于自己擴(kuò)展。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容