從moquette源碼看IOT接入協(xié)議MQTT的實現

背景

閱讀優(yōu)秀的代碼是一種享受,將優(yōu)秀的代碼用自己的世界觀優(yōu)秀地描述出來就十分痛苦了是要死一億個腦細胞的。

這篇源碼閱讀筆記早在一年前就有了當時只是簡單的記錄一下自己的總結,最近將她重新整理一下希望能幫助有需要的人。

隨著移動互聯(lián)網快速進入后半場,越來越多的企業(yè)將注意力轉移到物聯(lián)網。比如共享單車和小米的智能家居產品等都是典型的物聯(lián)網應用。

企業(yè)相信借助于大數據和AI技術可以獲得很多額外的價值產生新的商業(yè)模式。海量數據需要通過接入服務才能流向后端產生后續(xù)價值,在接入服務中MQTT已成為物聯(lián)網中非明確的標準協(xié)議國內外云廠均有其broker實現。

特性

MQTT協(xié)議是為大量計算能力有限,且工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協(xié)議,它具有以下主要的幾項特性:

  1. 使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應用程序耦合
  2. 對負載內容屏蔽的消息傳輸
  3. 使用 TCP/IP 提供網絡連接
  4. 有三種消息發(fā)布服務質量
    • “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網絡。會發(fā)生消息丟失或重復。這一級別可用于如下情況,環(huán)境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。
    • “至少一次”,確保消息到達,但消息重復可能會發(fā)生。
    • “只有一次”,確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復或丟失會導致不正確的結果。
  5. 小型傳輸,開銷很?。ü潭ㄩL度的頭部是2字節(jié)),協(xié)議交換最小化,以降低網絡流量
  6. 使用 Last Will (遺囑)和 Testament 特性通知有關各方客戶端異常中斷的機制

==下文中會對上述特性的實現方式進行講解==

術語

image

客戶端Client

使用MQTT的程序或者設備,如環(huán)境監(jiān)控傳感器、共享單車、共享充電寶等。

服務端Server

一個程序或設備,作為發(fā)送消息的客戶端和請求訂閱的客戶端之間的中介。

發(fā)布、訂閱流程

客戶端-A 給 客戶端-B 發(fā)送消息“hello”流程如下:

  1. 客戶端-B訂閱名稱為msg的主題
  2. 客戶端-A向服務端-Server發(fā)送“hello”,并指明發(fā)送給名為msg的主題
  3. 服務端-Server向客戶端-B轉發(fā)消息“hello”

有別于HTTP協(xié)議的請求響應模式,客戶端-A與客戶端-B不發(fā)生直接連接關系,他們之間的消息傳遞通過服務端Server進行轉發(fā)。
服務端Server又稱 MQTT Broker 即訂閱和發(fā)送的中間人

基于moquette源碼的特性實現分析

在上述的客戶端-A 給 客戶端-B 發(fā)送消息“hello”流程中需要有如下動作。

  1. 客戶端-A 、 客戶端-B 連接到服務端Server
  2. 客戶端-B 訂閱主題
  3. 客戶端-A 發(fā)布消息
  4. 服務端Server 轉發(fā)消息
  5. 客戶端-B 收到消息

下面將基于連接、訂閱、發(fā)布這幾個動作進行源碼跟蹤解讀。

連接

MQTT-連接.png

基本概念:

Session:會話即客戶端(由ClientId作為標示)和服務端之間邏輯層面的通信;生命周期(存在時間):會話 >= 網絡連接。

ClientID:客戶端唯一標識,服務端用于關聯(lián)一個Session
只能包含這些 大寫字母,小寫字母 和 數字(0-9a-zA-Z),23個字符以內
如果 ClientID 在多次 TCP連接中保持一致,客戶端和服務器端會保留會話信息(Session)
同一時間內 Server 和同一個 ClientID 只能保持一個 TCP 連接,再次連接會踢掉前一個。

CleanSession:在Connect時,由客戶端設置

  • 0 開啟會話重用機制。網絡斷開重連后,恢復之前的Session信息。需要客戶端和服務器有相關Session持久化機制;
  • 1 關閉會話重用機制。每次Connect都是一個新Session,會話僅持續(xù)和網絡連接同樣長的時間。

Keep Alive:目的是保持長連接的可靠性,以及雙方對彼此是否在線的確認。
客戶端在Connect的時候設置 Keep Alive 時長。如果服務端在 1.5 * KeepAlive 時間內沒有收到客戶端的報文,它必須斷開客戶端的網絡連接
Keep Alive 的值由具體應用指定,一般是幾分鐘。允許的最大值是 18 小時 12 分 15 秒。

Will:遺囑消息(Will Message)存儲在服務端,當網絡連接關閉時,服務端必須發(fā)布這個遺囑消息,所以被形象地稱之為遺囑,可用于通知異常斷線。
客戶端發(fā)送 DISCONNECT 關閉鏈接,遺囑失效并刪除
遺囑消息發(fā)布的條件,包括:
服務端檢測到了一個 I/O 錯誤或者網絡故障
客戶端在保持連接(Keep Alive)的時間內未能通訊
客戶端沒有先發(fā)送 DISCONNECT 報文直接關閉了網絡連接
由于協(xié)議錯誤服務端關閉了網絡連接
相關設置項,需要在Connect時,由客戶端指定。

Will Flag :遺囑的總開關

  • 0 關閉遺囑功能,Will QoS 和 Will Retain 必須為 0
  • 1 開啟遺囑功能,需要設置 Will Retain 和 Will QoS

Will QoS: 遺囑消息 QoS可取值 0、1、2,含義與消息QoS相同

Will Retain:遺囑是否保留

  • 0 遺囑消息不保留,后面再訂閱不會收到消息
  • 1 遺囑消息保留,持久存儲

Will Topic:遺囑話題

Will Payload:遺囑消息內容

連接流程

  1. 判斷客戶端連接時發(fā)送的MQTT協(xié)議版本號,非3.1和3.1.1版本發(fā)送協(xié)議不支持響應報文并在發(fā)送完成后關閉連接
  2. 在客戶端配置了cleanSession=false 或者服務端不允許clientId不存在的情況下客戶端如果未上傳clientId發(fā)送協(xié)議不支持響應報文并在發(fā)送完成后關閉連接
  3. 判斷用戶名和密碼是否合法
  4. 初始化連接對象并將連接對象引用放入連接管理中,如果發(fā)現連接管理中存在相同客戶端ID的對象則關閉前一個連接并將新的連接對象放入連接管理中
  5. 根據客戶端上傳的心跳時間調整服務端當前連接的心跳判斷時間(keepAlive * 1.5f)
  6. 遺囑消息存儲(當連接意外斷開時向存儲的主題發(fā)布消息)
  7. 發(fā)送連接成功響應
  8. 創(chuàng)建當前連接session
  9. 當cleanSession=false 發(fā)送當前session已經存儲的消息
public void processConnect(Channel channel, MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());

        // 1. 判斷客戶端連接時發(fā)送的MQTT協(xié)議版本號,非3.1和3.1.1版本發(fā)送協(xié)議不支持響應報文并在發(fā)送完成后關閉連接
        if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
                && msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);

            LOG.error("MQTT protocol version is not valid. CId={}", clientId);
            channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        final boolean cleanSession = msg.variableHeader().isCleanSession();
        if (clientId == null || clientId.length() == 0) {
            // 2. 在客戶端配置了cleanSession=false 或者服務端不允許clientId不存在的情況下客戶端如果未上傳clientId發(fā)送協(xié)議不支持響應報文并在發(fā)送完成后關閉連接
            if (!cleanSession || !this.allowZeroByteClientId) {
                MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);

                channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
                channel.close().addListener(CLOSE_ON_FAILURE);
                LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
                return;
            }

            // Generating client id.
            clientId = UUID.randomUUID().toString().replace("-", "");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId,
                payload.userName());
        }
        // 3. 判斷用戶名和密碼是否合法
        if (!login(channel, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        // 4. 初始化連接對象并將連接對象引用放入連接管理中,如果發(fā)現連接管理中存在相同客戶端ID的對象則關閉前一個連接并將新的連接對象放入連接管理中
        ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            //return;
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        // 5. 根據客戶端上傳的心跳時間調整服務端當前連接的心跳判斷時間(keepAlive * 1.5f)
        initializeKeepAliveTimeout(channel, msg, clientId);
        // 6. 遺囑消息存儲(當連接意外斷開時向存儲的主題發(fā)布消息)
        storeWillMessage(msg, clientId);
        // 7. 發(fā)送連接成功響應
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

        if (!descriptor.assignState(SENDACK, SESSION_CREATED)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        // 8. 創(chuàng)建當前連接session
        final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
        // 9. 當cleanSession=false 發(fā)送當前session已經存儲的消息
        if (!republish(descriptor, msg, clientSession)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        
        int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
        setupAutoFlusher(channel, flushIntervalMs);

        final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
        if (!success) {
            channel.close().addListener(CLOSE_ON_FAILURE);
        }

        LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
    }

訂閱

MQTT-訂閱.png

基本概念

訂閱流程

  1. 訂閱的主題校驗(權限、主題path合法性)
  2. 在當前session中存儲訂閱的主題
  3. 采用全局tree結構存儲訂閱信息(主題和訂閱者信息),用于消息轉發(fā)時根據主題查找到對應的訂閱者(tree結構和查找算法下一章節(jié)中介紹
  4. 發(fā)送訂閱回應
  5. 掃描持久化的消息匹配到當前訂閱主題的立即向此連接發(fā)送消息
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);

        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if (currentStatus != null) {
            LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
                clientID, messageID);
            return;
        }
        String username = NettyUtils.userName(channel);
        // 1、訂閱的主題校驗(權限、主題path合法性)
        List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
        MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
        if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
                "messageId={}", clientID, messageID);
            return;
        }

        LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
        // 2、在當前session中存儲訂閱的主題
        List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);

        // save session, persist subscriptions from session
        // 3、采用全局tree結構存儲訂閱信息(主題和訂閱者信息),用于消息轉發(fā)時根據主題查找到對應的訂閱者
        for (Subscription subscription : newSubscriptions) {
            subscriptions.add(subscription);
        }

        LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
        // 4、發(fā)送訂閱回應
        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);

        // fire the persisted messages in session
        // 5、掃描持久化的消息匹配到當前訂閱主題的立即向此連接發(fā)送消息
        for (Subscription subscription : newSubscriptions) {
            publishRetainedMessagesInSession(subscription, username);
        }

        boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
        if (!success) {
            LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
        } else {
            LOG.info("Client <{}> subscribed to topics", clientID);
        }
    }

發(fā)布

基本概念

Packet Identifier:報文標識存在報文的可變報頭部分,非零兩個字節(jié)整數 (0-65535]。

一個流程中重復:這些報文包含PacketID,而且在一次通信流程內保持一致:PUBLISH(QoS>0時)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。

新的不重復:客戶端每次發(fā)送一個新的這些類型的報文時都必須分配一個當前 未使用的PacketID
當客戶端處理完這個報文對應的確認后,這個報文標識符就釋放可重用。

獨立維護:客戶端和服務端彼此獨立地分配報文標識符。因此,客戶端服務端組合使用相同的報文標識符可以實現并發(fā)的消息交換??蛻舳撕头斩水a生的Packet Identifier一致不算異常。

Payload: 有效載荷即消息體最大允許 256MB。
Publish 的 Payload 允許為空,在很多場合下代表將持久消息(或者遺囑消息)清空。采用UTF-8編碼。

Retain:持久消息(粘性消息)

RETAIN 標記:每個Publish消息都需要指定的標記

  • 0 服務端不能存儲這個消息,也不能移除或替換任何 現存的保留消息
  • 1 服務端必須存儲這個應用消息和它的QoS等級,以便它可以被分發(fā)給未來的訂閱者

每個Topic只會保留最多一個 Retain 持久消息
客戶端訂閱帶有持久消息的Topic,會立即受到這條消息。

服務器可以選擇丟棄持久消息,比如內存或者存儲吃緊的時候。

如果客戶端想要刪除某個Topic 上面的持久消息,可以向這個Topic發(fā)送一個Payload為空的持久消息
遺囑消息(Will)的Retain持久機制同理。

QoS :服務等級(消息可靠性)

發(fā)布流程

public void processPublish(Channel channel, MqttPublishMessage msg) {
        final MqttQoS qos = msg.fixedHeader().qosLevel();
        final String clientId = NettyUtils.clientID(channel);
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
                msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
        switch (qos) {
            case AT_MOST_ONCE:
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            case AT_LEAST_ONCE:
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            case EXACTLY_ONCE:
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
                break;
            default:
                LOG.error("Unknown QoS-Type:{}", qos);
                break;
        }
    }

從上述代碼的switch語句中可以看出會根據消息的Qos級別分別進行處理

QoS0 最多一次
1532872155768.jpg
  1. 權限判斷
  2. 向所有該主題的訂閱者發(fā)布消息
  3. QoS == 0 && retain => clean old retained
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 權限判斷
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);
        // 2. 向所有該主題的訂閱者發(fā)布消息
        this.publisher.publish2Subscribers(toStoreMsg, topic);

        if (msg.fixedHeader().isRetain()) {
            // 3. QoS == 0 && retain => clean old retained
            m_messagesStore.cleanRetained(topic);
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
QoS1 至少一次
1532872248691.jpg

1.發(fā)送消息PUBLISH

  1. 權限判斷
  2. 向所有該主題的訂閱者發(fā)布消息(每個session中存儲即將要發(fā)送的消息)
  3. 發(fā)送Ack回應
  4. retain = true => 存儲消息
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 權限判斷
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }

        final int messageID = msg.variableHeader().messageId();

        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        // 2. 向所有該主題的訂閱者發(fā)布消息(每個session中存儲即將要發(fā)送的消息)
        this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);

        // 3. 發(fā)送Ack回應
        sendPubAck(clientID, messageID);

        // 4. retain = true => 存儲消息
        if (msg.fixedHeader().isRetain()) {
            if (!msg.payload().isReadable()) {
                m_messagesStore.cleanRetained(topic);
            } else {
                // before wasn't stored
                m_messagesStore.storeRetained(topic, toStoreMsg);
            }
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }

2.1發(fā)送消息回應PUBACK

服務端Server接收到PUBACK消息后將執(zhí)行:

  1. 刪除存儲在session中的消息
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.variableHeader().messageId();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", messageID);

        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);

        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
                                                                                messageID);
        m_interceptor.notifyMessageAcknowledged(wrapped);
    }
QoS2 有且僅有一次
1532872488548.jpg

1.發(fā)送消息PUBLISH

  1. 權限判斷
  2. 存儲消息
  3. 發(fā)送Rec回應
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
        final Topic topic = new Topic(msg.variableHeader().topicName());
        // check if the topic can be wrote
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 權限判斷
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        final int messageID = msg.variableHeader().messageId();

        // 2. 存儲消息
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
        }

        this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);

        // 3. 發(fā)送Rec回應
        sendPubRec(clientID, messageID);

        // Next the client will send us a pub rel
        // NB publish to subscribers for QoS 2 happen upon PUBREL from publisher

//        if (msg.fixedHeader().isRetain()) {
//            if (msg.payload().readableBytes() == 0) {
//                m_messagesStore.cleanRetained(topic);
//            } else {
//                m_messagesStore.storeRetained(topic, toStoreMsg);
//            }
//        }
        //TODO this should happen on PUB_REL, else we notify false positive
        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }

2.發(fā)送消息Rel

  1. 刪除消息
  2. 轉發(fā)消息
  3. 發(fā)送Comp 回應給客戶端-A
 void processPubRel(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 刪除消息
        IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
        if (evt == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
            throw new IllegalArgumentException("Can't find inbound inflight message");
        }
        final Topic topic = new Topic(evt.getTopic());

        // 2. 轉發(fā)消息
        this.publisher.publish2Subscribers(evt, topic, messageID);

        if (evt.isRetained()) {
            if (evt.getPayload().readableBytes() == 0) {
                m_messagesStore.cleanRetained(topic);
            } else {
                m_messagesStore.storeRetained(topic, evt);
            }
        }

        //TODO here we should notify to the listeners
        //m_interceptor.notifyTopicPublished(msg, clientID, username);
        // 3.發(fā)送Comp 回應
        sendPubComp(clientID, messageID);
    }

3.發(fā)送消息回應Rec

  1. 刪除消息
  2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
  3. 發(fā)送PUBREL
public void processPubRec(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // remove from the inflight and move to the QoS2 second phase queue
        // 1. 刪除消息
        StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
        // 2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
        targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
        // once received a PUBREC reply with a PUBREL(messageID)
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        // 3. 發(fā)送PUBREL
        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
        channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
    }

3.4發(fā)送消息回應Comp

  1. 刪除消息
public void processPubComp(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
        // once received the PUBCOMP then remove the message from the temp memory
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 刪除消息
        StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
        String username = NettyUtils.userName(channel);
        String topic = inflightMsg.getTopic();
        final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
            username, messageID);
        m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
    }

Topic & Subcribe

基本概念

Topic 話題 和 TopicFilter 話題過濾器

Pub-Sub消息模型的核心機制
UTF-8 編碼字符串,不能超過 65535 字節(jié)。層級數量沒有限制
不能包含任何的下文中提到的特殊符號(/、+、#),必須至少包含一個字符
區(qū)分大小寫,可以包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增加 斜杠 “/”,會產生不同的Topic和TopicFilter。舉例:

  • “/A” 和 “A” 是不同的
  • “A” 和 “A/” 是不同的

只包含斜杠 “/” 的 Topic 或 TopicFilter 是合法的

TopicFilter中的特殊符號

層級分隔符 /
用于分割主題的每個層級,為主題名提供一個分層結構
主題層級分隔符可以出現在 Topic 或 TopicFilter 的任何位置
特例:相鄰的主題層次分隔符表示一個零長度的主題層級

單層通配符 +

只能用于單個主題層級匹配的通配符。例如,“a/b/+” 匹配 “a/b/c1” 和 “a/b/c2” ,但是不匹配 “a/b/c/d”
可以匹配 任意層級,包括第一個和最后一個層級。

例如,“+” 是有效的,“sport/+/player1” 也是有效的。
可以在多個層級中使用它,也可以和多層通配符一起使用。

例如,“+/tennis/#” 是有效的。只能匹配本級不能匹配上級。

例如,“sport/+” 不匹配 “sport” 但是卻匹配“sport/”,“/finance” 匹配 “+/+” 和 “/+” ,但是不匹配 “+”。

多層通配符 #

用于匹配主題中任意層級的通配符
匹配包含本身的層級和子層級。

例如 “a/b/c/#" 可以匹配 “a/b/c”、“a/b/c/d” 和 “a/b/c/d/e”
必須是最后的結尾。

例如 “sport/tennis/#/ranking”是無效的

“#”是有效的,會收到所有的應用消息。 (服務器端應將此類 TopicFilter禁掉 )

以$開頭的,服務器保留

服務端不能將 $ 字符開頭的 Topic 匹配通配符 (#或+) 開頭的 TopicFilter

服務端應該阻止客戶端使用這種 Topic 與其它客戶端交換消息。

服務端實現可以將 $ 開頭的主題名用作其他目的。

SYS/ 被廣泛用作包含服務器特定信息或控制接口的主題的前綴 客戶端不特意訂閱開頭的 Topic,就不會收到對應的消息

  • 訂閱 “#” 的客戶端不會收到任何發(fā)布到以 “$” 開頭主題的消息
  • 訂閱 “+/A/B” 的客戶端不會收到任何發(fā)布到 “$SYS/A/B” 的消息
  • 訂閱 “SYS/#” 的客戶端會收到發(fā)布到以 “SYS/” 開頭主題的消息
  • 訂閱 “SYS/A/+” 的客戶端會收到發(fā)布到 “SYS/A/B” 主題的消息

如果客戶端想同時接受以 “SYS/” 開頭主題的消息和不以 開頭主題的消息,它需要同時 訂閱 “#” 和 “$SYS/#”

存儲結構

  • a/b/c
  • a/a
  • a/haha
  • msg

這4個主題會存儲成如下結構:

  1. children 指向下層節(jié)點
  2. subscriptions 存儲當前主題所有的訂閱者
image

查找算法

訂閱
@Override
    public void add(Subscription newSubscription) {
        Action res;
        do {
            res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
        } while (res == Action.REPEAT);
    }

    private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return insert(clientId, remainingTopic, nextInode, fullpath);
        } else {
            if (topic.isEmpty()) {
                return insertSubscription(clientId, fullpath, inode);
            } else {
                return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
            }
        }
    }
刪除訂閱
public void removeSubscription(Topic topic, String clientID) {
        Action res;
        do {
            res = remove(clientID, topic, this.root, NO_PARENT);
        } while (res == Action.REPEAT);
    }

    private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return remove(clientId, remainingTopic, nextInode, inode);
        } else {
            final CNode cnode = inode.mainNode();
            if (cnode instanceof TNode) {
                // this inode is a tomb, has no clients and should be cleaned up
                // Because we implemented cleanTomb below, this should be rare, but possible
                // Consider calling cleanTomb here too
                return Action.OK;
            }
            if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
                // last client to leave this node, AND there are no downstream children, remove via TNode tomb
                if (inode == this.root) {
                    return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
                }
                TNode tnode = new TNode();
                return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
            } else if (cnode.contains(clientId) && topic.isEmpty()) {
                CNode updatedCnode = cnode.copy();
                updatedCnode.removeSubscriptionsFor(clientId);
                return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
            } else {
                //someone else already removed
                return Action.OK;
            }
        }
    }
查找
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
        CNode cnode = inode.mainNode();
        if (Token.MULTI.equals(cnode.token)) {
            return cnode.subscriptions;
        }
        if (topic.isEmpty()) {
            return Collections.emptySet();
        }
        if (cnode instanceof TNode) {
            return Collections.emptySet();
        }
        final Token token = topic.headToken();
        if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
            return Collections.emptySet();
        }
        Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
        Set<Subscription> subscriptions = new HashSet<>();
        if (remainingTopic.isEmpty()) {
            subscriptions.addAll(cnode.subscriptions);
        }
        for (INode subInode : cnode.allChildren()) {
            subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
        }
        return subscriptions;
    }

尾巴

相關參考

MQTT協(xié)議通俗講解

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現,斷路器,智...
    卡卡羅2017閱讀 136,545評論 19 139
  • 一:前言 最近在了解MQTT協(xié)議相關的內容,內容有點多,特此把MQTT協(xié)議,以及其從服務端到客戶端的流程整理出來...
    子夏的不語閱讀 70,752評論 9 92
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong閱讀 22,939評論 1 92
  • MQTT-SN的一個重要設計原則是盡可能與MQTT相近。因此,所有的協(xié)議語義應保持盡可能與MQTT中定義的一致。接...
    aded3e27ac95閱讀 929評論 0 2
  • 下載CentOS ISO鏡像文件 在CentOS mirrors 找到合適的站點下載 CentOS-7-x86_6...
    追陽_41閱讀 3,870評論 0 1

友情鏈接更多精彩內容