使用python實(shí)現(xiàn)MQTT通信

MQTT 是一種基于發(fā)布/訂閱模式的 輕量級物聯(lián)網(wǎng)消息傳輸協(xié)議,由IBM在1999年發(fā)布。MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時可靠的消息服務(wù)。作為一種低開銷、低帶寬占用的即時通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動應(yīng)用等方面有較廣泛的應(yīng)用。

MQTT特點(diǎn)

1. 使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應(yīng)用程序耦合。該協(xié)議需要客戶端和服務(wù)端,而協(xié)議中主要有三種身份:發(fā)布者(Publisher)、代理(Broker,服務(wù)器)、訂閱者(Subscriber)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,而消息發(fā)布者可以同時是訂閱者,實(shí)現(xiàn)了生產(chǎn)者與消費(fèi)者的脫耦;
2. 對負(fù)載內(nèi)容屏蔽的消息傳輸;
3. 使用 TCP/IP 提供網(wǎng)絡(luò)連接;
4. 有三種消息發(fā)布服務(wù)質(zhì)量:
a) “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因?yàn)椴痪煤筮€會有第二次發(fā)送。
b) “至少一次”,確保消息到達(dá),但消息重復(fù)可能會發(fā)生。
c) “只有一次”,確保消息到達(dá)一次。這一級別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。
5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;
6. 使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機(jī)制。

實(shí)現(xiàn)MQTT協(xié)議需要客戶端和服務(wù)器端通訊完成,在通訊過程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務(wù)器)、訂閱者(Subscribe)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,消息發(fā)布者可以同時是訂閱者。
發(fā)布者發(fā)送消息到代理服務(wù)器,服務(wù)器轉(zhuǎn)發(fā)消息到訂閱者。

MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:
(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內(nèi)容(payload);
(2)payload,可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。

MQTT廣泛應(yīng)用于物聯(lián)網(wǎng)、移動互聯(lián)網(wǎng)、智能硬件、車聯(lián)網(wǎng)、電力能源等行業(yè)。下面我們在Python 項(xiàng)目中使用 paho-mqtt 客戶端庫 ,實(shí)現(xiàn)客戶端與 MQTT 服務(wù)器的連接、訂閱、收發(fā)消息等功能,
由于目前沒有物聯(lián)網(wǎng)設(shè)備,就在linux虛擬機(jī)中獲取服務(wù)器cpu,磁盤,內(nèi)存等信息來模擬獲取物聯(lián)網(wǎng)設(shè)備信息的消息發(fā)布與訂閱功能。

搭建開發(fā)環(huán)境 創(chuàng)建/root/mymqtt 為項(xiàng)目根目錄

首先搭建MQTT代理服務(wù)器,我們使用EMQX來做MQTT代理服務(wù)器。

  1. 下載 emqx-centos7-4.2.7-x86_64.zip 文件到mymqtt目錄中
    wget https://www.emqx.cn/downloads/broker/v4.2.7/emqx-centos7-4.2.7-x86_64.zip
  2. 安裝
    unzip emqx-centos7-4.2.7-x86_64.zip
  3. 啟動MQTT代理服務(wù)器
    ./emqx/bin/emqx start
  4. 驗(yàn)證代理服務(wù)器是否正常運(yùn)行

    ps aux | grep emqx
    image.png
    可以看到EMQX已經(jīng)在運(yùn)行了,MQTT代理服務(wù)器搭建成功。

搭建python環(huán)境,在mymqtt目錄下創(chuàng)建mypy目錄為python項(xiàng)目根目錄
image.png
  1. 使用pyenv 搭建python虛擬環(huán)境,并安裝依賴包psutil和paho-mqtt
    image.png
  2. 消息發(fā)布代碼
文件名:mypub.py


#!/usr/bin/env python
#coding:utf-8

import time
import json
import psutil
import random
from paho.mqtt import client as mqtt_client

broker = '127.0.0.1'  # mqtt代理服務(wù)器地址
port = 1883
keepalive = 60     # 與代理通信之間允許的最長時間段(以秒為單位)              
topic = "/python/mqtt"  # 消息主題
client_id = f'python-mqtt-pub-{random.randint(0, 1000)}'  # 客戶端id不能重復(fù)

def to_M(n):
    '''將B轉(zhuǎn)換為M'''
    u = 1024 * 1024
    m = round(n / u, 2)
    return m

def get_info():
    '''獲取系統(tǒng)硬件信息:cpu利用率,cpu個數(shù),系統(tǒng)負(fù)載,內(nèi)存信息等'''
    cpu_percent = psutil.cpu_percent(interval=1)
    cpu_count = psutil.cpu_count()
    sys_loadavg = [round(x / psutil.cpu_count() * 100, 2) for x in psutil.getloadavg()]
    mem = psutil.virtual_memory()
    mem_total, men_free = to_M(mem.total), to_M(mem.free)
    mem_percent = mem.percent
    info = {
        'cpu_percent': cpu_percent,
        'cpu_count' : cpu_count,
        'sys_loadavg': sys_loadavg,
        'mem_total': mem_total,
        'mem_percent': mem_percent,
        'men_free': men_free
    }
    # mqtt只能傳輸字符串?dāng)?shù)據(jù)
    return json.dumps(info)

def connect_mqtt():
    '''連接mqtt代理服務(wù)器'''
    def on_connect(client, userdata, flags, rc):
        '''連接回調(diào)函數(shù)'''
        # 響應(yīng)狀態(tài)碼為0表示連接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # 連接mqtt代理服務(wù)器,并獲取連接引用
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive)
    return client

def publish(client):
    '''發(fā)布消息'''
    while True:
        '''每隔4秒發(fā)布一次服務(wù)器信息'''
        time.sleep(4)
        msg = get_info()
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")

def run():
    '''運(yùn)行發(fā)布者'''
    client = connect_mqtt()
    # 運(yùn)行一個線程來自動調(diào)用loop()處理網(wǎng)絡(luò)事件, 非阻塞
    client.loop_start()
    publish(client)

if __name__ == '__main__':
    run()

  1. 消息訂閱代碼
文件名:mysub.py


#!/usr/bin/env python
#coding:utf-8

import random
from paho.mqtt import client as mqtt_client


broker = '127.0.0.1'  # mqtt代理服務(wù)器地址
port = 1883
keepalive = 60     # 與代理通信之間允許的最長時間段(以秒為單位)              
topic = "/python/mqtt"  # 消息主題
client_id = f'python-mqtt-sub-{random.randint(0, 1000)}'  # 客戶端id不能重復(fù)

def connect_mqtt():
    '''連接mqtt代理服務(wù)器'''
    def on_connect(client, userdata, flags, rc):
        '''連接回調(diào)函數(shù)'''
        # 響應(yīng)狀態(tài)碼為0表示連接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive )
    return client

def subscribe(client: mqtt_client):
    '''訂閱主題并接收消息'''
    def on_message(client, userdata, msg):
        '''訂閱消息回調(diào)函數(shù)'''
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
    # 訂閱指定消息主題
    client.subscribe(topic)
    client.on_message = on_message


def run():
    # 運(yùn)行訂閱者
    client = connect_mqtt()
    subscribe(client)
    #  運(yùn)行一個線程來自動調(diào)用loop()處理網(wǎng)絡(luò)事件, 阻塞模式
    client.loop_forever()


if __name__ == '__main__':
    run()
  1. 命令行進(jìn)入python虛擬環(huán)境,啟動發(fā)布者
    image.png

    可以看到發(fā)布者啟動并運(yùn)行成功

  2. 打開第二個命令行窗口,進(jìn)入虛擬環(huán)境,啟動訂閱者
    image.png

    可以看到訂閱者啟動成功,并接受到了發(fā)布者發(fā)布的消息。

至此,我們完成了使用 paho-mqtt 客戶端連接到 本地MQTT 服務(wù)器并實(shí)現(xiàn)了測試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容