【MQTT客戶端設(shè)計(jì)-C語言】“訂閱主題”與“取消訂閱主題”

【MQTT客戶端設(shè)計(jì)-C語言】“訂閱主題”與“取消訂閱主題”

關(guān)于“訂閱主題”與“取消訂閱主題”函數(shù)

int MQTTSubscribe (Client*, const char*, enum QoS, messageHandler);    //訂閱某個主題
int MQTTUnsubscribe (Client*, const char*);                            //取消訂閱某個主題

#define MQTT_TOPIC_SIZE     (256)       //訂閱和發(fā)布主題長度
#define MQTT_BUF_SIZE       (8 * 1024)  //接收后發(fā)送緩沖區(qū)大小

typedef struct {
    Network Network;
    Client Client;
    char sub_topic[MQTT_TOPIC_SIZE];        //存放訂閱主題
    char pub_topic[4][MQTT_TOPIC_SIZE];     //存放發(fā)布主題
    char mqtt_buffer[MQTT_BUF_SIZE];        //發(fā)送緩沖區(qū)
    char mqtt_read_buffer[MQTT_BUF_SIZE];   //接收緩沖區(qū)

    unsigned char willFlag;                 
    MQTTPacket_willOptions will;
    char will_topic[MQTT_TOPIC_SIZE];       //存放遺囑主題

    pMessageArrived_Fun DataArrived_CallBack;
}Cloud_MQTT_t;

typedef void  (*pMessageArrived_Fun)(void*,int len, void*);

struct opts_struct {
    char    *clientid;
    int     nodelimiter;
    char    *delimiter;
    enum    QoS qos;
    char    *username;
    char    *password;
    char    *host;
    int     port;
    int     showtopics;
} opts = {
    (char *)"iot-dev", 0, (char *)"\n", QOS0, "admin", "password", (char *)"localhost", 1883, 0
};//初始化結(jié)構(gòu)體

MQTTSubscribe調(diào)用示例:

/* main.c 的while(1)之前  */

    iot_mqtt_init(&Iot_mqtt);                                   //初始化主題
    mqtt_will_msg_set(&Iot_mqtt, Char2Json_1(will_msg, "will"), strlen(will_msg));  //設(shè)置遺囑

    ret = mqtt_device_connect(&Iot_mqtt);                       //初始化并連接mqtt服務(wù)器
    while (ret < 0) {
        printf("ret = %d\r\n", ret);
        sleep(3);
        ret = mqtt_device_connect(&Iot_mqtt);
    }

    MyMQTT_Subscribe(&Iot_mqtt);
    
/* 子函數(shù)(被調(diào)用函數(shù)) */
int MyMQTT_Subscribe(Cloud_MQTT_t   *MQTT)
{
    int ret;
    int rc = MQTTSubscribe(&MQTT->Client, MQTT->sub_topic, opts.qos, MQTTMessageArrived_CallbBack);
    if (rc) {
        printf("mqtt subscribe fail \n");
        ret = -105;
        goto __END;
    }
    printf("Subscribed %d\n", rc);
    MQTT->DataArrived_CallBack = mqtt_data_rx_callback;
__END:
    return ret;
}

void MQTTMessageArrived_CallbBack(MessageData* md)
{
    MQTTMessage *message = md->message; 
    MQTTString *topicN = md->topicName;
    Cloud_MQTT_t *piot_mqtt = &Iot_mqtt;

    if (NULL != piot_mqtt->DataArrived_Cb) {
        char topicBuf[MQTT_TOPIC_SIZE];
        char msgBuf[MQTT_BUF_SIZE];
        memcpy(topicBuf, topicN->lenstring.data, topicN->lenstring.len);
        topicBuf[topicN->lenstring.len] = 0;

        memcpy(msgBuf, message->payload, message->payloadlen);
        msgBuf[message->payloadlen] = 0;
        piot_mqtt->DataArrived_CallBack((void *)msgBuf, message->payloadlen, (void *)topicBuf);//異步消息體
    }
}

void mqtt_data_rx_callback(void *pbuf, int len, void *ptopic) 
{
    printf("[%s]>>%s\n",(unsigned char *)ptopic, (unsigned char *)pbuf);    //打印接收到的數(shù)據(jù)
}

MQTTUnsubscribe調(diào)用示例:

int  My_MQTT_Unsubscribe(Cloud_MQTT_t   *MQTT)
{
     int ret;
    int rc = MQTTUnsubscribe(&MQTT->Client, MQTT->sub_topic[4]);  //取消訂閱第5個主題
    if (rc) {
        printf("mqtt UnSubscribe failed \n");
        ret = -106;
        goto __END;
    }
    printf("UnSubscribed %d\n", rc);
__END:
    return ret;
]

\\
\\

MQTTSubscribe(...)——訂閱主題函數(shù)-源代碼

/**********************************************************************************
  Client* c:  指向MQTT客戶端結(jié)構(gòu)體(句柄)的指針
  const char* topicFilter:  主題過濾器(定義著要訂閱/退訂的主題的字符串名字)
  enum QoS qos:  所訂閱的主題的服務(wù)質(zhì)量QoS (0、1 或 2)
  messageHandler messageHandler:  訂閱主題的消息回調(diào)函數(shù)
  返回值:        0------成功      非0------失敗
 **********************************************************************************/
int MQTTSubscribe(Client* c, const char* topicFilter, \
                  enum QoS qos, messageHandler messageHandler)
{ 
    int rc = FAILURE;  
    Timer timer;
    int len = 0;
    MQTTString topic = MQTTString_initializer;
    topic.cstring = (char *)topicFilter;
    
    InitTimer(&timer);
    countdown_ms(&timer, c->command_timeout_ms);

    if (!c->isconnected)
        goto exit;
    
    len = MQTTSerialize_subscribe(c->buf, c->buf_size, \
          0, getNextPacketId(c), 1, &topic, (int*)&qos);
    if (len <= 0)
        goto exit;
    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
        goto exit;             // there was a problem
    
    if (waitfor(c, SUBACK, &timer) == SUBACK)      // wait for suback 
    {
        int count = 0, grantedQoS = -1;
        unsigned short mypacketid;
        if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, \
        c->readbuf, c->readbuf_size) == 1)
            rc = grantedQoS; // 0, 1, 2 or 0x80 
        if (rc != 0x80)
        {
            int i;
            for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
            {
                if (c->messageHandlers[i].topicFilter == 0)
                {
                    c->messageHandlers[i].topicFilter = topicFilter;
                    c->messageHandlers[i].fp = messageHandler;
                    rc = 0;
                    break;
                }
            }
        }
    }
    else 
        rc = FAILURE;
        
exit:
    return rc;
}

??每個訂閱都是由一對主題和QoS級別組成。借助通配符,使得可以訂閱特定的主題模式。而且,通過參數(shù)topicFilter的含義我們也可以看出,傳進(jìn)去的主題字符串是會被Broker解析一個或多個主題并由此形成實(shí)際可以訂閱的主題列表的。

\\

MQTTUnsubscribe(...)——取消訂閱主題函數(shù)-源代碼

/**********************************************************************************
  Client* c:  指向MQTT客戶端結(jié)構(gòu)體(句柄)的指針
  const char* topicFilter:  主題過濾器(定義著要訂閱/退訂的主題的字符串名字)
  返回值:        0------成功      非0------失敗
 **********************************************************************************/
int MQTTUnsubscribe(Client* c, const char* topicFilter)
{   
    int rc = FAILURE;
    Timer timer;    
    MQTTString topic = MQTTString_initializer;
    topic.cstring = (char *)topicFilter;
    int len = 0;

    InitTimer(&timer);
    countdown_ms(&timer, c->command_timeout_ms);
    
    if (!c->isconnected)
        goto exit;
    
    if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, \
    0, getNextPacketId(c), 1, &topic)) <= 0)
        goto exit;
    if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
        goto exit; // there was a problem
    
    if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
    {
        unsigned short mypacketid;  // should be the same as the packetid above
        if (MQTTDeserialize_unsuback(&mypacketid, \
        c->readbuf, c->readbuf_size) == 1)
            rc = 0; 
    }
    else
        rc = FAILURE;
    
exit:
    return rc;
}

MQTTUnsubscribe總結(jié)
??MQTTUnsubscribe從代理服務(wù)器上刪除某個客戶端已存在的訂閱。退訂消息和訂閱消息很相似,同樣包括一個Packet Identifier(包標(biāo)識符:標(biāo)識消息的唯一標(biāo)識符)和 TopicFilter(主題列表,包含客戶端希望退訂的任意數(shù)量的主題)。

PS:不管最初訂閱時QoS級別是什么,topicfilter列表中的主題都將被退訂。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容