关于edgeHub的消息通讯方式
分类: Azure物联网 ◆ 标签: #Azure #IoT Edge # ◆ 发布于: 2023-06-14 21:02:42

我们在上一章尝试向基于Linux
的Azure IoT Edge Runtime
部署自定义模块的时候,学习到消息路由的概念,我们这一节来深入学习一下系统模块edgeHub
的通讯方式:
- 消息路由
edgeHub
本身作为一个消息代理服务。
消息路由
edgeHub
模块管理模块之间,模块和IoT Hub
以及其他子设备之间的通讯,在Azure IoT Hub
中,可以通过edgeHub
的module twin来定义消息的路由,检查edgeHub
的module twin
, 可以观察到它有一个预期属性routes
, 该属性用于定义消息是如何路由的。如如下的实例:
{ "modulesContent": { "$edgeAgent": { ... }, "$edgeHub": { "properties.desired": { "schemaVersion": "1.1", "routes": { "route1": "FROM <source> WHERE <condition> INTO <sink>", "route2": { "route": "FROM <source> WHERE <condition> INTO <sink>", "priority": 0, "timeToLiveSecs": 86400 } }, "storeAndForwardConfiguration": { "timeToLiveSecs": 10 } } }, "module1": { ... }, "module2": { ... } } }
消息路由的格式:FROM <source> WHERE <condition> INTO <sink>
, 另外需要注意的是在edgeHub
中定义的schemaVersion
, 目前推荐的值是1.1
。
每条路有规则都必须有一个源<source>
, 一个目的<sink>
, 一个可选的过滤条件,每条路由您可以设定一个优先级,它可以确保消息处理的先后顺序。
消息路由的源<source>
消息路由的源用于指定消息是从哪里发出的,IoT Edge
可以路由模块或者子设备的消息, 消息源可以指定如下的值:
消息源<source> | 注释 |
---|---|
/* | 指代所有的device-to-cloud消息或者twin change notifications , 包括所有的模块或者子设备的消息 |
/twinChangeNotifications | 所有模块或者子设备的twin change消息,即(reported Properties)上报属性 |
/messages/* | 所有模块(子设备)的device-to-cloud 消息,或者没有设定output到下一个模块或者子设备 |
/messages/modules/* | 所有的模块的device-to-cloud |
/messages/modules/<moduleid>/* | 某个指定模块的所有device-to-cloud消息 |
/messages/modules/<moduleId>/outputs/<output> | 某个指定的模块所有通过特定的output 输出的device-to-cloud消息 |
路由中的条件
消息路由中的条件是可选的,如果想传递消息源中所有的消息,只要不带子句WHERE
就好了。或者您可以使用IoT Hub Query language
来过滤。需要注意的是IoT Edge
不支持基于twin Tag
或者Twin property
的查询, 只可以使用System Properties
, Application Properties
, Body Proties
这三种参数来构建查询条件,关于IoT Hub Query Language
你可以参考一下文档:https://www.azuredeveloper.cn/article/how-to-use-azure-iot-hub-query-language, 和文档:https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-query-language, 以及文档:https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-routing-query-syntax?view=iotedge-2020-11
如下是一个消息的实例:
{ "message": { "systemProperties": { "contentType": "application/json", "contentEncoding": "UTF-8", "iothub-message-source": "deviceMessages", "iothub-enqueuedtime": "2017-05-08T18:55:31.8514657Z" }, "appProperties": { "processingPath": "{cold | warm | hot}", "verbose": "{true, false}", "severity": 1-5, "testDevice": "{true | false}" }, "body": "{\"Weather\":{\"Temperature\":50}}" } }
从这个消息的实例可以看到systemProperties
, appProperties
, body
。 而在查询条件中按照如下的格式使用:
System properties
:$<propertyName>
或者{$<propertyName>}
Application Properties
:<propertyName>
Body properties
:$body.<propertyName>
例如:
FROM /messages/* WHERE NOT IS_DEFINED($connectionModuleId) INTO $upstream
这里的$connectionMOduleId
是系统属性值。
FROM /messages/* WHERE NOT IS_DEFINED(connectionModuleId) INTO $upstream
这里的connectionModuleId
是应用属性
FROM /messages/* WHERE NOT IS_DEFINED($body.connectionModuleId) INTO $upstream
则是从body属性中进行过滤。
消息的目的<sink>
<sink>
用于定义符合条件的消息会被发送到哪里,只有模块和iot Hub
可以接收消息。
注意
消息无法路由到其他设备上,包括子设备。
Sink
的可以选如下的值:
Sink | 注释 |
---|---|
$upstream | 发送消息到IoT Hub |
BrokeredEndpoint("/modules/<moudleId>/inputs/<inputs>") | 发送消息到某个指定的模块的指定名称的input 里 |
需要注意的是IoT Edge
提供at-least-once
的保证,即总是能保证消息至少有一次到达。如果消息不能传递,edgeHub
会将该消息保存在本地。
注意
Edge Hub
保存消息的时长是有限的,通过设置edgeHub
的预期属性(desired properties)storeAndForwardConfiguration.timeToLiveSecs
来更改保存的时长,默认是7200秒
消息路由的优先级和保存时长(time-to-live
)
消息路由的定义可以仅仅是一个字符串定义路由,也可以定义为一个对象,该对象包含三个部分,字符串定义路由路径,一个整形定义优先级,另外一个整型值定义存活时长time-to-live
。
例如:
"route1": "FROM <source> WHERE <condition> INTO <sink>",
或者:
"route2": { "route": "FROM <source> WHERE <condition> INTO <sink>", "priority": 0, "timeToLiveSecs": 86400 }
Priority
优先级的值可以定义值范围从0-9
, 其中0
代表最高的优先级,消息会根据指定的终点压入队列,然后按照优先级从高到底来处理
time-to-live
如果没有明确指定该值,那么该值会从IoT Hub
上继承,如果需要明确的指定,需要使用edgeHub
的预期属性:storeAndForwardConfiguration.timeToLiveSecs
来指定,单位是秒。
EdgeHub
作为本地的消息代理服务(Broker
)
这个时候edgeHub
自己就是一个消息队列服务,客户端只需要使用对应的支持mqtt
协议的库就可以订阅和发布消息了。借用官方的图来说明一下这个架构:
我们使用如下表来对edgeHub
不同的通讯方式做一个对比: