首先安裝paho.mqtt.client這個python庫:
需求描述:
- 第一步:客戶端訪問某個路由就可以執(zhí)行固定的提交到MQTT服務器的操作。
- 第二步:客戶端以get方法訪問某個路由,flask提取get中的參數(shù)并驗證,驗證完成以后將參數(shù)中的數(shù)據(jù)提交到MQTT服務器中。
- 第三步:將前面的代碼整合到SockIO中去。
一、flask上的簡單應用:
#encoding: utf-8
import paho.mqtt.client as mqtt
# 當連接上服務器后回調此函數(shù)
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# 放在on_connect函數(shù)里意味著
# 重新連接時訂閱主題將會被更新
client.subscribe("chat")
# 從服務器接受到消息后回調此函數(shù)
def on_message(client, userdata, msg):
print("主題:" + msg.topic + " 消息:" + str(msg.payload))
client = mqtt.Client()
# 參數(shù)有 Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
client.on_connect = on_connect # 設置連接上服務器回調函數(shù)
client.on_message = on_message # 設置接收到服務器消息回調函數(shù)
client.connect("test.mosquitto.org", 1883, 60) # 連接服務器,端口為1883,維持心跳為60秒
client.publish('chat','this is a test!') # 往主題chat里發(fā)送消息
client.loop_forever()
二、flaskIO配合MQTT的應用
#encoding: utf-8
from flask import Flask, render_template, request
import eventlet
import json
import paho.mqtt.client as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'test.mosquitto.org'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 60
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2
# Parameters for SSL enabled
# app.config['MQTT_BROKER_PORT'] = 8883
# app.config['MQTT_TLS_ENABLED'] = True
# app.config['MQTT_TLS_INSECURE'] = True
# app.config['MQTT_TLS_CA_CERTS'] = 'ca.crt'
mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)
@app.route('/')
def hello_world():
return render_template('index.html')
@app.route('/ws')
def wevsocket():
return render_template('websocket_mqtt_demo.html')
@socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt_ws.publish(data['topic'], data['message'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt_ws.subscribe(data['topic'])
@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
socketio.emit('mqtt_message', data=data)
@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
if __name__ == '__main__':
socketio.run(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)
三、flask、flaskIO、加純Python代碼的MQTT
#encoding: utf-8
from flask import Flask, render_template, request
import eventlet
import json
import paho.mqtt.client as mqtt_client
from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config['MQTT_BROKER_URL'] = 'test.mosquitto.org'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 60
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2
# Parameters for SSL enabled
# app.config['MQTT_BROKER_PORT'] = 8883
# app.config['MQTT_TLS_ENABLED'] = True
# app.config['MQTT_TLS_INSECURE'] = True
# app.config['MQTT_TLS_CA_CERTS'] = 'ca.crt'
mqtt_ws = Mqtt(app)
socketio = SocketIO(app)
bootstrap = Bootstrap(app)
# 這里的初始化MQTT可以讓純Python代碼運行起來
mqtt_client = mqtt_client.Client()
mqtt_client.connect("test.mosquitto.org", 1883, 60) # 連接服務器,端口為1883,維持心跳為60秒
@app.route('/')
def hello_world():
return render_template('index.html')
# 每次訪問這個路由將向chat主題,提交一次數(shù)據(jù)data
@app.route('/mqtts')
def mqtts():
# mqtt_client = myMqtt()
mqtt_client.publish('chat', 'data')
return 'ok'
@app.route('/ws')
def wevsocket():
return render_template('websocket_mqtt_demo.html')
@socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt_ws.publish(data['topic'], data['message'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt_ws.subscribe(data['topic'])
@mqtt_ws.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
socketio.emit('mqtt_message', data=data)
@mqtt_ws.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
if __name__ == '__main__':
socketio.run(app, host='127.0.0.1', port=5001, use_reloader=True, debug=True)