MQTT,RabbitMQ初步使用 - Ubuntu

MQTT

使用源碼安裝

下載 http://mosquitto.org/files/source/mosquitto-1.4.11.tar.gz
# 解壓
tar -zxfv mosquitto-1.4.11.tar.gz
# 進入目錄
cd mosquitto-1.4.11
# 編譯
make
# 安裝
sudo make install

其中會需要一些依賴(編譯過程找不到)

* openssl/ssl.h
  sudo apt-get install libssl-dev
* ares.h
  sudo apt-get install libc-ares-dev
* uuid/uuid.h
  sudo apt-get install uuid-dev
* 找不到libmosquitto.so.1文件
  sudo vi /etc/ld.so.conf.d/libc.conf or liblocal.conf
  添加 /usr/local/lib64
  保存,刷新 ldconfig
* 最后我們安裝 paho 用于連接MQTT服務
  sudo pip3 install paho-mqtt

接下來就可以進行測試了

* 首先輸入 mosquitto -v 來啟動MQTT服務```
  • 接著新建個py文件 touch mqtt_test.py 用于數(shù)據(jù)接收

! /usr/bin/env python3

-- coding: utf-8 --

import json
import time
import random
from datetime import datetime

import paho.mqtt.client as mqtt

HOST = "127.0.0.1"
PORT = 1883
CLIENT = mqtt.Client()

def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc), userdata)
client.subscribe("C0001/#", 1) # 訂閱top

def on_message(client, userdata, msg):
if msg:
body = str(msg.payload, encoding="utf-8")
print("數(shù)據(jù)為: {0}".format(body))
else:
print("數(shù)據(jù)為空")

def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")

if name == "main":
"""mqtt_test"""

CLIENT.on_connect = on_connect
CLIENT.on_message = on_message
CLIENT.on_disconnect = on_disconnect
try:
    CLIENT.connect(HOST, PORT, 60)
    CLIENT.loop_forever()
except KeyboardInterrupt:
    print("Interrupt received")
    CLIENT.disconnect()
except Exception as error:
    print("error: ", error)
    CLIENT.disconnect()
  • 接著新建個py文件 touch mqtt_send.py 用于數(shù)據(jù)發(fā)送

! /usr/bin/env python3

-- coding: utf-8 --

import time
import random

import paho.mqtt.client as mqtt

HOST = "127.0.0.1"
PORT = 1883
CLIENT = mqtt.Client()

if name == "main":
""""""
while 1:
CLIENT.connect(HOST, PORT, 60)
data_json = {
'id': 'SW0021',
'pid': 'W0003',
}
CLIENT.publish('C0001/{0}/{1}'.format(data_json['pid'], data_json['id']), str(data_json), 1)
CLIENT.disconnect()
time.sleep(3)

之后分別運行兩個py文件就可以收到數(shù)據(jù)了

###RabbitMQ
安裝RabbitMQ服務
```sudo apt-get install rabbitmq-server```
安裝pika 用于連接RabbitMQ服務
```sudo pip3 install pika```

touch rabbitmq_s.py

! /usr/bin/env python3

-- coding: utf-8 --

import pika

if name == "main":
# 建立一個實例
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost') # 默認端口5672,可不寫
)
# 聲明一個管道,在管道里發(fā)消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='sensor_data', durable=True)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
send_data = "OK"
channel.basic_publish(exchange='',
routing_key='sensor_data', # queue名字
body=send_data,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)) # 消息內(nèi)容

print(" [x] Sent: " + send_data)
connection.close()  # 隊列關閉

touch rabbimqt_r.py

! /usr/bin/env python3

-- coding: utf-8 --

from datetime import datetime
import pika

def callback(ch, method, properties, body): # 四個參數(shù)為標準格式
body = str(body, encoding='utf-8')
print("RabbitMQ: [{0}] Received: {1}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), body))
ch.basic_ack(delivery_tag=method.delivery_tag) # 告訴生成者,消息處理完成
print("Rabbitmq 讀取OK")

if name == "main":
# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# 聲明管道
channel = connection.channel()

# 為什么又聲明了一個‘sensor_data’隊列?
# 如果確定已經(jīng)聲明了,可以不聲明。但是你不知道那個機器先運行,所以要聲明兩次。
channel.queue_declare(queue='sensor_data', durable=True)

# RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message, 需要添加更多的Consumer,或者創(chuàng)建更多的virtualHost來細化你的設計。
# channel.basic_qos(prefetch_count=1)

channel.basic_consume(  # 消費消息
    callback,  # 如果收到消息,就調(diào)用callback函數(shù)來處理消息
    queue='sensor_data',  # 你要從那個隊列里收消息
    # no_ack=True  # 寫的話,如果接收消息,機器宕機消息就丟了
    # 一般不寫。宕機則生產(chǎn)者檢測到發(fā)給其他消費者
)

print(' [*]RabbotMQ Wait Msg. To exit press CTRL+C')
channel.start_consuming()  # 開始消費消息
之后分別運行兩個py文件就可以看到效果了,rabbitmq服務是自動運行的,可以使用 ***service rabbitmq-server xxx ***命令進行控制


文筆不好,還望見諒,本就是個人使用筆記的簡單記錄

***By: single430***
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容