MQTT
使用源碼安裝
下載 http://mosquitto.org/files/source/mosquitto-1.4.11.tar.gz
# 解壓
tar -zxfv mosquitto-1.4.11.tar.gz
# 進入目錄
cd mosquitto-1.4.11
# 編譯
make
# 安裝
sudo make install
其中會需要一些依賴(編譯過程找不到)
* openssl/ssl.h
sudo apt-get install libssl-dev
* ares.h
sudo apt-get install libc-ares-dev
* uuid/uuid.h
sudo apt-get install uuid-dev
* 找不到libmosquitto.so.1文件
sudo vi /etc/ld.so.conf.d/libc.conf or liblocal.conf
添加 /usr/local/lib64
保存,刷新 ldconfig
* 最后我們安裝 paho 用于連接MQTT服務
sudo pip3 install paho-mqtt
接下來就可以進行測試了
* 首先輸入 mosquitto -v 來啟動MQTT服務```
- 接著新建個py文件 touch mqtt_test.py 用于數(shù)據(jù)接收
! /usr/bin/env python3
-- coding: utf-8 --
import json
import time
import random
from datetime import datetime
import paho.mqtt.client as mqtt
HOST = "127.0.0.1"
PORT = 1883
CLIENT = mqtt.Client()
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc), userdata)
client.subscribe("C0001/#", 1) # 訂閱top
def on_message(client, userdata, msg):
if msg:
body = str(msg.payload, encoding="utf-8")
print("數(shù)據(jù)為: {0}".format(body))
else:
print("數(shù)據(jù)為空")
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
if name == "main":
"""mqtt_test"""
CLIENT.on_connect = on_connect
CLIENT.on_message = on_message
CLIENT.on_disconnect = on_disconnect
try:
CLIENT.connect(HOST, PORT, 60)
CLIENT.loop_forever()
except KeyboardInterrupt:
print("Interrupt received")
CLIENT.disconnect()
except Exception as error:
print("error: ", error)
CLIENT.disconnect()
- 接著新建個py文件 touch mqtt_send.py 用于數(shù)據(jù)發(fā)送
! /usr/bin/env python3
-- coding: utf-8 --
import time
import random
import paho.mqtt.client as mqtt
HOST = "127.0.0.1"
PORT = 1883
CLIENT = mqtt.Client()
if name == "main":
""""""
while 1:
CLIENT.connect(HOST, PORT, 60)
data_json = {
'id': 'SW0021',
'pid': 'W0003',
}
CLIENT.publish('C0001/{0}/{1}'.format(data_json['pid'], data_json['id']), str(data_json), 1)
CLIENT.disconnect()
time.sleep(3)
之后分別運行兩個py文件就可以收到數(shù)據(jù)了
###RabbitMQ
安裝RabbitMQ服務
```sudo apt-get install rabbitmq-server```
安裝pika 用于連接RabbitMQ服務
```sudo pip3 install pika```
touch rabbitmq_s.py
! /usr/bin/env python3
-- coding: utf-8 --
import pika
if name == "main":
# 建立一個實例
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # 默認端口5672,可不寫
)
# 聲明一個管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='sensor_data', durable=True)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
send_data = "OK"
channel.basic_publish(exchange='',
routing_key='sensor_data', # queue名字
body=send_data,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)) # 消息內(nèi)容
print(" [x] Sent: " + send_data)
connection.close() # 隊列關閉
touch rabbimqt_r.py
! /usr/bin/env python3
-- coding: utf-8 --
from datetime import datetime
import pika
def callback(ch, method, properties, body): # 四個參數(shù)為標準格式
body = str(body, encoding='utf-8')
print("RabbitMQ: [{0}] Received: {1}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), body))
ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
print("Rabbitmq 讀取OK")
if name == "main":
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()
# 為什么又聲明了一個‘sensor_data’隊列?
# 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個機器先運行,所以要聲明兩次。
channel.queue_declare(queue='sensor_data', durable=True)
# RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message, 需要添加更多的Consumer,或者創(chuàng)建更多的virtualHost來細化你的設計。
# channel.basic_qos(prefetch_count=1)
channel.basic_consume( # 消費消息
callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
queue='sensor_data', # 你要從那個隊列里收消息
# no_ack=True # 寫的話,如果接收消息,機器宕機消息就丟了
# 一般不寫。宕機則生產(chǎn)者檢測到發(fā)給其他消費者
)
print(' [*]RabbotMQ Wait Msg. To exit press CTRL+C')
channel.start_consuming() # 開始消費消息
之后分別運行兩個py文件就可以看到效果了,rabbitmq服務是自動運行的,可以使用 ***service rabbitmq-server xxx ***命令進行控制
文筆不好,還望見諒,本就是個人使用筆記的簡單記錄
***By: single430***