關(guān)于“訂閱主題”與“取消訂閱主題”函數(shù)
int MQTTSubscribe (Client*, const char*, enum QoS, messageHandler); //訂閱某個主題
int MQTTUnsubscribe (Client*, const char*); //取消訂閱某個主題
#define MQTT_TOPIC_SIZE (256) //訂閱和發(fā)布主題長度
#define MQTT_BUF_SIZE (8 * 1024) //接收后發(fā)送緩沖區(qū)大小
typedef struct {
Network Network;
Client Client;
char sub_topic[MQTT_TOPIC_SIZE]; //存放訂閱主題
char pub_topic[4][MQTT_TOPIC_SIZE]; //存放發(fā)布主題
char mqtt_buffer[MQTT_BUF_SIZE]; //發(fā)送緩沖區(qū)
char mqtt_read_buffer[MQTT_BUF_SIZE]; //接收緩沖區(qū)
unsigned char willFlag;
MQTTPacket_willOptions will;
char will_topic[MQTT_TOPIC_SIZE]; //存放遺囑主題
pMessageArrived_Fun DataArrived_CallBack;
}Cloud_MQTT_t;
typedef void (*pMessageArrived_Fun)(void*,int len, void*);
struct opts_struct {
char *clientid;
int nodelimiter;
char *delimiter;
enum QoS qos;
char *username;
char *password;
char *host;
int port;
int showtopics;
} opts = {
(char *)"iot-dev", 0, (char *)"\n", QOS0, "admin", "password", (char *)"localhost", 1883, 0
};//初始化結(jié)構(gòu)體
MQTTSubscribe調(diào)用示例:
/* main.c 的while(1)之前 */
iot_mqtt_init(&Iot_mqtt); //初始化主題
mqtt_will_msg_set(&Iot_mqtt, Char2Json_1(will_msg, "will"), strlen(will_msg)); //設(shè)置遺囑
ret = mqtt_device_connect(&Iot_mqtt); //初始化并連接mqtt服務(wù)器
while (ret < 0) {
printf("ret = %d\r\n", ret);
sleep(3);
ret = mqtt_device_connect(&Iot_mqtt);
}
MyMQTT_Subscribe(&Iot_mqtt);
/* 子函數(shù)(被調(diào)用函數(shù)) */
int MyMQTT_Subscribe(Cloud_MQTT_t *MQTT)
{
int ret;
int rc = MQTTSubscribe(&MQTT->Client, MQTT->sub_topic, opts.qos, MQTTMessageArrived_CallbBack);
if (rc) {
printf("mqtt subscribe fail \n");
ret = -105;
goto __END;
}
printf("Subscribed %d\n", rc);
MQTT->DataArrived_CallBack = mqtt_data_rx_callback;
__END:
return ret;
}
void MQTTMessageArrived_CallbBack(MessageData* md)
{
MQTTMessage *message = md->message;
MQTTString *topicN = md->topicName;
Cloud_MQTT_t *piot_mqtt = &Iot_mqtt;
if (NULL != piot_mqtt->DataArrived_Cb) {
char topicBuf[MQTT_TOPIC_SIZE];
char msgBuf[MQTT_BUF_SIZE];
memcpy(topicBuf, topicN->lenstring.data, topicN->lenstring.len);
topicBuf[topicN->lenstring.len] = 0;
memcpy(msgBuf, message->payload, message->payloadlen);
msgBuf[message->payloadlen] = 0;
piot_mqtt->DataArrived_CallBack((void *)msgBuf, message->payloadlen, (void *)topicBuf);//異步消息體
}
}
void mqtt_data_rx_callback(void *pbuf, int len, void *ptopic)
{
printf("[%s]>>%s\n",(unsigned char *)ptopic, (unsigned char *)pbuf); //打印接收到的數(shù)據(jù)
}
MQTTUnsubscribe調(diào)用示例:
int My_MQTT_Unsubscribe(Cloud_MQTT_t *MQTT)
{
int ret;
int rc = MQTTUnsubscribe(&MQTT->Client, MQTT->sub_topic[4]); //取消訂閱第5個主題
if (rc) {
printf("mqtt UnSubscribe failed \n");
ret = -106;
goto __END;
}
printf("UnSubscribed %d\n", rc);
__END:
return ret;
]
MQTTSubscribe(...)——訂閱主題函數(shù)-源代碼
/**********************************************************************************
Client* c: 指向MQTT客戶端結(jié)構(gòu)體(句柄)的指針
const char* topicFilter: 主題過濾器(定義著要訂閱/退訂的主題的字符串名字)
enum QoS qos: 所訂閱的主題的服務(wù)質(zhì)量QoS (0、1 或 2)
messageHandler messageHandler: 訂閱主題的消息回調(diào)函數(shù)
返回值: 0------成功 非0------失敗
**********************************************************************************/
int MQTTSubscribe(Client* c, const char* topicFilter, \
enum QoS qos, messageHandler messageHandler)
{
int rc = FAILURE;
Timer timer;
int len = 0;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected)
goto exit;
len = MQTTSerialize_subscribe(c->buf, c->buf_size, \
0, getNextPacketId(c), 1, &topic, (int*)&qos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{
int count = 0, grantedQoS = -1;
unsigned short mypacketid;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, \
c->readbuf, c->readbuf_size) == 1)
rc = grantedQoS; // 0, 1, 2 or 0x80
if (rc != 0x80)
{
int i;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == 0)
{
c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = messageHandler;
rc = 0;
break;
}
}
}
}
else
rc = FAILURE;
exit:
return rc;
}
??每個訂閱都是由一對主題和QoS級別組成。借助通配符,使得可以訂閱特定的主題模式。而且,通過參數(shù)topicFilter的含義我們也可以看出,傳進(jìn)去的主題字符串是會被Broker解析一個或多個主題并由此形成實(shí)際可以訂閱的主題列表的。
MQTTUnsubscribe(...)——取消訂閱主題函數(shù)-源代碼
/**********************************************************************************
Client* c: 指向MQTT客戶端結(jié)構(gòu)體(句柄)的指針
const char* topicFilter: 主題過濾器(定義著要訂閱/退訂的主題的字符串名字)
返回值: 0------成功 非0------失敗
**********************************************************************************/
int MQTTUnsubscribe(Client* c, const char* topicFilter)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
int len = 0;
InitTimer(&timer);
countdown_ms(&timer, c->command_timeout_ms);
if (!c->isconnected)
goto exit;
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, \
0, getNextPacketId(c), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
{
unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, \
c->readbuf, c->readbuf_size) == 1)
rc = 0;
}
else
rc = FAILURE;
exit:
return rc;
}
??MQTTUnsubscribe從代理服務(wù)器上刪除某個客戶端已存在的訂閱。退訂消息和訂閱消息很相似,同樣包括一個Packet Identifier(包標(biāo)識符:標(biāo)識消息的唯一標(biāo)識符)和 TopicFilter(主題列表,包含客戶端希望退訂的任意數(shù)量的主題)。
PS:不管最初訂閱時QoS級別是什么,topicfilter列表中的主題都將被退訂。