KubeEdge分析-mapper與deviceTwin交互流程

整體架構(gòu)

官方文檔 https://github.com/kubeedge/kubeedge/blob/master/docs/mappers/modbus_mapper.md

kubeedge_arch.png

從架構(gòu)圖上可以看出,mapper和外界,包括DeviceTwin唯一的交互途徑就是通過(guò)edge node內(nèi)部的MQTT broker.

mapper通過(guò)訂閱、發(fā)布相關(guān)的topic,實(shí)現(xiàn)與EdgeCore或者其他自己定義的APP進(jìn)行交互

topic

先看下網(wǎng)關(guān)文檔中的說(shuō)明(https://docs.kubeedge.io/en/latest/guides/message_topics.html)

1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"

We will focus on the message expected on the first 3 topics.

"$hw/events/node/+/membership/get": This topics is used to get membership details of a node i.e the devices that are associated with the node. The response of the message is published on "$hw/events/node/+/membership/get/result" topic.
"$hw/events/device/+/state/update”: This topic is used to update the state of the device. + symbol can be replaced with ID of the device whose state is to be updated.
"$hw/events/device/+/twin/+": The two + symbols can be replaced by the deviceID on whose twin the operation is to be performed and any one of(update,cloud_updated,get) respectively.
Following is the explanation of the three suffix used:

update: this suffix is used to update the twin for the deviceID.
cloud_updated: this suffix is used to sync the twin status between edge and cloud.
get: is used to get twin status of a device. The response is published on "$hw/events/device/+/twin/get/result" topic.

另外,在eventbus的文檔中也有一段描述

- $hw/events/upload/#
- SYS/dis/upload_records
- SYS/dis/upload_records/+
- $hw/event/node/+/membership/get
- $hw/event/node/+/membership/get/+
- $hw/events/device/+/state/update
- $hw/events/device/+/state/update/+
- $hw/event/device/+/twin/+

在看下代碼,從modbus_mapper的constant.js中,可以看到mapper中的所有topic。

const defaultTopicPrefix = '$hw/events/device/';
const defaultDirectTopicPrefix = '$hw/devices/';
const twinDeltaTopic = defaultTopicPrefix + '+/twin/update/delta';
const twinUpdateTopic = '/twin/update';
const twinGetResTopic = defaultTopicPrefix + '+/twin/get/result';
const twinGetTopic = '/twin/get';
const directGetTopic = '/events/properties/get';

從代碼中可以看出,有兩類topic,一類是類似廣播類型的hw/events/device/ ,另一類是針對(duì)某個(gè)特定設(shè)備的hw/devices/。

另外,文檔和代碼中的topic是有區(qū)別的,代碼中并沒(méi)有node、SYS相關(guān)的topic

topic詳細(xì)分析

基本概念

mqtt的topic是支持通配符的(wildcard),有3類通配符,分別是

  • + 加號(hào),匹配一段
  • # 井號(hào),匹配多段
  • $ 美元符,用在首位,防止被+、#通配符匹配

mqtt client是可以訂閱多個(gè)topic的(傳入topic列表),不過(guò)modbus_mapper的實(shí)現(xiàn)中,并沒(méi)有用這種方式,而是每個(gè)client只用來(lái)發(fā)布、訂閱一個(gè)topic。

這種方式下,on_message就不需要在判斷收到的msg是相應(yīng)哪個(gè)topic的了,可以直接進(jìn)入處理流程。

node/+/membership

在server.go中,有相關(guān)的訂閱,但是在mapper中沒(méi)看到有推送,這里后續(xù)再研究

// onSubscribe will be called if the topic is matched in topic tree.
func (m *Server) onSubscribe(msg *packet.Message) {
    // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
    // for other, send to hub
    // for "SYS/dis/upload_records", no need to base64 topic
    var target string
    resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic))
    if strings.HasPrefix(msg.Topic, "$hw/events/device") || strings.HasPrefix(msg.Topic, "$hw/events/node") {
        target = modules.TwinGroup
    } else {
        target = modules.HubGroup
        if msg.Topic == "SYS/dis/upload_records" {
            resource = "SYS/dis/upload_records"
        }
    }
    // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
    message := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
        resource, "response").FillBody(string(msg.Payload))
    klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, resource))
    beehiveContext.SendToGroup(target, *message)
}

twinGetResTopic

$hw/events/device/+/twin/get/result

Mapper訂閱這個(gè)twinGetResTopic,注意中間的通配符加號(hào),這里匹配的應(yīng)該是任意設(shè)備ID

        mqtt_client.on('connect', ()=>{
            logger.info('connetced to edge mqtt with topic twinGet');
            mqtt_client.subscribe(constant.twinGetResTopic);
            for (let instance of devIns) {
                dt.getActuals(instance[0]);
            }
        });

對(duì)這個(gè)訂閱,收到的消息處理代碼如下

        mqtt_client.on('message', (topic, message)=>{
            try {
                var msgGet = JSON.parse(message.toString());
            } catch (err) {
                logger.error('unmarshal error');
                return;
            }
            let resources = topic.toString().split('/');
            let deviceID = resources[3];
            let dt = new DeviceTwin(mqtt_client);
            let devProtocol, devInstance;
            if (devPro.has(deviceID) && devIns.has(deviceID)) {
                devProtocol = devPro.get(deviceID);
                devInstance = devIns.get(deviceID);
            } else {
                logger.error('match visitor failed');
            }
            logger.info('recieve twinGet msg, set properties actual value map');
            if (resources.length === 7 && resources[5] === 'get' && msgGet != null && msgGet.code != 404 && typeof(devProtocol) != 'undefined' && typeof(devInstance) != 'undefined') {
                dt.setActuals(msgGet, (PropActuals)=>{
                    for (let actual of PropActuals) {
                        ActualVal.set(util.format('%s-%s', deviceID, actual[0]), actual[1]);
                    }
                });
                dt.setExpecteds(msgGet, (PropExpecteds)=>{
                    for (let expected of PropExpecteds) {
                        modbusProtocolTransfer(devProtocol.protocol, (transferedProtocol)=>{
                            if (modVistr.has(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol))) {
                                let visitor = modVistr.get(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol));
                                dealDeltaMsg(msgGet, expected[0], visitor, devProtocol, expected[1]);
                            }
                        });
                    }
                });
            }
        }

代碼中首先根據(jù)topic的名稱取出device id,然后根據(jù)本地緩存的device profile檢查是否存在這個(gè)id,如果存在,那么取出改設(shè)備對(duì)應(yīng)的協(xié)議、實(shí)例等信息,然后依次調(diào)用
setActuals和setExpecteds方法。

setActuals

取出topic消息體中個(gè)屬性的actual的值,然后放到ActualVal這個(gè)全局的map中保存。

(從代碼中看,setActuals并不會(huì)改變?cè)O(shè)備上的數(shù)據(jù),只會(huì)記錄在ActualVal。而ActualVal是會(huì)定期和設(shè)備上實(shí)際的數(shù)據(jù)進(jìn)行同步的,并且以設(shè)備數(shù)據(jù)為準(zhǔn),所以這里setActual的目的不明)

setExpecteds

取出topic消息體中個(gè)屬性的expected的值(不能同時(shí)有actual和expected,否則是不做處理的),然后調(diào)用協(xié)議驅(qū)動(dòng)(比如modbus協(xié)議驅(qū)動(dòng)),將值寫入到設(shè)備中。

twinGetTopic

除了訂閱和處理twinGetResTopic消息,index.js在初始化的時(shí)候,還干了一件事:遍歷設(shè)備,調(diào)用deviceTwin的getActuals

    // getActuals publish get devicetwin msg to edge mqtt
    getActuals(deviceID) {
        let payload_msg = {
            event_id: "",
            timestamp: new Date().getTime()
        };
        this.mqttClient.publish(constant.defaultTopicPrefix + deviceID + constant.twinGetTopic, JSON.stringify(payload_msg));
    }

topic由具體的deviceID一起組成,'$hw/events/device/123456789/twin/get
device的列表是由configMap傳入并維護(hù)的,mapper會(huì)為每個(gè)device都發(fā)布一個(gè)獨(dú)立的twinGetTopic

上一節(jié)分析的twinGetResTopic,實(shí)際就是對(duì)twinGetTopic的response。deviceID是從configMap中取出來(lái)的,因此,mapper是根據(jù)configMap中配置的device instance列表,發(fā)布twinGetTopic,edgecore收到topic后,發(fā)布twinGetResTopic。mapper通過(guò)訂閱twinGetResTopic就可以獲得所有device instance從云端發(fā)來(lái)的數(shù)據(jù)。

twinDeltaTopic

$hw/events/device/+/twin/update/delta

twinDeltaTopic是在一個(gè)新的client中訂閱的,和之前twinGetResTopic處理形式類似,,也是從topic中取出device的id,然后再緩存中找到該設(shè)備對(duì)應(yīng)的profile信息

然后從topic中的msg中取出數(shù)據(jù),遍歷每個(gè)key(也就是property的名稱)通過(guò)如下方法進(jìn)行處理:

                                DeviceTwin.syncExpected(msg, key, (value)=>{
                                    dealDeltaMsg(msg, key, visitor, devProtocol, value);
                                });
    // syncExpected check whether expected value should be update to device
    static syncExpected(delta, key, callback) {
        let deviceTwin = delta.twin[key];
        if (!delta.twin.hasOwnProperty(key)) {
            logger.error("Invalid device twin ", key);
            return;
        }
        if (!deviceTwin.hasOwnProperty('actual') ||
          (deviceTwin.hasOwnProperty('expected') && deviceTwin.expected.hasOwnProperty('metadata') && deviceTwin.actual.hasOwnProperty('metadata') && 
            deviceTwin.expected.metadata.timestamp > deviceTwin.actual.metadata.timestamp &&
            deviceTwin.expected.value !== deviceTwin.actual.value)) {
          callback(deviceTwin.expected.value);
        }
    }

這里邏輯還比較復(fù)雜,總體上就是deviceTwin這個(gè)對(duì)象中需要有expected,但沒(méi)有actual,并且expected的更新時(shí)間晚于actual,并且expected和actual值不等的時(shí)候,調(diào)用dealDeltaMsg對(duì)實(shí)際設(shè)備上的寄存器進(jìn)行更新。

deviceProfile更新

index.js會(huì)監(jiān)聽deviceProfile更新,當(dāng)cloud通過(guò)configmap更新deviceProfile時(shí),會(huì)重新創(chuàng)建一個(gè)mqtt的client,來(lái)監(jiān)聽twinGetResTopic的topic。

對(duì)topic的處理和之前寫的twinGetResTopic處理邏輯基本一致,只是少了setActuals的步驟,而只調(diào)用setExpecteds。

從這里也可以看出,setActuals只是mapper第一次啟動(dòng)的時(shí)候進(jìn)行一下初始化。

syncDeviceTwin定時(shí)任務(wù)

syncDeviceTwin這個(gè)定時(shí)任務(wù)每?jī)擅雸?zhí)行一次

定時(shí)任務(wù)中,遍歷本地緩存總的所有設(shè)備,通過(guò)modbus讀取設(shè)備上最新的數(shù)據(jù),然后更新本地的ActualVals緩存,

twinUpdateTopic

然后調(diào)用updateActual方法,通過(guò)$hw/events/device/123456789/twin/update topic推送device twin的更新

directGetTopic

接著調(diào)用UpdateDirectActuals方法,往$hw/devices/123456789/events/properties/get 的topic中推送最新的設(shè)備屬性數(shù)據(jù)

兩者區(qū)別

從上述分析可以看出,定時(shí)任務(wù)會(huì)通過(guò)mqtt推送兩類數(shù)據(jù),看一下他們的區(qū)別。

  • directGetTopic只在每次定時(shí)任務(wù)結(jié)束的時(shí)候調(diào)用一次,而twinUpdateTopic是每個(gè)設(shè)備屬性的值發(fā)生變化時(shí),就調(diào)用一次。
  • directGetTopic每次更新這個(gè)設(shè)備的所有屬性,twinUpdateTopic每次只更新一個(gè)設(shè)備的一個(gè)屬性
  • directGetTopic有些額外的字段,比如route、content與header字段(具體用途見edge core的device twin模塊分析)

Edge端

edged的eventbus會(huì)監(jiān)控邊緣MQTT的topic,根據(jù)topic名稱和內(nèi)容進(jìn)行相應(yīng)處理。

eventbus

首先確認(rèn)一點(diǎn),所有與MQTT broker的交互,都是通過(guò)eventbus來(lái)做的,eventbus從MQTT訂閱消息以后,再通過(guò)beehive發(fā)送到其他模塊,如devicetwin模塊

// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, message MQTT.Message) {
    klog.Infof("OnSubMessageReceived receive msg from topic: %s", message.Topic())
    // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
    // for other, send to hub
    // for "SYS/dis/upload_records", no need to base64 topic
    var target string
    resource := base64.URLEncoding.EncodeToString([]byte(message.Topic()))
    if strings.HasPrefix(message.Topic(), "$hw/events/device") || strings.HasPrefix(message.Topic(), "$hw/events/node") {
        target = modules.TwinGroup
    } else {
        target = modules.HubGroup
        if message.Topic() == "SYS/dis/upload_records" {
            resource = "SYS/dis/upload_records"
        }
    }
    // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
    msg := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
        resource, "response").FillBody(string(message.Payload()))
    klog.Info(fmt.Sprintf("received msg from mqttserver, deliver to %s with resource %s", target, resource))
    beehiveContext.SendToGroup(target, *msg)
}
// BuildRouter sets route and resource operation in message
func (msg *Message) BuildRouter(source, group, res, opr string) *Message {
    msg.SetRoute(source, group)
    msg.SetResourceOperation(res, opr)
    return msg
}

  • resource是通過(guò)base64編碼的完整的topic的名字

  • target對(duì)于"hw/events/device"和"hw/events/node"這兩個(gè)類型的topic,target是 twinGroup,否則是HubGroup

  • router key是 routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>

  • 根據(jù)BuildRouter的參數(shù)可以看出,這里的source是bus,group是user

devicetwin

參考官方文檔 https://github.com/kubeedge/kubeedge/blob/master/docs/modules/edge/devicetwin.md

deviceTwin.png

devicetwin從beehive中取到group為twin的消息,
然后調(diào)用classifyMsg來(lái)對(duì)消息進(jìn)行分類。這里的分類方式,對(duì)devicetwin的處理流程有比較大的幫助,所以仔細(xì)看下

func classifyMsg(message *dttype.DTMessage) bool {
    if EventActionMap == nil {
        initEventActionMap()
    }
    var identity string
    var action string
    msgSource := message.Msg.GetSource()
    if strings.Compare(msgSource, "bus") == 0 {
        idLoc := 3
        topic := message.Msg.GetResource()
        topicByte, err := base64.URLEncoding.DecodeString(topic)
        if err != nil {
            return false
        }
        topic = string(topicByte)

        klog.Infof("classify the msg with the topic %s", topic)
        splitString := strings.Split(topic, "/")
        if len(splitString) == 4 {
            if strings.HasPrefix(topic, dtcommon.LifeCycleConnectETPrefix) {
                action = dtcommon.LifeCycle
            } else if strings.HasPrefix(topic, dtcommon.LifeCycleDisconnectETPrefix) {
                action = dtcommon.LifeCycle
            } else {
                return false
            }
        } else {
            identity = splitString[idLoc]
            loc := strings.Index(topic, identity)
            nextLoc := loc + len(identity)
            prefix := topic[0:loc]
            suffix := topic[nextLoc:]
            klog.Infof("%s %s", prefix, suffix)
            if v, exist := EventActionMap[prefix][suffix]; exist {
                action = v
            } else {
                return false
            }
        }
        message.Msg.Content = []byte((message.Msg.Content).(string))
        message.Identity = identity
        message.Action = action
        klog.Infof("Classify the msg to action %s", action)
        return true
    } else if (strings.Compare(msgSource, "edgemgr") == 0) || (strings.Compare(msgSource, "devicecontroller") == 0) {
        switch message.Msg.Content.(type) {
        case []byte:
            klog.Info("Message content type is []byte, no need to marshal again")
        default:
            content, err := json.Marshal(message.Msg.Content)
            if err != nil {
                return false
            }
            message.Msg.Content = content
        }
        if strings.Contains(message.Msg.Router.Resource, "membership/detail") {
            message.Action = dtcommon.MemDetailResult
            return true
        } else if strings.Contains(message.Msg.Router.Resource, "membership") {
            message.Action = dtcommon.MemUpdated
            return true
        } else if strings.Contains(message.Msg.Router.Resource, "twin/cloud_updated") {
            message.Action = dtcommon.TwinCloudSync
            resources := strings.Split(message.Msg.Router.Resource, "/")
            message.Identity = resources[1]
            return true
        } else if strings.Contains(message.Msg.Router.Operation, "updated") {
            resources := strings.Split(message.Msg.Router.Resource, "/")
            if len(resources) == 2 && strings.Compare(resources[0], "device") == 0 {
                message.Action = dtcommon.DeviceUpdated
                message.Identity = resources[1]
            }
            return true
        }
        return false

    } else if strings.Compare(msgSource, "edgehub") == 0 {
        if strings.Compare(message.Msg.Router.Resource, "node/connection") == 0 {
            message.Action = dtcommon.LifeCycle
            return true
        }
        return false
    }
    return false
}

classifyMsg中首先判斷的是msg的source,從代碼中看,分別有bus、edgemgr、devicecontroller、edgehub四種。

從eventbus中過(guò)來(lái)的消息,就是bus;從edgehub過(guò)來(lái)的消息目前看就只有心跳;edgemgr的source從代碼中沒(méi)看到,目前應(yīng)該是用不到;devicecontroller過(guò)來(lái)的消息應(yīng)該是云端對(duì)device的操作;

這里在分析mapper與devicetwin的交互,所以先看bus的類型的消息。

對(duì)于"$hw/events/connected/%s"類型的消息,是心跳類型,返回false,也就是不需要處理

對(duì)于其他消息,也就是"$hw/events/device/123456789/twin/update"的消息,identity就是取的第四個(gè)字段,也就是device的ID

action則是根據(jù)前綴$hw/events/device/和后綴twin/update這兩部分查表(EventActionMap)查出來(lái)的,

通過(guò)這兩部分進(jìn)行查表,最終進(jìn)入到處理方法dealTwinUpdate方法中

dealTwinUpdate.png

遺留問(wèn)題

twinDeltaTopic和twinGetResTopic區(qū)別

twinDeltaTopic和twinGetResTopic都可以更新設(shè)備上的值,兩者的區(qū)別在哪里?

  • BuildDeviceTwinResult
  • BuildDeviceTwinDelta

directGetTopic沒(méi)有接受方

directGetTopic從代碼中看,是沒(méi)有接收方到,也就是說(shuō)這個(gè)消息不會(huì)被處理

mapper更新了所有屬性

從modbus mapper中可以看到msg twin的一個(gè)結(jié)構(gòu),即一個(gè)key是Property名稱的字典

        let reply_msg = {
            event_id: "",
            timestamp: new Date().getTime()
        };
        let twin = {};
        twin[property.name] = {
            actual: {
                value: String(value),
                metadata: {
                    timestamp: new Date().getTime()
                }
            },
            metadata: {
                tyep: property.dataType
            }
        };
        reply_msg.twin = twin;

所以,最終eventbus就是從modbus mapper中取出msg twin來(lái)進(jìn)行后續(xù)處理。

但這里其實(shí)是有個(gè)問(wèn)題的,modbus mapper的當(dāng)前實(shí)現(xiàn)上,沒(méi)有判斷一個(gè)屬性是否是twin屬性,
直接將所有屬性都做為twin屬性給publish了,這個(gè)問(wèn)題在后續(xù)應(yīng)該會(huì)進(jìn)行修復(fù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容