【物聯(lián)網(wǎng)應用】Flask--MQTT的應用

首先安裝paho.mqtt.client這個python庫:


需求描述:

  • 第一步:客戶端訪問某個路由就可以執(zhí)行固定的提交到MQTT服務器的操作。
  • 第二步:客戶端以get方法訪問某個路由,flask提取get中的參數(shù)并驗證,驗證完成以后將參數(shù)中的數(shù)據(jù)提交到MQTT服務器中。
  • 第三步:將前面的代碼整合到SockIO中去。

一、flask上的簡單應用:

#encoding: utf-8

import paho.mqtt.client as mqtt


# 當連接上服務器后回調此函數(shù)
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    # 放在on_connect函數(shù)里意味著
    # 重新連接時訂閱主題將會被更新
    client.subscribe("chat")

# 從服務器接受到消息后回調此函數(shù)
def on_message(client, userdata, msg):
    print("主題:" + msg.topic + " 消息:" + str(msg.payload))


client = mqtt.Client()
# 參數(shù)有 Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
client.on_connect = on_connect  # 設置連接上服務器回調函數(shù)
client.on_message = on_message  # 設置接收到服務器消息回調函數(shù)
client.connect("test.mosquitto.org", 1883, 60)  # 連接服務器,端口為1883,維持心跳為60秒
client.publish('chat','this is a test!') # 往主題chat里發(fā)送消息
client.loop_forever()

二、flaskIO配合MQTT的應用

#encoding: utf-8

from flask import Flask, render_template, request
import eventlet
import json
import paho.mqtt.client as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap

eventlet.monkey_patch()

app = Flask(__name__)

app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'test.mosquitto.org'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 60
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2


# Parameters for SSL enabled
# app.config['MQTT_BROKER_PORT'] = 8883
# app.config['MQTT_TLS_ENABLED'] = True
# app.config['MQTT_TLS_INSECURE'] = True
# app.config['MQTT_TLS_CA_CERTS'] = 'ca.crt'

mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)


@app.route('/')
def hello_world():
    return render_template('index.html')


@app.route('/ws')
def wevsocket():
    return render_template('websocket_mqtt_demo.html')

@socketio.on('publish')
def handle_publish(json_str):
    data = json.loads(json_str)
    mqtt_ws.publish(data['topic'], data['message'])


@socketio.on('subscribe')
def handle_subscribe(json_str):
    data = json.loads(json_str)
    mqtt_ws.subscribe(data['topic'])


@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
    data = dict(
        topic=message.topic,
        payload=message.payload.decode()
    )
    socketio.emit('mqtt_message', data=data)


@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
    print(client, userdata, level, buf)

if __name__ == '__main__':
    socketio.run(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)

三、flask、flaskIO、加純Python代碼的MQTT

#encoding: utf-8

from flask import Flask, render_template, request
import eventlet
import json
import paho.mqtt.client as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap

eventlet.monkey_patch()

app = Flask(__name__)

app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'test.mosquitto.org'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 60
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2


# Parameters for SSL enabled
# app.config['MQTT_BROKER_PORT'] = 8883
# app.config['MQTT_TLS_ENABLED'] = True
# app.config['MQTT_TLS_INSECURE'] = True
# app.config['MQTT_TLS_CA_CERTS'] = 'ca.crt'



mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)

# 這里的初始化MQTT可以讓純Python代碼運行起來
mqtt_client = mqtt_client.Client()
mqtt_client.connect("test.mosquitto.org", 1883, 60)  # 連接服務器,端口為1883,維持心跳為60秒


@app.route('/')
def hello_world():
    return render_template('index.html')


# 每次訪問這個路由將向chat主題,提交一次數(shù)據(jù)data
@app.route('/mqtts')
def mqtts():
    # mqtt_client = myMqtt()
    mqtt_client.publish('chat', 'data')
    return 'ok'


@app.route('/ws')
def wevsocket():
    return render_template('websocket_mqtt_demo.html')


@socketio.on('publish')
def handle_publish(json_str):
    data = json.loads(json_str)
    mqtt_ws.publish(data['topic'], data['message'])


@socketio.on('subscribe')
def handle_subscribe(json_str):
    data = json.loads(json_str)
    mqtt_ws.subscribe(data['topic'])


@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
    data = dict(
        topic=message.topic,
        payload=message.payload.decode()
    )
    socketio.emit('mqtt_message', data=data)


@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
    print(client, userdata, level, buf)


if __name__ == '__main__':
    socketio.run(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容