Skip to main content

关于edgeHub的消息通讯方式

分类:  Azure物联网 标签:  #Azure #IoT Edge # 发布于: 2023-06-14 21:02:42

我们在上一章尝试向基于LinuxAzure IoT Edge Runtime部署自定义模块的时候,学习到消息路由的概念,我们这一节来深入学习一下系统模块edgeHub的通讯方式:

  • 消息路由
  • edgeHub本身作为一个消息代理服务。

消息路由

edgeHub模块管理模块之间,模块和IoT Hub以及其他子设备之间的通讯,在Azure IoT Hub中,可以通过edgeHub的module twin来定义消息的路由,检查edgeHubmodule twin, 可以观察到它有一个预期属性routes, 该属性用于定义消息是如何路由的。如如下的实例:

{
  "modulesContent": {
    "$edgeAgent": { ... },
    "$edgeHub": {
      "properties.desired": {
        "schemaVersion": "1.1",
        "routes": {
          "route1": "FROM <source> WHERE <condition> INTO <sink>",
          "route2": {
            "route": "FROM <source> WHERE <condition> INTO <sink>",
            "priority": 0,
            "timeToLiveSecs": 86400
          }
        },
        "storeAndForwardConfiguration": {
          "timeToLiveSecs": 10
        }
      }
    },
    "module1": { ... },
    "module2": { ... }
  }
}

消息路由的格式:FROM <source> WHERE <condition> INTO <sink>, 另外需要注意的是在edgeHub中定义的schemaVersion, 目前推荐的值是1.1

每条路有规则都必须有一个源<source>, 一个目的<sink>, 一个可选的过滤条件,每条路由您可以设定一个优先级,它可以确保消息处理的先后顺序。

消息路由的源<source>

消息路由的源用于指定消息是从哪里发出的,IoT Edge可以路由模块或者子设备的消息, 消息源可以指定如下的值:

消息源<source>注释
/*指代所有的device-to-cloud消息或者twin change notifications, 包括所有的模块或者子设备的消息
/twinChangeNotifications所有模块或者子设备的twin change消息,即(reported Properties)上报属性
/messages/*所有模块(子设备)的device-to-cloud 消息,或者没有设定output到下一个模块或者子设备
/messages/modules/*所有的模块的device-to-cloud
/messages/modules/<moduleid>/*某个指定模块的所有device-to-cloud消息
/messages/modules/<moduleId>/outputs/<output>某个指定的模块所有通过特定的output 输出的device-to-cloud消息

路由中的条件

消息路由中的条件是可选的,如果想传递消息源中所有的消息,只要不带子句WHERE就好了。或者您可以使用IoT Hub Query language来过滤。需要注意的是IoT Edge不支持基于twin Tag或者Twin property的查询, 只可以使用System PropertiesApplication PropertiesBody Proties这三种参数来构建查询条件,关于IoT Hub Query Language你可以参考一下文档:https://www.azuredeveloper.cn/article/how-to-use-azure-iot-hub-query-language, 和文档:https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-query-language, 以及文档:https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-routing-query-syntax?view=iotedge-2020-11

如下是一个消息的实例:

{ 
  "message": { 
    "systemProperties": { 
      "contentType": "application/json", 
      "contentEncoding": "UTF-8", 
      "iothub-message-source": "deviceMessages", 
      "iothub-enqueuedtime": "2017-05-08T18:55:31.8514657Z" 
    }, 
    "appProperties": { 
      "processingPath": "{cold | warm | hot}", 
      "verbose": "{true, false}", 
      "severity": 1-5, 
      "testDevice": "{true | false}" 
    }, 
    "body": "{\"Weather\":{\"Temperature\":50}}" 
  } 
}

从这个消息的实例可以看到systemPropertiesappPropertiesbody。 而在查询条件中按照如下的格式使用:

  • System properties$<propertyName> 或者 {$<propertyName>}
  • Application Properties<propertyName>
  • Body properties$body.<propertyName>

例如:

FROM /messages/* WHERE NOT IS_DEFINED($connectionModuleId) INTO $upstream

这里的$connectionMOduleId是系统属性值。

FROM /messages/* WHERE NOT IS_DEFINED(connectionModuleId) INTO $upstream

这里的connectionModuleId是应用属性

FROM /messages/* WHERE NOT IS_DEFINED($body.connectionModuleId) INTO $upstream

则是从body属性中进行过滤。

消息的目的<sink>

<sink>用于定义符合条件的消息会被发送到哪里,只有模块和iot Hub可以接收消息。

注意
消息无法路由到其他设备上,包括子设备。

Sink的可以选如下的值:

Sink注释
$upstream发送消息到IoT Hub
BrokeredEndpoint("/modules/<moudleId>/inputs/<inputs>")发送消息到某个指定的模块的指定名称的input

需要注意的是IoT Edge提供at-least-once的保证,即总是能保证消息至少有一次到达。如果消息不能传递,edgeHub会将该消息保存在本地。

注意
Edge Hub保存消息的时长是有限的,通过设置edgeHub的预期属性(desired properties)storeAndForwardConfiguration.timeToLiveSecs来更改保存的时长,默认是7200秒

消息路由的优先级和保存时长(time-to-live)

消息路由的定义可以仅仅是一个字符串定义路由,也可以定义为一个对象,该对象包含三个部分,字符串定义路由路径,一个整形定义优先级,另外一个整型值定义存活时长time-to-live

例如:

"route1": "FROM <source> WHERE <condition> INTO <sink>",

或者:

"route2": {
  "route": "FROM <source> WHERE <condition> INTO <sink>",
  "priority": 0,
  "timeToLiveSecs": 86400
}

Priority

优先级的值可以定义值范围从0-9, 其中0代表最高的优先级,消息会根据指定的终点压入队列,然后按照优先级从高到底来处理

time-to-live

如果没有明确指定该值,那么该值会从IoT Hub上继承,如果需要明确的指定,需要使用edgeHub的预期属性:storeAndForwardConfiguration.timeToLiveSecs来指定,单位是秒。

EdgeHub作为本地的消息代理服务(Broker)

这个时候edgeHub自己就是一个消息队列服务,客户端只需要使用对应的支持mqtt协议的库就可以订阅和发布消息了。借用官方的图来说明一下这个架构:


我们使用如下表来对edgeHub不同的通讯方式做一个对比: