基于mqtt協(xié)議的智能家居系統(tǒng)解決方案

應(yīng)用場(chǎng)景說明:

在經(jīng)濟(jì)高速發(fā)展的今天,現(xiàn)代人對(duì)自己的生活的要求越來越高,家電設(shè)備也迅猛增加,但是日常生活中,人們不擅長對(duì)于家電的管理,而造成了大量的不必要的能耗損失。在這樣的一種情況下,如果有一套智能家居系統(tǒng)能夠管理家庭電器的狀態(tài),我們也可以隨時(shí)的控制家電,這樣我們的生活效率將會(huì)有很大程度上的提高。

在這樣的一個(gè)需求的大背景下,我們又對(duì)設(shè)備和設(shè)備之間,人和設(shè)備之間,進(jìn)行了一番詳細(xì)的分析。首先我們來看設(shè)備和設(shè)備之間的需求。

1.假設(shè)在N市的A設(shè)備的狀態(tài)發(fā)生改變,我們需要遠(yuǎn)在M市的B設(shè)備的狀態(tài)也發(fā)生改變,這種需求我們稱為不同網(wǎng)段的設(shè)備間的聯(lián)動(dòng)需求。

2.假設(shè)在N市的A設(shè)備的狀態(tài)發(fā)生改變,我們需要當(dāng)前統(tǒng)一網(wǎng)段下的B設(shè)備也發(fā)生改變,這種需求我們稱為同一網(wǎng)段的設(shè)備間的聯(lián)動(dòng)需求。

3.假設(shè)主人遠(yuǎn)行,而忘記了自己家有沒有鎖門,有沒有關(guān)燈,有沒有不該開啟的電器設(shè)備關(guān)閉,這是我們需要得知家中設(shè)備運(yùn)行的狀態(tài),并調(diào)控到最佳狀態(tài)。我們稱之為人機(jī)遠(yuǎn)程調(diào)控需求。

4.假設(shè)在家庭與廣域網(wǎng)斷網(wǎng)的情況下,我們還可以得知和控制家庭的設(shè)備,而不是失去對(duì)于家庭設(shè)備間的控制。我們稱之為遠(yuǎn)程失聯(lián)需求。

基于國內(nèi)市場(chǎng)的大需求和我們自行分析的小需求下,我們?cè)O(shè)計(jì)了一套滿足于以上四點(diǎn)控制剛性需求的智能家居控制系統(tǒng)。

系統(tǒng)結(jié)構(gòu)說明

我們現(xiàn)在有了上面的需求分析,這時(shí)我們就可以對(duì)系統(tǒng)進(jìn)行選型和架構(gòu)了,我們對(duì)于設(shè)備間的聯(lián)動(dòng)需求進(jìn)行分析后選擇了一種物聯(lián)網(wǎng)廣泛使用的推送消息的協(xié)議機(jī)制(mqtt),然后對(duì)它進(jìn)行二次開發(fā)和封裝。下面我們就來看看系統(tǒng)的設(shè)計(jì)結(jié)構(gòu):

結(jié)構(gòu):

1.節(jié)點(diǎn)事件上報(bào)(publish nodeid event),這個(gè)場(chǎng)景用于當(dāng)人在現(xiàn)場(chǎng)對(duì)設(shè)備的狀態(tài)進(jìn)行了改變,這時(shí),該設(shè)備應(yīng)該向主服務(wù)器進(jìn)行通報(bào),事件的發(fā)生,以及當(dāng)前的狀態(tài),還有為了實(shí)現(xiàn)設(shè)備的熱插拔,當(dāng)設(shè)備連上這套系統(tǒng)后,它便會(huì)廣播上線通知,當(dāng)設(shè)備異常斷開系統(tǒng)后,會(huì)發(fā)送遺言離線通知,方便我們對(duì)節(jié)點(diǎn)事件異常進(jìn)行及時(shí)的處理。

2.節(jié)點(diǎn)屬性上報(bào)(publish nodeid property),當(dāng)人為的改變了設(shè)備后,主服務(wù)器和在線的控制端將會(huì)受到該設(shè)備節(jié)點(diǎn)的屬性上報(bào)通知,這個(gè)行為主要是及時(shí)的獲取設(shè)備點(diǎn)的狀態(tài)信息。當(dāng)設(shè)備剛上線是也會(huì)進(jìn)行屬性播報(bào),以便控制端熱加載設(shè)備。

3.節(jié)點(diǎn)方法被調(diào)用(subscribe nodeid call | publish 0 ack),當(dāng)N市的A設(shè)備狀態(tài)發(fā)生改變,M市的B設(shè)備也要發(fā)生狀態(tài)的改變,就會(huì)直接讓A去控制B設(shè)備,這時(shí)我們成A為控制器,B為執(zhí)行器。那么A就會(huì)調(diào)用控制遠(yuǎn)程設(shè)備命令,B就會(huì)收到call命令之后執(zhí)行命令并返回一個(gè)ack以確認(rèn)信息的無誤性。

4.系統(tǒng)廣播事件(subscribe 0 system),當(dāng)所有的設(shè)備同時(shí)接受統(tǒng)一命令的調(diào)控時(shí),我們?yōu)榱颂岣咝畔⑻幚淼男适褂孟到y(tǒng)廣播事件來統(tǒng)一調(diào)度。

在這五個(gè)控制總命令下,我們還將設(shè)計(jì)針對(duì)每種設(shè)備的控制子命令格式。從而達(dá)到既從屬分布式控制有歸屬于集中式控制系統(tǒng)。

智能家居互聯(lián)的通訊協(xié)議:

1.角色定義:?

節(jié)點(diǎn),設(shè)備,控制器,服務(wù)器

2.主題結(jié)構(gòu):

?yqmiot/<accountid>/<receiver>/<sender>/<command>

3.消息結(jié)構(gòu):

{

receiver:?,???#?接受者nodeid

sender:?,???????#?發(fā)送者nodeid

name:?,????????#?主命令(名字有帶商定)

action:?,???????#?子命令(可為null)

callseq:?,?????#?調(diào)用序號(hào)(多次調(diào)用時(shí)確定回包對(duì)應(yīng)的請(qǐng)求)?(非call和ack命令可以為null)

params:?,???????#?命令參數(shù)

#?seq:?,???????????#?包序號(hào)(用戶篩選重復(fù)數(shù)據(jù)包)?暫未使用

}

備注:receiver,sender,name?未來這三者在發(fā)送數(shù)據(jù)包中可能被省略,因主題中已經(jīng)存在。

屬性上報(bào)(property)

-command: "property"

-params:?設(shè)備屬性?({"name":?"hello",?"status":?"正忙呢",?"yqmiot.property.nodeid":?27888})

事件上報(bào)(event)

-command: "event"

-action:?事件名?("yqmiot.event.online",?"yqmiot.event.offline")

-params:?事件參數(shù)

方法調(diào)用(call)

-command: "call"

-action:?方法名?("yqmiot.method.ping",?"yqmiot.method.test")

-callseq:?調(diào)用序號(hào)(每次調(diào)用都必須唯一)

-params:?方法參數(shù)

調(diào)用響應(yīng)(ack)

-command: "ack"

-action:?call包中的action

-callseq:?call包中的seq

-params:?回應(yīng)參數(shù)

其他(暫未使用)

服務(wù)器 nodeid: 0

全頻道廣播?nodeid:?0xffffffff

全服廣播?accountid:?0,?nodeid:?0

我們把通信協(xié)議搭建好了之后,就來開始構(gòu)建整個(gè)系統(tǒng),接下來就是要使用編程語言進(jìn)行編程實(shí)現(xiàn)。從協(xié)議開始,一步一步構(gòu)建 整套系統(tǒng)的通訊層和應(yīng)用層,以及控制端。

系統(tǒng)的構(gòu)建:

1.設(shè)備控制端的構(gòu)建:

我們是基于可以運(yùn)行嵌入式linux系統(tǒng)的設(shè)備,對(duì)節(jié)點(diǎn)進(jìn)行控制。由于linux系統(tǒng)的便利性。我們使用了python這種腳本對(duì)設(shè)備客戶端進(jìn)行了編程處理,接下來我們一步步的看,被控器的客戶端構(gòu)建。

1.1.引入依賴包和常用參數(shù).

# -*- encoding: utf-8 -*-

importlogging

importtime

importsys

importgetopt

importjson

frompaho.mqtt.clientimportClientasMqtt

VERSION?="1.0.1"

"""

每個(gè)設(shè)備都擁有三類特性:屬性,事件,方法。

屬性表示設(shè)備的當(dāng)前狀態(tài),比如:電力狀態(tài),照明開關(guān)等。每當(dāng)屬性發(fā)生改變就會(huì)立即上報(bào)。

事件表示設(shè)備當(dāng)前發(fā)生了什么,按下按鈕,電力不足警告等。

方法則是設(shè)備對(duì)外提供的操作接口,通過它可以對(duì)設(shè)備進(jìn)行控制。比如:重啟,打開照明,關(guān)機(jī)等。

"""

YQMIOT_OK?=0

YQMIOT_TIMEOUT?=1

YQMIOT_BROADCAST_RECEIVER?=0#?廣播接受者id

#?系統(tǒng)命令

YQMIOT_COMMAND_PROPERTY?="property"#?屬性上報(bào)

YQMIOT_COMMAND_EVENT?="event"#?事件上報(bào)

YQMIOT_COMMAND_CALL?="call"#?方法調(diào)用

YQMIOT_COMMAND_ACK?="ack"#?方法響應(yīng)

#?系統(tǒng)事件

YQMIOT_EVENT_ONLINE?="yqmiot.event.online"#?上線通知

YQMIOT_EVENT_OFFLINE?="yqmiot.event.offline"#?下線通知

YQMIOT_EVENT_TEST?="yqmiot.event.test"#?按下測(cè)試按鈕

#?系統(tǒng)屬性

YQMIOT_PROPERTY_NODEID?="yqmiot.property.nodeid"#?節(jié)點(diǎn)id號(hào)

YQMIOT_PROPERTY_ACCOUNTID?="yqmiot.property.accountid"#?節(jié)點(diǎn)所在賬號(hào)id(頻道id)頻道隔離

YQMIOT_PROPERTY_MODEL?="yqmiot.property.model"#?設(shè)備所屬類型

YQMIOT_PROPERTY_VERSION?="yqmiot.property.version"#?設(shè)備所屬固件版本號(hào)

#?系統(tǒng)方法

YQMIOT_METHOD_PING?="yqmiot.method.ping"#?ping連通測(cè)試

YQMIOT_METHOD_TEST?="yqmiot.method.test"#?方法調(diào)用測(cè)試

logging.basicConfig(level=logging.DEBUG,

format='[%(asctime)s]?%(levelname)s?%(message)s',

datefmt='%Y-%m-%d?%H:%M:%S')

root?=?logging.getLogger()

root.setLevel(logging.NOTSET)

1.2.mqtt通訊層的基本封裝

classMqttClient(object):

"""Mqtt通訊封裝"""

def__init__(self,address):

ifnotisinstance(address,tuple)?orlen(address)?!=2:

raiseValueError("Invalid?address.")

defon_connect(client,userdata,flags,rc):

self.handleConnected()

defon_message(client,userdata,msg):

self.handleMessage(msg.topic,?msg.payload)

self.client?=?Mqtt()

self.address?=?address

self.client.on_connect?=?on_connect

self.client.on_message?=?on_message

defhandleConnected(self):

pass

defhandleMessage(self,topic,payload):

pass

defpublish(self,topic,payload=None,qos=0,retain=False):

self.client.publish(topic,?payload,?qos,?retain)

defsubscribe(self,topic,qos=0):

self.client.subscribe(topic,?qos)

defstart(self):

self.client.connect_async(self.address[0],self.address[1])

self.client.loop_start()

defstop(self):

self.client.loop_stop()

defusername_pw_set(self,username,password=None):

self.client.username_pw_set(username,?password)

defwill_set(self,topic,payload=None,qos=0,retain=False):

self.client.will_set(topic,?payload,?qos,?retain)

1.3.家居互聯(lián)通訊層封裝

classYqmiotBase(MqttClient):

"""月球貓互聯(lián)通訊基類"""

def__init__(self,address,accountid,nodeid,authkey=None,username=None,password=None):

"""username和password是mqtt賬號(hào)密碼。"""

super(YqmiotBase,self).__init__(address)

self.username?=?username

self.password?=?password

self.accountid?=?accountid

self.nodeid?=?nodeid

self.authkey?=?authkey#TODO

self.callMethodInfo?=?{}#

self.callMethodTimeout?=10*1000#?方法調(diào)用超時(shí)時(shí)間TODO處理多線程問題。調(diào)用超時(shí)

self.callseq?=0

ifself.accountid?<=0orself.nodeid?<=0:

raiseValueError("Invalid?accountid?or?nodeid.")

defhandleConnected(self):

super(YqmiotBase,self).handleConnected()

#?偵聽發(fā)送給自己的消息

topic?="yqmiot/{self.accountid}/{self.nodeid}/#".format(self=self)

self.subscribe(topic)

defhandleMessage(self,topic,payload):

super(YqmiotBase,self).handleMessage(topic,?payload)

try:

prefix,?account,?receiver,?sender,?command?=?topic.split("/")

account?=int(account)

receiver?=int(receiver)

sender?=int(sender)

except:

logging.error("Invalid?topic.?{}".format(topic))

return

#?if?prefix?!=?"yqmiot"?\

#?????or?account?!=?self.accountid?\

#?????or?receiver?!=?self.nodeid:?#TODO處理廣播

#?????logging.error("It's?not?my?topic.?{}".format(topic))

#?????return

try:

payload?=?json.loads(payload)

except:

logging.error("Invalid?payload.?{}".format(payload))

return

cmd?=?Command(

name=?command,

action=?payload.get("action"),

receiver=?receiver,

sender=?sender,

callseq=?payload.get("callseq"),

params=?payload.get("params"))

try:

self.handleCommand(cmd)

except:

logging.error("Error?processing?command.?{}".format(topic))

return

defsendCommand(self,cmd):

ifcmd:

try:

accountid?=self.accountid

receiver?=?cmd.receiverifcmd.receiver?!=NoneelseYQMIOT_BROADCAST_RECEIVER#?默認(rèn)接受者是服務(wù)器

sender?=self.nodeid

name?=?cmd.name

action?=?cmd.action

callseq?=?cmd.callseq

params?=?cmd.paramsifcmd.params?!=Noneelse{}

topic?="yqmiot/{}/{}/{}/{}".format(accountid,?receiver,?sender,?name)

payload?=?{"action":?cmd.action,"callseq":?callseq,"params":?params}

self.publish(topic,?json.dumps(payload))

exceptException,?e:

logging.error("Error?sending?command."+str(e))

else:

logging.error("Invalid?cmd.")

defhandleCommand(self,cmd):

ifcmd.name?==?YQMIOT_COMMAND_CALL:

self.handleCommandCall(cmd)

elifcmd.name?==?YQMIOT_COMMAND_ACK:

callseq?=?cmd.callseq

ifcallseq?inself.callMethodInfo:

info?=self.callMethodInfo.pop(callseq)

cmd.action?=?info["action"]

cmd.time?=?millis()?-?info["time"]

self.handleCommandAck(cmd)

else:

logging.error("Drop?unknown?command.")

else:

logging.error("Command?not?supported.")

defhandleCommandCall(self,cmd):

ifcmd.action?==?YQMIOT_METHOD_PING:

self.handleCommandCallPing(cmd)

else:

logging.warn("Could?not?find?method.")

defhandleCommandAck(self,cmd):

ifcmd.action?==?YQMIOT_METHOD_PING:

self.handleCommandCallPingAck(cmd)

defcallMethod(self,receiver,action,params=None):

ifreceiver?and?receiver?!=?YQMIOT_BROADCAST_RECEIVER?and?action:

try:

self.callseq?+=1

cmd?=?Command(

name=?YQMIOT_COMMAND_CALL,

action=?action,

receiver=?receiver,

callseq=self.callseq,

params=?params)

self.callMethodInfo[cmd.callseq]?=?{"action":?action,"callseq":?cmd.callseq,"time":?millis()}

self.sendCommand(cmd)

except:

logging.error("Error?calling?remote?action.")

else:

logging.error("Remote?action?parameter?is?incorrect.")

defcallMethodPing(self,receiver):

self.callMethod(receiver,?YQMIOT_METHOD_PING)

defhandleCommandCallPing(self,cmd):

self.sendCommand(cmd.reply())

defhandleCommandCallPingAck(self,cmd):

pass

1.4.互聯(lián)客戶端封裝

classYqmiotClient(YqmiotBase):

"""月球貓互聯(lián)客戶端

屬性定時(shí)上報(bào)

屬性變更上報(bào)

事件上報(bào)

處理方法調(diào)用,并回包"""

defstart(self):

#?離線通知

topic?="yqmiot/{}/{}/{}/{}".format(self.accountid,?YQMIOT_BROADCAST_RECEIVER,self.nodeid,?YQMIOT_COMMAND_EVENT)

payload?=?{"action":?YQMIOT_EVENT_OFFLINE}

self.will_set(topic,?json.dumps(payload))

super(YqmiotClient,self).start()

defhandleConnected(self):

super(YqmiotClient,self).handleConnected()

logging.info("Connect?server?successfully.")

#?上線通知

self.reportEvent(YQMIOT_EVENT_ONLINE)

#TODO推送下線遺言

defreportProperty(self,params):

"""屬性上報(bào)

params(dict)?設(shè)備屬性集"""

ifisinstance(params,dict):

try:

cmd?=?Command(

name=?YQMIOT_COMMAND_PROPERTY,

receiver=?YQMIOT_BROADCAST_RECEIVER,

params=?params)

self.sendCommand(cmd)

except:

logging.error("An?error?occurred?while?reporting?the?property.")

else:

raiseTypeError("Incorrect?params?type.")

defreportEvent(self,action,params=None):

"""事件上報(bào)

action?事件名

params?參數(shù)"""

ifaction:

try:

cmd?=?Command(

name=?YQMIOT_COMMAND_EVENT,

action=?action,

receiver=?YQMIOT_BROADCAST_RECEIVER,

params=?params)

self.sendCommand(cmd)

except:

logging.error("An?error?occurred?while?reporting?the?event.")

else:

raiseTypeError("Incorrect?action?type.")

1.5.家居系統(tǒng)互聯(lián)控制器封裝

classYqmiotController(YqmiotBase):

"""

月球貓互聯(lián)控制器

"""

#?訂閱廣播消息

defhandleConnected(self):

super(YqmiotController,self).handleConnected()

logging.info("Connect?server?successfully.")

#?偵聽設(shè)備上報(bào)

topic?="yqmiot/{self.accountid}/0/#".format(self=self)

self.subscribe(topic)

defhandleCommand(self,cmd):

ifcmd.name?==?YQMIOT_COMMAND_PROPERTY:

self.handleCommandProperty(cmd)

elifcmd.name?==?YQMIOT_COMMAND_EVENT:

self.handleCommandEvent(cmd)

else:

super(YqmiotController,self).handleCommand(cmd)

defhandleCommandProperty(self,cmd):

print"設(shè)備?{}?上報(bào)屬性:{}".format(cmd.sender,?cmd.params)

defhandleCommandEvent(self,cmd):

print"設(shè)備?{}?上報(bào)事件:{}?參數(shù):{}".format(cmd.sender,?cmd.action,?cmd.params)


到這里為止,我們的控制系統(tǒng)的客戶端已經(jīng)封裝完畢,但是這才剛剛起步,我們有了客戶端,那我們還需要遠(yuǎn)程控制器,我們?yōu)榱撕?jiǎn)便起見使用了web終端的方案。來進(jìn)行對(duì)設(shè)備客戶端的控制,由于代碼量很大我這里就簡(jiǎn)要的介紹一下。

在控制端中主要使用的mqtt推送協(xié)議,然后轉(zhuǎn)換成socket以便實(shí)時(shí)控制。因?yàn)槲覀兊募夹g(shù)棧使用的是vuejs,大家如果不了解可以先去了解了解,這是一種以數(shù)據(jù)為驅(qū)動(dòng)的web解決方案,告別了傳統(tǒng)的dom節(jié)點(diǎn)控制。使得運(yùn)行速度和性能得到了很大的提升。我們?cè)诳刂频玫絪ocket數(shù)據(jù)后,然后進(jìn)行分發(fā)進(jìn)入各種控制器,分別管理不同數(shù)據(jù)和業(yè)務(wù)邏輯的實(shí)現(xiàn)以及數(shù)據(jù)的調(diào)配。

實(shí)踐效果:


設(shè)備列表


設(shè)備詳情

好下面我們就來看看最后達(dá)到的控制效果吧!


智能家居控制系統(tǒng)測(cè)試視頻



智能家居互聯(lián)




項(xiàng)目地址:https://github.com/yqmiot

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容